comparison Pml/Channels/ActivePmlChannel.cs @ 0:3ab940a0c7a0

Initial commit
author Ivo Smits <Ivo@UCIS.nl>
date Tue, 11 Sep 2012 16:28:53 +0200
parents
children 327be9216006
comparison
equal deleted inserted replaced
-1:000000000000 0:3ab940a0c7a0
1 using System;
2 using UCIS.Pml;
3 using System.Collections.Generic;
4 using System.Threading;
5
6 namespace UCIS.Pml {
7 public abstract class ActivePmlChannel : IPmlChannel {
8 private AutoResetEvent _receiveEvent = new AutoResetEvent(false);
9 private ReadMessageAsyncResult _asyncWait = null;
10 private Queue<PmlElement> _queue = new Queue<PmlElement>();
11 private bool _isOpen = true;
12
13 public virtual bool IsOpen { get { return _isOpen; } }
14 public abstract void WriteMessage(PmlElement message);
15
16 public PmlElement ReadMessage() {
17 if (!IsOpen) throw new InvalidOperationException("The channel is not open");
18 if (_queue.Count == 0) {
19 _receiveEvent.WaitOne();
20 if (_queue.Count == 0) throw new OperationCanceledException("The operation did not complete");
21 } else if (_queue.Count == 1) {
22 _receiveEvent.Reset();
23 }
24 return _queue.Dequeue();
25 }
26
27 public IAsyncResult BeginReadMessage(AsyncCallback callback, object state) {
28 ReadMessageAsyncResult ar = new ReadMessageAsyncResult(state, callback);
29 if (!IsOpen) throw new InvalidOperationException("The channel is not open");
30 if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress");
31 if (_queue.Count == 0) {
32 _asyncWait = ar;
33 } else {
34 if (_queue.Count == 1) _receiveEvent.Reset();
35 ar.Complete(true, _queue.Dequeue(), true);
36 }
37 return ar;
38 }
39
40 public PmlElement EndReadMessage(IAsyncResult asyncResult) {
41 ReadMessageAsyncResult ar = (ReadMessageAsyncResult)asyncResult;
42 if (ar.Error) new OperationCanceledException("The operation did not complete");
43 return ar.Message;
44 }
45
46 public virtual void Close() {
47 _isOpen = false;
48 ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null);
49 if (asyncWait != null) asyncWait.Complete(false, null, false);
50 _receiveEvent.Set();
51 _receiveEvent.Close();
52 }
53
54 public void Dispose() {
55 Close();
56 }
57
58 protected void PushReceivedMessage(PmlElement message) {
59 ReadMessageAsyncResult asyncWait = Interlocked.Exchange<ReadMessageAsyncResult>(ref _asyncWait, null);
60 if (asyncWait != null) {
61 asyncWait.Complete(true, message, false);
62 } else {
63 _queue.Enqueue(message);
64 _receiveEvent.Set();
65 }
66 }
67
68 private class ReadMessageAsyncResult : IAsyncResult {
69 private object state;
70 private AsyncCallback callback;
71 private bool completed;
72 private bool synchronously;
73 private ManualResetEvent waitHandle;
74
75 internal PmlElement Message;
76 internal bool Error;
77
78 public bool CompletedSynchronously { get { return synchronously; } }
79 public object AsyncState { get { return state; } }
80 public WaitHandle AsyncWaitHandle {
81 get {
82 if (waitHandle == null) waitHandle = new ManualResetEvent(completed);
83 return waitHandle;
84 }
85 }
86 public bool IsCompleted { get { return completed; } }
87
88 internal ReadMessageAsyncResult(object state, AsyncCallback callback) {
89 this.state = state;
90 this.callback = callback;
91 this.completed = false;
92 this.synchronously = false;
93 this.Message = null;
94 this.Error = false;
95 this.waitHandle = null;
96 }
97 internal void Complete(bool success, PmlElement message, bool synchronously) {
98 this.Message = message;
99 this.Error = !success;
100 this.synchronously = synchronously;
101 this.completed = true;
102 if (waitHandle != null) waitHandle.Set();
103 if (this.callback != null) this.callback.Invoke(this);
104 }
105 }
106 }
107 }