Mercurial > hg > ucis.core
comparison Pml/Channels/ActivePmlChannel.cs @ 0:3ab940a0c7a0
Initial commit
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Tue, 11 Sep 2012 16:28:53 +0200 |
parents | |
children | 327be9216006 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3ab940a0c7a0 |
---|---|
1 using System; | |
2 using UCIS.Pml; | |
3 using System.Collections.Generic; | |
4 using System.Threading; | |
5 | |
6 namespace UCIS.Pml { | |
7 public abstract class ActivePmlChannel : IPmlChannel { | |
8 private AutoResetEvent _receiveEvent = new AutoResetEvent(false); | |
9 private ReadMessageAsyncResult _asyncWait = null; | |
10 private Queue<PmlElement> _queue = new Queue<PmlElement>(); | |
11 private bool _isOpen = true; | |
12 | |
13 public virtual bool IsOpen { get { return _isOpen; } } | |
14 public abstract void WriteMessage(PmlElement message); | |
15 | |
16 public PmlElement ReadMessage() { | |
17 if (!IsOpen) throw new InvalidOperationException("The channel is not open"); | |
18 if (_queue.Count == 0) { | |
19 _receiveEvent.WaitOne(); | |
20 if (_queue.Count == 0) throw new OperationCanceledException("The operation did not complete"); | |
21 } else if (_queue.Count == 1) { | |
22 _receiveEvent.Reset(); | |
23 } | |
24 return _queue.Dequeue(); | |
25 } | |
26 | |
27 public IAsyncResult BeginReadMessage(AsyncCallback callback, object state) { | |
28 ReadMessageAsyncResult ar = new ReadMessageAsyncResult(state, callback); | |
29 if (!IsOpen) throw new InvalidOperationException("The channel is not open"); | |
30 if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress"); | |
31 if (_queue.Count == 0) { | |
32 _asyncWait = ar; | |
33 } else { | |
34 if (_queue.Count == 1) _receiveEvent.Reset(); | |
35 ar.Complete(true, _queue.Dequeue(), true); | |
36 } | |
37 return ar; | |
38 } | |
39 | |
40 public PmlElement EndReadMessage(IAsyncResult asyncResult) { | |
41 ReadMessageAsyncResult ar = (ReadMessageAsyncResult)asyncResult; | |
42 if (ar.Error) new OperationCanceledException("The operation did not complete"); | |
43 return ar.Message; | |
44 } | |
45 | |
46 public virtual void Close() { | |
47 _isOpen = false; | |
48 ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); | |
49 if (asyncWait != null) asyncWait.Complete(false, null, false); | |
50 _receiveEvent.Set(); | |
51 _receiveEvent.Close(); | |
52 } | |
53 | |
54 public void Dispose() { | |
55 Close(); | |
56 } | |
57 | |
58 protected void PushReceivedMessage(PmlElement message) { | |
59 ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); | |
60 if (asyncWait != null) { | |
61 asyncWait.Complete(true, message, false); | |
62 } else { | |
63 _queue.Enqueue(message); | |
64 _receiveEvent.Set(); | |
65 } | |
66 } | |
67 | |
68 private class ReadMessageAsyncResult : IAsyncResult { | |
69 private object state; | |
70 private AsyncCallback callback; | |
71 private bool completed; | |
72 private bool synchronously; | |
73 private ManualResetEvent waitHandle; | |
74 | |
75 internal PmlElement Message; | |
76 internal bool Error; | |
77 | |
78 public bool CompletedSynchronously { get { return synchronously; } } | |
79 public object AsyncState { get { return state; } } | |
80 public WaitHandle AsyncWaitHandle { | |
81 get { | |
82 if (waitHandle == null) waitHandle = new ManualResetEvent(completed); | |
83 return waitHandle; | |
84 } | |
85 } | |
86 public bool IsCompleted { get { return completed; } } | |
87 | |
88 internal ReadMessageAsyncResult(object state, AsyncCallback callback) { | |
89 this.state = state; | |
90 this.callback = callback; | |
91 this.completed = false; | |
92 this.synchronously = false; | |
93 this.Message = null; | |
94 this.Error = false; | |
95 this.waitHandle = null; | |
96 } | |
97 internal void Complete(bool success, PmlElement message, bool synchronously) { | |
98 this.Message = message; | |
99 this.Error = !success; | |
100 this.synchronously = synchronously; | |
101 this.completed = true; | |
102 if (waitHandle != null) waitHandle.Set(); | |
103 if (this.callback != null) this.callback.Invoke(this); | |
104 } | |
105 } | |
106 } | |
107 } |