Mercurial > hg > ucis.core
comparison Pml/Channels/ActivePmlChannel.cs @ 104:327be9216006
Improved PML code
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Sat, 11 Oct 2014 14:05:41 +0200 |
parents | 3ab940a0c7a0 |
children |
comparison
equal
deleted
inserted
replaced
103:8fe322656807 | 104:327be9216006 |
---|---|
1 using System; | 1 using System; |
2 using UCIS.Pml; | 2 using UCIS.Pml; |
3 using UCIS.Util; | |
3 using System.Collections.Generic; | 4 using System.Collections.Generic; |
4 using System.Threading; | 5 using System.Threading; |
5 | 6 |
6 namespace UCIS.Pml { | 7 namespace UCIS.Pml { |
7 public abstract class ActivePmlChannel : IPmlChannel { | 8 public abstract class ActivePmlChannel : IPmlChannel { |
8 private AutoResetEvent _receiveEvent = new AutoResetEvent(false); | |
9 private ReadMessageAsyncResult _asyncWait = null; | 9 private ReadMessageAsyncResult _asyncWait = null; |
10 private Queue<PmlElement> _queue = new Queue<PmlElement>(); | 10 private Queue<PmlElement> _queue = new Queue<PmlElement>(); |
11 private bool _isOpen = true; | 11 private bool _isOpen = true; |
12 | 12 |
13 public virtual bool IsOpen { get { return _isOpen; } } | 13 public virtual bool IsOpen { get { return _isOpen; } } |
14 public abstract void WriteMessage(PmlElement message); | 14 public abstract void WriteMessage(PmlElement message); |
15 | 15 |
16 public PmlElement ReadMessage() { | 16 public PmlElement ReadMessage() { |
17 if (!IsOpen) throw new InvalidOperationException("The channel is not open"); | 17 lock (_queue) { |
18 if (_queue.Count == 0) { | 18 if (!IsOpen) throw new InvalidOperationException("The channel is not open"); |
19 _receiveEvent.WaitOne(); | 19 while (_queue.Count == 0) { |
20 if (_queue.Count == 0) throw new OperationCanceledException("The operation did not complete"); | 20 if (!IsOpen) throw new OperationCanceledException("The operation did not complete"); |
21 } else if (_queue.Count == 1) { | 21 Monitor.Wait(_queue); |
22 _receiveEvent.Reset(); | 22 } |
23 return _queue.Dequeue(); | |
23 } | 24 } |
24 return _queue.Dequeue(); | |
25 } | 25 } |
26 | 26 |
27 public IAsyncResult BeginReadMessage(AsyncCallback callback, object state) { | 27 public IAsyncResult BeginReadMessage(AsyncCallback callback, object state) { |
28 ReadMessageAsyncResult ar = new ReadMessageAsyncResult(state, callback); | 28 ReadMessageAsyncResult ar; |
29 if (!IsOpen) throw new InvalidOperationException("The channel is not open"); | 29 Boolean completed = false; |
30 if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress"); | 30 lock (_queue) { |
31 if (_queue.Count == 0) { | 31 if (!IsOpen) throw new InvalidOperationException("The channel is not open"); |
32 _asyncWait = ar; | 32 if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress"); |
33 } else { | 33 ar = new ReadMessageAsyncResult(callback, state); |
34 if (_queue.Count == 1) _receiveEvent.Reset(); | 34 if (_queue.Count == 0) { |
35 ar.Complete(true, _queue.Dequeue(), true); | 35 _asyncWait = ar; |
36 } else { | |
37 ar.Message = _queue.Dequeue(); | |
38 completed = true; | |
39 } | |
36 } | 40 } |
41 if (completed) ar.SetCompleted(true, null); | |
37 return ar; | 42 return ar; |
38 } | 43 } |
39 | 44 |
40 public PmlElement EndReadMessage(IAsyncResult asyncResult) { | 45 public PmlElement EndReadMessage(IAsyncResult asyncResult) { |
41 ReadMessageAsyncResult ar = (ReadMessageAsyncResult)asyncResult; | 46 ReadMessageAsyncResult ar = (ReadMessageAsyncResult)asyncResult; |
42 if (ar.Error) new OperationCanceledException("The operation did not complete"); | 47 ar.WaitForCompletion(); |
48 if (ar.Error != null) throw new OperationCanceledException("The asynchronous operation failed", ar.Error); | |
43 return ar.Message; | 49 return ar.Message; |
44 } | 50 } |
45 | 51 |
46 public virtual void Close() { | 52 public virtual void Close() { |
47 _isOpen = false; | 53 ReadMessageAsyncResult asyncWait; |
48 ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); | 54 lock (_queue) { |
49 if (asyncWait != null) asyncWait.Complete(false, null, false); | 55 _isOpen = false; |
50 _receiveEvent.Set(); | 56 asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); |
51 _receiveEvent.Close(); | 57 Monitor.PulseAll(_queue); |
58 } | |
59 if (asyncWait != null) asyncWait.SetCompleted(false, new ObjectDisposedException("ActivePmlChannel")); | |
52 } | 60 } |
53 | 61 |
54 public void Dispose() { | 62 public void Dispose() { |
55 Close(); | 63 Close(); |
56 } | 64 } |
57 | 65 |
58 protected void PushReceivedMessage(PmlElement message) { | 66 protected void PushReceivedMessage(PmlElement message) { |
59 ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); | 67 ReadMessageAsyncResult asyncWait; |
68 lock (_queue) { | |
69 asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); | |
70 if (asyncWait == null) { | |
71 _queue.Enqueue(message); | |
72 Monitor.Pulse(_queue); | |
73 } | |
74 } | |
60 if (asyncWait != null) { | 75 if (asyncWait != null) { |
61 asyncWait.Complete(true, message, false); | 76 asyncWait.Message = message; |
62 } else { | 77 asyncWait.SetCompleted(false, null); |
63 _queue.Enqueue(message); | |
64 _receiveEvent.Set(); | |
65 } | 78 } |
66 } | 79 } |
67 | 80 |
68 private class ReadMessageAsyncResult : IAsyncResult { | 81 class ReadMessageAsyncResult : AsyncResultBase { |
69 private object state; | |
70 private AsyncCallback callback; | |
71 private bool completed; | |
72 private bool synchronously; | |
73 private ManualResetEvent waitHandle; | |
74 | |
75 internal PmlElement Message; | 82 internal PmlElement Message; |
76 internal bool Error; | 83 public ReadMessageAsyncResult(AsyncCallback callback, Object state) : base(callback, state) { } |
77 | 84 public void SetCompleted(Boolean synchronously, Exception error) { |
78 public bool CompletedSynchronously { get { return synchronously; } } | 85 base.SetCompleted(synchronously, error); |
79 public object AsyncState { get { return state; } } | |
80 public WaitHandle AsyncWaitHandle { | |
81 get { | |
82 if (waitHandle == null) waitHandle = new ManualResetEvent(completed); | |
83 return waitHandle; | |
84 } | |
85 } | |
86 public bool IsCompleted { get { return completed; } } | |
87 | |
88 internal ReadMessageAsyncResult(object state, AsyncCallback callback) { | |
89 this.state = state; | |
90 this.callback = callback; | |
91 this.completed = false; | |
92 this.synchronously = false; | |
93 this.Message = null; | |
94 this.Error = false; | |
95 this.waitHandle = null; | |
96 } | |
97 internal void Complete(bool success, PmlElement message, bool synchronously) { | |
98 this.Message = message; | |
99 this.Error = !success; | |
100 this.synchronously = synchronously; | |
101 this.completed = true; | |
102 if (waitHandle != null) waitHandle.Set(); | |
103 if (this.callback != null) this.callback.Invoke(this); | |
104 } | 86 } |
105 } | 87 } |
106 } | 88 } |
107 } | 89 } |