comparison Pml/Channels/ActivePmlChannel.cs @ 104:327be9216006

Improved PML code
author Ivo Smits <Ivo@UCIS.nl>
date Sat, 11 Oct 2014 14:05:41 +0200
parents 3ab940a0c7a0
children
comparison
equal deleted inserted replaced
103:8fe322656807 104:327be9216006
1 using System; 1 using System;
2 using UCIS.Pml; 2 using UCIS.Pml;
3 using UCIS.Util;
3 using System.Collections.Generic; 4 using System.Collections.Generic;
4 using System.Threading; 5 using System.Threading;
5 6
6 namespace UCIS.Pml { 7 namespace UCIS.Pml {
7 public abstract class ActivePmlChannel : IPmlChannel { 8 public abstract class ActivePmlChannel : IPmlChannel {
8 private AutoResetEvent _receiveEvent = new AutoResetEvent(false);
9 private ReadMessageAsyncResult _asyncWait = null; 9 private ReadMessageAsyncResult _asyncWait = null;
10 private Queue<PmlElement> _queue = new Queue<PmlElement>(); 10 private Queue<PmlElement> _queue = new Queue<PmlElement>();
11 private bool _isOpen = true; 11 private bool _isOpen = true;
12 12
13 public virtual bool IsOpen { get { return _isOpen; } } 13 public virtual bool IsOpen { get { return _isOpen; } }
14 public abstract void WriteMessage(PmlElement message); 14 public abstract void WriteMessage(PmlElement message);
15 15
16 public PmlElement ReadMessage() { 16 public PmlElement ReadMessage() {
17 if (!IsOpen) throw new InvalidOperationException("The channel is not open"); 17 lock (_queue) {
18 if (_queue.Count == 0) { 18 if (!IsOpen) throw new InvalidOperationException("The channel is not open");
19 _receiveEvent.WaitOne(); 19 while (_queue.Count == 0) {
20 if (_queue.Count == 0) throw new OperationCanceledException("The operation did not complete"); 20 if (!IsOpen) throw new OperationCanceledException("The operation did not complete");
21 } else if (_queue.Count == 1) { 21 Monitor.Wait(_queue);
22 _receiveEvent.Reset(); 22 }
23 return _queue.Dequeue();
23 } 24 }
24 return _queue.Dequeue();
25 } 25 }
26 26
27 public IAsyncResult BeginReadMessage(AsyncCallback callback, object state) { 27 public IAsyncResult BeginReadMessage(AsyncCallback callback, object state) {
28 ReadMessageAsyncResult ar = new ReadMessageAsyncResult(state, callback); 28 ReadMessageAsyncResult ar;
29 if (!IsOpen) throw new InvalidOperationException("The channel is not open"); 29 Boolean completed = false;
30 if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress"); 30 lock (_queue) {
31 if (_queue.Count == 0) { 31 if (!IsOpen) throw new InvalidOperationException("The channel is not open");
32 _asyncWait = ar; 32 if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress");
33 } else { 33 ar = new ReadMessageAsyncResult(callback, state);
34 if (_queue.Count == 1) _receiveEvent.Reset(); 34 if (_queue.Count == 0) {
35 ar.Complete(true, _queue.Dequeue(), true); 35 _asyncWait = ar;
36 } else {
37 ar.Message = _queue.Dequeue();
38 completed = true;
39 }
36 } 40 }
41 if (completed) ar.SetCompleted(true, null);
37 return ar; 42 return ar;
38 } 43 }
39 44
40 public PmlElement EndReadMessage(IAsyncResult asyncResult) { 45 public PmlElement EndReadMessage(IAsyncResult asyncResult) {
41 ReadMessageAsyncResult ar = (ReadMessageAsyncResult)asyncResult; 46 ReadMessageAsyncResult ar = (ReadMessageAsyncResult)asyncResult;
42 if (ar.Error) new OperationCanceledException("The operation did not complete"); 47 ar.WaitForCompletion();
48 if (ar.Error != null) throw new OperationCanceledException("The asynchronous operation failed", ar.Error);
43 return ar.Message; 49 return ar.Message;
44 } 50 }
45 51
46 public virtual void Close() { 52 public virtual void Close() {
47 _isOpen = false; 53 ReadMessageAsyncResult asyncWait;
48 ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); 54 lock (_queue) {
49 if (asyncWait != null) asyncWait.Complete(false, null, false); 55 _isOpen = false;
50 _receiveEvent.Set(); 56 asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null);
51 _receiveEvent.Close(); 57 Monitor.PulseAll(_queue);
58 }
59 if (asyncWait != null) asyncWait.SetCompleted(false, new ObjectDisposedException("ActivePmlChannel"));
52 } 60 }
53 61
54 public void Dispose() { 62 public void Dispose() {
55 Close(); 63 Close();
56 } 64 }
57 65
58 protected void PushReceivedMessage(PmlElement message) { 66 protected void PushReceivedMessage(PmlElement message) {
59 ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); 67 ReadMessageAsyncResult asyncWait;
68 lock (_queue) {
69 asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null);
70 if (asyncWait == null) {
71 _queue.Enqueue(message);
72 Monitor.Pulse(_queue);
73 }
74 }
60 if (asyncWait != null) { 75 if (asyncWait != null) {
61 asyncWait.Complete(true, message, false); 76 asyncWait.Message = message;
62 } else { 77 asyncWait.SetCompleted(false, null);
63 _queue.Enqueue(message);
64 _receiveEvent.Set();
65 } 78 }
66 } 79 }
67 80
68 private class ReadMessageAsyncResult : IAsyncResult { 81 class ReadMessageAsyncResult : AsyncResultBase {
69 private object state;
70 private AsyncCallback callback;
71 private bool completed;
72 private bool synchronously;
73 private ManualResetEvent waitHandle;
74
75 internal PmlElement Message; 82 internal PmlElement Message;
76 internal bool Error; 83 public ReadMessageAsyncResult(AsyncCallback callback, Object state) : base(callback, state) { }
77 84 public void SetCompleted(Boolean synchronously, Exception error) {
78 public bool CompletedSynchronously { get { return synchronously; } } 85 base.SetCompleted(synchronously, error);
79 public object AsyncState { get { return state; } }
80 public WaitHandle AsyncWaitHandle {
81 get {
82 if (waitHandle == null) waitHandle = new ManualResetEvent(completed);
83 return waitHandle;
84 }
85 }
86 public bool IsCompleted { get { return completed; } }
87
88 internal ReadMessageAsyncResult(object state, AsyncCallback callback) {
89 this.state = state;
90 this.callback = callback;
91 this.completed = false;
92 this.synchronously = false;
93 this.Message = null;
94 this.Error = false;
95 this.waitHandle = null;
96 }
97 internal void Complete(bool success, PmlElement message, bool synchronously) {
98 this.Message = message;
99 this.Error = !success;
100 this.synchronously = synchronously;
101 this.completed = true;
102 if (waitHandle != null) waitHandle.Set();
103 if (this.callback != null) this.callback.Invoke(this);
104 } 86 }
105 } 87 }
106 } 88 }
107 } 89 }