Mercurial > hg > ucis.core
comparison Pml/PmlCommunicator2.cs @ 0:3ab940a0c7a0
Initial commit
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Tue, 11 Sep 2012 16:28:53 +0200 |
parents | |
children | 8fe322656807 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3ab940a0c7a0 |
---|---|
1 using System; | |
2 using System.Threading; | |
3 using System.Collections.Generic; | |
4 using UCIS.Pml; | |
5 | |
6 namespace UCIS.Pml { | |
7 public class PmlCommunicator { | |
8 private class CSyncRequest { | |
9 internal PmlElement Reply; | |
10 internal ManualResetEvent ResetEvent = new ManualResetEvent(false); | |
11 } | |
12 private interface ISession { | |
13 void MessageIn(PmlElement message); | |
14 void CloseIn(); | |
15 UInt32 ID { get; } | |
16 } | |
17 /*public abstract class SessionBase : ISession { | |
18 private bool pActive; | |
19 private PmlCommunicator _communicator; | |
20 private UInt32 _id; | |
21 | |
22 public uint SID { get { return _id; } } | |
23 public bool Active { get { return pActive; } } | |
24 public PmlCommunicator Communicator { get { return _communicator; } } | |
25 | |
26 protected SessionBase(PmlCommunicator Connection) { | |
27 _communicator = Connection; | |
28 } | |
29 | |
30 protected void Accept(UInt32 sid) { | |
31 if (pActive) throw new InvalidOperationException("Session is active"); | |
32 _id = sid; | |
33 lock (_communicator._sessions) _communicator._sessions.Add(_id, this); | |
34 pActive = true; | |
35 } | |
36 protected void Request() { | |
37 Request(null); | |
38 } | |
39 protected void Request(PmlElement Message) { | |
40 if (pActive) throw new InvalidOperationException("Session is active"); | |
41 _id = _communicator.GetNextSessionId(true); | |
42 lock (_communicator._sessions) _communicator._sessions.Add(_id, this); | |
43 _communicator.WriteSessionMessage(_id, 0, Message); | |
44 pActive = true; | |
45 } | |
46 | |
47 uint ISession.ID { get { return _id; } } | |
48 void ISession.MessageIn(PmlElement message) { this.MessageIn(message); } | |
49 void ISession.CloseIn() { | |
50 pActive = false; | |
51 _communicator.RemoveSession(this); | |
52 Closed(null); | |
53 } | |
54 | |
55 protected internal abstract void MessageIn(PmlElement Message); | |
56 | |
57 protected void SendMessage(PmlElement Message) { | |
58 if (!pActive) throw new InvalidOperationException("Session is not active"); | |
59 _communicator.WriteSessionMessage(_id, 1, Message); | |
60 } | |
61 | |
62 public void Close() { | |
63 if (!pActive) return; | |
64 pActive = false; | |
65 _communicator.WriteSessionMessage(_id, 2, null); | |
66 _communicator.RemoveSession(this); | |
67 } | |
68 | |
69 protected virtual void Closed(PmlElement Message) { } | |
70 } | |
71 public class Session : SessionBase { | |
72 public event MessageReceivedEventHandler MessageReceived; | |
73 public delegate void MessageReceivedEventHandler(PmlElement Message); | |
74 public event SessionClosedEventHandler SessionClosed; | |
75 public delegate void SessionClosedEventHandler(PmlElement Message); | |
76 | |
77 public Session(PmlCommunicator Connection) : base(Connection) { } | |
78 | |
79 public new void Accept(UInt32 SID) { | |
80 base.Accept(SID); | |
81 } | |
82 public new void Request() { | |
83 Request(null); | |
84 } | |
85 public new void Request(PmlElement Message) { | |
86 base.Request(Message); | |
87 } | |
88 | |
89 protected internal override void MessageIn(PmlElement Message) { | |
90 if (MessageReceived != null) MessageReceived(Message); | |
91 } | |
92 | |
93 public new void SendMessage(PmlElement Message) { | |
94 base.SendMessage(Message); | |
95 } | |
96 | |
97 protected override void Closed(PmlElement Message) { | |
98 if (SessionClosed != null) SessionClosed(Message); | |
99 } | |
100 }*/ | |
101 | |
102 private class PmlSubChannel : ActivePmlChannel, ISession { | |
103 private enum ChannelState { Requesting, Acknowledged, Closed } | |
104 | |
105 private PmlCommunicator _communicator; | |
106 private UInt32 _id; | |
107 private ChannelState _state; | |
108 | |
109 internal PmlSubChannel(PmlCommunicator communicator, UInt32 sid, bool accepted) { | |
110 _communicator = communicator; | |
111 _id = sid; | |
112 _state = accepted ? ChannelState.Acknowledged : ChannelState.Requesting; | |
113 if (accepted) _communicator.AddSession(this); | |
114 } | |
115 | |
116 public override bool IsOpen { get { return _state == ChannelState.Acknowledged; } } | |
117 | |
118 uint ISession.ID { get { return _id; } } | |
119 void ISession.CloseIn() { | |
120 _state = ChannelState.Closed; | |
121 _communicator.RemoveSession(this); | |
122 base.Close(); | |
123 } | |
124 void ISession.MessageIn(PmlElement message) { | |
125 base.PushReceivedMessage(message); | |
126 } | |
127 | |
128 public override void WriteMessage(PmlElement message) { | |
129 if (_state != ChannelState.Acknowledged) throw new InvalidOperationException("The subchannel is not open"); | |
130 _communicator.WriteSessionMessage(_id, 1, message); | |
131 } | |
132 public override void Close() { | |
133 if (_state != ChannelState.Acknowledged) return; | |
134 _state = ChannelState.Closed; | |
135 _communicator.WriteSessionMessage(_id, 2, null); | |
136 _communicator.RemoveSession(this); | |
137 base.Close(); | |
138 } | |
139 } | |
140 private class PmlChannelRequestReceivedEventArgsA : PmlChannelRequestReceivedEventArgs { | |
141 private PmlCommunicator _communicator; | |
142 private PmlElement _data; | |
143 private bool _accepted, _rejected; | |
144 private UInt32 _sid; | |
145 internal PmlChannelRequestReceivedEventArgsA(PmlCommunicator communicator, UInt32 sid, PmlElement message) { | |
146 _communicator = communicator; | |
147 _data = message; | |
148 _sid = sid; | |
149 _accepted = _rejected = false; | |
150 } | |
151 public UInt32 AcceptSession() { | |
152 if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected"); | |
153 _accepted = true; | |
154 return _sid; | |
155 } | |
156 public override IPmlChannel Accept() { | |
157 if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected"); | |
158 _accepted = true; | |
159 return new PmlSubChannel(_communicator, _sid, true); | |
160 } | |
161 public override void Reject() { | |
162 if (_accepted) throw new InvalidOperationException("The channel has already been accepted"); | |
163 if (_rejected) return; | |
164 _rejected = true; | |
165 _communicator.WriteSessionMessage(_sid, 2, null); | |
166 //_channel.RejectOut(); | |
167 } | |
168 internal void RejectIfNotAccepted() { | |
169 if (!_accepted) Reject(); | |
170 } | |
171 public override PmlElement Data { | |
172 get { | |
173 return _data; | |
174 } | |
175 } | |
176 } | |
177 | |
178 public event EventHandler<PmlCallReceivedEventArgs> CallReceived; | |
179 public event EventHandler<PmlChannelRequestReceivedEventArgs> ChannelRequestReceived; | |
180 public event EventHandler Closed; | |
181 | |
182 private Dictionary<UInt32, ISession> _sessions = new Dictionary<UInt32, ISession>(); | |
183 private Dictionary<UInt32, CSyncRequest> _invocations = new Dictionary<UInt32, CSyncRequest>(); | |
184 private UInt32 pNextSession; | |
185 private UInt32 pNextSyncRequest; | |
186 | |
187 private bool _closed; | |
188 private IPmlChannel _channel; | |
189 | |
190 public IPmlChannel Channel { get { return _channel; } } | |
191 | |
192 public PmlCommunicator(IPmlChannel channel, bool autoStart) { | |
193 _channel = channel; | |
194 if (autoStart) Start(); | |
195 } | |
196 public void Start() { | |
197 _channel.BeginReadMessage(messageReceived, null); | |
198 } | |
199 public void StartSync() { | |
200 while (true) { | |
201 try { | |
202 processMessage(_channel.ReadMessage()); | |
203 } catch (InvalidOperationException ex) { | |
204 Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message); | |
205 closed(); | |
206 _channel.Close(); | |
207 return; | |
208 } catch (Exception ex) { | |
209 Console.WriteLine(ex.ToString()); | |
210 closed(); | |
211 _channel.Close(); | |
212 return; | |
213 } | |
214 } | |
215 } | |
216 public void Close() { | |
217 _channel.Close(); | |
218 } | |
219 public void WriteRawMessage(PmlElement Message) { | |
220 _WriteMessage(Message); | |
221 } | |
222 | |
223 private void _WriteMessage(PmlElement Message) { | |
224 lock (_channel) { | |
225 if (!_channel.IsOpen) throw new InvalidOperationException("Could not write message: the channel is not open"); | |
226 _channel.WriteMessage(Message); | |
227 } | |
228 } | |
229 private void closed() { | |
230 _closed = true; | |
231 lock (_sessions) { | |
232 foreach (ISession S in _sessions.Values) { | |
233 try { | |
234 S.CloseIn(); | |
235 } catch (Exception ex) { | |
236 Console.WriteLine(ex.ToString()); | |
237 } | |
238 } | |
239 _sessions.Clear(); | |
240 } | |
241 lock (_invocations) { | |
242 foreach (CSyncRequest T in _invocations.Values) { | |
243 T.ResetEvent.Set(); | |
244 } | |
245 _invocations.Clear(); | |
246 } | |
247 if (Closed != null) Closed(this, new EventArgs()); | |
248 } | |
249 | |
250 private void messageReceived(IAsyncResult ar) { | |
251 try { | |
252 PmlElement Message = _channel.EndReadMessage(ar); | |
253 processMessage(Message); | |
254 _channel.BeginReadMessage(messageReceived, null); | |
255 } catch (InvalidOperationException ex) { | |
256 Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message); | |
257 closed(); | |
258 _channel.Close(); | |
259 return; | |
260 } catch (Exception ex) { | |
261 Console.WriteLine(ex.ToString()); | |
262 closed(); | |
263 _channel.Close(); | |
264 return; | |
265 } | |
266 } | |
267 private void processMessage(PmlElement Message) { | |
268 if (Message is PmlString) { | |
269 string Cmd = Message.ToString(); | |
270 if (Cmd.Equals("PING")) { | |
271 _WriteMessage(new PmlString("PONG")); | |
272 /*} else if (Cmd.Equals("PONG")) { | |
273 Ping = 0;*/ | |
274 } | |
275 } else if (Message is PmlDictionary) { | |
276 string Cmd = Message.GetChild("CMD").ToString(); | |
277 if (Cmd.Equals("SES")) { | |
278 processSessionMessage(Message); | |
279 } else if (Cmd.Equals("RPL")) { | |
280 UInt32 SID = Message.GetChild("SID").ToUInt32(); | |
281 CSyncRequest SRequest = null; | |
282 lock (_invocations) { | |
283 if (_invocations.TryGetValue(SID, out SRequest)) { | |
284 _invocations.Remove(SID); | |
285 } else { | |
286 Console.WriteLine("UCIS.PML.Connection.Worker Invalid request ID in reply: " + SID.ToString()); | |
287 } | |
288 } | |
289 if (SRequest != null) { | |
290 SRequest.Reply = Message.GetChild("MSG"); | |
291 SRequest.ResetEvent.Set(); | |
292 } | |
293 } else if (Cmd.Equals("REQ") || Cmd.Equals("MSG")) { | |
294 UCIS.ThreadPool.RunCall(processCall, Message); | |
295 } else { | |
296 Console.WriteLine("UCIS.PML.Connection.Worker Invalid command received"); | |
297 } | |
298 } | |
299 } | |
300 private void processSessionMessage(PmlElement Message) { | |
301 UInt32 SID = Message.GetChild("SID").ToUInt32(); | |
302 byte SCMD = Message.GetChild("SCMD").ToByte(); | |
303 PmlElement InnerMsg = Message.GetChild("MSG"); | |
304 ISession Session = null; | |
305 lock (_sessions) if (!_sessions.TryGetValue(SID, out Session)) Session = null; | |
306 switch (SCMD) { | |
307 case 0: //Request | |
308 if (Session != null) { | |
309 try { | |
310 Session.CloseIn(); | |
311 } catch (Exception ex) { | |
312 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage-Request: exception in session.CloseIn: " + ex.ToString()); | |
313 } | |
314 WriteSessionMessage(SID, 2, null); | |
315 } else if (ChannelRequestReceived != null) { | |
316 try { | |
317 PmlChannelRequestReceivedEventArgsA ea = new PmlChannelRequestReceivedEventArgsA(this, SID, InnerMsg); | |
318 ChannelRequestReceived(this, ea); | |
319 ea.RejectIfNotAccepted(); | |
320 } catch (Exception ex) { | |
321 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in ChannelRequestReceived: " + ex.ToString()); | |
322 WriteSessionMessage(SID, 2, null); | |
323 } | |
324 } else { | |
325 WriteSessionMessage(SID, 2, null); | |
326 } | |
327 break; | |
328 case 1: //Message | |
329 if (Session != null) { | |
330 try { | |
331 Session.MessageIn(InnerMsg); | |
332 } catch (Exception ex) { | |
333 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in session.MessageIn: " + ex.ToString()); | |
334 WriteSessionMessage(SID, 2, null); | |
335 } | |
336 } else { | |
337 WriteSessionMessage(SID, 2, null); | |
338 } | |
339 break; | |
340 case 2: //Close | |
341 if (Session != null) { | |
342 try { | |
343 if (InnerMsg != null && !(InnerMsg is PmlNull)) Session.MessageIn(InnerMsg); | |
344 } catch (Exception ex) { | |
345 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage-Close: exception in session.MessageIn: " + ex.ToString()); | |
346 } finally { | |
347 try { | |
348 Session.CloseIn(); | |
349 } catch (Exception ex) { | |
350 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in session.CloseIn: " + ex.ToString()); | |
351 } | |
352 } | |
353 } | |
354 break; | |
355 } | |
356 } | |
357 private void processCall(object state) { | |
358 PmlDictionary Message = (PmlDictionary)state; | |
359 bool wantReply = Message.ContainsKey("SID"); | |
360 UInt32 SID = 0; | |
361 if (wantReply) SID = Message.GetChild("SID").ToUInt32(); | |
362 PmlElement Reply = null; | |
363 try { | |
364 if (CallReceived != null) { | |
365 PmlCallReceivedEventArgs ea = new PmlCallReceivedEventArgs(Message.GetChild("MSG"), wantReply, SID); | |
366 CallReceived(this, ea); | |
367 Reply = ea.Reply; | |
368 } | |
369 } catch (Exception ex) { | |
370 Reply = new PmlDictionary(); | |
371 ((PmlDictionary)Reply).Add("EXCEPTION", new PmlString(ex.ToString())); | |
372 Console.WriteLine(ex.ToString()); | |
373 } finally { | |
374 if (wantReply && Channel.IsOpen) { | |
375 try { | |
376 WriteSyncMessage(SID, true, Reply); | |
377 } catch (Exception ex) { | |
378 Console.WriteLine("UCIS.Pml.PmlCommunicator.processCall: exception: " + ex.ToString()); | |
379 closed(); | |
380 Channel.Close(); | |
381 } | |
382 } | |
383 } | |
384 } | |
385 | |
386 public void Call(PmlElement message) { | |
387 PmlDictionary Msg = new PmlDictionary(); | |
388 Msg.Add("CMD", new PmlString("MSG")); | |
389 Msg.Add("MSG", message); | |
390 _WriteMessage(Msg); | |
391 } | |
392 public PmlElement Invoke(PmlElement message) { | |
393 return Invoke(message, 60000); | |
394 } | |
395 public PmlElement Invoke(PmlElement message, int timeout) { | |
396 if (_closed) throw new InvalidOperationException("Sorry, we're closed."); | |
397 CSyncRequest SyncEvent = new CSyncRequest(); | |
398 UInt32 SID = GetNextSessionId(false); | |
399 lock (_invocations) _invocations.Add(SID, SyncEvent); | |
400 try { | |
401 WriteSyncMessage(SID, false, message); | |
402 if (!SyncEvent.ResetEvent.WaitOne(timeout, false)) { | |
403 if (!_closed) lock (_invocations) _invocations.Remove(SID); | |
404 throw new TimeoutException("The SyncRequest timed out (SID=" + SID.ToString() + ")"); | |
405 } | |
406 } finally { | |
407 lock (_invocations) _invocations.Remove(SID); | |
408 } | |
409 return SyncEvent.Reply; | |
410 } | |
411 | |
412 public IPmlChannel CreateChannel(PmlElement data) { | |
413 UInt32 sid = GetNextSessionId(true); | |
414 PmlSubChannel ch = new PmlSubChannel(this, sid, true); | |
415 WriteSessionMessage(sid, 0, data); | |
416 if (!ch.IsOpen) return null; | |
417 return ch; | |
418 } | |
419 | |
420 private void AddSession(ISession session) { | |
421 if (_closed) return; | |
422 lock (_sessions) _sessions.Add(session.ID, session); | |
423 } | |
424 private void RemoveSession(UInt32 session) { | |
425 if (_closed) return; | |
426 lock (_sessions) _sessions.Remove(session); | |
427 } | |
428 private void RemoveSession(ISession session) { | |
429 RemoveSession(session.ID); | |
430 } | |
431 | |
432 private UInt32 GetNextSessionId(bool IsSession) { | |
433 if (IsSession) { | |
434 lock (_sessions) { | |
435 do { | |
436 unchecked { pNextSession++; } | |
437 } while (_sessions.ContainsKey(pNextSession)); | |
438 return pNextSession; | |
439 } | |
440 } else { | |
441 lock (_invocations) { | |
442 do { | |
443 unchecked { pNextSyncRequest++; } | |
444 } while (_invocations.ContainsKey(pNextSyncRequest)); | |
445 return pNextSyncRequest; | |
446 } | |
447 } | |
448 } | |
449 | |
450 protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) { | |
451 PmlDictionary Msg2 = new PmlDictionary(); | |
452 Msg2.Add("CMD", new PmlString(RPL ? "RPL" : "REQ")); | |
453 Msg2.Add("SID", new PmlInteger(SID)); | |
454 Msg2.Add("MSG", MSG); | |
455 _WriteMessage(Msg2); | |
456 } | |
457 protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) { | |
458 PmlDictionary Msg2 = new PmlDictionary(); | |
459 Msg2.Add("CMD", new PmlString("SES")); | |
460 Msg2.Add("SID", new PmlInteger(SID)); | |
461 Msg2.Add("SCMD", new PmlInteger(CMD)); | |
462 if (MSG != null) Msg2.Add("MSG", MSG); | |
463 _WriteMessage(Msg2); | |
464 } | |
465 | |
466 | |
467 | |
468 /* LegacyPmlCommunicator compatibility */ | |
469 public PmlElement SyncRequest(PmlElement Request) { | |
470 return Invoke(Request); | |
471 } | |
472 public PmlElement SyncRequest(PmlElement Request, int Timeout) { | |
473 return Invoke(Request, Timeout); | |
474 } | |
475 public void SendMessage(PmlElement Message) { | |
476 Call(Message); | |
477 } | |
478 } | |
479 } |