Mercurial > hg > ucis.core
changeset 104:327be9216006
Improved PML code
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Sat, 11 Oct 2014 14:05:41 +0200 |
parents | 8fe322656807 |
children | 4ba4fd48e1da |
files | Pml/Channels/ActivePmlChannel.cs Pml/Channels/PassivePmlChannel.cs Pml/PmlCommunicator2.cs Util/AsyncResultBase.cs |
diffstat | 4 files changed, 91 insertions(+), 121 deletions(-) [+] |
line wrap: on
line diff
--- a/Pml/Channels/ActivePmlChannel.cs Sat Oct 11 14:03:31 2014 +0200 +++ b/Pml/Channels/ActivePmlChannel.cs Sat Oct 11 14:05:41 2014 +0200 @@ -1,11 +1,11 @@ using System; using UCIS.Pml; +using UCIS.Util; using System.Collections.Generic; using System.Threading; namespace UCIS.Pml { public abstract class ActivePmlChannel : IPmlChannel { - private AutoResetEvent _receiveEvent = new AutoResetEvent(false); private ReadMessageAsyncResult _asyncWait = null; private Queue<PmlElement> _queue = new Queue<PmlElement>(); private bool _isOpen = true; @@ -14,41 +14,49 @@ public abstract void WriteMessage(PmlElement message); public PmlElement ReadMessage() { - if (!IsOpen) throw new InvalidOperationException("The channel is not open"); - if (_queue.Count == 0) { - _receiveEvent.WaitOne(); - if (_queue.Count == 0) throw new OperationCanceledException("The operation did not complete"); - } else if (_queue.Count == 1) { - _receiveEvent.Reset(); + lock (_queue) { + if (!IsOpen) throw new InvalidOperationException("The channel is not open"); + while (_queue.Count == 0) { + if (!IsOpen) throw new OperationCanceledException("The operation did not complete"); + Monitor.Wait(_queue); + } + return _queue.Dequeue(); } - return _queue.Dequeue(); } public IAsyncResult BeginReadMessage(AsyncCallback callback, object state) { - ReadMessageAsyncResult ar = new ReadMessageAsyncResult(state, callback); - if (!IsOpen) throw new InvalidOperationException("The channel is not open"); - if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress"); - if (_queue.Count == 0) { - _asyncWait = ar; - } else { - if (_queue.Count == 1) _receiveEvent.Reset(); - ar.Complete(true, _queue.Dequeue(), true); + ReadMessageAsyncResult ar; + Boolean completed = false; + lock (_queue) { + if (!IsOpen) throw new InvalidOperationException("The channel is not open"); + if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress"); + ar = new ReadMessageAsyncResult(callback, state); + if (_queue.Count == 0) { + _asyncWait = ar; + } else { + ar.Message = _queue.Dequeue(); + completed = true; + } } + if (completed) ar.SetCompleted(true, null); return ar; } public PmlElement EndReadMessage(IAsyncResult asyncResult) { ReadMessageAsyncResult ar = (ReadMessageAsyncResult)asyncResult; - if (ar.Error) new OperationCanceledException("The operation did not complete"); + ar.WaitForCompletion(); + if (ar.Error != null) throw new OperationCanceledException("The asynchronous operation failed", ar.Error); return ar.Message; } public virtual void Close() { - _isOpen = false; - ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); - if (asyncWait != null) asyncWait.Complete(false, null, false); - _receiveEvent.Set(); - _receiveEvent.Close(); + ReadMessageAsyncResult asyncWait; + lock (_queue) { + _isOpen = false; + asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); + Monitor.PulseAll(_queue); + } + if (asyncWait != null) asyncWait.SetCompleted(false, new ObjectDisposedException("ActivePmlChannel")); } public void Dispose() { @@ -56,51 +64,25 @@ } protected void PushReceivedMessage(PmlElement message) { - ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); + ReadMessageAsyncResult asyncWait; + lock (_queue) { + asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); + if (asyncWait == null) { + _queue.Enqueue(message); + Monitor.Pulse(_queue); + } + } if (asyncWait != null) { - asyncWait.Complete(true, message, false); - } else { - _queue.Enqueue(message); - _receiveEvent.Set(); + asyncWait.Message = message; + asyncWait.SetCompleted(false, null); } } - private class ReadMessageAsyncResult : IAsyncResult { - private object state; - private AsyncCallback callback; - private bool completed; - private bool synchronously; - private ManualResetEvent waitHandle; - + class ReadMessageAsyncResult : AsyncResultBase { internal PmlElement Message; - internal bool Error; - - public bool CompletedSynchronously { get { return synchronously; } } - public object AsyncState { get { return state; } } - public WaitHandle AsyncWaitHandle { - get { - if (waitHandle == null) waitHandle = new ManualResetEvent(completed); - return waitHandle; - } - } - public bool IsCompleted { get { return completed; } } - - internal ReadMessageAsyncResult(object state, AsyncCallback callback) { - this.state = state; - this.callback = callback; - this.completed = false; - this.synchronously = false; - this.Message = null; - this.Error = false; - this.waitHandle = null; - } - internal void Complete(bool success, PmlElement message, bool synchronously) { - this.Message = message; - this.Error = !success; - this.synchronously = synchronously; - this.completed = true; - if (waitHandle != null) waitHandle.Set(); - if (this.callback != null) this.callback.Invoke(this); + public ReadMessageAsyncResult(AsyncCallback callback, Object state) : base(callback, state) { } + public void SetCompleted(Boolean synchronously, Exception error) { + base.SetCompleted(synchronously, error); } } }
--- a/Pml/Channels/PassivePmlChannel.cs Sat Oct 11 14:03:31 2014 +0200 +++ b/Pml/Channels/PassivePmlChannel.cs Sat Oct 11 14:05:41 2014 +0200 @@ -1,5 +1,6 @@ using System; using UCIS.Pml; +using UCIS.Util; using System.Collections.Generic; using System.Threading; @@ -20,40 +21,33 @@ public abstract PmlElement ReadMessage(); public IAsyncResult BeginReadMessage(AsyncCallback callback, object state) { - ReadMessageAsyncResult ar = new ReadMessageAsyncResult(); - ar.Callback = callback; - ar.State = state; + ReadMessageAsyncResult ar = new ReadMessageAsyncResult(callback, state); UCIS.ThreadPool.RunCall(AsyncReadMessage, ar); return ar; } public PmlElement EndReadMessage(IAsyncResult asyncResult) { ReadMessageAsyncResult ar = (ReadMessageAsyncResult)asyncResult; - if (ar.Error != null) throw new Exception("The asynchronous operation could not be completed", ar.Error); + ar.WaitForCompletion(); + if (ar.Error != null) throw new Exception("The asynchronous operation failed", ar.Error); return ar.Message; } - private struct ReadMessageAsyncResult : IAsyncResult { - internal object State; + class ReadMessageAsyncResult : AsyncResultBase { internal PmlElement Message; - internal AsyncCallback Callback; - internal Exception Error; - internal bool Completed; - - public bool CompletedSynchronously { get { return false; } } - public object AsyncState { get { return State; } } - public WaitHandle AsyncWaitHandle { get { return null; } } - public bool IsCompleted { get { return Completed; } } + public ReadMessageAsyncResult(AsyncCallback callback, Object state) : base(callback, state) { } + public void SetCompleted(Boolean synchronously, Exception error, PmlElement message) { + this.Message = message; + base.SetCompleted(synchronously, error); + } } private void AsyncReadMessage(object state) { ReadMessageAsyncResult ar = (ReadMessageAsyncResult)state; try { - ar.Message = ReadMessage(); - ar.Error = null; + PmlElement message = ReadMessage(); + ar.SetCompleted(false, null, message); } catch (Exception ex) { - ar.Error = ex; + ar.SetCompleted(false, ex, null); } - ar.Completed = true; - ar.Callback.Invoke(ar); } } }
--- a/Pml/PmlCommunicator2.cs Sat Oct 11 14:03:31 2014 +0200 +++ b/Pml/PmlCommunicator2.cs Sat Oct 11 14:05:41 2014 +0200 @@ -7,7 +7,6 @@ public class PmlCommunicator { private class CSyncRequest { internal PmlElement Reply; - internal ManualResetEvent ResetEvent = new ManualResetEvent(false); } private interface ISession { void MessageIn(PmlElement message); @@ -144,9 +143,7 @@ _sessions.Clear(); } lock (_invocations) { - foreach (CSyncRequest T in _invocations.Values) { - T.ResetEvent.Set(); - } + foreach (CSyncRequest T in _invocations.Values) lock (T) Monitor.Pulse(T); _invocations.Clear(); } if (Closed != null) Closed(this, new EventArgs()); @@ -192,7 +189,7 @@ } if (SRequest != null) { SRequest.Reply = Message.GetChild("MSG"); - SRequest.ResetEvent.Set(); + lock (SRequest) Monitor.Pulse(SRequest); } } else if (Cmd.Equals("REQ") || Cmd.Equals("MSG")) { UCIS.ThreadPool.RunCall(processCall, Message); @@ -288,9 +285,7 @@ } public void Call(PmlElement message) { - PmlDictionary Msg = new PmlDictionary(); - Msg.Add("CMD", new PmlString("MSG")); - Msg.Add("MSG", message); + PmlDictionary Msg = new PmlDictionary() { { "CMD", "MSG" }, { "MSG", message } }; _WriteMessage(Msg); } public PmlElement Invoke(PmlElement message) { @@ -299,14 +294,16 @@ public PmlElement Invoke(PmlElement message, int timeout) { if (_closed) throw new InvalidOperationException("Sorry, we're closed."); CSyncRequest SyncEvent = new CSyncRequest(); - UInt32 SID = GetNextSessionId(false); - lock (_invocations) _invocations.Add(SID, SyncEvent); + UInt32 SID; + lock (_invocations) { + SID = GetNextSessionId(ref pNextSyncRequest, _invocations); + _invocations.Add(SID, SyncEvent); + } try { WriteSyncMessage(SID, false, message); - if (!SyncEvent.ResetEvent.WaitOne(timeout, false)) { - if (!_closed) lock (_invocations) _invocations.Remove(SID); - throw new TimeoutException("The SyncRequest timed out (SID=" + SID.ToString() + ")"); - } + Boolean success; + lock (SyncEvent) success = Monitor.Wait(SyncEvent, timeout); + if (!success) throw new TimeoutException("The SyncRequest timed out (SID=" + SID.ToString() + ")"); } finally { lock (_invocations) _invocations.Remove(SID); } @@ -314,7 +311,7 @@ } public IPmlChannel CreateChannel(PmlElement data) { - UInt32 sid = GetNextSessionId(true); + UInt32 sid = GetNextSessionId(ref pNextSession, _sessions); PmlSubChannel ch = new PmlSubChannel(this, sid, true); WriteSessionMessage(sid, 0, data); if (!ch.IsOpen) return null; @@ -333,42 +330,33 @@ RemoveSession(session.ID); } - private UInt32 GetNextSessionId(bool IsSession) { - if (IsSession) { - lock (_sessions) { - do { - unchecked { pNextSession++; } - } while (_sessions.ContainsKey(pNextSession)); - return pNextSession; - } - } else { - lock (_invocations) { - do { - unchecked { pNextSyncRequest++; } - } while (_invocations.ContainsKey(pNextSyncRequest)); - return pNextSyncRequest; - } + private static UInt32 GetNextSessionId<T>(ref UInt32 id, IDictionary<UInt32, T> dictionary) { + lock (dictionary) { + do { + id++; + } while (dictionary.ContainsKey(id)); + return id; } } protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) { - PmlDictionary Msg2 = new PmlDictionary(); - Msg2.Add("CMD", new PmlString(RPL ? "RPL" : "REQ")); - Msg2.Add("SID", new PmlInteger(SID)); - Msg2.Add("MSG", MSG); + PmlDictionary Msg2 = new PmlDictionary() { + { "CMD", RPL ? "RPL" : "REQ" }, + { "SID", SID }, + { "MSG", MSG }, + }; _WriteMessage(Msg2); } protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) { - PmlDictionary Msg2 = new PmlDictionary(); - Msg2.Add("CMD", new PmlString("SES")); - Msg2.Add("SID", new PmlInteger(SID)); - Msg2.Add("SCMD", new PmlInteger(CMD)); + PmlDictionary Msg2 = new PmlDictionary() { + { "CMD", "SES" }, + { "SID", SID }, + { "SCMD", CMD }, + }; if (MSG != null) Msg2.Add("MSG", MSG); _WriteMessage(Msg2); } - - /* LegacyPmlCommunicator compatibility */ public PmlElement SyncRequest(PmlElement Request) { return Invoke(Request);
--- a/Util/AsyncResultBase.cs Sat Oct 11 14:03:31 2014 +0200 +++ b/Util/AsyncResultBase.cs Sat Oct 11 14:05:41 2014 +0200 @@ -8,6 +8,7 @@ static Boolean ThreadInCallback = false; ManualResetEvent WaitEvent = null; AsyncCallback Callback = null; + Object MonitorWaitHandle = new Object(); public object AsyncState { get; private set; } public bool CompletedSynchronously { get; private set; } public bool IsCompleted { get; private set; } @@ -36,6 +37,7 @@ lock (this) { IsCompleted = true; if (WaitEvent != null) WaitEvent.Set(); + if (MonitorWaitHandle != null) lock (MonitorWaitHandle) Monitor.Pulse(MonitorWaitHandle); } if (Callback != null) { if (synchronously && !ThreadInCallback) { @@ -51,6 +53,10 @@ } } + public void WaitForCompletion() { + lock (this) if (!IsCompleted) lock (MonitorWaitHandle) Monitor.Wait(MonitorWaitHandle); + } + protected void ThrowError() { if (Error != null) throw Error; }