view Pml/PmlCommunicator2.cs @ 17:5d9a7186c9f7

ProtocolBuffers: fix handling of 64 bit varint
author Ivo Smits <Ivo@UCIS.nl>
date Wed, 10 Apr 2013 01:33:32 +0200
parents 3ab940a0c7a0
children 8fe322656807
line wrap: on
line source

???using System;
using System.Threading;
using System.Collections.Generic;
using UCIS.Pml;

namespace UCIS.Pml {
	public class PmlCommunicator {
		private class CSyncRequest {
			internal PmlElement Reply;
			internal ManualResetEvent ResetEvent = new ManualResetEvent(false);
		}
		private interface ISession {
			void MessageIn(PmlElement message);
			void CloseIn();
			UInt32 ID { get; }
		}
		/*public abstract class SessionBase : ISession {
			private bool pActive;
			private PmlCommunicator _communicator;
			private UInt32 _id;

			public uint SID { get { return _id; } }
			public bool Active { get { return pActive; } }
			public PmlCommunicator Communicator { get { return _communicator; } }

			protected SessionBase(PmlCommunicator Connection) {
				_communicator = Connection;
			}

			protected void Accept(UInt32 sid) {
				if (pActive) throw new InvalidOperationException("Session is active");
				_id = sid;
				lock (_communicator._sessions) _communicator._sessions.Add(_id, this);
				pActive = true;
			}
			protected void Request() {
				Request(null);
			}
			protected void Request(PmlElement Message) {
				if (pActive) throw new InvalidOperationException("Session is active");
				_id = _communicator.GetNextSessionId(true);
				lock (_communicator._sessions) _communicator._sessions.Add(_id, this);
				_communicator.WriteSessionMessage(_id, 0, Message);
				pActive = true;
			}

			uint ISession.ID { get { return _id; } }
			void ISession.MessageIn(PmlElement message) { this.MessageIn(message); }
			void ISession.CloseIn() {
				pActive = false;
				_communicator.RemoveSession(this);
				Closed(null);
			}

			protected internal abstract void MessageIn(PmlElement Message);

			protected void SendMessage(PmlElement Message) {
				if (!pActive) throw new InvalidOperationException("Session is not active");
				_communicator.WriteSessionMessage(_id, 1, Message);
			}

			public void Close() {
				if (!pActive) return;
				pActive = false;
				_communicator.WriteSessionMessage(_id, 2, null);
				_communicator.RemoveSession(this);
			}

			protected virtual void Closed(PmlElement Message) { }
		}
		public class Session : SessionBase {
			public event MessageReceivedEventHandler MessageReceived;
			public delegate void MessageReceivedEventHandler(PmlElement Message);
			public event SessionClosedEventHandler SessionClosed;
			public delegate void SessionClosedEventHandler(PmlElement Message);

			public Session(PmlCommunicator Connection) : base(Connection) { }

			public new void Accept(UInt32 SID) {
				base.Accept(SID);
			}
			public new void Request() {
				Request(null);
			}
			public new void Request(PmlElement Message) {
				base.Request(Message);
			}

			protected internal override void MessageIn(PmlElement Message) {
				if (MessageReceived != null) MessageReceived(Message);
			}

			public new void SendMessage(PmlElement Message) {
				base.SendMessage(Message);
			}

			protected override void Closed(PmlElement Message) {
				if (SessionClosed != null) SessionClosed(Message);
			}
		}*/

		private class PmlSubChannel : ActivePmlChannel, ISession {
			private enum ChannelState { Requesting, Acknowledged, Closed }

			private PmlCommunicator _communicator;
			private UInt32 _id;
			private ChannelState _state;

			internal PmlSubChannel(PmlCommunicator communicator, UInt32 sid, bool accepted) {
				_communicator = communicator;
				_id = sid;
				_state = accepted ? ChannelState.Acknowledged : ChannelState.Requesting;
				if (accepted) _communicator.AddSession(this);
			}

			public override bool IsOpen { get { return _state == ChannelState.Acknowledged; } }

			uint ISession.ID { get { return _id; } }
			void ISession.CloseIn() {
				_state = ChannelState.Closed;
				_communicator.RemoveSession(this);
				base.Close();
			}
			void ISession.MessageIn(PmlElement message) {
				base.PushReceivedMessage(message);
			}

			public override void WriteMessage(PmlElement message) {
				if (_state != ChannelState.Acknowledged) throw new InvalidOperationException("The subchannel is not open");
				_communicator.WriteSessionMessage(_id, 1, message);
			}
			public override void Close() {
				if (_state != ChannelState.Acknowledged) return;
				_state = ChannelState.Closed;
				_communicator.WriteSessionMessage(_id, 2, null);
				_communicator.RemoveSession(this); 
				base.Close();
			}
		}
		private class PmlChannelRequestReceivedEventArgsA : PmlChannelRequestReceivedEventArgs {
			private PmlCommunicator _communicator;
			private PmlElement _data;
			private bool _accepted, _rejected;
			private UInt32 _sid;
			internal PmlChannelRequestReceivedEventArgsA(PmlCommunicator communicator, UInt32 sid, PmlElement message) {
				_communicator = communicator;
				_data = message;
				_sid = sid;
				_accepted = _rejected = false;
			}
			public UInt32 AcceptSession() {
				if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected");
				_accepted = true;
				return _sid;
			}
			public override IPmlChannel Accept() {
				if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected");
				_accepted = true;
				return new PmlSubChannel(_communicator, _sid, true);
			}
			public override void Reject() {
				if (_accepted) throw new InvalidOperationException("The channel has already been accepted");
				if (_rejected) return;
				_rejected = true;
				_communicator.WriteSessionMessage(_sid, 2, null);
				//_channel.RejectOut();
			}
			internal void RejectIfNotAccepted() {
				if (!_accepted) Reject();
			}
			public override PmlElement Data {
				get {
					return _data;
				}
			}
		}

		public event EventHandler<PmlCallReceivedEventArgs> CallReceived;
		public event EventHandler<PmlChannelRequestReceivedEventArgs> ChannelRequestReceived;
		public event EventHandler Closed;

		private Dictionary<UInt32, ISession> _sessions = new Dictionary<UInt32, ISession>();
		private Dictionary<UInt32, CSyncRequest> _invocations = new Dictionary<UInt32, CSyncRequest>();
		private UInt32 pNextSession;
		private UInt32 pNextSyncRequest;

		private bool _closed;
		private IPmlChannel _channel;

		public IPmlChannel Channel { get { return _channel; } }

		public PmlCommunicator(IPmlChannel channel, bool autoStart) {
			_channel = channel;
			if (autoStart) Start();
		}
		public void Start() {
			_channel.BeginReadMessage(messageReceived, null);
		}
		public void StartSync() {
			while (true) {
				try {
					processMessage(_channel.ReadMessage());
				} catch (InvalidOperationException ex) {
					Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message);
					closed();
					_channel.Close();
					return;
				} catch (Exception ex) {
					Console.WriteLine(ex.ToString());
					closed();
					_channel.Close();
					return;
				}
			}
		}
		public void Close() {
			_channel.Close();
		}
		public void WriteRawMessage(PmlElement Message) {
			_WriteMessage(Message);
		}

		private void _WriteMessage(PmlElement Message) {
			lock (_channel) {
				if (!_channel.IsOpen) throw new InvalidOperationException("Could not write message: the channel is not open");
				_channel.WriteMessage(Message);
			}
		}
		private void closed() {
			_closed = true;
			lock (_sessions) {
				foreach (ISession S in _sessions.Values) {
					try {
						S.CloseIn();
					} catch (Exception ex) {
						Console.WriteLine(ex.ToString());
					}
				}
				_sessions.Clear();
			}
			lock (_invocations) {
				foreach (CSyncRequest T in _invocations.Values) {
					T.ResetEvent.Set();
				}
				_invocations.Clear();
			}
			if (Closed != null) Closed(this, new EventArgs());
		}

		private void messageReceived(IAsyncResult ar) {
			try {
				PmlElement Message = _channel.EndReadMessage(ar);
				processMessage(Message);
				_channel.BeginReadMessage(messageReceived, null);
			} catch (InvalidOperationException ex) {
				Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message);
				closed();
				_channel.Close();
				return;
			} catch (Exception ex) {
				Console.WriteLine(ex.ToString());
				closed();
				_channel.Close();
				return;
			}
		}
		private void processMessage(PmlElement Message) {
			if (Message is PmlString) {
				string Cmd = Message.ToString();
				if (Cmd.Equals("PING")) {
					_WriteMessage(new PmlString("PONG"));
				/*} else if (Cmd.Equals("PONG")) {
					Ping = 0;*/
				}
			} else if (Message is PmlDictionary) {
				string Cmd = Message.GetChild("CMD").ToString();
				if (Cmd.Equals("SES")) {
					processSessionMessage(Message);
				} else if (Cmd.Equals("RPL")) {
					UInt32 SID = Message.GetChild("SID").ToUInt32();
					CSyncRequest SRequest = null;
					lock (_invocations) {
						if (_invocations.TryGetValue(SID, out SRequest)) {
							_invocations.Remove(SID);
						} else {
							Console.WriteLine("UCIS.PML.Connection.Worker Invalid request ID in reply: " + SID.ToString());
						}
					}
					if (SRequest != null) {
						SRequest.Reply = Message.GetChild("MSG");
						SRequest.ResetEvent.Set();
					}
				} else if (Cmd.Equals("REQ") || Cmd.Equals("MSG")) {
					UCIS.ThreadPool.RunCall(processCall, Message);
				} else {
					Console.WriteLine("UCIS.PML.Connection.Worker Invalid command received");
				}
			}
		}
		private void processSessionMessage(PmlElement Message) {
			UInt32 SID = Message.GetChild("SID").ToUInt32();
			byte SCMD = Message.GetChild("SCMD").ToByte();
			PmlElement InnerMsg = Message.GetChild("MSG");
			ISession Session = null;
			lock (_sessions) if (!_sessions.TryGetValue(SID, out Session)) Session = null;
			switch (SCMD) {
				case 0: //Request
					if (Session != null) {
						try {
							Session.CloseIn();
						} catch (Exception ex) {
							Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage-Request: exception in session.CloseIn: " + ex.ToString());
						}
						WriteSessionMessage(SID, 2, null);
					} else if (ChannelRequestReceived != null) {
						try {
							PmlChannelRequestReceivedEventArgsA ea = new PmlChannelRequestReceivedEventArgsA(this, SID, InnerMsg);
							ChannelRequestReceived(this, ea);
							ea.RejectIfNotAccepted();
						} catch (Exception ex) {
							Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in ChannelRequestReceived: " + ex.ToString());
							WriteSessionMessage(SID, 2, null);
						}
					} else {
						WriteSessionMessage(SID, 2, null);
					}
					break;
				case 1: //Message
					if (Session != null) {
						try {
							Session.MessageIn(InnerMsg);
						} catch (Exception ex) {
							Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in session.MessageIn: " + ex.ToString());
							WriteSessionMessage(SID, 2, null);
						}
					} else {
						WriteSessionMessage(SID, 2, null);
					}
					break;
				case 2: //Close
					if (Session != null) {
						try {
							if (InnerMsg != null && !(InnerMsg is PmlNull)) Session.MessageIn(InnerMsg);
						} catch (Exception ex) {
							Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage-Close: exception in session.MessageIn: " + ex.ToString());
						} finally {
							try {
								Session.CloseIn();
							} catch (Exception ex) {
								Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in session.CloseIn: " + ex.ToString());
							}
						}
					}
					break;
			}
		}
		private void processCall(object state) {
			PmlDictionary Message = (PmlDictionary)state;
			bool wantReply = Message.ContainsKey("SID");
			UInt32 SID = 0;
			if (wantReply) SID = Message.GetChild("SID").ToUInt32();
			PmlElement Reply = null;
			try {
				if (CallReceived != null) {
					PmlCallReceivedEventArgs ea = new PmlCallReceivedEventArgs(Message.GetChild("MSG"), wantReply, SID);
					CallReceived(this, ea);
					Reply = ea.Reply;
				}
			} catch (Exception ex) {
				Reply = new PmlDictionary();
				((PmlDictionary)Reply).Add("EXCEPTION", new PmlString(ex.ToString()));
				Console.WriteLine(ex.ToString());
			} finally {
				if (wantReply && Channel.IsOpen) {
					try {
						WriteSyncMessage(SID, true, Reply);
					} catch (Exception ex) {
						Console.WriteLine("UCIS.Pml.PmlCommunicator.processCall: exception: " + ex.ToString());
						closed();
						Channel.Close();
					}
				}
			}
		}

		public void Call(PmlElement message) {
			PmlDictionary Msg = new PmlDictionary();
			Msg.Add("CMD", new PmlString("MSG"));
			Msg.Add("MSG", message);
			_WriteMessage(Msg);
		}
		public PmlElement Invoke(PmlElement message) {
			return Invoke(message, 60000);
		}
		public PmlElement Invoke(PmlElement message, int timeout) {
			if (_closed) throw new InvalidOperationException("Sorry, we're closed.");
			CSyncRequest SyncEvent = new CSyncRequest();
			UInt32 SID = GetNextSessionId(false);
			lock (_invocations) _invocations.Add(SID, SyncEvent);
			try {
				WriteSyncMessage(SID, false, message);
				if (!SyncEvent.ResetEvent.WaitOne(timeout, false)) {
					if (!_closed) lock (_invocations) _invocations.Remove(SID);
					throw new TimeoutException("The SyncRequest timed out (SID=" + SID.ToString() + ")");
				}
			} finally {
				lock (_invocations) _invocations.Remove(SID);
			}
			return SyncEvent.Reply;
		}

		public IPmlChannel CreateChannel(PmlElement data) {
			UInt32 sid = GetNextSessionId(true);
			PmlSubChannel ch = new PmlSubChannel(this, sid, true);
			WriteSessionMessage(sid, 0, data);
			if (!ch.IsOpen) return null;
			return ch;
		}

		private void AddSession(ISession session) {
			if (_closed) return;
			lock (_sessions) _sessions.Add(session.ID, session);
		}
		private void RemoveSession(UInt32 session) {
			if (_closed) return;
			lock (_sessions) _sessions.Remove(session);
		}
		private void RemoveSession(ISession session) {
			RemoveSession(session.ID);
		}

		private UInt32 GetNextSessionId(bool IsSession) {
			if (IsSession) {
				lock (_sessions) {
					do {
						unchecked { pNextSession++; }
					} while (_sessions.ContainsKey(pNextSession));
					return pNextSession;
				}
			} else {
				lock (_invocations) {
					do {
						unchecked { pNextSyncRequest++; }
					} while (_invocations.ContainsKey(pNextSyncRequest));
					return pNextSyncRequest;
				}
			}
		}

		protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) {
			PmlDictionary Msg2 = new PmlDictionary();
			Msg2.Add("CMD", new PmlString(RPL ? "RPL" : "REQ"));
			Msg2.Add("SID", new PmlInteger(SID));
			Msg2.Add("MSG", MSG);
			_WriteMessage(Msg2);
		}
		protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) {
			PmlDictionary Msg2 = new PmlDictionary();
			Msg2.Add("CMD", new PmlString("SES"));
			Msg2.Add("SID", new PmlInteger(SID));
			Msg2.Add("SCMD", new PmlInteger(CMD));
			if (MSG != null) Msg2.Add("MSG", MSG);
			_WriteMessage(Msg2);
		}



		/* LegacyPmlCommunicator compatibility */
		public PmlElement SyncRequest(PmlElement Request) {
			return Invoke(Request);
		}
		public PmlElement SyncRequest(PmlElement Request, int Timeout) {
			return Invoke(Request, Timeout);
		}
		public void SendMessage(PmlElement Message) {
			Call(Message);
		}
	}
}