diff Pml/PmlCommunicator.cs @ 0:3ab940a0c7a0

Initial commit
author Ivo Smits <Ivo@UCIS.nl>
date Tue, 11 Sep 2012 16:28:53 +0200
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Pml/PmlCommunicator.cs	Tue Sep 11 16:28:53 2012 +0200
@@ -0,0 +1,342 @@
+using System;
+using System.Threading;
+using System.Collections.Generic;
+
+namespace UCIS.Pml {
+/*	class PmlCommunicator : IPmlCommunicator, IDisposable {
+		private IPmlChannel _channel;
+		private Dictionary<UInt32, IPmlSubChannel> _subchannels = new Dictionary<uint,IPmlSubChannel>();
+		private Random _random = new Random();
+
+		private enum CommandCode : int {
+			CallWithoutReply = 0,
+			CallWithReply = 1,
+			Message = 2,
+			ChannelRequest = 3,
+			ChannelAcknowledge = 4,
+			ChannelClose = 5,
+			Error = 6
+		}
+
+		private interface IPmlSubChannel {
+			void CloseIn();
+			void ErrorIn(PmlElement message);
+			void MessageIn(PmlElement message);
+		}
+
+		private class ChannelRequestWaitHandler : IAsyncResult {
+			internal AsyncCallback Callback = null;
+			internal Object CallbackState = null;
+			internal ManualResetEvent Event = null;
+			internal PmlSubChannel Channel = null;
+			internal bool Completed = false;
+
+			internal ChannelRequestWaitHandler(PmlSubChannel channel) {
+				Channel = channel;
+			}
+
+			internal void Complete() {
+				Completed = true;
+				if (Event != null) Event.Set();
+				if (Callback != null) Callback.Invoke(this);
+			}
+
+			public object AsyncState { get { return CallbackState; } }
+			public WaitHandle AsyncWaitHandle { get { return null; } }
+			public bool CompletedSynchronously { get { return false; } }
+			public bool IsCompleted { get { return Completed; } }
+		}
+		private class PmlSubChannel : ActivePmlChannel, IPmlSubChannel {
+			private enum ChannelState { Requesting, Acknowledged, Closed }
+
+			private PmlCommunicator _communicator;
+			private UInt32 _id;
+			private ChannelState _state;
+
+			internal PmlSubChannel(PmlCommunicator communicator, UInt32 sid) {
+				_communicator = communicator;
+				_id = sid;
+				_state = ChannelState.Requesting;
+			}
+
+			public override bool IsOpen { get { return _state == ChannelState.Acknowledged; } }
+
+			internal void AcknowledgeIn() {
+				if (_state != 0) throw new InvalidOperationException("The subchannel is not awaiting an acknowledgement");
+				_state = ChannelState.Acknowledged;
+			}
+			void IPmlSubChannel.CloseIn() {
+				_state = ChannelState.Closed;
+				_communicator._subchannels.Remove(_id);
+				base.Close();
+			}
+			void IPmlSubChannel.ErrorIn(PmlElement message) {
+				(this as IPmlSubChannel).CloseIn();
+			}
+			void IPmlSubChannel.MessageIn(PmlElement message) {
+				base.PushReceivedMessage(message);
+			}
+
+			internal void AcknowledgeOut() {
+				if (_state != 0) throw new InvalidOperationException("The subchannel is not awaiting an acknowledgement");
+				_state = ChannelState.Acknowledged;
+				_communicator.sendMessage(CommandCode.ChannelAcknowledge, _id, null);
+			}
+			internal void RejectOut() {
+				if (_state != 0) throw new InvalidOperationException("The subchannel is not awaiting an acknowledgement");
+				_state = ChannelState.Closed;
+				_communicator.sendMessage(CommandCode.ChannelClose, _id, null);
+			}
+
+			public override void SendMessage(PmlElement message) {
+				if (_state != ChannelState.Acknowledged) throw new InvalidOperationException("The subchannel is not open");
+				_communicator.sendMessage(CommandCode.Message, _id, message);
+			}
+			public override void Close() {
+				if (_state != ChannelState.Acknowledged) return;
+				_state = ChannelState.Closed;
+				_communicator.sendMessage(CommandCode.ChannelClose, _id, null);
+				_communicator._subchannels.Remove(_id);
+				base.Close();
+			}
+		}
+		private class PmlChannelRequestReceivedEventArgsA : PmlChannelRequestReceivedEventArgs {
+			private PmlCommunicator _communicator;
+			private PmlElement _data;
+			private PmlSubChannel _channel;
+			private bool _accepted;
+			private bool _rejected;
+			internal PmlChannelRequestReceivedEventArgsA(PmlCommunicator communicator, UInt32 sid, PmlElement message) {
+				_communicator = communicator;
+				_channel = new PmlSubChannel(communicator, sid);
+				_data = message;
+			}
+			public override IPmlChannel Accept() {
+				if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected");
+				_accepted = true;
+				_channel.AcknowledgeOut();
+				return _channel;
+			}
+			public override void Reject() {
+				if (_accepted) throw new InvalidOperationException("The channel has already been accepted");
+				if (_rejected) return;
+				_rejected = true;
+				_channel.RejectOut();
+			}
+			internal void RejectIfNotAccepted() {
+				if (!_accepted) Reject();
+			}
+			public override PmlElement Data {
+				get {
+					return _data;
+				}
+			}
+		}
+
+		private class PmlInvocation : IAsyncResult, IPmlSubChannel {
+			internal PmlCommunicator Communicator = null;
+			internal AsyncCallback Callback = null;
+			internal Object CallbackState = null;
+			internal bool Error = false;
+			internal bool Completed = false;
+			internal PmlElement Message = null;
+			internal ManualResetEvent Event = null;
+			internal UInt32 ID;
+
+			internal PmlInvocation(PmlCommunicator communicator, UInt32 id) {
+				Communicator = communicator;
+				ID = id;
+			}
+
+			void IPmlSubChannel.CloseIn() {
+				(this as IPmlSubChannel).ErrorIn(null);
+			}
+			void IPmlSubChannel.ErrorIn(PmlElement message) {
+				Error = true;
+				Communicator._subchannels.Remove(ID);
+				(this as IPmlSubChannel).MessageIn(message);
+			}
+			void IPmlSubChannel.MessageIn(PmlElement message) {
+				Message = message;
+				Completed = true;
+				if (Event != null) Event.Set();
+				if (Callback != null) Callback.Invoke(this);
+			}
+
+			public object AsyncState { get { return CallbackState; } }
+			public WaitHandle AsyncWaitHandle { get { return null; } }
+			public bool CompletedSynchronously { get { return false; } }
+			public bool IsCompleted { get { return Completed; } }
+		}
+
+		public event EventHandler<PmlCallReceivedEventArgs> CallReceived;
+		public event EventHandler<PmlChannelRequestReceivedEventArgs> ChannelRequestReceived;
+
+		public PmlCommunicator(IPmlChannel channel) {
+			_channel = channel;
+			_channel.Closed += channelClosed;
+		}
+
+		public void Dispose() {
+			_channel.Close();
+			_channel = null;
+			IPmlSubChannel[] A = new IPmlSubChannel[_subchannels.Count];
+			_subchannels.Values.CopyTo(A, 0);
+			foreach (IPmlSubChannel S in A) S.CloseIn();
+			_subchannels.Clear();
+			_subchannels = null;
+			_random = null;
+		}
+
+		private void channelClosed(Object sender, EventArgs e) {
+			Dispose();
+		}
+
+		public IPmlChannel Channel { get { return _channel; } }
+
+		public void Call(PmlElement message) {
+			sendMessage(0, 0, message); //Call without reply
+		}
+		public PmlElement Invoke(PmlElement message) {
+			return Invoke(message, 60000);
+		}
+		public PmlElement Invoke(PmlElement message, int timeout) {
+			UInt32 sid = getSessionID();
+			PmlInvocation inv = new PmlInvocation(this, sid);
+			inv.Event = new ManualResetEvent(false);
+			_subchannels.Add(sid, inv);
+			sendMessage(CommandCode.CallWithReply, sid, message);
+			inv.Event.WaitOne(timeout);
+			if (inv.Error) throw new Exception(message.ToString());
+			return inv.Message;
+		}
+
+		public IAsyncResult BeginInvoke(PmlElement message, AsyncCallback callback, Object state) {
+			UInt32 sid = getSessionID();
+			PmlInvocation inv = new PmlInvocation(this, sid);
+			inv.Callback = callback;
+			inv.CallbackState = state;
+			_subchannels.Add(sid, inv);
+			sendMessage(CommandCode.CallWithReply, sid, message);
+			return inv;
+		}
+		public PmlElement EndInvoke(IAsyncResult result) {
+			PmlInvocation ar = (PmlInvocation)result;
+			if (!ar.Completed) {
+				(_subchannels as IList<IPmlSubChannel>).Remove(ar);
+				throw new InvalidOperationException("The asynchronous operation has not completed");
+			} else if (ar.Error) {
+				throw new Exception(ar.Message.ToString());
+			} else {
+				return ar.Message;
+			}
+		}
+
+		public IPmlChannel CreateChannel(PmlElement data) {
+			UInt32 sid = getSessionID();
+			PmlSubChannel ch = new PmlSubChannel(this, sid);
+			ChannelRequestWaitHandler wh = new ChannelRequestWaitHandler(ch);
+			wh.Event = new ManualResetEvent(false);
+			_subchannels.Add(sid, ch);
+			sendMessage(CommandCode.ChannelRequest, sid, data);
+			wh.Event.WaitOne();
+			if (!ch.IsOpen) return null;
+			return ch;
+		}
+		public IAsyncResult BeginCreateChannel(PmlElement data, AsyncCallback callback, Object state) {
+			UInt32 sid = getSessionID();
+			PmlSubChannel ch = new PmlSubChannel(this, sid);
+			ChannelRequestWaitHandler wh = new ChannelRequestWaitHandler(ch);
+			wh.Callback = callback;
+			wh.CallbackState = state;
+			_subchannels.Add(sid, ch);
+			sendMessage(CommandCode.ChannelRequest, sid, data);
+			if (!ch.IsOpen) return null;
+			return wh;
+		}
+		public IPmlChannel EndCreateChannel(IAsyncResult result) {
+			ChannelRequestWaitHandler ar = (ChannelRequestWaitHandler)result;
+			if (!ar.Channel.IsOpen) return null;
+			return ar.Channel;
+		}
+
+		private UInt32 getSessionID() {
+			return (uint)_random.Next();
+		}
+
+		private void sendMessage(CommandCode cmd, uint sid, PmlElement message) {
+			PmlDictionary msg = new PmlDictionary();
+			msg.Add("c", (int)cmd);
+			if (cmd > 0) msg.Add("s", sid);
+			if (message != null) msg.Add("m", message);
+			_channel.SendMessage(msg);
+		}
+
+		private void invokeCallReceived(Object state) {
+			PmlCallReceivedEventArgs e = (PmlCallReceivedEventArgs)state;
+			try {
+				if (CallReceived != null) CallReceived(this, e);
+				if (e.WantReply) sendMessage(CommandCode.Message, e.SID, e.Reply);
+			} catch (Exception ex) {
+				if (e.WantReply) sendMessage(CommandCode.Error, e.SID, new PmlString(ex.ToString()));
+			}
+		}
+		private void invokeChannelRequestReceived(Object state) {
+			PmlChannelRequestReceivedEventArgsA e = (PmlChannelRequestReceivedEventArgsA)state;
+			if (ChannelRequestReceived != null) ChannelRequestReceived(this, e);
+			e.RejectIfNotAccepted();
+		}
+
+		private void messageReceived(Object sender, EventArgs e) {
+			IPmlSubChannel subChannel = null;
+			UInt32 sid = 0;
+			bool subChannelExists = false;
+			if (!(e.Message is PmlDictionary)) return;
+			PmlDictionary msg = (PmlDictionary)e.Message;
+			PmlElement cmdElement = msg.GetChild("c");
+			PmlElement sidElement = msg.GetChild("i");
+			PmlElement msgElement = msg.GetChild("m");
+			if (cmdElement == null) return;
+			if (sidElement != null) sid = sidElement.ToUInt32();
+			if (sidElement != null) subChannelExists = _subchannels.TryGetValue(sid, out subChannel);
+			if (!subChannelExists) subChannel = null;
+			switch ((CommandCode)cmdElement.ToInt32()) {
+				case CommandCode.CallWithoutReply:
+					if (CallReceived != null) ThreadPool.RunCall(invokeCallReceived, new PmlCallReceivedEventArgs(msgElement, false, 0));
+					break;
+				case CommandCode.CallWithReply:
+					if (CallReceived != null) ThreadPool.RunCall(invokeCallReceived, new PmlCallReceivedEventArgs(msgElement, true, sid));
+					else sendMessage(CommandCode.Error, sid, null);
+					break;
+				case CommandCode.Message: //Reply to call | subchannel message
+					if (subChannelExists) subChannel.MessageIn(msgElement);
+					else sendMessage(CommandCode.Error, sid, null);
+					break;
+				case CommandCode.ChannelRequest:
+					if (subChannelExists) {
+						sendMessage(CommandCode.Error, sid, null);
+						subChannel.CloseIn();
+					} else {
+						if (ChannelRequestReceived == null) sendMessage(CommandCode.ChannelClose, sid, null);
+						else ThreadPool.RunCall(invokeChannelRequestReceived, new PmlChannelRequestReceivedEventArgsA(this, sid, msgElement));
+					}
+					break;
+				case CommandCode.ChannelAcknowledge:
+					if (subChannelExists) {
+						if (subChannel is PmlSubChannel) (subChannel as PmlSubChannel).AcknowledgeIn();
+						else {
+							sendMessage(CommandCode.Error, sid, null); //Error
+							subChannel.CloseIn();
+						}
+					} else sendMessage(CommandCode.Error, sid, null); //Error
+					break;
+				case CommandCode.ChannelClose:
+					if (subChannelExists) subChannel.CloseIn();
+					break;
+				case CommandCode.Error:
+					if (subChannelExists) subChannel.ErrorIn(msgElement);
+					break;
+			}
+		}
+	}*/
+}