Mercurial > hg > ucis.core
diff Pml/PmlConnection.cs @ 0:3ab940a0c7a0
Initial commit
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Tue, 11 Sep 2012 16:28:53 +0200 |
parents | |
children | 57b4c2f895d1 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Pml/PmlConnection.cs Tue Sep 11 16:28:53 2012 +0200 @@ -0,0 +1,384 @@ +using System; +using System.Collections.Generic; +using System.Text; +using UCIS.Net; +using System.Net.Sockets; +using System.Threading; +using System.IO; + +namespace UCIS.Pml { + /*public class PmlConnection : LegacyPmlCommunicator { + public PmlConnection(Socket Socket) : this(new TCPPmlChannel(Socket)) { } + public PmlConnection(TCPStream Stream) : this(new TCPPmlChannel(Stream)) { } + public PmlConnection(Stream Stream) : this(new PmlBinaryRW(Stream)) {} + public PmlConnection(IPmlRW RW) : this(new PmlChannel(RW)) { } + public PmlConnection(IPmlChannel CH) : base(CH) { } + }*/ + public class PmlConnection { + private class CSyncRequest { + internal PmlElement Reply; + internal ManualResetEvent ResetEvent = new ManualResetEvent(false); + } + public abstract class SessionBase { + private bool pActive; + private PmlConnection pConnection; + private UInt32 pSID; + + protected SessionBase(PmlConnection Connection) { + pConnection = Connection; + } + + protected void Accept(UInt32 SID) { + if (pActive) throw new InvalidOperationException("Session is active"); + pSID = SID; + lock (pConnection.pSessions) pConnection.pSessions.Add(pSID, this); + pActive = true; + } + protected void Request() { + Request(null); + } + protected void Request(PmlElement Message) { + if (pActive) throw new InvalidOperationException("Session is active"); + pSID = pConnection.GetNextSessionId(true); + lock (pConnection.pSessions) pConnection.pSessions.Add(pSID, this); + pConnection.WriteSessionMessage(pSID, 0, Message); + pActive = true; + } + + protected internal abstract void MessageIn(PmlElement Message); + + protected void SendMessage(PmlElement Message) { + if (!pActive) throw new InvalidOperationException("Session is not active"); + pConnection.WriteSessionMessage(pSID, 1, Message); + } + + public void Close() { + Close(null); + } + public void Close(PmlElement Message) { + if (!pActive) throw new InvalidOperationException("Session is not active"); + pConnection.WriteSessionMessage(pSID, 2, Message); + ClosedA(); + } + + internal void ClosedA() { + pActive = false; + lock (pConnection.pSessions) pConnection.pSessions.Remove(pSID); + } + + internal void ClosedB(PmlElement Message) { + pActive = false; + Closed(Message); + } + + protected virtual void Closed(PmlElement Message) { + } + } + public class Session : SessionBase { + public event MessageReceivedEventHandler MessageReceived; + public delegate void MessageReceivedEventHandler(PmlElement Message); + public event SessionClosedEventHandler SessionClosed; + public delegate void SessionClosedEventHandler(PmlElement Message); + + public Session(PmlConnection Connection) : base(Connection) { } + + public new void Accept(UInt32 SID) { + base.Accept(SID); + } + public new void Request() { + Request(null); + } + public new void Request(PmlElement Message) { + base.Request(Message); + } + + protected internal override void MessageIn(PmlElement Message) { + if (MessageReceived != null) MessageReceived(Message); + } + + public new void SendMessage(PmlElement Message) { + base.SendMessage(Message); + } + + protected override void Closed(PmlElement Message) { + if (SessionClosed != null) SessionClosed(Message); + } + } + + private Dictionary<UInt32, SessionBase> pSessions = new Dictionary<UInt32, SessionBase>(); + private UInt32 pNextSession; + private Dictionary<UInt32, CSyncRequest> pSyncRequests = new Dictionary<UInt32, CSyncRequest>(); + private UInt32 pNextSyncRequest; + + private Stream pStream; + + public event MessageReceivedEventHandler MessageReceived; + public delegate void MessageReceivedEventHandler(PmlElement Message); + public event RequestReceivedEventHandler RequestReceived; + public delegate void RequestReceivedEventHandler(PmlElement Request, ref PmlElement Reply); + public event SessionRequestReceivedEventHandler SessionRequestReceived; + public delegate void SessionRequestReceivedEventHandler(PmlElement Request, uint SID); + + private IPmlWriter _writer; + private IPmlReader _reader; + + public PmlConnection(Socket Socket) : this(new TCPStream(Socket)) { } + public PmlConnection(Stream Stream) : this(new PmlBinaryRW(Stream)) { + pStream = Stream; + } + public PmlConnection(IPmlRW RMRW) : this(RMRW, RMRW) { } + public PmlConnection(IPmlReader Reader, IPmlWriter Writer) { + _reader = Reader; + _writer = Writer; + } + + public void Close() { + if (pStream != null) pStream.Close(); + } + + public IPmlReader Reader { + get { return _reader; } + } + public IPmlWriter Writer { + get { return _writer; } + } + private PmlElement _ReadMessage() { + PmlElement Message = _reader.ReadMessage(); + return Message; //Warning: Can't lock reader because it can be the same as the Writer (possible deadlock) + } + private void _WriteMessage(PmlElement Message) { + lock (_writer) _writer.WriteMessage(Message); + } + + private UInt32 GetNextSessionId(bool IsSession) { + if (IsSession) { + lock (pSessions) { + do { + if (pNextSession == UInt32.MaxValue) { + pNextSession = 0; + } else { + pNextSession += (uint)1; + } + } + while (pSessions.ContainsKey(pNextSession)); + return pNextSession; + } + } else { + lock (pSyncRequests) { + do { + if (pNextSyncRequest == UInt32.MaxValue) { + pNextSyncRequest = 0; + } else { + pNextSyncRequest += (uint)1; + } + } + while (pSyncRequests.ContainsKey(pNextSyncRequest)); + return pNextSyncRequest; + } + } + } + + protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) { + PmlDictionary Msg2 = new PmlDictionary(); + Msg2.Add("CMD", new PmlString("SES")); + Msg2.Add("SID", new PmlInteger(SID)); + Msg2.Add("SCMD", new PmlInteger(CMD)); + Msg2.Add("MSG", MSG); + _WriteMessage(Msg2); + } + + protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) { + PmlDictionary Msg2 = new PmlDictionary(); + if (RPL) { + Msg2.Add("CMD", new PmlString("RPL")); + } else { + Msg2.Add("CMD", new PmlString("REQ")); + } + Msg2.Add("SID", new PmlInteger(SID)); + Msg2.Add("MSG", MSG); + _WriteMessage(Msg2); + } + + public void Worker() { + try { + PmlElement Message = null; + int Ping = 0; + while (true) { + try { + Message = _ReadMessage(); + if (Message == null) Console.WriteLine("UCIS.PML.Connection: Message is just null?"); + } catch (EndOfStreamException) { + Console.WriteLine("UCIS.PML.Connection: End of stream"); + return; + } catch (SocketException ex) { + if (ex.ErrorCode == (int)SocketError.TimedOut) { + Console.WriteLine("UCIS.PML.Connection: SocketException/TimedOut"); + Message = null; + } else if (ex.ErrorCode == (int)SocketError.ConnectionReset) { + Console.WriteLine("UCIS.PML.Connection: Connection reset by peer"); + return; + } else { + throw new Exception("Exception while reading message", ex); + } + } catch (IOException ex) { + Console.WriteLine("UCIS.PML.Connection: IOException: " + ex.Message); + Message = null; + } catch (TimeoutException) { + Message = null; + } + if (Message == null) { + if (Ping > 2) { + Console.WriteLine("UCIS.PML.Connection: Connection timed out"); + break; + } else { + _WriteMessage(new PmlString("PING")); + } + Ping += 1; + } else if (Message is PmlString) { + string Cmd = Message.ToString(); + if (Cmd.Equals("PING")) { + _WriteMessage(new PmlString("PONG")); + } else if (Cmd.Equals("PONG")) { + Ping = 0; + } + } else if (Message is PmlDictionary) { + string Cmd = null; + Cmd = Message.GetChild("CMD").ToString(); + if (Cmd.Equals("SES")) { + UInt32 SID = default(UInt32); + byte SCMD = 0; + SessionBase Session = default(SessionBase); + PmlElement InnerMsg = default(PmlElement); + SID = Message.GetChild("SID").ToUInt32(); + SCMD = Message.GetChild("SCMD").ToByte(); + InnerMsg = Message.GetChild("MSG"); + lock (pSessions) { + if (pSessions.ContainsKey(SID)) { + Session = pSessions[SID]; + } else { + Session = null; + } + } + if (SCMD == 0) { + if (Session == null) { + if (SessionRequestReceived != null) { + SessionRequestReceived(InnerMsg, SID); + } + } else { + Session.ClosedA(); + Session.ClosedB(null); + WriteSessionMessage(SID, 2, null); + } + } else if (SCMD == 1) { + if (Session == null) { + WriteSessionMessage(SID, 2, null); + } else { + Session.MessageIn(InnerMsg); + } + } else if (SCMD == 2) { + if (Session != null) { + Session.ClosedA(); + Session.ClosedB(InnerMsg); + } + } + } else if (Cmd.Equals("RPL")) { + UInt32 SID = default(UInt32); + CSyncRequest SRequest = null; + SID = Message.GetChild("SID").ToUInt32(); + lock (pSyncRequests) { + if (pSyncRequests.TryGetValue(SID, out SRequest)) { + pSyncRequests.Remove(SID); + } else { + Console.WriteLine("UCIS.PML.Connection.Worker Invalid request ID in reply: " + SID.ToString()); + } + } + if (SRequest != null) { + SRequest.Reply = Message.GetChild("MSG"); + SRequest.ResetEvent.Set(); + } + } else if (Cmd.Equals("REQ")) { + System.Threading.ThreadPool.QueueUserWorkItem(new System.Threading.WaitCallback(SyncRequestHandler), Message); + } else if (Cmd.Equals("MSG")) { + PmlElement InnerMsg = Message.GetChild("MSG"); + if (MessageReceived != null) MessageReceived(InnerMsg); + } else { + throw new InvalidOperationException("Invalid operation"); + } + } + } + } catch (System.Threading.ThreadAbortException ex) { + throw ex; + } catch (Exception ex) { + Console.WriteLine(ex.ToString()); + } finally { + Console.WriteLine("UCIS.PML.Connection: Connection closed"); + try { + foreach (SessionBase S in pSessions.Values) { + try { + S.ClosedB(null); + } catch (Exception ex) { + Console.WriteLine(ex.ToString()); + } + } + pSessions.Clear(); + foreach (CSyncRequest T in pSyncRequests.Values) { + T.ResetEvent.Set(); + } + } catch (Exception ex) { + Console.WriteLine(ex.ToString()); + } + } + } + + private void SyncRequestHandler(object state) { + PmlDictionary Message = (PmlDictionary)state; + UInt32 SID = default(UInt32); + PmlElement InnerMsg = default(PmlElement); + PmlElement Reply = default(PmlElement); + Reply = null; + SID = Message.GetChild("SID").ToUInt32(); + InnerMsg = Message.GetChild("MSG"); + try { + if (RequestReceived != null) { + RequestReceived(InnerMsg, ref Reply); + } + } catch (Exception ex) { + Reply = new PmlDictionary(); + ((PmlDictionary)Reply).Add("EXCEPTION", new PmlString(ex.ToString())); + Console.WriteLine(ex.ToString()); + } + WriteSyncMessage(SID, true, Reply); + } + + public PmlElement SyncRequest(PmlElement Request) { + return SyncRequest(Request, 30000); + } + public PmlElement SyncRequest(PmlElement Request, int Timeout) { + UInt32 SID = default(UInt32); + CSyncRequest SyncEvent = new CSyncRequest(); + SID = GetNextSessionId(false); + lock (pSyncRequests) pSyncRequests.Add(SID, SyncEvent); + WriteSyncMessage(SID, false, Request); + if (!SyncEvent.ResetEvent.WaitOne(Timeout, false)) { + Console.WriteLine("UCIS.PML.Connection.SyncRequest Timeout: " + SID.ToString()); + lock (pSyncRequests) pSyncRequests.Remove(SID); + throw new TimeoutException(); + } + return SyncEvent.Reply; + } + + public void SendMessage(PmlElement Message) { + PmlDictionary Msg = new PmlDictionary(); + Msg.Add("CMD", new PmlString("MSG")); + Msg.Add("MSG", Message); + _WriteMessage(Msg); + } + + public PmlElement ReadMessage() { + return _ReadMessage(); + } + public void SendRawMessage(PmlElement Message) { + _WriteMessage(Message); + } + } +}