0
|
1 ???using System; |
|
2 using UCIS.Pml; |
|
3 using System.Collections.Generic; |
|
4 using System.Threading; |
|
5 |
|
6 namespace UCIS.Pml { |
|
7 public abstract class ActivePmlChannel : IPmlChannel { |
|
8 private AutoResetEvent _receiveEvent = new AutoResetEvent(false); |
|
9 private ReadMessageAsyncResult _asyncWait = null; |
|
10 private Queue<PmlElement> _queue = new Queue<PmlElement>(); |
|
11 private bool _isOpen = true; |
|
12 |
|
13 public virtual bool IsOpen { get { return _isOpen; } } |
|
14 public abstract void WriteMessage(PmlElement message); |
|
15 |
|
16 public PmlElement ReadMessage() { |
|
17 if (!IsOpen) throw new InvalidOperationException("The channel is not open"); |
|
18 if (_queue.Count == 0) { |
|
19 _receiveEvent.WaitOne(); |
|
20 if (_queue.Count == 0) throw new OperationCanceledException("The operation did not complete"); |
|
21 } else if (_queue.Count == 1) { |
|
22 _receiveEvent.Reset(); |
|
23 } |
|
24 return _queue.Dequeue(); |
|
25 } |
|
26 |
|
27 public IAsyncResult BeginReadMessage(AsyncCallback callback, object state) { |
|
28 ReadMessageAsyncResult ar = new ReadMessageAsyncResult(state, callback); |
|
29 if (!IsOpen) throw new InvalidOperationException("The channel is not open"); |
|
30 if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress"); |
|
31 if (_queue.Count == 0) { |
|
32 _asyncWait = ar; |
|
33 } else { |
|
34 if (_queue.Count == 1) _receiveEvent.Reset(); |
|
35 ar.Complete(true, _queue.Dequeue(), true); |
|
36 } |
|
37 return ar; |
|
38 } |
|
39 |
|
40 public PmlElement EndReadMessage(IAsyncResult asyncResult) { |
|
41 ReadMessageAsyncResult ar = (ReadMessageAsyncResult)asyncResult; |
|
42 if (ar.Error) new OperationCanceledException("The operation did not complete"); |
|
43 return ar.Message; |
|
44 } |
|
45 |
|
46 public virtual void Close() { |
|
47 _isOpen = false; |
|
48 ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); |
|
49 if (asyncWait != null) asyncWait.Complete(false, null, false); |
|
50 _receiveEvent.Set(); |
|
51 _receiveEvent.Close(); |
|
52 } |
|
53 |
|
54 public void Dispose() { |
|
55 Close(); |
|
56 } |
|
57 |
|
58 protected void PushReceivedMessage(PmlElement message) { |
|
59 ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); |
|
60 if (asyncWait != null) { |
|
61 asyncWait.Complete(true, message, false); |
|
62 } else { |
|
63 _queue.Enqueue(message); |
|
64 _receiveEvent.Set(); |
|
65 } |
|
66 } |
|
67 |
|
68 private class ReadMessageAsyncResult : IAsyncResult { |
|
69 private object state; |
|
70 private AsyncCallback callback; |
|
71 private bool completed; |
|
72 private bool synchronously; |
|
73 private ManualResetEvent waitHandle; |
|
74 |
|
75 internal PmlElement Message; |
|
76 internal bool Error; |
|
77 |
|
78 public bool CompletedSynchronously { get { return synchronously; } } |
|
79 public object AsyncState { get { return state; } } |
|
80 public WaitHandle AsyncWaitHandle { |
|
81 get { |
|
82 if (waitHandle == null) waitHandle = new ManualResetEvent(completed); |
|
83 return waitHandle; |
|
84 } |
|
85 } |
|
86 public bool IsCompleted { get { return completed; } } |
|
87 |
|
88 internal ReadMessageAsyncResult(object state, AsyncCallback callback) { |
|
89 this.state = state; |
|
90 this.callback = callback; |
|
91 this.completed = false; |
|
92 this.synchronously = false; |
|
93 this.Message = null; |
|
94 this.Error = false; |
|
95 this.waitHandle = null; |
|
96 } |
|
97 internal void Complete(bool success, PmlElement message, bool synchronously) { |
|
98 this.Message = message; |
|
99 this.Error = !success; |
|
100 this.synchronously = synchronously; |
|
101 this.completed = true; |
|
102 if (waitHandle != null) waitHandle.Set(); |
|
103 if (this.callback != null) this.callback.Invoke(this); |
|
104 } |
|
105 } |
|
106 } |
|
107 } |