diff Pml/Channels/ActivePmlChannel.cs @ 0:3ab940a0c7a0

Initial commit
author Ivo Smits <Ivo@UCIS.nl>
date Tue, 11 Sep 2012 16:28:53 +0200
parents
children 327be9216006
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Pml/Channels/ActivePmlChannel.cs	Tue Sep 11 16:28:53 2012 +0200
@@ -0,0 +1,107 @@
+using System;
+using UCIS.Pml;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace UCIS.Pml {
+	public abstract class ActivePmlChannel : IPmlChannel {
+		private AutoResetEvent _receiveEvent = new AutoResetEvent(false);
+		private ReadMessageAsyncResult _asyncWait = null;
+		private Queue<PmlElement> _queue = new Queue<PmlElement>();
+		private bool _isOpen = true;
+
+		public virtual bool IsOpen { get { return _isOpen; } }
+		public abstract void WriteMessage(PmlElement message);
+
+		public PmlElement ReadMessage() {
+			if (!IsOpen) throw new InvalidOperationException("The channel is not open");
+			if (_queue.Count == 0) {
+				_receiveEvent.WaitOne();
+				if (_queue.Count == 0) throw new OperationCanceledException("The operation did not complete");
+			} else if (_queue.Count == 1) {
+				_receiveEvent.Reset();
+			}
+			return _queue.Dequeue();
+		}
+
+		public IAsyncResult BeginReadMessage(AsyncCallback callback, object state) {
+			ReadMessageAsyncResult ar = new ReadMessageAsyncResult(state, callback);
+			if (!IsOpen) throw new InvalidOperationException("The channel is not open");
+			if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress");
+			if (_queue.Count == 0) {
+				_asyncWait = ar;
+			} else {
+				if (_queue.Count == 1) _receiveEvent.Reset();
+				ar.Complete(true, _queue.Dequeue(), true);
+			}
+			return ar;
+		}
+
+		public PmlElement EndReadMessage(IAsyncResult asyncResult) {
+			ReadMessageAsyncResult ar = (ReadMessageAsyncResult)asyncResult;
+			if (ar.Error) new OperationCanceledException("The operation did not complete");
+			return ar.Message;
+		}
+
+		public virtual void Close() {
+			_isOpen = false;
+			ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null);
+			if (asyncWait != null) asyncWait.Complete(false, null, false);
+			_receiveEvent.Set();
+			_receiveEvent.Close();
+		}
+
+		public void Dispose() {
+			Close();
+		}
+
+		protected void PushReceivedMessage(PmlElement message) {
+			ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null);
+			if (asyncWait != null) {
+				asyncWait.Complete(true, message, false);
+			} else {
+				_queue.Enqueue(message);
+				_receiveEvent.Set();
+			}
+		}
+
+		private class ReadMessageAsyncResult : IAsyncResult {
+			private object state;
+			private AsyncCallback callback;
+			private bool completed;
+			private bool synchronously;
+			private ManualResetEvent waitHandle;
+
+			internal PmlElement Message;
+			internal bool Error;
+
+			public bool CompletedSynchronously { get { return synchronously; } }
+			public object AsyncState { get { return state; } }
+			public WaitHandle AsyncWaitHandle {
+				get {
+					if (waitHandle == null) waitHandle = new ManualResetEvent(completed);
+					return waitHandle;
+				}
+			}
+			public bool IsCompleted { get { return completed; } }
+
+			internal ReadMessageAsyncResult(object state, AsyncCallback callback) {
+				this.state = state;
+				this.callback = callback;
+				this.completed = false;
+				this.synchronously = false;
+				this.Message = null;
+				this.Error = false;
+				this.waitHandle = null;
+			}
+			internal void Complete(bool success, PmlElement message, bool synchronously) {
+				this.Message = message;
+				this.Error = !success;
+				this.synchronously = synchronously;
+				this.completed = true;
+				if (waitHandle != null) waitHandle.Set();
+				if (this.callback != null) this.callback.Invoke(this);
+			}
+		}
+	}
+}