diff Pml/PmlCommunicator2.cs @ 0:3ab940a0c7a0

Initial commit
author Ivo Smits <Ivo@UCIS.nl>
date Tue, 11 Sep 2012 16:28:53 +0200
parents
children 8fe322656807
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Pml/PmlCommunicator2.cs	Tue Sep 11 16:28:53 2012 +0200
@@ -0,0 +1,479 @@
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using UCIS.Pml;
+
+namespace UCIS.Pml {
+	public class PmlCommunicator {
+		private class CSyncRequest {
+			internal PmlElement Reply;
+			internal ManualResetEvent ResetEvent = new ManualResetEvent(false);
+		}
+		private interface ISession {
+			void MessageIn(PmlElement message);
+			void CloseIn();
+			UInt32 ID { get; }
+		}
+		/*public abstract class SessionBase : ISession {
+			private bool pActive;
+			private PmlCommunicator _communicator;
+			private UInt32 _id;
+
+			public uint SID { get { return _id; } }
+			public bool Active { get { return pActive; } }
+			public PmlCommunicator Communicator { get { return _communicator; } }
+
+			protected SessionBase(PmlCommunicator Connection) {
+				_communicator = Connection;
+			}
+
+			protected void Accept(UInt32 sid) {
+				if (pActive) throw new InvalidOperationException("Session is active");
+				_id = sid;
+				lock (_communicator._sessions) _communicator._sessions.Add(_id, this);
+				pActive = true;
+			}
+			protected void Request() {
+				Request(null);
+			}
+			protected void Request(PmlElement Message) {
+				if (pActive) throw new InvalidOperationException("Session is active");
+				_id = _communicator.GetNextSessionId(true);
+				lock (_communicator._sessions) _communicator._sessions.Add(_id, this);
+				_communicator.WriteSessionMessage(_id, 0, Message);
+				pActive = true;
+			}
+
+			uint ISession.ID { get { return _id; } }
+			void ISession.MessageIn(PmlElement message) { this.MessageIn(message); }
+			void ISession.CloseIn() {
+				pActive = false;
+				_communicator.RemoveSession(this);
+				Closed(null);
+			}
+
+			protected internal abstract void MessageIn(PmlElement Message);
+
+			protected void SendMessage(PmlElement Message) {
+				if (!pActive) throw new InvalidOperationException("Session is not active");
+				_communicator.WriteSessionMessage(_id, 1, Message);
+			}
+
+			public void Close() {
+				if (!pActive) return;
+				pActive = false;
+				_communicator.WriteSessionMessage(_id, 2, null);
+				_communicator.RemoveSession(this);
+			}
+
+			protected virtual void Closed(PmlElement Message) { }
+		}
+		public class Session : SessionBase {
+			public event MessageReceivedEventHandler MessageReceived;
+			public delegate void MessageReceivedEventHandler(PmlElement Message);
+			public event SessionClosedEventHandler SessionClosed;
+			public delegate void SessionClosedEventHandler(PmlElement Message);
+
+			public Session(PmlCommunicator Connection) : base(Connection) { }
+
+			public new void Accept(UInt32 SID) {
+				base.Accept(SID);
+			}
+			public new void Request() {
+				Request(null);
+			}
+			public new void Request(PmlElement Message) {
+				base.Request(Message);
+			}
+
+			protected internal override void MessageIn(PmlElement Message) {
+				if (MessageReceived != null) MessageReceived(Message);
+			}
+
+			public new void SendMessage(PmlElement Message) {
+				base.SendMessage(Message);
+			}
+
+			protected override void Closed(PmlElement Message) {
+				if (SessionClosed != null) SessionClosed(Message);
+			}
+		}*/
+
+		private class PmlSubChannel : ActivePmlChannel, ISession {
+			private enum ChannelState { Requesting, Acknowledged, Closed }
+
+			private PmlCommunicator _communicator;
+			private UInt32 _id;
+			private ChannelState _state;
+
+			internal PmlSubChannel(PmlCommunicator communicator, UInt32 sid, bool accepted) {
+				_communicator = communicator;
+				_id = sid;
+				_state = accepted ? ChannelState.Acknowledged : ChannelState.Requesting;
+				if (accepted) _communicator.AddSession(this);
+			}
+
+			public override bool IsOpen { get { return _state == ChannelState.Acknowledged; } }
+
+			uint ISession.ID { get { return _id; } }
+			void ISession.CloseIn() {
+				_state = ChannelState.Closed;
+				_communicator.RemoveSession(this);
+				base.Close();
+			}
+			void ISession.MessageIn(PmlElement message) {
+				base.PushReceivedMessage(message);
+			}
+
+			public override void WriteMessage(PmlElement message) {
+				if (_state != ChannelState.Acknowledged) throw new InvalidOperationException("The subchannel is not open");
+				_communicator.WriteSessionMessage(_id, 1, message);
+			}
+			public override void Close() {
+				if (_state != ChannelState.Acknowledged) return;
+				_state = ChannelState.Closed;
+				_communicator.WriteSessionMessage(_id, 2, null);
+				_communicator.RemoveSession(this); 
+				base.Close();
+			}
+		}
+		private class PmlChannelRequestReceivedEventArgsA : PmlChannelRequestReceivedEventArgs {
+			private PmlCommunicator _communicator;
+			private PmlElement _data;
+			private bool _accepted, _rejected;
+			private UInt32 _sid;
+			internal PmlChannelRequestReceivedEventArgsA(PmlCommunicator communicator, UInt32 sid, PmlElement message) {
+				_communicator = communicator;
+				_data = message;
+				_sid = sid;
+				_accepted = _rejected = false;
+			}
+			public UInt32 AcceptSession() {
+				if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected");
+				_accepted = true;
+				return _sid;
+			}
+			public override IPmlChannel Accept() {
+				if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected");
+				_accepted = true;
+				return new PmlSubChannel(_communicator, _sid, true);
+			}
+			public override void Reject() {
+				if (_accepted) throw new InvalidOperationException("The channel has already been accepted");
+				if (_rejected) return;
+				_rejected = true;
+				_communicator.WriteSessionMessage(_sid, 2, null);
+				//_channel.RejectOut();
+			}
+			internal void RejectIfNotAccepted() {
+				if (!_accepted) Reject();
+			}
+			public override PmlElement Data {
+				get {
+					return _data;
+				}
+			}
+		}
+
+		public event EventHandler<PmlCallReceivedEventArgs> CallReceived;
+		public event EventHandler<PmlChannelRequestReceivedEventArgs> ChannelRequestReceived;
+		public event EventHandler Closed;
+
+		private Dictionary<UInt32, ISession> _sessions = new Dictionary<UInt32, ISession>();
+		private Dictionary<UInt32, CSyncRequest> _invocations = new Dictionary<UInt32, CSyncRequest>();
+		private UInt32 pNextSession;
+		private UInt32 pNextSyncRequest;
+
+		private bool _closed;
+		private IPmlChannel _channel;
+
+		public IPmlChannel Channel { get { return _channel; } }
+
+		public PmlCommunicator(IPmlChannel channel, bool autoStart) {
+			_channel = channel;
+			if (autoStart) Start();
+		}
+		public void Start() {
+			_channel.BeginReadMessage(messageReceived, null);
+		}
+		public void StartSync() {
+			while (true) {
+				try {
+					processMessage(_channel.ReadMessage());
+				} catch (InvalidOperationException ex) {
+					Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message);
+					closed();
+					_channel.Close();
+					return;
+				} catch (Exception ex) {
+					Console.WriteLine(ex.ToString());
+					closed();
+					_channel.Close();
+					return;
+				}
+			}
+		}
+		public void Close() {
+			_channel.Close();
+		}
+		public void WriteRawMessage(PmlElement Message) {
+			_WriteMessage(Message);
+		}
+
+		private void _WriteMessage(PmlElement Message) {
+			lock (_channel) {
+				if (!_channel.IsOpen) throw new InvalidOperationException("Could not write message: the channel is not open");
+				_channel.WriteMessage(Message);
+			}
+		}
+		private void closed() {
+			_closed = true;
+			lock (_sessions) {
+				foreach (ISession S in _sessions.Values) {
+					try {
+						S.CloseIn();
+					} catch (Exception ex) {
+						Console.WriteLine(ex.ToString());
+					}
+				}
+				_sessions.Clear();
+			}
+			lock (_invocations) {
+				foreach (CSyncRequest T in _invocations.Values) {
+					T.ResetEvent.Set();
+				}
+				_invocations.Clear();
+			}
+			if (Closed != null) Closed(this, new EventArgs());
+		}
+
+		private void messageReceived(IAsyncResult ar) {
+			try {
+				PmlElement Message = _channel.EndReadMessage(ar);
+				processMessage(Message);
+				_channel.BeginReadMessage(messageReceived, null);
+			} catch (InvalidOperationException ex) {
+				Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message);
+				closed();
+				_channel.Close();
+				return;
+			} catch (Exception ex) {
+				Console.WriteLine(ex.ToString());
+				closed();
+				_channel.Close();
+				return;
+			}
+		}
+		private void processMessage(PmlElement Message) {
+			if (Message is PmlString) {
+				string Cmd = Message.ToString();
+				if (Cmd.Equals("PING")) {
+					_WriteMessage(new PmlString("PONG"));
+				/*} else if (Cmd.Equals("PONG")) {
+					Ping = 0;*/
+				}
+			} else if (Message is PmlDictionary) {
+				string Cmd = Message.GetChild("CMD").ToString();
+				if (Cmd.Equals("SES")) {
+					processSessionMessage(Message);
+				} else if (Cmd.Equals("RPL")) {
+					UInt32 SID = Message.GetChild("SID").ToUInt32();
+					CSyncRequest SRequest = null;
+					lock (_invocations) {
+						if (_invocations.TryGetValue(SID, out SRequest)) {
+							_invocations.Remove(SID);
+						} else {
+							Console.WriteLine("UCIS.PML.Connection.Worker Invalid request ID in reply: " + SID.ToString());
+						}
+					}
+					if (SRequest != null) {
+						SRequest.Reply = Message.GetChild("MSG");
+						SRequest.ResetEvent.Set();
+					}
+				} else if (Cmd.Equals("REQ") || Cmd.Equals("MSG")) {
+					UCIS.ThreadPool.RunCall(processCall, Message);
+				} else {
+					Console.WriteLine("UCIS.PML.Connection.Worker Invalid command received");
+				}
+			}
+		}
+		private void processSessionMessage(PmlElement Message) {
+			UInt32 SID = Message.GetChild("SID").ToUInt32();
+			byte SCMD = Message.GetChild("SCMD").ToByte();
+			PmlElement InnerMsg = Message.GetChild("MSG");
+			ISession Session = null;
+			lock (_sessions) if (!_sessions.TryGetValue(SID, out Session)) Session = null;
+			switch (SCMD) {
+				case 0: //Request
+					if (Session != null) {
+						try {
+							Session.CloseIn();
+						} catch (Exception ex) {
+							Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage-Request: exception in session.CloseIn: " + ex.ToString());
+						}
+						WriteSessionMessage(SID, 2, null);
+					} else if (ChannelRequestReceived != null) {
+						try {
+							PmlChannelRequestReceivedEventArgsA ea = new PmlChannelRequestReceivedEventArgsA(this, SID, InnerMsg);
+							ChannelRequestReceived(this, ea);
+							ea.RejectIfNotAccepted();
+						} catch (Exception ex) {
+							Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in ChannelRequestReceived: " + ex.ToString());
+							WriteSessionMessage(SID, 2, null);
+						}
+					} else {
+						WriteSessionMessage(SID, 2, null);
+					}
+					break;
+				case 1: //Message
+					if (Session != null) {
+						try {
+							Session.MessageIn(InnerMsg);
+						} catch (Exception ex) {
+							Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in session.MessageIn: " + ex.ToString());
+							WriteSessionMessage(SID, 2, null);
+						}
+					} else {
+						WriteSessionMessage(SID, 2, null);
+					}
+					break;
+				case 2: //Close
+					if (Session != null) {
+						try {
+							if (InnerMsg != null && !(InnerMsg is PmlNull)) Session.MessageIn(InnerMsg);
+						} catch (Exception ex) {
+							Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage-Close: exception in session.MessageIn: " + ex.ToString());
+						} finally {
+							try {
+								Session.CloseIn();
+							} catch (Exception ex) {
+								Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in session.CloseIn: " + ex.ToString());
+							}
+						}
+					}
+					break;
+			}
+		}
+		private void processCall(object state) {
+			PmlDictionary Message = (PmlDictionary)state;
+			bool wantReply = Message.ContainsKey("SID");
+			UInt32 SID = 0;
+			if (wantReply) SID = Message.GetChild("SID").ToUInt32();
+			PmlElement Reply = null;
+			try {
+				if (CallReceived != null) {
+					PmlCallReceivedEventArgs ea = new PmlCallReceivedEventArgs(Message.GetChild("MSG"), wantReply, SID);
+					CallReceived(this, ea);
+					Reply = ea.Reply;
+				}
+			} catch (Exception ex) {
+				Reply = new PmlDictionary();
+				((PmlDictionary)Reply).Add("EXCEPTION", new PmlString(ex.ToString()));
+				Console.WriteLine(ex.ToString());
+			} finally {
+				if (wantReply && Channel.IsOpen) {
+					try {
+						WriteSyncMessage(SID, true, Reply);
+					} catch (Exception ex) {
+						Console.WriteLine("UCIS.Pml.PmlCommunicator.processCall: exception: " + ex.ToString());
+						closed();
+						Channel.Close();
+					}
+				}
+			}
+		}
+
+		public void Call(PmlElement message) {
+			PmlDictionary Msg = new PmlDictionary();
+			Msg.Add("CMD", new PmlString("MSG"));
+			Msg.Add("MSG", message);
+			_WriteMessage(Msg);
+		}
+		public PmlElement Invoke(PmlElement message) {
+			return Invoke(message, 60000);
+		}
+		public PmlElement Invoke(PmlElement message, int timeout) {
+			if (_closed) throw new InvalidOperationException("Sorry, we're closed.");
+			CSyncRequest SyncEvent = new CSyncRequest();
+			UInt32 SID = GetNextSessionId(false);
+			lock (_invocations) _invocations.Add(SID, SyncEvent);
+			try {
+				WriteSyncMessage(SID, false, message);
+				if (!SyncEvent.ResetEvent.WaitOne(timeout, false)) {
+					if (!_closed) lock (_invocations) _invocations.Remove(SID);
+					throw new TimeoutException("The SyncRequest timed out (SID=" + SID.ToString() + ")");
+				}
+			} finally {
+				lock (_invocations) _invocations.Remove(SID);
+			}
+			return SyncEvent.Reply;
+		}
+
+		public IPmlChannel CreateChannel(PmlElement data) {
+			UInt32 sid = GetNextSessionId(true);
+			PmlSubChannel ch = new PmlSubChannel(this, sid, true);
+			WriteSessionMessage(sid, 0, data);
+			if (!ch.IsOpen) return null;
+			return ch;
+		}
+
+		private void AddSession(ISession session) {
+			if (_closed) return;
+			lock (_sessions) _sessions.Add(session.ID, session);
+		}
+		private void RemoveSession(UInt32 session) {
+			if (_closed) return;
+			lock (_sessions) _sessions.Remove(session);
+		}
+		private void RemoveSession(ISession session) {
+			RemoveSession(session.ID);
+		}
+
+		private UInt32 GetNextSessionId(bool IsSession) {
+			if (IsSession) {
+				lock (_sessions) {
+					do {
+						unchecked { pNextSession++; }
+					} while (_sessions.ContainsKey(pNextSession));
+					return pNextSession;
+				}
+			} else {
+				lock (_invocations) {
+					do {
+						unchecked { pNextSyncRequest++; }
+					} while (_invocations.ContainsKey(pNextSyncRequest));
+					return pNextSyncRequest;
+				}
+			}
+		}
+
+		protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) {
+			PmlDictionary Msg2 = new PmlDictionary();
+			Msg2.Add("CMD", new PmlString(RPL ? "RPL" : "REQ"));
+			Msg2.Add("SID", new PmlInteger(SID));
+			Msg2.Add("MSG", MSG);
+			_WriteMessage(Msg2);
+		}
+		protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) {
+			PmlDictionary Msg2 = new PmlDictionary();
+			Msg2.Add("CMD", new PmlString("SES"));
+			Msg2.Add("SID", new PmlInteger(SID));
+			Msg2.Add("SCMD", new PmlInteger(CMD));
+			if (MSG != null) Msg2.Add("MSG", MSG);
+			_WriteMessage(Msg2);
+		}
+
+
+
+		/* LegacyPmlCommunicator compatibility */
+		public PmlElement SyncRequest(PmlElement Request) {
+			return Invoke(Request);
+		}
+		public PmlElement SyncRequest(PmlElement Request, int Timeout) {
+			return Invoke(Request, Timeout);
+		}
+		public void SendMessage(PmlElement Message) {
+			Call(Message);
+		}
+	}
+}