0
|
1 ???using System; |
|
2 using UCIS.Pml; |
104
|
3 using UCIS.Util; |
0
|
4 using System.Collections.Generic; |
|
5 using System.Threading; |
|
6 |
|
7 namespace UCIS.Pml { |
|
8 public abstract class ActivePmlChannel : IPmlChannel { |
|
9 private ReadMessageAsyncResult _asyncWait = null; |
|
10 private Queue<PmlElement> _queue = new Queue<PmlElement>(); |
|
11 private bool _isOpen = true; |
|
12 |
|
13 public virtual bool IsOpen { get { return _isOpen; } } |
|
14 public abstract void WriteMessage(PmlElement message); |
|
15 |
|
16 public PmlElement ReadMessage() { |
104
|
17 lock (_queue) { |
|
18 if (!IsOpen) throw new InvalidOperationException("The channel is not open"); |
|
19 while (_queue.Count == 0) { |
|
20 if (!IsOpen) throw new OperationCanceledException("The operation did not complete"); |
|
21 Monitor.Wait(_queue); |
|
22 } |
|
23 return _queue.Dequeue(); |
0
|
24 } |
|
25 } |
|
26 |
|
27 public IAsyncResult BeginReadMessage(AsyncCallback callback, object state) { |
104
|
28 ReadMessageAsyncResult ar; |
|
29 Boolean completed = false; |
|
30 lock (_queue) { |
|
31 if (!IsOpen) throw new InvalidOperationException("The channel is not open"); |
|
32 if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress"); |
|
33 ar = new ReadMessageAsyncResult(callback, state); |
|
34 if (_queue.Count == 0) { |
|
35 _asyncWait = ar; |
|
36 } else { |
|
37 ar.Message = _queue.Dequeue(); |
|
38 completed = true; |
|
39 } |
0
|
40 } |
104
|
41 if (completed) ar.SetCompleted(true, null); |
0
|
42 return ar; |
|
43 } |
|
44 |
|
45 public PmlElement EndReadMessage(IAsyncResult asyncResult) { |
|
46 ReadMessageAsyncResult ar = (ReadMessageAsyncResult)asyncResult; |
104
|
47 ar.WaitForCompletion(); |
|
48 if (ar.Error != null) throw new OperationCanceledException("The asynchronous operation failed", ar.Error); |
0
|
49 return ar.Message; |
|
50 } |
|
51 |
|
52 public virtual void Close() { |
104
|
53 ReadMessageAsyncResult asyncWait; |
|
54 lock (_queue) { |
|
55 _isOpen = false; |
|
56 asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); |
|
57 Monitor.PulseAll(_queue); |
|
58 } |
|
59 if (asyncWait != null) asyncWait.SetCompleted(false, new ObjectDisposedException("ActivePmlChannel")); |
0
|
60 } |
|
61 |
|
62 public void Dispose() { |
|
63 Close(); |
|
64 } |
|
65 |
|
66 protected void PushReceivedMessage(PmlElement message) { |
104
|
67 ReadMessageAsyncResult asyncWait; |
|
68 lock (_queue) { |
|
69 asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); |
|
70 if (asyncWait == null) { |
|
71 _queue.Enqueue(message); |
|
72 Monitor.Pulse(_queue); |
|
73 } |
|
74 } |
0
|
75 if (asyncWait != null) { |
104
|
76 asyncWait.Message = message; |
|
77 asyncWait.SetCompleted(false, null); |
0
|
78 } |
|
79 } |
|
80 |
104
|
81 class ReadMessageAsyncResult : AsyncResultBase { |
0
|
82 internal PmlElement Message; |
104
|
83 public ReadMessageAsyncResult(AsyncCallback callback, Object state) : base(callback, state) { } |
|
84 public void SetCompleted(Boolean synchronously, Exception error) { |
|
85 base.SetCompleted(synchronously, error); |
0
|
86 } |
|
87 } |
|
88 } |
|
89 } |