Mercurial > hg > ucis.core
diff Pml/PmlCommunicator2.cs @ 0:3ab940a0c7a0
Initial commit
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Tue, 11 Sep 2012 16:28:53 +0200 |
parents | |
children | 8fe322656807 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Pml/PmlCommunicator2.cs Tue Sep 11 16:28:53 2012 +0200 @@ -0,0 +1,479 @@ +using System; +using System.Threading; +using System.Collections.Generic; +using UCIS.Pml; + +namespace UCIS.Pml { + public class PmlCommunicator { + private class CSyncRequest { + internal PmlElement Reply; + internal ManualResetEvent ResetEvent = new ManualResetEvent(false); + } + private interface ISession { + void MessageIn(PmlElement message); + void CloseIn(); + UInt32 ID { get; } + } + /*public abstract class SessionBase : ISession { + private bool pActive; + private PmlCommunicator _communicator; + private UInt32 _id; + + public uint SID { get { return _id; } } + public bool Active { get { return pActive; } } + public PmlCommunicator Communicator { get { return _communicator; } } + + protected SessionBase(PmlCommunicator Connection) { + _communicator = Connection; + } + + protected void Accept(UInt32 sid) { + if (pActive) throw new InvalidOperationException("Session is active"); + _id = sid; + lock (_communicator._sessions) _communicator._sessions.Add(_id, this); + pActive = true; + } + protected void Request() { + Request(null); + } + protected void Request(PmlElement Message) { + if (pActive) throw new InvalidOperationException("Session is active"); + _id = _communicator.GetNextSessionId(true); + lock (_communicator._sessions) _communicator._sessions.Add(_id, this); + _communicator.WriteSessionMessage(_id, 0, Message); + pActive = true; + } + + uint ISession.ID { get { return _id; } } + void ISession.MessageIn(PmlElement message) { this.MessageIn(message); } + void ISession.CloseIn() { + pActive = false; + _communicator.RemoveSession(this); + Closed(null); + } + + protected internal abstract void MessageIn(PmlElement Message); + + protected void SendMessage(PmlElement Message) { + if (!pActive) throw new InvalidOperationException("Session is not active"); + _communicator.WriteSessionMessage(_id, 1, Message); + } + + public void Close() { + if (!pActive) return; + pActive = false; + _communicator.WriteSessionMessage(_id, 2, null); + _communicator.RemoveSession(this); + } + + protected virtual void Closed(PmlElement Message) { } + } + public class Session : SessionBase { + public event MessageReceivedEventHandler MessageReceived; + public delegate void MessageReceivedEventHandler(PmlElement Message); + public event SessionClosedEventHandler SessionClosed; + public delegate void SessionClosedEventHandler(PmlElement Message); + + public Session(PmlCommunicator Connection) : base(Connection) { } + + public new void Accept(UInt32 SID) { + base.Accept(SID); + } + public new void Request() { + Request(null); + } + public new void Request(PmlElement Message) { + base.Request(Message); + } + + protected internal override void MessageIn(PmlElement Message) { + if (MessageReceived != null) MessageReceived(Message); + } + + public new void SendMessage(PmlElement Message) { + base.SendMessage(Message); + } + + protected override void Closed(PmlElement Message) { + if (SessionClosed != null) SessionClosed(Message); + } + }*/ + + private class PmlSubChannel : ActivePmlChannel, ISession { + private enum ChannelState { Requesting, Acknowledged, Closed } + + private PmlCommunicator _communicator; + private UInt32 _id; + private ChannelState _state; + + internal PmlSubChannel(PmlCommunicator communicator, UInt32 sid, bool accepted) { + _communicator = communicator; + _id = sid; + _state = accepted ? ChannelState.Acknowledged : ChannelState.Requesting; + if (accepted) _communicator.AddSession(this); + } + + public override bool IsOpen { get { return _state == ChannelState.Acknowledged; } } + + uint ISession.ID { get { return _id; } } + void ISession.CloseIn() { + _state = ChannelState.Closed; + _communicator.RemoveSession(this); + base.Close(); + } + void ISession.MessageIn(PmlElement message) { + base.PushReceivedMessage(message); + } + + public override void WriteMessage(PmlElement message) { + if (_state != ChannelState.Acknowledged) throw new InvalidOperationException("The subchannel is not open"); + _communicator.WriteSessionMessage(_id, 1, message); + } + public override void Close() { + if (_state != ChannelState.Acknowledged) return; + _state = ChannelState.Closed; + _communicator.WriteSessionMessage(_id, 2, null); + _communicator.RemoveSession(this); + base.Close(); + } + } + private class PmlChannelRequestReceivedEventArgsA : PmlChannelRequestReceivedEventArgs { + private PmlCommunicator _communicator; + private PmlElement _data; + private bool _accepted, _rejected; + private UInt32 _sid; + internal PmlChannelRequestReceivedEventArgsA(PmlCommunicator communicator, UInt32 sid, PmlElement message) { + _communicator = communicator; + _data = message; + _sid = sid; + _accepted = _rejected = false; + } + public UInt32 AcceptSession() { + if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected"); + _accepted = true; + return _sid; + } + public override IPmlChannel Accept() { + if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected"); + _accepted = true; + return new PmlSubChannel(_communicator, _sid, true); + } + public override void Reject() { + if (_accepted) throw new InvalidOperationException("The channel has already been accepted"); + if (_rejected) return; + _rejected = true; + _communicator.WriteSessionMessage(_sid, 2, null); + //_channel.RejectOut(); + } + internal void RejectIfNotAccepted() { + if (!_accepted) Reject(); + } + public override PmlElement Data { + get { + return _data; + } + } + } + + public event EventHandler<PmlCallReceivedEventArgs> CallReceived; + public event EventHandler<PmlChannelRequestReceivedEventArgs> ChannelRequestReceived; + public event EventHandler Closed; + + private Dictionary<UInt32, ISession> _sessions = new Dictionary<UInt32, ISession>(); + private Dictionary<UInt32, CSyncRequest> _invocations = new Dictionary<UInt32, CSyncRequest>(); + private UInt32 pNextSession; + private UInt32 pNextSyncRequest; + + private bool _closed; + private IPmlChannel _channel; + + public IPmlChannel Channel { get { return _channel; } } + + public PmlCommunicator(IPmlChannel channel, bool autoStart) { + _channel = channel; + if (autoStart) Start(); + } + public void Start() { + _channel.BeginReadMessage(messageReceived, null); + } + public void StartSync() { + while (true) { + try { + processMessage(_channel.ReadMessage()); + } catch (InvalidOperationException ex) { + Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message); + closed(); + _channel.Close(); + return; + } catch (Exception ex) { + Console.WriteLine(ex.ToString()); + closed(); + _channel.Close(); + return; + } + } + } + public void Close() { + _channel.Close(); + } + public void WriteRawMessage(PmlElement Message) { + _WriteMessage(Message); + } + + private void _WriteMessage(PmlElement Message) { + lock (_channel) { + if (!_channel.IsOpen) throw new InvalidOperationException("Could not write message: the channel is not open"); + _channel.WriteMessage(Message); + } + } + private void closed() { + _closed = true; + lock (_sessions) { + foreach (ISession S in _sessions.Values) { + try { + S.CloseIn(); + } catch (Exception ex) { + Console.WriteLine(ex.ToString()); + } + } + _sessions.Clear(); + } + lock (_invocations) { + foreach (CSyncRequest T in _invocations.Values) { + T.ResetEvent.Set(); + } + _invocations.Clear(); + } + if (Closed != null) Closed(this, new EventArgs()); + } + + private void messageReceived(IAsyncResult ar) { + try { + PmlElement Message = _channel.EndReadMessage(ar); + processMessage(Message); + _channel.BeginReadMessage(messageReceived, null); + } catch (InvalidOperationException ex) { + Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message); + closed(); + _channel.Close(); + return; + } catch (Exception ex) { + Console.WriteLine(ex.ToString()); + closed(); + _channel.Close(); + return; + } + } + private void processMessage(PmlElement Message) { + if (Message is PmlString) { + string Cmd = Message.ToString(); + if (Cmd.Equals("PING")) { + _WriteMessage(new PmlString("PONG")); + /*} else if (Cmd.Equals("PONG")) { + Ping = 0;*/ + } + } else if (Message is PmlDictionary) { + string Cmd = Message.GetChild("CMD").ToString(); + if (Cmd.Equals("SES")) { + processSessionMessage(Message); + } else if (Cmd.Equals("RPL")) { + UInt32 SID = Message.GetChild("SID").ToUInt32(); + CSyncRequest SRequest = null; + lock (_invocations) { + if (_invocations.TryGetValue(SID, out SRequest)) { + _invocations.Remove(SID); + } else { + Console.WriteLine("UCIS.PML.Connection.Worker Invalid request ID in reply: " + SID.ToString()); + } + } + if (SRequest != null) { + SRequest.Reply = Message.GetChild("MSG"); + SRequest.ResetEvent.Set(); + } + } else if (Cmd.Equals("REQ") || Cmd.Equals("MSG")) { + UCIS.ThreadPool.RunCall(processCall, Message); + } else { + Console.WriteLine("UCIS.PML.Connection.Worker Invalid command received"); + } + } + } + private void processSessionMessage(PmlElement Message) { + UInt32 SID = Message.GetChild("SID").ToUInt32(); + byte SCMD = Message.GetChild("SCMD").ToByte(); + PmlElement InnerMsg = Message.GetChild("MSG"); + ISession Session = null; + lock (_sessions) if (!_sessions.TryGetValue(SID, out Session)) Session = null; + switch (SCMD) { + case 0: //Request + if (Session != null) { + try { + Session.CloseIn(); + } catch (Exception ex) { + Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage-Request: exception in session.CloseIn: " + ex.ToString()); + } + WriteSessionMessage(SID, 2, null); + } else if (ChannelRequestReceived != null) { + try { + PmlChannelRequestReceivedEventArgsA ea = new PmlChannelRequestReceivedEventArgsA(this, SID, InnerMsg); + ChannelRequestReceived(this, ea); + ea.RejectIfNotAccepted(); + } catch (Exception ex) { + Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in ChannelRequestReceived: " + ex.ToString()); + WriteSessionMessage(SID, 2, null); + } + } else { + WriteSessionMessage(SID, 2, null); + } + break; + case 1: //Message + if (Session != null) { + try { + Session.MessageIn(InnerMsg); + } catch (Exception ex) { + Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in session.MessageIn: " + ex.ToString()); + WriteSessionMessage(SID, 2, null); + } + } else { + WriteSessionMessage(SID, 2, null); + } + break; + case 2: //Close + if (Session != null) { + try { + if (InnerMsg != null && !(InnerMsg is PmlNull)) Session.MessageIn(InnerMsg); + } catch (Exception ex) { + Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage-Close: exception in session.MessageIn: " + ex.ToString()); + } finally { + try { + Session.CloseIn(); + } catch (Exception ex) { + Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in session.CloseIn: " + ex.ToString()); + } + } + } + break; + } + } + private void processCall(object state) { + PmlDictionary Message = (PmlDictionary)state; + bool wantReply = Message.ContainsKey("SID"); + UInt32 SID = 0; + if (wantReply) SID = Message.GetChild("SID").ToUInt32(); + PmlElement Reply = null; + try { + if (CallReceived != null) { + PmlCallReceivedEventArgs ea = new PmlCallReceivedEventArgs(Message.GetChild("MSG"), wantReply, SID); + CallReceived(this, ea); + Reply = ea.Reply; + } + } catch (Exception ex) { + Reply = new PmlDictionary(); + ((PmlDictionary)Reply).Add("EXCEPTION", new PmlString(ex.ToString())); + Console.WriteLine(ex.ToString()); + } finally { + if (wantReply && Channel.IsOpen) { + try { + WriteSyncMessage(SID, true, Reply); + } catch (Exception ex) { + Console.WriteLine("UCIS.Pml.PmlCommunicator.processCall: exception: " + ex.ToString()); + closed(); + Channel.Close(); + } + } + } + } + + public void Call(PmlElement message) { + PmlDictionary Msg = new PmlDictionary(); + Msg.Add("CMD", new PmlString("MSG")); + Msg.Add("MSG", message); + _WriteMessage(Msg); + } + public PmlElement Invoke(PmlElement message) { + return Invoke(message, 60000); + } + public PmlElement Invoke(PmlElement message, int timeout) { + if (_closed) throw new InvalidOperationException("Sorry, we're closed."); + CSyncRequest SyncEvent = new CSyncRequest(); + UInt32 SID = GetNextSessionId(false); + lock (_invocations) _invocations.Add(SID, SyncEvent); + try { + WriteSyncMessage(SID, false, message); + if (!SyncEvent.ResetEvent.WaitOne(timeout, false)) { + if (!_closed) lock (_invocations) _invocations.Remove(SID); + throw new TimeoutException("The SyncRequest timed out (SID=" + SID.ToString() + ")"); + } + } finally { + lock (_invocations) _invocations.Remove(SID); + } + return SyncEvent.Reply; + } + + public IPmlChannel CreateChannel(PmlElement data) { + UInt32 sid = GetNextSessionId(true); + PmlSubChannel ch = new PmlSubChannel(this, sid, true); + WriteSessionMessage(sid, 0, data); + if (!ch.IsOpen) return null; + return ch; + } + + private void AddSession(ISession session) { + if (_closed) return; + lock (_sessions) _sessions.Add(session.ID, session); + } + private void RemoveSession(UInt32 session) { + if (_closed) return; + lock (_sessions) _sessions.Remove(session); + } + private void RemoveSession(ISession session) { + RemoveSession(session.ID); + } + + private UInt32 GetNextSessionId(bool IsSession) { + if (IsSession) { + lock (_sessions) { + do { + unchecked { pNextSession++; } + } while (_sessions.ContainsKey(pNextSession)); + return pNextSession; + } + } else { + lock (_invocations) { + do { + unchecked { pNextSyncRequest++; } + } while (_invocations.ContainsKey(pNextSyncRequest)); + return pNextSyncRequest; + } + } + } + + protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) { + PmlDictionary Msg2 = new PmlDictionary(); + Msg2.Add("CMD", new PmlString(RPL ? "RPL" : "REQ")); + Msg2.Add("SID", new PmlInteger(SID)); + Msg2.Add("MSG", MSG); + _WriteMessage(Msg2); + } + protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) { + PmlDictionary Msg2 = new PmlDictionary(); + Msg2.Add("CMD", new PmlString("SES")); + Msg2.Add("SID", new PmlInteger(SID)); + Msg2.Add("SCMD", new PmlInteger(CMD)); + if (MSG != null) Msg2.Add("MSG", MSG); + _WriteMessage(Msg2); + } + + + + /* LegacyPmlCommunicator compatibility */ + public PmlElement SyncRequest(PmlElement Request) { + return Invoke(Request); + } + public PmlElement SyncRequest(PmlElement Request, int Timeout) { + return Invoke(Request, Timeout); + } + public void SendMessage(PmlElement Message) { + Call(Message); + } + } +}