comparison Pml/PmlCommunicator.cs @ 0:3ab940a0c7a0

Initial commit
author Ivo Smits <Ivo@UCIS.nl>
date Tue, 11 Sep 2012 16:28:53 +0200
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3ab940a0c7a0
1 using System;
2 using System.Threading;
3 using System.Collections.Generic;
4
5 namespace UCIS.Pml {
6 /* class PmlCommunicator : IPmlCommunicator, IDisposable {
7 private IPmlChannel _channel;
8 private Dictionary<UInt32, IPmlSubChannel> _subchannels = new Dictionary<uint,IPmlSubChannel>();
9 private Random _random = new Random();
10
11 private enum CommandCode : int {
12 CallWithoutReply = 0,
13 CallWithReply = 1,
14 Message = 2,
15 ChannelRequest = 3,
16 ChannelAcknowledge = 4,
17 ChannelClose = 5,
18 Error = 6
19 }
20
21 private interface IPmlSubChannel {
22 void CloseIn();
23 void ErrorIn(PmlElement message);
24 void MessageIn(PmlElement message);
25 }
26
27 private class ChannelRequestWaitHandler : IAsyncResult {
28 internal AsyncCallback Callback = null;
29 internal Object CallbackState = null;
30 internal ManualResetEvent Event = null;
31 internal PmlSubChannel Channel = null;
32 internal bool Completed = false;
33
34 internal ChannelRequestWaitHandler(PmlSubChannel channel) {
35 Channel = channel;
36 }
37
38 internal void Complete() {
39 Completed = true;
40 if (Event != null) Event.Set();
41 if (Callback != null) Callback.Invoke(this);
42 }
43
44 public object AsyncState { get { return CallbackState; } }
45 public WaitHandle AsyncWaitHandle { get { return null; } }
46 public bool CompletedSynchronously { get { return false; } }
47 public bool IsCompleted { get { return Completed; } }
48 }
49 private class PmlSubChannel : ActivePmlChannel, IPmlSubChannel {
50 private enum ChannelState { Requesting, Acknowledged, Closed }
51
52 private PmlCommunicator _communicator;
53 private UInt32 _id;
54 private ChannelState _state;
55
56 internal PmlSubChannel(PmlCommunicator communicator, UInt32 sid) {
57 _communicator = communicator;
58 _id = sid;
59 _state = ChannelState.Requesting;
60 }
61
62 public override bool IsOpen { get { return _state == ChannelState.Acknowledged; } }
63
64 internal void AcknowledgeIn() {
65 if (_state != 0) throw new InvalidOperationException("The subchannel is not awaiting an acknowledgement");
66 _state = ChannelState.Acknowledged;
67 }
68 void IPmlSubChannel.CloseIn() {
69 _state = ChannelState.Closed;
70 _communicator._subchannels.Remove(_id);
71 base.Close();
72 }
73 void IPmlSubChannel.ErrorIn(PmlElement message) {
74 (this as IPmlSubChannel).CloseIn();
75 }
76 void IPmlSubChannel.MessageIn(PmlElement message) {
77 base.PushReceivedMessage(message);
78 }
79
80 internal void AcknowledgeOut() {
81 if (_state != 0) throw new InvalidOperationException("The subchannel is not awaiting an acknowledgement");
82 _state = ChannelState.Acknowledged;
83 _communicator.sendMessage(CommandCode.ChannelAcknowledge, _id, null);
84 }
85 internal void RejectOut() {
86 if (_state != 0) throw new InvalidOperationException("The subchannel is not awaiting an acknowledgement");
87 _state = ChannelState.Closed;
88 _communicator.sendMessage(CommandCode.ChannelClose, _id, null);
89 }
90
91 public override void SendMessage(PmlElement message) {
92 if (_state != ChannelState.Acknowledged) throw new InvalidOperationException("The subchannel is not open");
93 _communicator.sendMessage(CommandCode.Message, _id, message);
94 }
95 public override void Close() {
96 if (_state != ChannelState.Acknowledged) return;
97 _state = ChannelState.Closed;
98 _communicator.sendMessage(CommandCode.ChannelClose, _id, null);
99 _communicator._subchannels.Remove(_id);
100 base.Close();
101 }
102 }
103 private class PmlChannelRequestReceivedEventArgsA : PmlChannelRequestReceivedEventArgs {
104 private PmlCommunicator _communicator;
105 private PmlElement _data;
106 private PmlSubChannel _channel;
107 private bool _accepted;
108 private bool _rejected;
109 internal PmlChannelRequestReceivedEventArgsA(PmlCommunicator communicator, UInt32 sid, PmlElement message) {
110 _communicator = communicator;
111 _channel = new PmlSubChannel(communicator, sid);
112 _data = message;
113 }
114 public override IPmlChannel Accept() {
115 if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected");
116 _accepted = true;
117 _channel.AcknowledgeOut();
118 return _channel;
119 }
120 public override void Reject() {
121 if (_accepted) throw new InvalidOperationException("The channel has already been accepted");
122 if (_rejected) return;
123 _rejected = true;
124 _channel.RejectOut();
125 }
126 internal void RejectIfNotAccepted() {
127 if (!_accepted) Reject();
128 }
129 public override PmlElement Data {
130 get {
131 return _data;
132 }
133 }
134 }
135
136 private class PmlInvocation : IAsyncResult, IPmlSubChannel {
137 internal PmlCommunicator Communicator = null;
138 internal AsyncCallback Callback = null;
139 internal Object CallbackState = null;
140 internal bool Error = false;
141 internal bool Completed = false;
142 internal PmlElement Message = null;
143 internal ManualResetEvent Event = null;
144 internal UInt32 ID;
145
146 internal PmlInvocation(PmlCommunicator communicator, UInt32 id) {
147 Communicator = communicator;
148 ID = id;
149 }
150
151 void IPmlSubChannel.CloseIn() {
152 (this as IPmlSubChannel).ErrorIn(null);
153 }
154 void IPmlSubChannel.ErrorIn(PmlElement message) {
155 Error = true;
156 Communicator._subchannels.Remove(ID);
157 (this as IPmlSubChannel).MessageIn(message);
158 }
159 void IPmlSubChannel.MessageIn(PmlElement message) {
160 Message = message;
161 Completed = true;
162 if (Event != null) Event.Set();
163 if (Callback != null) Callback.Invoke(this);
164 }
165
166 public object AsyncState { get { return CallbackState; } }
167 public WaitHandle AsyncWaitHandle { get { return null; } }
168 public bool CompletedSynchronously { get { return false; } }
169 public bool IsCompleted { get { return Completed; } }
170 }
171
172 public event EventHandler<PmlCallReceivedEventArgs> CallReceived;
173 public event EventHandler<PmlChannelRequestReceivedEventArgs> ChannelRequestReceived;
174
175 public PmlCommunicator(IPmlChannel channel) {
176 _channel = channel;
177 _channel.Closed += channelClosed;
178 }
179
180 public void Dispose() {
181 _channel.Close();
182 _channel = null;
183 IPmlSubChannel[] A = new IPmlSubChannel[_subchannels.Count];
184 _subchannels.Values.CopyTo(A, 0);
185 foreach (IPmlSubChannel S in A) S.CloseIn();
186 _subchannels.Clear();
187 _subchannels = null;
188 _random = null;
189 }
190
191 private void channelClosed(Object sender, EventArgs e) {
192 Dispose();
193 }
194
195 public IPmlChannel Channel { get { return _channel; } }
196
197 public void Call(PmlElement message) {
198 sendMessage(0, 0, message); //Call without reply
199 }
200 public PmlElement Invoke(PmlElement message) {
201 return Invoke(message, 60000);
202 }
203 public PmlElement Invoke(PmlElement message, int timeout) {
204 UInt32 sid = getSessionID();
205 PmlInvocation inv = new PmlInvocation(this, sid);
206 inv.Event = new ManualResetEvent(false);
207 _subchannels.Add(sid, inv);
208 sendMessage(CommandCode.CallWithReply, sid, message);
209 inv.Event.WaitOne(timeout);
210 if (inv.Error) throw new Exception(message.ToString());
211 return inv.Message;
212 }
213
214 public IAsyncResult BeginInvoke(PmlElement message, AsyncCallback callback, Object state) {
215 UInt32 sid = getSessionID();
216 PmlInvocation inv = new PmlInvocation(this, sid);
217 inv.Callback = callback;
218 inv.CallbackState = state;
219 _subchannels.Add(sid, inv);
220 sendMessage(CommandCode.CallWithReply, sid, message);
221 return inv;
222 }
223 public PmlElement EndInvoke(IAsyncResult result) {
224 PmlInvocation ar = (PmlInvocation)result;
225 if (!ar.Completed) {
226 (_subchannels as IList<IPmlSubChannel>).Remove(ar);
227 throw new InvalidOperationException("The asynchronous operation has not completed");
228 } else if (ar.Error) {
229 throw new Exception(ar.Message.ToString());
230 } else {
231 return ar.Message;
232 }
233 }
234
235 public IPmlChannel CreateChannel(PmlElement data) {
236 UInt32 sid = getSessionID();
237 PmlSubChannel ch = new PmlSubChannel(this, sid);
238 ChannelRequestWaitHandler wh = new ChannelRequestWaitHandler(ch);
239 wh.Event = new ManualResetEvent(false);
240 _subchannels.Add(sid, ch);
241 sendMessage(CommandCode.ChannelRequest, sid, data);
242 wh.Event.WaitOne();
243 if (!ch.IsOpen) return null;
244 return ch;
245 }
246 public IAsyncResult BeginCreateChannel(PmlElement data, AsyncCallback callback, Object state) {
247 UInt32 sid = getSessionID();
248 PmlSubChannel ch = new PmlSubChannel(this, sid);
249 ChannelRequestWaitHandler wh = new ChannelRequestWaitHandler(ch);
250 wh.Callback = callback;
251 wh.CallbackState = state;
252 _subchannels.Add(sid, ch);
253 sendMessage(CommandCode.ChannelRequest, sid, data);
254 if (!ch.IsOpen) return null;
255 return wh;
256 }
257 public IPmlChannel EndCreateChannel(IAsyncResult result) {
258 ChannelRequestWaitHandler ar = (ChannelRequestWaitHandler)result;
259 if (!ar.Channel.IsOpen) return null;
260 return ar.Channel;
261 }
262
263 private UInt32 getSessionID() {
264 return (uint)_random.Next();
265 }
266
267 private void sendMessage(CommandCode cmd, uint sid, PmlElement message) {
268 PmlDictionary msg = new PmlDictionary();
269 msg.Add("c", (int)cmd);
270 if (cmd > 0) msg.Add("s", sid);
271 if (message != null) msg.Add("m", message);
272 _channel.SendMessage(msg);
273 }
274
275 private void invokeCallReceived(Object state) {
276 PmlCallReceivedEventArgs e = (PmlCallReceivedEventArgs)state;
277 try {
278 if (CallReceived != null) CallReceived(this, e);
279 if (e.WantReply) sendMessage(CommandCode.Message, e.SID, e.Reply);
280 } catch (Exception ex) {
281 if (e.WantReply) sendMessage(CommandCode.Error, e.SID, new PmlString(ex.ToString()));
282 }
283 }
284 private void invokeChannelRequestReceived(Object state) {
285 PmlChannelRequestReceivedEventArgsA e = (PmlChannelRequestReceivedEventArgsA)state;
286 if (ChannelRequestReceived != null) ChannelRequestReceived(this, e);
287 e.RejectIfNotAccepted();
288 }
289
290 private void messageReceived(Object sender, EventArgs e) {
291 IPmlSubChannel subChannel = null;
292 UInt32 sid = 0;
293 bool subChannelExists = false;
294 if (!(e.Message is PmlDictionary)) return;
295 PmlDictionary msg = (PmlDictionary)e.Message;
296 PmlElement cmdElement = msg.GetChild("c");
297 PmlElement sidElement = msg.GetChild("i");
298 PmlElement msgElement = msg.GetChild("m");
299 if (cmdElement == null) return;
300 if (sidElement != null) sid = sidElement.ToUInt32();
301 if (sidElement != null) subChannelExists = _subchannels.TryGetValue(sid, out subChannel);
302 if (!subChannelExists) subChannel = null;
303 switch ((CommandCode)cmdElement.ToInt32()) {
304 case CommandCode.CallWithoutReply:
305 if (CallReceived != null) ThreadPool.RunCall(invokeCallReceived, new PmlCallReceivedEventArgs(msgElement, false, 0));
306 break;
307 case CommandCode.CallWithReply:
308 if (CallReceived != null) ThreadPool.RunCall(invokeCallReceived, new PmlCallReceivedEventArgs(msgElement, true, sid));
309 else sendMessage(CommandCode.Error, sid, null);
310 break;
311 case CommandCode.Message: //Reply to call | subchannel message
312 if (subChannelExists) subChannel.MessageIn(msgElement);
313 else sendMessage(CommandCode.Error, sid, null);
314 break;
315 case CommandCode.ChannelRequest:
316 if (subChannelExists) {
317 sendMessage(CommandCode.Error, sid, null);
318 subChannel.CloseIn();
319 } else {
320 if (ChannelRequestReceived == null) sendMessage(CommandCode.ChannelClose, sid, null);
321 else ThreadPool.RunCall(invokeChannelRequestReceived, new PmlChannelRequestReceivedEventArgsA(this, sid, msgElement));
322 }
323 break;
324 case CommandCode.ChannelAcknowledge:
325 if (subChannelExists) {
326 if (subChannel is PmlSubChannel) (subChannel as PmlSubChannel).AcknowledgeIn();
327 else {
328 sendMessage(CommandCode.Error, sid, null); //Error
329 subChannel.CloseIn();
330 }
331 } else sendMessage(CommandCode.Error, sid, null); //Error
332 break;
333 case CommandCode.ChannelClose:
334 if (subChannelExists) subChannel.CloseIn();
335 break;
336 case CommandCode.Error:
337 if (subChannelExists) subChannel.ErrorIn(msgElement);
338 break;
339 }
340 }
341 }*/
342 }