diff Pml/Channels/ActivePmlChannel.cs @ 104:327be9216006

Improved PML code
author Ivo Smits <Ivo@UCIS.nl>
date Sat, 11 Oct 2014 14:05:41 +0200
parents 3ab940a0c7a0
children
line wrap: on
line diff
--- a/Pml/Channels/ActivePmlChannel.cs	Sat Oct 11 14:03:31 2014 +0200
+++ b/Pml/Channels/ActivePmlChannel.cs	Sat Oct 11 14:05:41 2014 +0200
@@ -1,11 +1,11 @@
 using System;
 using UCIS.Pml;
+using UCIS.Util;
 using System.Collections.Generic;
 using System.Threading;
 
 namespace UCIS.Pml {
 	public abstract class ActivePmlChannel : IPmlChannel {
-		private AutoResetEvent _receiveEvent = new AutoResetEvent(false);
 		private ReadMessageAsyncResult _asyncWait = null;
 		private Queue<PmlElement> _queue = new Queue<PmlElement>();
 		private bool _isOpen = true;
@@ -14,41 +14,49 @@
 		public abstract void WriteMessage(PmlElement message);
 
 		public PmlElement ReadMessage() {
-			if (!IsOpen) throw new InvalidOperationException("The channel is not open");
-			if (_queue.Count == 0) {
-				_receiveEvent.WaitOne();
-				if (_queue.Count == 0) throw new OperationCanceledException("The operation did not complete");
-			} else if (_queue.Count == 1) {
-				_receiveEvent.Reset();
+			lock (_queue) {
+				if (!IsOpen) throw new InvalidOperationException("The channel is not open");
+				while (_queue.Count == 0) {
+					if (!IsOpen) throw new OperationCanceledException("The operation did not complete");
+					Monitor.Wait(_queue);
+				}
+				return _queue.Dequeue();
 			}
-			return _queue.Dequeue();
 		}
 
 		public IAsyncResult BeginReadMessage(AsyncCallback callback, object state) {
-			ReadMessageAsyncResult ar = new ReadMessageAsyncResult(state, callback);
-			if (!IsOpen) throw new InvalidOperationException("The channel is not open");
-			if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress");
-			if (_queue.Count == 0) {
-				_asyncWait = ar;
-			} else {
-				if (_queue.Count == 1) _receiveEvent.Reset();
-				ar.Complete(true, _queue.Dequeue(), true);
+			ReadMessageAsyncResult ar;
+			Boolean completed = false;
+			lock (_queue) {
+				if (!IsOpen) throw new InvalidOperationException("The channel is not open");
+				if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress");
+				ar = new ReadMessageAsyncResult(callback, state);
+				if (_queue.Count == 0) {
+					_asyncWait = ar;
+				} else {
+					ar.Message = _queue.Dequeue();
+					completed = true;
+				}
 			}
+			if (completed) ar.SetCompleted(true, null);
 			return ar;
 		}
 
 		public PmlElement EndReadMessage(IAsyncResult asyncResult) {
 			ReadMessageAsyncResult ar = (ReadMessageAsyncResult)asyncResult;
-			if (ar.Error) new OperationCanceledException("The operation did not complete");
+			ar.WaitForCompletion();
+			if (ar.Error != null) throw new OperationCanceledException("The asynchronous operation failed", ar.Error);
 			return ar.Message;
 		}
 
 		public virtual void Close() {
-			_isOpen = false;
-			ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null);
-			if (asyncWait != null) asyncWait.Complete(false, null, false);
-			_receiveEvent.Set();
-			_receiveEvent.Close();
+			ReadMessageAsyncResult asyncWait;
+			lock (_queue) {
+				_isOpen = false;
+				asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null);
+				Monitor.PulseAll(_queue);
+			}
+			if (asyncWait != null) asyncWait.SetCompleted(false, new ObjectDisposedException("ActivePmlChannel"));
 		}
 
 		public void Dispose() {
@@ -56,51 +64,25 @@
 		}
 
 		protected void PushReceivedMessage(PmlElement message) {
-			ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null);
+			ReadMessageAsyncResult asyncWait;
+			lock (_queue) {
+				asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null);
+				if (asyncWait == null) {
+					_queue.Enqueue(message);
+					Monitor.Pulse(_queue);
+				}
+			}
 			if (asyncWait != null) {
-				asyncWait.Complete(true, message, false);
-			} else {
-				_queue.Enqueue(message);
-				_receiveEvent.Set();
+				asyncWait.Message = message;
+				asyncWait.SetCompleted(false, null);
 			}
 		}
 
-		private class ReadMessageAsyncResult : IAsyncResult {
-			private object state;
-			private AsyncCallback callback;
-			private bool completed;
-			private bool synchronously;
-			private ManualResetEvent waitHandle;
-
+		class ReadMessageAsyncResult : AsyncResultBase {
 			internal PmlElement Message;
-			internal bool Error;
-
-			public bool CompletedSynchronously { get { return synchronously; } }
-			public object AsyncState { get { return state; } }
-			public WaitHandle AsyncWaitHandle {
-				get {
-					if (waitHandle == null) waitHandle = new ManualResetEvent(completed);
-					return waitHandle;
-				}
-			}
-			public bool IsCompleted { get { return completed; } }
-
-			internal ReadMessageAsyncResult(object state, AsyncCallback callback) {
-				this.state = state;
-				this.callback = callback;
-				this.completed = false;
-				this.synchronously = false;
-				this.Message = null;
-				this.Error = false;
-				this.waitHandle = null;
-			}
-			internal void Complete(bool success, PmlElement message, bool synchronously) {
-				this.Message = message;
-				this.Error = !success;
-				this.synchronously = synchronously;
-				this.completed = true;
-				if (waitHandle != null) waitHandle.Set();
-				if (this.callback != null) this.callback.Invoke(this);
+			public ReadMessageAsyncResult(AsyncCallback callback, Object state) : base(callback, state) { }
+			public void SetCompleted(Boolean synchronously, Exception error) {
+				base.SetCompleted(synchronously, error);
 			}
 		}
 	}