changeset 104:327be9216006

Improved PML code
author Ivo Smits <Ivo@UCIS.nl>
date Sat, 11 Oct 2014 14:05:41 +0200
parents 8fe322656807
children 4ba4fd48e1da
files Pml/Channels/ActivePmlChannel.cs Pml/Channels/PassivePmlChannel.cs Pml/PmlCommunicator2.cs Util/AsyncResultBase.cs
diffstat 4 files changed, 91 insertions(+), 121 deletions(-) [+]
line wrap: on
line diff
--- a/Pml/Channels/ActivePmlChannel.cs	Sat Oct 11 14:03:31 2014 +0200
+++ b/Pml/Channels/ActivePmlChannel.cs	Sat Oct 11 14:05:41 2014 +0200
@@ -1,11 +1,11 @@
 using System;
 using UCIS.Pml;
+using UCIS.Util;
 using System.Collections.Generic;
 using System.Threading;
 
 namespace UCIS.Pml {
 	public abstract class ActivePmlChannel : IPmlChannel {
-		private AutoResetEvent _receiveEvent = new AutoResetEvent(false);
 		private ReadMessageAsyncResult _asyncWait = null;
 		private Queue<PmlElement> _queue = new Queue<PmlElement>();
 		private bool _isOpen = true;
@@ -14,41 +14,49 @@
 		public abstract void WriteMessage(PmlElement message);
 
 		public PmlElement ReadMessage() {
-			if (!IsOpen) throw new InvalidOperationException("The channel is not open");
-			if (_queue.Count == 0) {
-				_receiveEvent.WaitOne();
-				if (_queue.Count == 0) throw new OperationCanceledException("The operation did not complete");
-			} else if (_queue.Count == 1) {
-				_receiveEvent.Reset();
+			lock (_queue) {
+				if (!IsOpen) throw new InvalidOperationException("The channel is not open");
+				while (_queue.Count == 0) {
+					if (!IsOpen) throw new OperationCanceledException("The operation did not complete");
+					Monitor.Wait(_queue);
+				}
+				return _queue.Dequeue();
 			}
-			return _queue.Dequeue();
 		}
 
 		public IAsyncResult BeginReadMessage(AsyncCallback callback, object state) {
-			ReadMessageAsyncResult ar = new ReadMessageAsyncResult(state, callback);
-			if (!IsOpen) throw new InvalidOperationException("The channel is not open");
-			if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress");
-			if (_queue.Count == 0) {
-				_asyncWait = ar;
-			} else {
-				if (_queue.Count == 1) _receiveEvent.Reset();
-				ar.Complete(true, _queue.Dequeue(), true);
+			ReadMessageAsyncResult ar;
+			Boolean completed = false;
+			lock (_queue) {
+				if (!IsOpen) throw new InvalidOperationException("The channel is not open");
+				if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress");
+				ar = new ReadMessageAsyncResult(callback, state);
+				if (_queue.Count == 0) {
+					_asyncWait = ar;
+				} else {
+					ar.Message = _queue.Dequeue();
+					completed = true;
+				}
 			}
+			if (completed) ar.SetCompleted(true, null);
 			return ar;
 		}
 
 		public PmlElement EndReadMessage(IAsyncResult asyncResult) {
 			ReadMessageAsyncResult ar = (ReadMessageAsyncResult)asyncResult;
-			if (ar.Error) new OperationCanceledException("The operation did not complete");
+			ar.WaitForCompletion();
+			if (ar.Error != null) throw new OperationCanceledException("The asynchronous operation failed", ar.Error);
 			return ar.Message;
 		}
 
 		public virtual void Close() {
-			_isOpen = false;
-			ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null);
-			if (asyncWait != null) asyncWait.Complete(false, null, false);
-			_receiveEvent.Set();
-			_receiveEvent.Close();
+			ReadMessageAsyncResult asyncWait;
+			lock (_queue) {
+				_isOpen = false;
+				asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null);
+				Monitor.PulseAll(_queue);
+			}
+			if (asyncWait != null) asyncWait.SetCompleted(false, new ObjectDisposedException("ActivePmlChannel"));
 		}
 
 		public void Dispose() {
@@ -56,51 +64,25 @@
 		}
 
 		protected void PushReceivedMessage(PmlElement message) {
-			ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null);
+			ReadMessageAsyncResult asyncWait;
+			lock (_queue) {
+				asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null);
+				if (asyncWait == null) {
+					_queue.Enqueue(message);
+					Monitor.Pulse(_queue);
+				}
+			}
 			if (asyncWait != null) {
-				asyncWait.Complete(true, message, false);
-			} else {
-				_queue.Enqueue(message);
-				_receiveEvent.Set();
+				asyncWait.Message = message;
+				asyncWait.SetCompleted(false, null);
 			}
 		}
 
-		private class ReadMessageAsyncResult : IAsyncResult {
-			private object state;
-			private AsyncCallback callback;
-			private bool completed;
-			private bool synchronously;
-			private ManualResetEvent waitHandle;
-
+		class ReadMessageAsyncResult : AsyncResultBase {
 			internal PmlElement Message;
-			internal bool Error;
-
-			public bool CompletedSynchronously { get { return synchronously; } }
-			public object AsyncState { get { return state; } }
-			public WaitHandle AsyncWaitHandle {
-				get {
-					if (waitHandle == null) waitHandle = new ManualResetEvent(completed);
-					return waitHandle;
-				}
-			}
-			public bool IsCompleted { get { return completed; } }
-
-			internal ReadMessageAsyncResult(object state, AsyncCallback callback) {
-				this.state = state;
-				this.callback = callback;
-				this.completed = false;
-				this.synchronously = false;
-				this.Message = null;
-				this.Error = false;
-				this.waitHandle = null;
-			}
-			internal void Complete(bool success, PmlElement message, bool synchronously) {
-				this.Message = message;
-				this.Error = !success;
-				this.synchronously = synchronously;
-				this.completed = true;
-				if (waitHandle != null) waitHandle.Set();
-				if (this.callback != null) this.callback.Invoke(this);
+			public ReadMessageAsyncResult(AsyncCallback callback, Object state) : base(callback, state) { }
+			public void SetCompleted(Boolean synchronously, Exception error) {
+				base.SetCompleted(synchronously, error);
 			}
 		}
 	}
--- a/Pml/Channels/PassivePmlChannel.cs	Sat Oct 11 14:03:31 2014 +0200
+++ b/Pml/Channels/PassivePmlChannel.cs	Sat Oct 11 14:05:41 2014 +0200
@@ -1,5 +1,6 @@
 using System;
 using UCIS.Pml;
+using UCIS.Util;
 using System.Collections.Generic;
 using System.Threading;
 
@@ -20,40 +21,33 @@
 		public abstract PmlElement ReadMessage();
 
 		public IAsyncResult BeginReadMessage(AsyncCallback callback, object state) {
-			ReadMessageAsyncResult ar = new ReadMessageAsyncResult();
-			ar.Callback = callback;
-			ar.State = state;
+			ReadMessageAsyncResult ar = new ReadMessageAsyncResult(callback, state);
 			UCIS.ThreadPool.RunCall(AsyncReadMessage, ar);
 			return ar;
 		}
 		public PmlElement EndReadMessage(IAsyncResult asyncResult) {
 			ReadMessageAsyncResult ar = (ReadMessageAsyncResult)asyncResult;
-			if (ar.Error != null) throw new Exception("The asynchronous operation could not be completed", ar.Error);
+			ar.WaitForCompletion();
+			if (ar.Error != null) throw new Exception("The asynchronous operation failed", ar.Error);
 			return ar.Message;
 		}
 
-		private struct ReadMessageAsyncResult : IAsyncResult {
-			internal object State;
+		class ReadMessageAsyncResult : AsyncResultBase {
 			internal PmlElement Message;
-			internal AsyncCallback Callback;
-			internal Exception Error;
-			internal bool Completed;
-
-			public bool CompletedSynchronously { get { return false; } }
-			public object AsyncState { get { return State; } }
-			public WaitHandle AsyncWaitHandle { get { return null; } }
-			public bool IsCompleted { get { return Completed; } }
+			public ReadMessageAsyncResult(AsyncCallback callback, Object state) : base(callback, state) { }
+			public void SetCompleted(Boolean synchronously, Exception error, PmlElement message) {
+				this.Message = message;
+				base.SetCompleted(synchronously, error);
+			}
 		}
 		private void AsyncReadMessage(object state) {
 			ReadMessageAsyncResult ar = (ReadMessageAsyncResult)state;
 			try {
-				ar.Message = ReadMessage();
-				ar.Error = null;
+				PmlElement message = ReadMessage();
+				ar.SetCompleted(false, null, message);
 			} catch (Exception ex) {
-				ar.Error = ex;
+				ar.SetCompleted(false, ex, null);
 			}
-			ar.Completed = true;
-			ar.Callback.Invoke(ar);
 		}
 	}
 }
--- a/Pml/PmlCommunicator2.cs	Sat Oct 11 14:03:31 2014 +0200
+++ b/Pml/PmlCommunicator2.cs	Sat Oct 11 14:05:41 2014 +0200
@@ -7,7 +7,6 @@
 	public class PmlCommunicator {
 		private class CSyncRequest {
 			internal PmlElement Reply;
-			internal ManualResetEvent ResetEvent = new ManualResetEvent(false);
 		}
 		private interface ISession {
 			void MessageIn(PmlElement message);
@@ -144,9 +143,7 @@
 				_sessions.Clear();
 			}
 			lock (_invocations) {
-				foreach (CSyncRequest T in _invocations.Values) {
-					T.ResetEvent.Set();
-				}
+				foreach (CSyncRequest T in _invocations.Values) lock (T) Monitor.Pulse(T);
 				_invocations.Clear();
 			}
 			if (Closed != null) Closed(this, new EventArgs());
@@ -192,7 +189,7 @@
 					}
 					if (SRequest != null) {
 						SRequest.Reply = Message.GetChild("MSG");
-						SRequest.ResetEvent.Set();
+						lock (SRequest) Monitor.Pulse(SRequest);
 					}
 				} else if (Cmd.Equals("REQ") || Cmd.Equals("MSG")) {
 					UCIS.ThreadPool.RunCall(processCall, Message);
@@ -288,9 +285,7 @@
 		}
 
 		public void Call(PmlElement message) {
-			PmlDictionary Msg = new PmlDictionary();
-			Msg.Add("CMD", new PmlString("MSG"));
-			Msg.Add("MSG", message);
+			PmlDictionary Msg = new PmlDictionary() { { "CMD", "MSG" }, { "MSG", message } };
 			_WriteMessage(Msg);
 		}
 		public PmlElement Invoke(PmlElement message) {
@@ -299,14 +294,16 @@
 		public PmlElement Invoke(PmlElement message, int timeout) {
 			if (_closed) throw new InvalidOperationException("Sorry, we're closed.");
 			CSyncRequest SyncEvent = new CSyncRequest();
-			UInt32 SID = GetNextSessionId(false);
-			lock (_invocations) _invocations.Add(SID, SyncEvent);
+			UInt32 SID;
+			lock (_invocations) {
+				SID = GetNextSessionId(ref pNextSyncRequest, _invocations);
+				_invocations.Add(SID, SyncEvent);
+			}
 			try {
 				WriteSyncMessage(SID, false, message);
-				if (!SyncEvent.ResetEvent.WaitOne(timeout, false)) {
-					if (!_closed) lock (_invocations) _invocations.Remove(SID);
-					throw new TimeoutException("The SyncRequest timed out (SID=" + SID.ToString() + ")");
-				}
+				Boolean success;
+				lock (SyncEvent) success = Monitor.Wait(SyncEvent, timeout);
+				if (!success) throw new TimeoutException("The SyncRequest timed out (SID=" + SID.ToString() + ")");
 			} finally {
 				lock (_invocations) _invocations.Remove(SID);
 			}
@@ -314,7 +311,7 @@
 		}
 
 		public IPmlChannel CreateChannel(PmlElement data) {
-			UInt32 sid = GetNextSessionId(true);
+			UInt32 sid = GetNextSessionId(ref pNextSession, _sessions);
 			PmlSubChannel ch = new PmlSubChannel(this, sid, true);
 			WriteSessionMessage(sid, 0, data);
 			if (!ch.IsOpen) return null;
@@ -333,42 +330,33 @@
 			RemoveSession(session.ID);
 		}
 
-		private UInt32 GetNextSessionId(bool IsSession) {
-			if (IsSession) {
-				lock (_sessions) {
-					do {
-						unchecked { pNextSession++; }
-					} while (_sessions.ContainsKey(pNextSession));
-					return pNextSession;
-				}
-			} else {
-				lock (_invocations) {
-					do {
-						unchecked { pNextSyncRequest++; }
-					} while (_invocations.ContainsKey(pNextSyncRequest));
-					return pNextSyncRequest;
-				}
+		private static UInt32 GetNextSessionId<T>(ref UInt32 id, IDictionary<UInt32, T> dictionary) {
+			lock (dictionary) {
+				do {
+					id++;
+				} while (dictionary.ContainsKey(id));
+				return id;
 			}
 		}
 
 		protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) {
-			PmlDictionary Msg2 = new PmlDictionary();
-			Msg2.Add("CMD", new PmlString(RPL ? "RPL" : "REQ"));
-			Msg2.Add("SID", new PmlInteger(SID));
-			Msg2.Add("MSG", MSG);
+			PmlDictionary Msg2 = new PmlDictionary() {
+				{ "CMD", RPL ? "RPL" : "REQ" },
+				{ "SID", SID },
+				{ "MSG", MSG },
+			};
 			_WriteMessage(Msg2);
 		}
 		protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) {
-			PmlDictionary Msg2 = new PmlDictionary();
-			Msg2.Add("CMD", new PmlString("SES"));
-			Msg2.Add("SID", new PmlInteger(SID));
-			Msg2.Add("SCMD", new PmlInteger(CMD));
+			PmlDictionary Msg2 = new PmlDictionary() {
+				{ "CMD", "SES" },
+				{ "SID", SID },
+				{ "SCMD", CMD },
+			};
 			if (MSG != null) Msg2.Add("MSG", MSG);
 			_WriteMessage(Msg2);
 		}
 
-
-
 		/* LegacyPmlCommunicator compatibility */
 		public PmlElement SyncRequest(PmlElement Request) {
 			return Invoke(Request);
--- a/Util/AsyncResultBase.cs	Sat Oct 11 14:03:31 2014 +0200
+++ b/Util/AsyncResultBase.cs	Sat Oct 11 14:05:41 2014 +0200
@@ -8,6 +8,7 @@
 		static Boolean ThreadInCallback = false;
 		ManualResetEvent WaitEvent = null;
 		AsyncCallback Callback = null;
+		Object MonitorWaitHandle = new Object();
 		public object AsyncState { get; private set; }
 		public bool CompletedSynchronously { get; private set; }
 		public bool IsCompleted { get; private set; }
@@ -36,6 +37,7 @@
 			lock (this) {
 				IsCompleted = true;
 				if (WaitEvent != null) WaitEvent.Set();
+				if (MonitorWaitHandle != null) lock (MonitorWaitHandle) Monitor.Pulse(MonitorWaitHandle);
 			}
 			if (Callback != null) {
 				if (synchronously && !ThreadInCallback) {
@@ -51,6 +53,10 @@
 			}
 		}
 
+		public void WaitForCompletion() {
+			lock (this) if (!IsCompleted) lock (MonitorWaitHandle) Monitor.Wait(MonitorWaitHandle);
+		}
+
 		protected void ThrowError() {
 			if (Error != null) throw Error;
 		}