Mercurial > hg > ucis.core
view Pml/PmlCommunicator.cs @ 0:3ab940a0c7a0
Initial commit
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Tue, 11 Sep 2012 16:28:53 +0200 |
parents | |
children |
line wrap: on
line source
???using System; using System.Threading; using System.Collections.Generic; namespace UCIS.Pml { /* class PmlCommunicator : IPmlCommunicator, IDisposable { private IPmlChannel _channel; private Dictionary<UInt32, IPmlSubChannel> _subchannels = new Dictionary<uint,IPmlSubChannel>(); private Random _random = new Random(); private enum CommandCode : int { CallWithoutReply = 0, CallWithReply = 1, Message = 2, ChannelRequest = 3, ChannelAcknowledge = 4, ChannelClose = 5, Error = 6 } private interface IPmlSubChannel { void CloseIn(); void ErrorIn(PmlElement message); void MessageIn(PmlElement message); } private class ChannelRequestWaitHandler : IAsyncResult { internal AsyncCallback Callback = null; internal Object CallbackState = null; internal ManualResetEvent Event = null; internal PmlSubChannel Channel = null; internal bool Completed = false; internal ChannelRequestWaitHandler(PmlSubChannel channel) { Channel = channel; } internal void Complete() { Completed = true; if (Event != null) Event.Set(); if (Callback != null) Callback.Invoke(this); } public object AsyncState { get { return CallbackState; } } public WaitHandle AsyncWaitHandle { get { return null; } } public bool CompletedSynchronously { get { return false; } } public bool IsCompleted { get { return Completed; } } } private class PmlSubChannel : ActivePmlChannel, IPmlSubChannel { private enum ChannelState { Requesting, Acknowledged, Closed } private PmlCommunicator _communicator; private UInt32 _id; private ChannelState _state; internal PmlSubChannel(PmlCommunicator communicator, UInt32 sid) { _communicator = communicator; _id = sid; _state = ChannelState.Requesting; } public override bool IsOpen { get { return _state == ChannelState.Acknowledged; } } internal void AcknowledgeIn() { if (_state != 0) throw new InvalidOperationException("The subchannel is not awaiting an acknowledgement"); _state = ChannelState.Acknowledged; } void IPmlSubChannel.CloseIn() { _state = ChannelState.Closed; _communicator._subchannels.Remove(_id); base.Close(); } void IPmlSubChannel.ErrorIn(PmlElement message) { (this as IPmlSubChannel).CloseIn(); } void IPmlSubChannel.MessageIn(PmlElement message) { base.PushReceivedMessage(message); } internal void AcknowledgeOut() { if (_state != 0) throw new InvalidOperationException("The subchannel is not awaiting an acknowledgement"); _state = ChannelState.Acknowledged; _communicator.sendMessage(CommandCode.ChannelAcknowledge, _id, null); } internal void RejectOut() { if (_state != 0) throw new InvalidOperationException("The subchannel is not awaiting an acknowledgement"); _state = ChannelState.Closed; _communicator.sendMessage(CommandCode.ChannelClose, _id, null); } public override void SendMessage(PmlElement message) { if (_state != ChannelState.Acknowledged) throw new InvalidOperationException("The subchannel is not open"); _communicator.sendMessage(CommandCode.Message, _id, message); } public override void Close() { if (_state != ChannelState.Acknowledged) return; _state = ChannelState.Closed; _communicator.sendMessage(CommandCode.ChannelClose, _id, null); _communicator._subchannels.Remove(_id); base.Close(); } } private class PmlChannelRequestReceivedEventArgsA : PmlChannelRequestReceivedEventArgs { private PmlCommunicator _communicator; private PmlElement _data; private PmlSubChannel _channel; private bool _accepted; private bool _rejected; internal PmlChannelRequestReceivedEventArgsA(PmlCommunicator communicator, UInt32 sid, PmlElement message) { _communicator = communicator; _channel = new PmlSubChannel(communicator, sid); _data = message; } public override IPmlChannel Accept() { if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected"); _accepted = true; _channel.AcknowledgeOut(); return _channel; } public override void Reject() { if (_accepted) throw new InvalidOperationException("The channel has already been accepted"); if (_rejected) return; _rejected = true; _channel.RejectOut(); } internal void RejectIfNotAccepted() { if (!_accepted) Reject(); } public override PmlElement Data { get { return _data; } } } private class PmlInvocation : IAsyncResult, IPmlSubChannel { internal PmlCommunicator Communicator = null; internal AsyncCallback Callback = null; internal Object CallbackState = null; internal bool Error = false; internal bool Completed = false; internal PmlElement Message = null; internal ManualResetEvent Event = null; internal UInt32 ID; internal PmlInvocation(PmlCommunicator communicator, UInt32 id) { Communicator = communicator; ID = id; } void IPmlSubChannel.CloseIn() { (this as IPmlSubChannel).ErrorIn(null); } void IPmlSubChannel.ErrorIn(PmlElement message) { Error = true; Communicator._subchannels.Remove(ID); (this as IPmlSubChannel).MessageIn(message); } void IPmlSubChannel.MessageIn(PmlElement message) { Message = message; Completed = true; if (Event != null) Event.Set(); if (Callback != null) Callback.Invoke(this); } public object AsyncState { get { return CallbackState; } } public WaitHandle AsyncWaitHandle { get { return null; } } public bool CompletedSynchronously { get { return false; } } public bool IsCompleted { get { return Completed; } } } public event EventHandler<PmlCallReceivedEventArgs> CallReceived; public event EventHandler<PmlChannelRequestReceivedEventArgs> ChannelRequestReceived; public PmlCommunicator(IPmlChannel channel) { _channel = channel; _channel.Closed += channelClosed; } public void Dispose() { _channel.Close(); _channel = null; IPmlSubChannel[] A = new IPmlSubChannel[_subchannels.Count]; _subchannels.Values.CopyTo(A, 0); foreach (IPmlSubChannel S in A) S.CloseIn(); _subchannels.Clear(); _subchannels = null; _random = null; } private void channelClosed(Object sender, EventArgs e) { Dispose(); } public IPmlChannel Channel { get { return _channel; } } public void Call(PmlElement message) { sendMessage(0, 0, message); //Call without reply } public PmlElement Invoke(PmlElement message) { return Invoke(message, 60000); } public PmlElement Invoke(PmlElement message, int timeout) { UInt32 sid = getSessionID(); PmlInvocation inv = new PmlInvocation(this, sid); inv.Event = new ManualResetEvent(false); _subchannels.Add(sid, inv); sendMessage(CommandCode.CallWithReply, sid, message); inv.Event.WaitOne(timeout); if (inv.Error) throw new Exception(message.ToString()); return inv.Message; } public IAsyncResult BeginInvoke(PmlElement message, AsyncCallback callback, Object state) { UInt32 sid = getSessionID(); PmlInvocation inv = new PmlInvocation(this, sid); inv.Callback = callback; inv.CallbackState = state; _subchannels.Add(sid, inv); sendMessage(CommandCode.CallWithReply, sid, message); return inv; } public PmlElement EndInvoke(IAsyncResult result) { PmlInvocation ar = (PmlInvocation)result; if (!ar.Completed) { (_subchannels as IList<IPmlSubChannel>).Remove(ar); throw new InvalidOperationException("The asynchronous operation has not completed"); } else if (ar.Error) { throw new Exception(ar.Message.ToString()); } else { return ar.Message; } } public IPmlChannel CreateChannel(PmlElement data) { UInt32 sid = getSessionID(); PmlSubChannel ch = new PmlSubChannel(this, sid); ChannelRequestWaitHandler wh = new ChannelRequestWaitHandler(ch); wh.Event = new ManualResetEvent(false); _subchannels.Add(sid, ch); sendMessage(CommandCode.ChannelRequest, sid, data); wh.Event.WaitOne(); if (!ch.IsOpen) return null; return ch; } public IAsyncResult BeginCreateChannel(PmlElement data, AsyncCallback callback, Object state) { UInt32 sid = getSessionID(); PmlSubChannel ch = new PmlSubChannel(this, sid); ChannelRequestWaitHandler wh = new ChannelRequestWaitHandler(ch); wh.Callback = callback; wh.CallbackState = state; _subchannels.Add(sid, ch); sendMessage(CommandCode.ChannelRequest, sid, data); if (!ch.IsOpen) return null; return wh; } public IPmlChannel EndCreateChannel(IAsyncResult result) { ChannelRequestWaitHandler ar = (ChannelRequestWaitHandler)result; if (!ar.Channel.IsOpen) return null; return ar.Channel; } private UInt32 getSessionID() { return (uint)_random.Next(); } private void sendMessage(CommandCode cmd, uint sid, PmlElement message) { PmlDictionary msg = new PmlDictionary(); msg.Add("c", (int)cmd); if (cmd > 0) msg.Add("s", sid); if (message != null) msg.Add("m", message); _channel.SendMessage(msg); } private void invokeCallReceived(Object state) { PmlCallReceivedEventArgs e = (PmlCallReceivedEventArgs)state; try { if (CallReceived != null) CallReceived(this, e); if (e.WantReply) sendMessage(CommandCode.Message, e.SID, e.Reply); } catch (Exception ex) { if (e.WantReply) sendMessage(CommandCode.Error, e.SID, new PmlString(ex.ToString())); } } private void invokeChannelRequestReceived(Object state) { PmlChannelRequestReceivedEventArgsA e = (PmlChannelRequestReceivedEventArgsA)state; if (ChannelRequestReceived != null) ChannelRequestReceived(this, e); e.RejectIfNotAccepted(); } private void messageReceived(Object sender, EventArgs e) { IPmlSubChannel subChannel = null; UInt32 sid = 0; bool subChannelExists = false; if (!(e.Message is PmlDictionary)) return; PmlDictionary msg = (PmlDictionary)e.Message; PmlElement cmdElement = msg.GetChild("c"); PmlElement sidElement = msg.GetChild("i"); PmlElement msgElement = msg.GetChild("m"); if (cmdElement == null) return; if (sidElement != null) sid = sidElement.ToUInt32(); if (sidElement != null) subChannelExists = _subchannels.TryGetValue(sid, out subChannel); if (!subChannelExists) subChannel = null; switch ((CommandCode)cmdElement.ToInt32()) { case CommandCode.CallWithoutReply: if (CallReceived != null) ThreadPool.RunCall(invokeCallReceived, new PmlCallReceivedEventArgs(msgElement, false, 0)); break; case CommandCode.CallWithReply: if (CallReceived != null) ThreadPool.RunCall(invokeCallReceived, new PmlCallReceivedEventArgs(msgElement, true, sid)); else sendMessage(CommandCode.Error, sid, null); break; case CommandCode.Message: //Reply to call | subchannel message if (subChannelExists) subChannel.MessageIn(msgElement); else sendMessage(CommandCode.Error, sid, null); break; case CommandCode.ChannelRequest: if (subChannelExists) { sendMessage(CommandCode.Error, sid, null); subChannel.CloseIn(); } else { if (ChannelRequestReceived == null) sendMessage(CommandCode.ChannelClose, sid, null); else ThreadPool.RunCall(invokeChannelRequestReceived, new PmlChannelRequestReceivedEventArgsA(this, sid, msgElement)); } break; case CommandCode.ChannelAcknowledge: if (subChannelExists) { if (subChannel is PmlSubChannel) (subChannel as PmlSubChannel).AcknowledgeIn(); else { sendMessage(CommandCode.Error, sid, null); //Error subChannel.CloseIn(); } } else sendMessage(CommandCode.Error, sid, null); //Error break; case CommandCode.ChannelClose: if (subChannelExists) subChannel.CloseIn(); break; case CommandCode.Error: if (subChannelExists) subChannel.ErrorIn(msgElement); break; } } }*/ }