Mercurial > hg > ucis.core
diff Pml/Channels/ActivePmlChannel.cs @ 104:327be9216006
Improved PML code
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Sat, 11 Oct 2014 14:05:41 +0200 |
parents | 3ab940a0c7a0 |
children |
line wrap: on
line diff
--- a/Pml/Channels/ActivePmlChannel.cs Sat Oct 11 14:03:31 2014 +0200 +++ b/Pml/Channels/ActivePmlChannel.cs Sat Oct 11 14:05:41 2014 +0200 @@ -1,11 +1,11 @@ using System; using UCIS.Pml; +using UCIS.Util; using System.Collections.Generic; using System.Threading; namespace UCIS.Pml { public abstract class ActivePmlChannel : IPmlChannel { - private AutoResetEvent _receiveEvent = new AutoResetEvent(false); private ReadMessageAsyncResult _asyncWait = null; private Queue<PmlElement> _queue = new Queue<PmlElement>(); private bool _isOpen = true; @@ -14,41 +14,49 @@ public abstract void WriteMessage(PmlElement message); public PmlElement ReadMessage() { - if (!IsOpen) throw new InvalidOperationException("The channel is not open"); - if (_queue.Count == 0) { - _receiveEvent.WaitOne(); - if (_queue.Count == 0) throw new OperationCanceledException("The operation did not complete"); - } else if (_queue.Count == 1) { - _receiveEvent.Reset(); + lock (_queue) { + if (!IsOpen) throw new InvalidOperationException("The channel is not open"); + while (_queue.Count == 0) { + if (!IsOpen) throw new OperationCanceledException("The operation did not complete"); + Monitor.Wait(_queue); + } + return _queue.Dequeue(); } - return _queue.Dequeue(); } public IAsyncResult BeginReadMessage(AsyncCallback callback, object state) { - ReadMessageAsyncResult ar = new ReadMessageAsyncResult(state, callback); - if (!IsOpen) throw new InvalidOperationException("The channel is not open"); - if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress"); - if (_queue.Count == 0) { - _asyncWait = ar; - } else { - if (_queue.Count == 1) _receiveEvent.Reset(); - ar.Complete(true, _queue.Dequeue(), true); + ReadMessageAsyncResult ar; + Boolean completed = false; + lock (_queue) { + if (!IsOpen) throw new InvalidOperationException("The channel is not open"); + if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress"); + ar = new ReadMessageAsyncResult(callback, state); + if (_queue.Count == 0) { + _asyncWait = ar; + } else { + ar.Message = _queue.Dequeue(); + completed = true; + } } + if (completed) ar.SetCompleted(true, null); return ar; } public PmlElement EndReadMessage(IAsyncResult asyncResult) { ReadMessageAsyncResult ar = (ReadMessageAsyncResult)asyncResult; - if (ar.Error) new OperationCanceledException("The operation did not complete"); + ar.WaitForCompletion(); + if (ar.Error != null) throw new OperationCanceledException("The asynchronous operation failed", ar.Error); return ar.Message; } public virtual void Close() { - _isOpen = false; - ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); - if (asyncWait != null) asyncWait.Complete(false, null, false); - _receiveEvent.Set(); - _receiveEvent.Close(); + ReadMessageAsyncResult asyncWait; + lock (_queue) { + _isOpen = false; + asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); + Monitor.PulseAll(_queue); + } + if (asyncWait != null) asyncWait.SetCompleted(false, new ObjectDisposedException("ActivePmlChannel")); } public void Dispose() { @@ -56,51 +64,25 @@ } protected void PushReceivedMessage(PmlElement message) { - ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); + ReadMessageAsyncResult asyncWait; + lock (_queue) { + asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null); + if (asyncWait == null) { + _queue.Enqueue(message); + Monitor.Pulse(_queue); + } + } if (asyncWait != null) { - asyncWait.Complete(true, message, false); - } else { - _queue.Enqueue(message); - _receiveEvent.Set(); + asyncWait.Message = message; + asyncWait.SetCompleted(false, null); } } - private class ReadMessageAsyncResult : IAsyncResult { - private object state; - private AsyncCallback callback; - private bool completed; - private bool synchronously; - private ManualResetEvent waitHandle; - + class ReadMessageAsyncResult : AsyncResultBase { internal PmlElement Message; - internal bool Error; - - public bool CompletedSynchronously { get { return synchronously; } } - public object AsyncState { get { return state; } } - public WaitHandle AsyncWaitHandle { - get { - if (waitHandle == null) waitHandle = new ManualResetEvent(completed); - return waitHandle; - } - } - public bool IsCompleted { get { return completed; } } - - internal ReadMessageAsyncResult(object state, AsyncCallback callback) { - this.state = state; - this.callback = callback; - this.completed = false; - this.synchronously = false; - this.Message = null; - this.Error = false; - this.waitHandle = null; - } - internal void Complete(bool success, PmlElement message, bool synchronously) { - this.Message = message; - this.Error = !success; - this.synchronously = synchronously; - this.completed = true; - if (waitHandle != null) waitHandle.Set(); - if (this.callback != null) this.callback.Invoke(this); + public ReadMessageAsyncResult(AsyncCallback callback, Object state) : base(callback, state) { } + public void SetCompleted(Boolean synchronously, Exception error) { + base.SetCompleted(synchronously, error); } } }