Mercurial > hg > ucis.core
diff Pml/LegacyPmlCommunicator.cs @ 0:3ab940a0c7a0
Initial commit
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Tue, 11 Sep 2012 16:28:53 +0200 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Pml/LegacyPmlCommunicator.cs Tue Sep 11 16:28:53 2012 +0200 @@ -0,0 +1,403 @@ +using System; +using System.Threading; +using System.Collections.Generic; +using UCIS.Pml; + +namespace UCIS.Pml { + public class LegacyPmlCommunicator { + private class CSyncRequest { + internal PmlElement Reply; + internal ManualResetEvent ResetEvent = new ManualResetEvent(false); + } + public abstract class SessionBase { + private bool pActive; + private LegacyPmlCommunicator pConnection; + private UInt32 pSID; + + public uint SID { get { return pSID; } } + public bool Active { get { return pActive; } } + public LegacyPmlCommunicator Communicator { get { return pConnection; } } + + protected SessionBase(LegacyPmlCommunicator Connection) { + pConnection = Connection; + } + + protected void Accept(UInt32 SID) { + if (pActive) throw new InvalidOperationException("Session is active"); + pSID = SID; + lock (pConnection.pSessions) pConnection.pSessions.Add(pSID, this); + pActive = true; + } + protected void Request() { + Request(null); + } + protected void Request(PmlElement Message) { + if (pActive) throw new InvalidOperationException("Session is active"); + pSID = pConnection.GetNextSessionId(true); + lock (pConnection.pSessions) pConnection.pSessions.Add(pSID, this); + pConnection.WriteSessionMessage(pSID, 0, Message); + pActive = true; + } + + protected internal abstract void MessageIn(PmlElement Message); + + protected void SendMessage(PmlElement Message) { + if (!pActive) throw new InvalidOperationException("Session is not active"); + pConnection.WriteSessionMessage(pSID, 1, Message); + } + + public void Close() { + Close(null); + } + public void Close(PmlElement Message) { + if (!pActive) return; // throw new InvalidOperationException("Session is not active"); + pConnection.WriteSessionMessage(pSID, 2, Message); + ClosedA(); + } + + internal void ClosedA() { + pActive = false; + lock (pConnection.pSessions) pConnection.pSessions.Remove(pSID); + } + + internal void ClosedB(PmlElement Message) { + pActive = false; + Closed(Message); + } + + protected virtual void Closed(PmlElement Message) { + } + } + public class Session : SessionBase { + public event MessageReceivedEventHandler MessageReceived; + public delegate void MessageReceivedEventHandler(PmlElement Message); + public event SessionClosedEventHandler SessionClosed; + public delegate void SessionClosedEventHandler(PmlElement Message); + + public Session(LegacyPmlCommunicator Connection) : base(Connection) { } + + public new void Accept(UInt32 SID) { + base.Accept(SID); + } + public new void Request() { + Request(null); + } + public new void Request(PmlElement Message) { + base.Request(Message); + } + + protected internal override void MessageIn(PmlElement Message) { + if (MessageReceived != null) MessageReceived(Message); + } + + public new void SendMessage(PmlElement Message) { + base.SendMessage(Message); + } + + protected override void Closed(PmlElement Message) { + if (SessionClosed != null) SessionClosed(Message); + } + } + + private Dictionary<UInt32, SessionBase> pSessions = new Dictionary<UInt32, SessionBase>(); + private UInt32 pNextSession; + private Dictionary<UInt32, CSyncRequest> pSyncRequests = new Dictionary<UInt32, CSyncRequest>(); + private UInt32 pNextSyncRequest; + + private IPmlChannel _channel; + + public event MessageReceivedEventHandler MessageReceived; + public delegate void MessageReceivedEventHandler(PmlElement Message); + public event RequestReceivedEventHandler RequestReceived; + public delegate void RequestReceivedEventHandler(PmlElement Request, ref PmlElement Reply); + public event SessionRequestReceivedEventHandler SessionRequestReceived; + public delegate void SessionRequestReceivedEventHandler(PmlElement Request, uint SID); + public event EventHandler Closed; + + public ICollection<SessionBase> Sessions { get { return (ICollection<SessionBase>)pSessions.Values; } } + public int SyncRequests { get { return pSyncRequests.Count; } } + + public LegacyPmlCommunicator(IPmlChannel channel, bool autoStart) { + _channel = channel; + if (autoStart) Start(); + //_channel.BeginReadMessage(messageReceived, null); + //_channel.MessageReceived += messageReceived; + //_channel.Closed += closed; + } + public void Start() { + _channel.BeginReadMessage(messageReceived, null); + } + + public IPmlChannel Channel { get { return _channel; } } + + public void Close() { + //_channel.MessageReceived -= messageReceived; + //_channel.Closed -= closed; + _channel.Close(); + } + + private void _WriteMessage(PmlElement Message) { + lock (_channel) { + if (_channel.IsOpen) { + _channel.WriteMessage(Message); + } else { + throw new InvalidOperationException("Could not write message: the channel is not open"); + } + } + } + + private UInt32 GetNextSessionId(bool IsSession) { + if (IsSession) { + lock (pSessions) { + do { + if (pNextSession == UInt32.MaxValue) { + pNextSession = 0; + } else { + pNextSession += (uint)1; + } + } + while (pSessions.ContainsKey(pNextSession)); + return pNextSession; + } + } else { + lock (pSyncRequests) { + do { + if (pNextSyncRequest == UInt32.MaxValue) { + pNextSyncRequest = 0; + } else { + pNextSyncRequest += (uint)1; + } + } + while (pSyncRequests.ContainsKey(pNextSyncRequest)); + return pNextSyncRequest; + } + } + } + + protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) { + PmlDictionary Msg2 = new PmlDictionary(); + Msg2.Add("CMD", new PmlString("SES")); + Msg2.Add("SID", new PmlInteger(SID)); + Msg2.Add("SCMD", new PmlInteger(CMD)); + Msg2.Add("MSG", MSG); + _WriteMessage(Msg2); + } + + protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) { + PmlDictionary Msg2 = new PmlDictionary(); + if (RPL) { + Msg2.Add("CMD", new PmlString("RPL")); + } else { + Msg2.Add("CMD", new PmlString("REQ")); + } + Msg2.Add("SID", new PmlInteger(SID)); + Msg2.Add("MSG", MSG); + _WriteMessage(Msg2); + } + + private void messageReceived(IAsyncResult ar) { + PmlElement Message; + try { + Message = _channel.EndReadMessage(ar); + _channel.BeginReadMessage(messageReceived, null); + } catch (InvalidOperationException ex) { + Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message); + closed(); + _channel.Close(); + return; + } catch (Exception ex) { + Console.WriteLine(ex.ToString()); + closed(); + _channel.Close(); + return; + } + int Ping = 0; + if (Message == null) { + if (Ping > 2) { + _channel.Close(); + return; + } else { + _WriteMessage(new PmlString("PING")); + } + Ping += 1; + } else if (Message is PmlString) { + string Cmd = Message.ToString(); + if (Cmd.Equals("PING")) { + _WriteMessage(new PmlString("PONG")); + } else if (Cmd.Equals("PONG")) { + Ping = 0; + } + } else if (Message is PmlDictionary) { + string Cmd = null; + Cmd = Message.GetChild("CMD").ToString(); + if (Cmd.Equals("SES")) { + UInt32 SID = default(UInt32); + byte SCMD = 0; + SessionBase Session = default(SessionBase); + PmlElement InnerMsg = default(PmlElement); + SID = Message.GetChild("SID").ToUInt32(); + SCMD = Message.GetChild("SCMD").ToByte(); + InnerMsg = Message.GetChild("MSG"); + lock (pSessions) { + if (pSessions.ContainsKey(SID)) { + Session = pSessions[SID]; + } else { + Session = null; + } + } + if (SCMD == 0) { + if (Session == null) { + if (SessionRequestReceived != null) { + try { + SessionRequestReceived(InnerMsg, SID); + } catch (Exception ex) { + Console.WriteLine("Exception in LegacyPmlCommnuicator.messageReceived->SessionRequestReceived: " + ex.ToString()); + WriteSessionMessage(SID, 2, null); + } + } + } else { + try { + Session.ClosedA(); + Session.ClosedB(null); + } catch (Exception ex) { + Console.WriteLine("Exception in LegacyPmlCommnuicator.messageReceived->Session.ClosedA/B: " + ex.ToString()); + } + WriteSessionMessage(SID, 2, null); + } + } else if (SCMD == 1) { + if (Session == null) { + WriteSessionMessage(SID, 2, null); + } else { + try { + Session.MessageIn(InnerMsg); + } catch (Exception ex) { + Console.WriteLine("Exception in LegacyPmlCommnuicator.messageReceived->Session.MessageIn: " + ex.ToString()); + WriteSessionMessage(SID, 2, null); + } + } + } else if (SCMD == 2) { + if (Session != null) { + try { + Session.ClosedA(); + Session.ClosedB(InnerMsg); + } catch (Exception ex) { + Console.WriteLine("Exception in LegacyPmlCommnuicator.messageReceived->Session.ClosedA/B: " + ex.ToString()); + } + } + } + } else if (Cmd.Equals("RPL")) { + UInt32 SID = default(UInt32); + CSyncRequest SRequest = null; + SID = Message.GetChild("SID").ToUInt32(); + lock (pSyncRequests) { + if (pSyncRequests.TryGetValue(SID, out SRequest)) { + pSyncRequests.Remove(SID); + } else { + Console.WriteLine("UCIS.PML.Connection.Worker Invalid request ID in reply: " + SID.ToString()); + } + } + if (SRequest != null) { + SRequest.Reply = Message.GetChild("MSG"); + SRequest.ResetEvent.Set(); + } + } else if (Cmd.Equals("REQ")) { + UCIS.ThreadPool.RunCall(SyncRequestHandler, Message); + //System.Threading.ThreadPool.QueueUserWorkItem(new System.Threading.WaitCallback(SyncRequestHandler), Message); + } else if (Cmd.Equals("MSG")) { + PmlElement InnerMsg = Message.GetChild("MSG"); + if (MessageReceived != null) MessageReceived(InnerMsg); + } else { + throw new InvalidOperationException("Invalid operation"); + } + } + } + private void closed() { + //_channel.MessageReceived -= messageReceived; + //_channel.Closed -= closed; + Console.WriteLine("UCIS.PML.Connection: Connection closed"); + try { + SessionBase[] sessions; + lock (pSessions) { + sessions = new SessionBase[pSessions.Count]; + pSessions.Values.CopyTo(sessions, 0); + } + foreach (SessionBase S in sessions) { + try { + S.ClosedB(null); + } catch (Exception ex) { + Console.WriteLine(ex.ToString()); + } + } + } catch (Exception ex) { + Console.WriteLine(ex.ToString()); + } + lock (pSessions) pSessions.Clear(); + try { + CSyncRequest[] reqs; + lock (pSyncRequests) { + reqs = new CSyncRequest[pSyncRequests.Count]; + pSyncRequests.Values.CopyTo(reqs, 0); + } + foreach (CSyncRequest T in reqs) { + T.ResetEvent.Set(); + } + } catch (Exception ex) { + Console.WriteLine(ex.ToString()); + } + lock (pSyncRequests) pSyncRequests.Clear(); + if (Closed != null) Closed(this, new EventArgs()); + } + + private void SyncRequestHandler(object state) { + PmlDictionary Message = (PmlDictionary)state; + PmlElement Reply = null; + UInt32 SID = 0; + try { + SID = Message.GetChild("SID").ToUInt32(); + PmlElement InnerMsg = Message.GetChild("MSG"); + if (RequestReceived != null) { + RequestReceived(InnerMsg, ref Reply); + } + } catch (Exception ex) { + Reply = new PmlDictionary(); + ((PmlDictionary)Reply).Add("EXCEPTION", new PmlString(ex.ToString())); + Console.WriteLine(ex.ToString()); + } + try { + WriteSyncMessage(SID, true, Reply); + } catch (Exception ex) { + Console.WriteLine("Exception: " + ex.ToString()); + } + } + + public PmlElement SyncRequest(PmlElement Request) { + return SyncRequest(Request, 30000); + } + public PmlElement SyncRequest(PmlElement Request, int Timeout) { + CSyncRequest SyncEvent = new CSyncRequest(); + UInt32 SID = GetNextSessionId(false); + lock (pSyncRequests) pSyncRequests.Add(SID, SyncEvent); + try { + WriteSyncMessage(SID, false, Request); + if (!SyncEvent.ResetEvent.WaitOne(Timeout, false)) { + lock (pSyncRequests) pSyncRequests.Remove(SID); + throw new TimeoutException("The SyncRequest timed out (SID=" + SID.ToString() + ")"); + } + } finally { + lock (pSyncRequests) pSyncRequests.Remove(SID); + } + return SyncEvent.Reply; + } + + public void SendMessage(PmlElement Message) { + PmlDictionary Msg = new PmlDictionary(); + Msg.Add("CMD", new PmlString("MSG")); + Msg.Add("MSG", Message); + _WriteMessage(Msg); + } + + public void SendRawMessage(PmlElement Message) { + _WriteMessage(Message); + } + } +}