comparison Pml/PmlCommunicator2.cs @ 0:3ab940a0c7a0

Initial commit
author Ivo Smits <Ivo@UCIS.nl>
date Tue, 11 Sep 2012 16:28:53 +0200
parents
children 8fe322656807
comparison
equal deleted inserted replaced
-1:000000000000 0:3ab940a0c7a0
1 using System;
2 using System.Threading;
3 using System.Collections.Generic;
4 using UCIS.Pml;
5
6 namespace UCIS.Pml {
7 public class PmlCommunicator {
8 private class CSyncRequest {
9 internal PmlElement Reply;
10 internal ManualResetEvent ResetEvent = new ManualResetEvent(false);
11 }
12 private interface ISession {
13 void MessageIn(PmlElement message);
14 void CloseIn();
15 UInt32 ID { get; }
16 }
17 /*public abstract class SessionBase : ISession {
18 private bool pActive;
19 private PmlCommunicator _communicator;
20 private UInt32 _id;
21
22 public uint SID { get { return _id; } }
23 public bool Active { get { return pActive; } }
24 public PmlCommunicator Communicator { get { return _communicator; } }
25
26 protected SessionBase(PmlCommunicator Connection) {
27 _communicator = Connection;
28 }
29
30 protected void Accept(UInt32 sid) {
31 if (pActive) throw new InvalidOperationException("Session is active");
32 _id = sid;
33 lock (_communicator._sessions) _communicator._sessions.Add(_id, this);
34 pActive = true;
35 }
36 protected void Request() {
37 Request(null);
38 }
39 protected void Request(PmlElement Message) {
40 if (pActive) throw new InvalidOperationException("Session is active");
41 _id = _communicator.GetNextSessionId(true);
42 lock (_communicator._sessions) _communicator._sessions.Add(_id, this);
43 _communicator.WriteSessionMessage(_id, 0, Message);
44 pActive = true;
45 }
46
47 uint ISession.ID { get { return _id; } }
48 void ISession.MessageIn(PmlElement message) { this.MessageIn(message); }
49 void ISession.CloseIn() {
50 pActive = false;
51 _communicator.RemoveSession(this);
52 Closed(null);
53 }
54
55 protected internal abstract void MessageIn(PmlElement Message);
56
57 protected void SendMessage(PmlElement Message) {
58 if (!pActive) throw new InvalidOperationException("Session is not active");
59 _communicator.WriteSessionMessage(_id, 1, Message);
60 }
61
62 public void Close() {
63 if (!pActive) return;
64 pActive = false;
65 _communicator.WriteSessionMessage(_id, 2, null);
66 _communicator.RemoveSession(this);
67 }
68
69 protected virtual void Closed(PmlElement Message) { }
70 }
71 public class Session : SessionBase {
72 public event MessageReceivedEventHandler MessageReceived;
73 public delegate void MessageReceivedEventHandler(PmlElement Message);
74 public event SessionClosedEventHandler SessionClosed;
75 public delegate void SessionClosedEventHandler(PmlElement Message);
76
77 public Session(PmlCommunicator Connection) : base(Connection) { }
78
79 public new void Accept(UInt32 SID) {
80 base.Accept(SID);
81 }
82 public new void Request() {
83 Request(null);
84 }
85 public new void Request(PmlElement Message) {
86 base.Request(Message);
87 }
88
89 protected internal override void MessageIn(PmlElement Message) {
90 if (MessageReceived != null) MessageReceived(Message);
91 }
92
93 public new void SendMessage(PmlElement Message) {
94 base.SendMessage(Message);
95 }
96
97 protected override void Closed(PmlElement Message) {
98 if (SessionClosed != null) SessionClosed(Message);
99 }
100 }*/
101
102 private class PmlSubChannel : ActivePmlChannel, ISession {
103 private enum ChannelState { Requesting, Acknowledged, Closed }
104
105 private PmlCommunicator _communicator;
106 private UInt32 _id;
107 private ChannelState _state;
108
109 internal PmlSubChannel(PmlCommunicator communicator, UInt32 sid, bool accepted) {
110 _communicator = communicator;
111 _id = sid;
112 _state = accepted ? ChannelState.Acknowledged : ChannelState.Requesting;
113 if (accepted) _communicator.AddSession(this);
114 }
115
116 public override bool IsOpen { get { return _state == ChannelState.Acknowledged; } }
117
118 uint ISession.ID { get { return _id; } }
119 void ISession.CloseIn() {
120 _state = ChannelState.Closed;
121 _communicator.RemoveSession(this);
122 base.Close();
123 }
124 void ISession.MessageIn(PmlElement message) {
125 base.PushReceivedMessage(message);
126 }
127
128 public override void WriteMessage(PmlElement message) {
129 if (_state != ChannelState.Acknowledged) throw new InvalidOperationException("The subchannel is not open");
130 _communicator.WriteSessionMessage(_id, 1, message);
131 }
132 public override void Close() {
133 if (_state != ChannelState.Acknowledged) return;
134 _state = ChannelState.Closed;
135 _communicator.WriteSessionMessage(_id, 2, null);
136 _communicator.RemoveSession(this);
137 base.Close();
138 }
139 }
140 private class PmlChannelRequestReceivedEventArgsA : PmlChannelRequestReceivedEventArgs {
141 private PmlCommunicator _communicator;
142 private PmlElement _data;
143 private bool _accepted, _rejected;
144 private UInt32 _sid;
145 internal PmlChannelRequestReceivedEventArgsA(PmlCommunicator communicator, UInt32 sid, PmlElement message) {
146 _communicator = communicator;
147 _data = message;
148 _sid = sid;
149 _accepted = _rejected = false;
150 }
151 public UInt32 AcceptSession() {
152 if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected");
153 _accepted = true;
154 return _sid;
155 }
156 public override IPmlChannel Accept() {
157 if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected");
158 _accepted = true;
159 return new PmlSubChannel(_communicator, _sid, true);
160 }
161 public override void Reject() {
162 if (_accepted) throw new InvalidOperationException("The channel has already been accepted");
163 if (_rejected) return;
164 _rejected = true;
165 _communicator.WriteSessionMessage(_sid, 2, null);
166 //_channel.RejectOut();
167 }
168 internal void RejectIfNotAccepted() {
169 if (!_accepted) Reject();
170 }
171 public override PmlElement Data {
172 get {
173 return _data;
174 }
175 }
176 }
177
178 public event EventHandler<PmlCallReceivedEventArgs> CallReceived;
179 public event EventHandler<PmlChannelRequestReceivedEventArgs> ChannelRequestReceived;
180 public event EventHandler Closed;
181
182 private Dictionary<UInt32, ISession> _sessions = new Dictionary<UInt32, ISession>();
183 private Dictionary<UInt32, CSyncRequest> _invocations = new Dictionary<UInt32, CSyncRequest>();
184 private UInt32 pNextSession;
185 private UInt32 pNextSyncRequest;
186
187 private bool _closed;
188 private IPmlChannel _channel;
189
190 public IPmlChannel Channel { get { return _channel; } }
191
192 public PmlCommunicator(IPmlChannel channel, bool autoStart) {
193 _channel = channel;
194 if (autoStart) Start();
195 }
196 public void Start() {
197 _channel.BeginReadMessage(messageReceived, null);
198 }
199 public void StartSync() {
200 while (true) {
201 try {
202 processMessage(_channel.ReadMessage());
203 } catch (InvalidOperationException ex) {
204 Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message);
205 closed();
206 _channel.Close();
207 return;
208 } catch (Exception ex) {
209 Console.WriteLine(ex.ToString());
210 closed();
211 _channel.Close();
212 return;
213 }
214 }
215 }
216 public void Close() {
217 _channel.Close();
218 }
219 public void WriteRawMessage(PmlElement Message) {
220 _WriteMessage(Message);
221 }
222
223 private void _WriteMessage(PmlElement Message) {
224 lock (_channel) {
225 if (!_channel.IsOpen) throw new InvalidOperationException("Could not write message: the channel is not open");
226 _channel.WriteMessage(Message);
227 }
228 }
229 private void closed() {
230 _closed = true;
231 lock (_sessions) {
232 foreach (ISession S in _sessions.Values) {
233 try {
234 S.CloseIn();
235 } catch (Exception ex) {
236 Console.WriteLine(ex.ToString());
237 }
238 }
239 _sessions.Clear();
240 }
241 lock (_invocations) {
242 foreach (CSyncRequest T in _invocations.Values) {
243 T.ResetEvent.Set();
244 }
245 _invocations.Clear();
246 }
247 if (Closed != null) Closed(this, new EventArgs());
248 }
249
250 private void messageReceived(IAsyncResult ar) {
251 try {
252 PmlElement Message = _channel.EndReadMessage(ar);
253 processMessage(Message);
254 _channel.BeginReadMessage(messageReceived, null);
255 } catch (InvalidOperationException ex) {
256 Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message);
257 closed();
258 _channel.Close();
259 return;
260 } catch (Exception ex) {
261 Console.WriteLine(ex.ToString());
262 closed();
263 _channel.Close();
264 return;
265 }
266 }
267 private void processMessage(PmlElement Message) {
268 if (Message is PmlString) {
269 string Cmd = Message.ToString();
270 if (Cmd.Equals("PING")) {
271 _WriteMessage(new PmlString("PONG"));
272 /*} else if (Cmd.Equals("PONG")) {
273 Ping = 0;*/
274 }
275 } else if (Message is PmlDictionary) {
276 string Cmd = Message.GetChild("CMD").ToString();
277 if (Cmd.Equals("SES")) {
278 processSessionMessage(Message);
279 } else if (Cmd.Equals("RPL")) {
280 UInt32 SID = Message.GetChild("SID").ToUInt32();
281 CSyncRequest SRequest = null;
282 lock (_invocations) {
283 if (_invocations.TryGetValue(SID, out SRequest)) {
284 _invocations.Remove(SID);
285 } else {
286 Console.WriteLine("UCIS.PML.Connection.Worker Invalid request ID in reply: " + SID.ToString());
287 }
288 }
289 if (SRequest != null) {
290 SRequest.Reply = Message.GetChild("MSG");
291 SRequest.ResetEvent.Set();
292 }
293 } else if (Cmd.Equals("REQ") || Cmd.Equals("MSG")) {
294 UCIS.ThreadPool.RunCall(processCall, Message);
295 } else {
296 Console.WriteLine("UCIS.PML.Connection.Worker Invalid command received");
297 }
298 }
299 }
300 private void processSessionMessage(PmlElement Message) {
301 UInt32 SID = Message.GetChild("SID").ToUInt32();
302 byte SCMD = Message.GetChild("SCMD").ToByte();
303 PmlElement InnerMsg = Message.GetChild("MSG");
304 ISession Session = null;
305 lock (_sessions) if (!_sessions.TryGetValue(SID, out Session)) Session = null;
306 switch (SCMD) {
307 case 0: //Request
308 if (Session != null) {
309 try {
310 Session.CloseIn();
311 } catch (Exception ex) {
312 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage-Request: exception in session.CloseIn: " + ex.ToString());
313 }
314 WriteSessionMessage(SID, 2, null);
315 } else if (ChannelRequestReceived != null) {
316 try {
317 PmlChannelRequestReceivedEventArgsA ea = new PmlChannelRequestReceivedEventArgsA(this, SID, InnerMsg);
318 ChannelRequestReceived(this, ea);
319 ea.RejectIfNotAccepted();
320 } catch (Exception ex) {
321 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in ChannelRequestReceived: " + ex.ToString());
322 WriteSessionMessage(SID, 2, null);
323 }
324 } else {
325 WriteSessionMessage(SID, 2, null);
326 }
327 break;
328 case 1: //Message
329 if (Session != null) {
330 try {
331 Session.MessageIn(InnerMsg);
332 } catch (Exception ex) {
333 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in session.MessageIn: " + ex.ToString());
334 WriteSessionMessage(SID, 2, null);
335 }
336 } else {
337 WriteSessionMessage(SID, 2, null);
338 }
339 break;
340 case 2: //Close
341 if (Session != null) {
342 try {
343 if (InnerMsg != null && !(InnerMsg is PmlNull)) Session.MessageIn(InnerMsg);
344 } catch (Exception ex) {
345 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage-Close: exception in session.MessageIn: " + ex.ToString());
346 } finally {
347 try {
348 Session.CloseIn();
349 } catch (Exception ex) {
350 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in session.CloseIn: " + ex.ToString());
351 }
352 }
353 }
354 break;
355 }
356 }
357 private void processCall(object state) {
358 PmlDictionary Message = (PmlDictionary)state;
359 bool wantReply = Message.ContainsKey("SID");
360 UInt32 SID = 0;
361 if (wantReply) SID = Message.GetChild("SID").ToUInt32();
362 PmlElement Reply = null;
363 try {
364 if (CallReceived != null) {
365 PmlCallReceivedEventArgs ea = new PmlCallReceivedEventArgs(Message.GetChild("MSG"), wantReply, SID);
366 CallReceived(this, ea);
367 Reply = ea.Reply;
368 }
369 } catch (Exception ex) {
370 Reply = new PmlDictionary();
371 ((PmlDictionary)Reply).Add("EXCEPTION", new PmlString(ex.ToString()));
372 Console.WriteLine(ex.ToString());
373 } finally {
374 if (wantReply && Channel.IsOpen) {
375 try {
376 WriteSyncMessage(SID, true, Reply);
377 } catch (Exception ex) {
378 Console.WriteLine("UCIS.Pml.PmlCommunicator.processCall: exception: " + ex.ToString());
379 closed();
380 Channel.Close();
381 }
382 }
383 }
384 }
385
386 public void Call(PmlElement message) {
387 PmlDictionary Msg = new PmlDictionary();
388 Msg.Add("CMD", new PmlString("MSG"));
389 Msg.Add("MSG", message);
390 _WriteMessage(Msg);
391 }
392 public PmlElement Invoke(PmlElement message) {
393 return Invoke(message, 60000);
394 }
395 public PmlElement Invoke(PmlElement message, int timeout) {
396 if (_closed) throw new InvalidOperationException("Sorry, we're closed.");
397 CSyncRequest SyncEvent = new CSyncRequest();
398 UInt32 SID = GetNextSessionId(false);
399 lock (_invocations) _invocations.Add(SID, SyncEvent);
400 try {
401 WriteSyncMessage(SID, false, message);
402 if (!SyncEvent.ResetEvent.WaitOne(timeout, false)) {
403 if (!_closed) lock (_invocations) _invocations.Remove(SID);
404 throw new TimeoutException("The SyncRequest timed out (SID=" + SID.ToString() + ")");
405 }
406 } finally {
407 lock (_invocations) _invocations.Remove(SID);
408 }
409 return SyncEvent.Reply;
410 }
411
412 public IPmlChannel CreateChannel(PmlElement data) {
413 UInt32 sid = GetNextSessionId(true);
414 PmlSubChannel ch = new PmlSubChannel(this, sid, true);
415 WriteSessionMessage(sid, 0, data);
416 if (!ch.IsOpen) return null;
417 return ch;
418 }
419
420 private void AddSession(ISession session) {
421 if (_closed) return;
422 lock (_sessions) _sessions.Add(session.ID, session);
423 }
424 private void RemoveSession(UInt32 session) {
425 if (_closed) return;
426 lock (_sessions) _sessions.Remove(session);
427 }
428 private void RemoveSession(ISession session) {
429 RemoveSession(session.ID);
430 }
431
432 private UInt32 GetNextSessionId(bool IsSession) {
433 if (IsSession) {
434 lock (_sessions) {
435 do {
436 unchecked { pNextSession++; }
437 } while (_sessions.ContainsKey(pNextSession));
438 return pNextSession;
439 }
440 } else {
441 lock (_invocations) {
442 do {
443 unchecked { pNextSyncRequest++; }
444 } while (_invocations.ContainsKey(pNextSyncRequest));
445 return pNextSyncRequest;
446 }
447 }
448 }
449
450 protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) {
451 PmlDictionary Msg2 = new PmlDictionary();
452 Msg2.Add("CMD", new PmlString(RPL ? "RPL" : "REQ"));
453 Msg2.Add("SID", new PmlInteger(SID));
454 Msg2.Add("MSG", MSG);
455 _WriteMessage(Msg2);
456 }
457 protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) {
458 PmlDictionary Msg2 = new PmlDictionary();
459 Msg2.Add("CMD", new PmlString("SES"));
460 Msg2.Add("SID", new PmlInteger(SID));
461 Msg2.Add("SCMD", new PmlInteger(CMD));
462 if (MSG != null) Msg2.Add("MSG", MSG);
463 _WriteMessage(Msg2);
464 }
465
466
467
468 /* LegacyPmlCommunicator compatibility */
469 public PmlElement SyncRequest(PmlElement Request) {
470 return Invoke(Request);
471 }
472 public PmlElement SyncRequest(PmlElement Request, int Timeout) {
473 return Invoke(Request, Timeout);
474 }
475 public void SendMessage(PmlElement Message) {
476 Call(Message);
477 }
478 }
479 }