Mercurial > hg > ucis.core
view Pml/LegacyPmlCommunicator.cs @ 17:5d9a7186c9f7
ProtocolBuffers: fix handling of 64 bit varint
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Wed, 10 Apr 2013 01:33:32 +0200 |
parents | 3ab940a0c7a0 |
children |
line wrap: on
line source
???using System; using System.Threading; using System.Collections.Generic; using UCIS.Pml; namespace UCIS.Pml { public class LegacyPmlCommunicator { private class CSyncRequest { internal PmlElement Reply; internal ManualResetEvent ResetEvent = new ManualResetEvent(false); } public abstract class SessionBase { private bool pActive; private LegacyPmlCommunicator pConnection; private UInt32 pSID; public uint SID { get { return pSID; } } public bool Active { get { return pActive; } } public LegacyPmlCommunicator Communicator { get { return pConnection; } } protected SessionBase(LegacyPmlCommunicator Connection) { pConnection = Connection; } protected void Accept(UInt32 SID) { if (pActive) throw new InvalidOperationException("Session is active"); pSID = SID; lock (pConnection.pSessions) pConnection.pSessions.Add(pSID, this); pActive = true; } protected void Request() { Request(null); } protected void Request(PmlElement Message) { if (pActive) throw new InvalidOperationException("Session is active"); pSID = pConnection.GetNextSessionId(true); lock (pConnection.pSessions) pConnection.pSessions.Add(pSID, this); pConnection.WriteSessionMessage(pSID, 0, Message); pActive = true; } protected internal abstract void MessageIn(PmlElement Message); protected void SendMessage(PmlElement Message) { if (!pActive) throw new InvalidOperationException("Session is not active"); pConnection.WriteSessionMessage(pSID, 1, Message); } public void Close() { Close(null); } public void Close(PmlElement Message) { if (!pActive) return; // throw new InvalidOperationException("Session is not active"); pConnection.WriteSessionMessage(pSID, 2, Message); ClosedA(); } internal void ClosedA() { pActive = false; lock (pConnection.pSessions) pConnection.pSessions.Remove(pSID); } internal void ClosedB(PmlElement Message) { pActive = false; Closed(Message); } protected virtual void Closed(PmlElement Message) { } } public class Session : SessionBase { public event MessageReceivedEventHandler MessageReceived; public delegate void MessageReceivedEventHandler(PmlElement Message); public event SessionClosedEventHandler SessionClosed; public delegate void SessionClosedEventHandler(PmlElement Message); public Session(LegacyPmlCommunicator Connection) : base(Connection) { } public new void Accept(UInt32 SID) { base.Accept(SID); } public new void Request() { Request(null); } public new void Request(PmlElement Message) { base.Request(Message); } protected internal override void MessageIn(PmlElement Message) { if (MessageReceived != null) MessageReceived(Message); } public new void SendMessage(PmlElement Message) { base.SendMessage(Message); } protected override void Closed(PmlElement Message) { if (SessionClosed != null) SessionClosed(Message); } } private Dictionary<UInt32, SessionBase> pSessions = new Dictionary<UInt32, SessionBase>(); private UInt32 pNextSession; private Dictionary<UInt32, CSyncRequest> pSyncRequests = new Dictionary<UInt32, CSyncRequest>(); private UInt32 pNextSyncRequest; private IPmlChannel _channel; public event MessageReceivedEventHandler MessageReceived; public delegate void MessageReceivedEventHandler(PmlElement Message); public event RequestReceivedEventHandler RequestReceived; public delegate void RequestReceivedEventHandler(PmlElement Request, ref PmlElement Reply); public event SessionRequestReceivedEventHandler SessionRequestReceived; public delegate void SessionRequestReceivedEventHandler(PmlElement Request, uint SID); public event EventHandler Closed; public ICollection<SessionBase> Sessions { get { return (ICollection<SessionBase>)pSessions.Values; } } public int SyncRequests { get { return pSyncRequests.Count; } } public LegacyPmlCommunicator(IPmlChannel channel, bool autoStart) { _channel = channel; if (autoStart) Start(); //_channel.BeginReadMessage(messageReceived, null); //_channel.MessageReceived += messageReceived; //_channel.Closed += closed; } public void Start() { _channel.BeginReadMessage(messageReceived, null); } public IPmlChannel Channel { get { return _channel; } } public void Close() { //_channel.MessageReceived -= messageReceived; //_channel.Closed -= closed; _channel.Close(); } private void _WriteMessage(PmlElement Message) { lock (_channel) { if (_channel.IsOpen) { _channel.WriteMessage(Message); } else { throw new InvalidOperationException("Could not write message: the channel is not open"); } } } private UInt32 GetNextSessionId(bool IsSession) { if (IsSession) { lock (pSessions) { do { if (pNextSession == UInt32.MaxValue) { pNextSession = 0; } else { pNextSession += (uint)1; } } while (pSessions.ContainsKey(pNextSession)); return pNextSession; } } else { lock (pSyncRequests) { do { if (pNextSyncRequest == UInt32.MaxValue) { pNextSyncRequest = 0; } else { pNextSyncRequest += (uint)1; } } while (pSyncRequests.ContainsKey(pNextSyncRequest)); return pNextSyncRequest; } } } protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) { PmlDictionary Msg2 = new PmlDictionary(); Msg2.Add("CMD", new PmlString("SES")); Msg2.Add("SID", new PmlInteger(SID)); Msg2.Add("SCMD", new PmlInteger(CMD)); Msg2.Add("MSG", MSG); _WriteMessage(Msg2); } protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) { PmlDictionary Msg2 = new PmlDictionary(); if (RPL) { Msg2.Add("CMD", new PmlString("RPL")); } else { Msg2.Add("CMD", new PmlString("REQ")); } Msg2.Add("SID", new PmlInteger(SID)); Msg2.Add("MSG", MSG); _WriteMessage(Msg2); } private void messageReceived(IAsyncResult ar) { PmlElement Message; try { Message = _channel.EndReadMessage(ar); _channel.BeginReadMessage(messageReceived, null); } catch (InvalidOperationException ex) { Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message); closed(); _channel.Close(); return; } catch (Exception ex) { Console.WriteLine(ex.ToString()); closed(); _channel.Close(); return; } int Ping = 0; if (Message == null) { if (Ping > 2) { _channel.Close(); return; } else { _WriteMessage(new PmlString("PING")); } Ping += 1; } else if (Message is PmlString) { string Cmd = Message.ToString(); if (Cmd.Equals("PING")) { _WriteMessage(new PmlString("PONG")); } else if (Cmd.Equals("PONG")) { Ping = 0; } } else if (Message is PmlDictionary) { string Cmd = null; Cmd = Message.GetChild("CMD").ToString(); if (Cmd.Equals("SES")) { UInt32 SID = default(UInt32); byte SCMD = 0; SessionBase Session = default(SessionBase); PmlElement InnerMsg = default(PmlElement); SID = Message.GetChild("SID").ToUInt32(); SCMD = Message.GetChild("SCMD").ToByte(); InnerMsg = Message.GetChild("MSG"); lock (pSessions) { if (pSessions.ContainsKey(SID)) { Session = pSessions[SID]; } else { Session = null; } } if (SCMD == 0) { if (Session == null) { if (SessionRequestReceived != null) { try { SessionRequestReceived(InnerMsg, SID); } catch (Exception ex) { Console.WriteLine("Exception in LegacyPmlCommnuicator.messageReceived->SessionRequestReceived: " + ex.ToString()); WriteSessionMessage(SID, 2, null); } } } else { try { Session.ClosedA(); Session.ClosedB(null); } catch (Exception ex) { Console.WriteLine("Exception in LegacyPmlCommnuicator.messageReceived->Session.ClosedA/B: " + ex.ToString()); } WriteSessionMessage(SID, 2, null); } } else if (SCMD == 1) { if (Session == null) { WriteSessionMessage(SID, 2, null); } else { try { Session.MessageIn(InnerMsg); } catch (Exception ex) { Console.WriteLine("Exception in LegacyPmlCommnuicator.messageReceived->Session.MessageIn: " + ex.ToString()); WriteSessionMessage(SID, 2, null); } } } else if (SCMD == 2) { if (Session != null) { try { Session.ClosedA(); Session.ClosedB(InnerMsg); } catch (Exception ex) { Console.WriteLine("Exception in LegacyPmlCommnuicator.messageReceived->Session.ClosedA/B: " + ex.ToString()); } } } } else if (Cmd.Equals("RPL")) { UInt32 SID = default(UInt32); CSyncRequest SRequest = null; SID = Message.GetChild("SID").ToUInt32(); lock (pSyncRequests) { if (pSyncRequests.TryGetValue(SID, out SRequest)) { pSyncRequests.Remove(SID); } else { Console.WriteLine("UCIS.PML.Connection.Worker Invalid request ID in reply: " + SID.ToString()); } } if (SRequest != null) { SRequest.Reply = Message.GetChild("MSG"); SRequest.ResetEvent.Set(); } } else if (Cmd.Equals("REQ")) { UCIS.ThreadPool.RunCall(SyncRequestHandler, Message); //System.Threading.ThreadPool.QueueUserWorkItem(new System.Threading.WaitCallback(SyncRequestHandler), Message); } else if (Cmd.Equals("MSG")) { PmlElement InnerMsg = Message.GetChild("MSG"); if (MessageReceived != null) MessageReceived(InnerMsg); } else { throw new InvalidOperationException("Invalid operation"); } } } private void closed() { //_channel.MessageReceived -= messageReceived; //_channel.Closed -= closed; Console.WriteLine("UCIS.PML.Connection: Connection closed"); try { SessionBase[] sessions; lock (pSessions) { sessions = new SessionBase[pSessions.Count]; pSessions.Values.CopyTo(sessions, 0); } foreach (SessionBase S in sessions) { try { S.ClosedB(null); } catch (Exception ex) { Console.WriteLine(ex.ToString()); } } } catch (Exception ex) { Console.WriteLine(ex.ToString()); } lock (pSessions) pSessions.Clear(); try { CSyncRequest[] reqs; lock (pSyncRequests) { reqs = new CSyncRequest[pSyncRequests.Count]; pSyncRequests.Values.CopyTo(reqs, 0); } foreach (CSyncRequest T in reqs) { T.ResetEvent.Set(); } } catch (Exception ex) { Console.WriteLine(ex.ToString()); } lock (pSyncRequests) pSyncRequests.Clear(); if (Closed != null) Closed(this, new EventArgs()); } private void SyncRequestHandler(object state) { PmlDictionary Message = (PmlDictionary)state; PmlElement Reply = null; UInt32 SID = 0; try { SID = Message.GetChild("SID").ToUInt32(); PmlElement InnerMsg = Message.GetChild("MSG"); if (RequestReceived != null) { RequestReceived(InnerMsg, ref Reply); } } catch (Exception ex) { Reply = new PmlDictionary(); ((PmlDictionary)Reply).Add("EXCEPTION", new PmlString(ex.ToString())); Console.WriteLine(ex.ToString()); } try { WriteSyncMessage(SID, true, Reply); } catch (Exception ex) { Console.WriteLine("Exception: " + ex.ToString()); } } public PmlElement SyncRequest(PmlElement Request) { return SyncRequest(Request, 30000); } public PmlElement SyncRequest(PmlElement Request, int Timeout) { CSyncRequest SyncEvent = new CSyncRequest(); UInt32 SID = GetNextSessionId(false); lock (pSyncRequests) pSyncRequests.Add(SID, SyncEvent); try { WriteSyncMessage(SID, false, Request); if (!SyncEvent.ResetEvent.WaitOne(Timeout, false)) { lock (pSyncRequests) pSyncRequests.Remove(SID); throw new TimeoutException("The SyncRequest timed out (SID=" + SID.ToString() + ")"); } } finally { lock (pSyncRequests) pSyncRequests.Remove(SID); } return SyncEvent.Reply; } public void SendMessage(PmlElement Message) { PmlDictionary Msg = new PmlDictionary(); Msg.Add("CMD", new PmlString("MSG")); Msg.Add("MSG", Message); _WriteMessage(Msg); } public void SendRawMessage(PmlElement Message) { _WriteMessage(Message); } } }