0
|
1 ???using System; |
|
2 using System.Threading; |
|
3 using System.Collections.Generic; |
|
4 using UCIS.Pml; |
|
5 |
|
6 namespace UCIS.Pml { |
|
7 public class PmlCommunicator { |
|
8 private class CSyncRequest { |
|
9 internal PmlElement Reply; |
|
10 internal ManualResetEvent ResetEvent = new ManualResetEvent(false); |
|
11 } |
|
12 private interface ISession { |
|
13 void MessageIn(PmlElement message); |
|
14 void CloseIn(); |
|
15 UInt32 ID { get; } |
|
16 } |
|
17 private class PmlSubChannel : ActivePmlChannel, ISession { |
|
18 private enum ChannelState { Requesting, Acknowledged, Closed } |
|
19 |
|
20 private PmlCommunicator _communicator; |
|
21 private UInt32 _id; |
|
22 private ChannelState _state; |
|
23 |
|
24 internal PmlSubChannel(PmlCommunicator communicator, UInt32 sid, bool accepted) { |
|
25 _communicator = communicator; |
|
26 _id = sid; |
|
27 _state = accepted ? ChannelState.Acknowledged : ChannelState.Requesting; |
|
28 if (accepted) _communicator.AddSession(this); |
|
29 } |
|
30 |
|
31 public override bool IsOpen { get { return _state == ChannelState.Acknowledged; } } |
|
32 |
|
33 uint ISession.ID { get { return _id; } } |
|
34 void ISession.CloseIn() { |
|
35 _state = ChannelState.Closed; |
|
36 _communicator.RemoveSession(this); |
|
37 base.Close(); |
|
38 } |
|
39 void ISession.MessageIn(PmlElement message) { |
|
40 base.PushReceivedMessage(message); |
|
41 } |
|
42 |
|
43 public override void WriteMessage(PmlElement message) { |
|
44 if (_state != ChannelState.Acknowledged) throw new InvalidOperationException("The subchannel is not open"); |
|
45 _communicator.WriteSessionMessage(_id, 1, message); |
|
46 } |
|
47 public override void Close() { |
|
48 if (_state != ChannelState.Acknowledged) return; |
|
49 _state = ChannelState.Closed; |
|
50 _communicator.WriteSessionMessage(_id, 2, null); |
|
51 _communicator.RemoveSession(this); |
|
52 base.Close(); |
|
53 } |
|
54 } |
|
55 private class PmlChannelRequestReceivedEventArgsA : PmlChannelRequestReceivedEventArgs { |
|
56 private PmlCommunicator _communicator; |
|
57 private PmlElement _data; |
|
58 private bool _accepted, _rejected; |
|
59 private UInt32 _sid; |
|
60 internal PmlChannelRequestReceivedEventArgsA(PmlCommunicator communicator, UInt32 sid, PmlElement message) { |
|
61 _communicator = communicator; |
|
62 _data = message; |
|
63 _sid = sid; |
|
64 _accepted = _rejected = false; |
|
65 } |
|
66 public override IPmlChannel Accept() { |
|
67 if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected"); |
|
68 _accepted = true; |
|
69 return new PmlSubChannel(_communicator, _sid, true); |
|
70 } |
|
71 public override void Reject() { |
|
72 if (_accepted) throw new InvalidOperationException("The channel has already been accepted"); |
|
73 if (_rejected) return; |
|
74 _rejected = true; |
|
75 _communicator.WriteSessionMessage(_sid, 2, null); |
|
76 } |
|
77 internal void RejectIfNotAccepted() { |
|
78 if (!_accepted) Reject(); |
|
79 } |
103
|
80 public override PmlElement Data { get { return _data; } } |
0
|
81 } |
|
82 |
|
83 public event EventHandler<PmlCallReceivedEventArgs> CallReceived; |
|
84 public event EventHandler<PmlChannelRequestReceivedEventArgs> ChannelRequestReceived; |
|
85 public event EventHandler Closed; |
|
86 |
|
87 private Dictionary<UInt32, ISession> _sessions = new Dictionary<UInt32, ISession>(); |
|
88 private Dictionary<UInt32, CSyncRequest> _invocations = new Dictionary<UInt32, CSyncRequest>(); |
|
89 private UInt32 pNextSession; |
|
90 private UInt32 pNextSyncRequest; |
|
91 |
|
92 private bool _closed; |
|
93 private IPmlChannel _channel; |
|
94 |
|
95 public IPmlChannel Channel { get { return _channel; } } |
|
96 |
|
97 public PmlCommunicator(IPmlChannel channel, bool autoStart) { |
|
98 _channel = channel; |
|
99 if (autoStart) Start(); |
|
100 } |
|
101 public void Start() { |
|
102 _channel.BeginReadMessage(messageReceived, null); |
|
103 } |
|
104 public void StartSync() { |
|
105 while (true) { |
|
106 try { |
|
107 processMessage(_channel.ReadMessage()); |
|
108 } catch (InvalidOperationException ex) { |
|
109 Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message); |
|
110 closed(); |
|
111 _channel.Close(); |
|
112 return; |
|
113 } catch (Exception ex) { |
|
114 Console.WriteLine(ex.ToString()); |
|
115 closed(); |
|
116 _channel.Close(); |
|
117 return; |
|
118 } |
|
119 } |
|
120 } |
|
121 public void Close() { |
|
122 _channel.Close(); |
|
123 } |
|
124 public void WriteRawMessage(PmlElement Message) { |
|
125 _WriteMessage(Message); |
|
126 } |
|
127 |
|
128 private void _WriteMessage(PmlElement Message) { |
|
129 lock (_channel) { |
|
130 if (!_channel.IsOpen) throw new InvalidOperationException("Could not write message: the channel is not open"); |
|
131 _channel.WriteMessage(Message); |
|
132 } |
|
133 } |
|
134 private void closed() { |
|
135 _closed = true; |
|
136 lock (_sessions) { |
|
137 foreach (ISession S in _sessions.Values) { |
|
138 try { |
|
139 S.CloseIn(); |
|
140 } catch (Exception ex) { |
|
141 Console.WriteLine(ex.ToString()); |
|
142 } |
|
143 } |
|
144 _sessions.Clear(); |
|
145 } |
|
146 lock (_invocations) { |
|
147 foreach (CSyncRequest T in _invocations.Values) { |
|
148 T.ResetEvent.Set(); |
|
149 } |
|
150 _invocations.Clear(); |
|
151 } |
|
152 if (Closed != null) Closed(this, new EventArgs()); |
|
153 } |
|
154 |
|
155 private void messageReceived(IAsyncResult ar) { |
|
156 try { |
|
157 PmlElement Message = _channel.EndReadMessage(ar); |
|
158 processMessage(Message); |
|
159 _channel.BeginReadMessage(messageReceived, null); |
|
160 } catch (InvalidOperationException ex) { |
|
161 Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message); |
|
162 closed(); |
|
163 _channel.Close(); |
|
164 return; |
|
165 } catch (Exception ex) { |
|
166 Console.WriteLine(ex.ToString()); |
|
167 closed(); |
|
168 _channel.Close(); |
|
169 return; |
|
170 } |
|
171 } |
|
172 private void processMessage(PmlElement Message) { |
|
173 if (Message is PmlString) { |
|
174 string Cmd = Message.ToString(); |
|
175 if (Cmd.Equals("PING")) { |
103
|
176 _WriteMessage("PONG"); |
|
177 } else if (Cmd.Equals("PONG")) { |
0
|
178 } |
|
179 } else if (Message is PmlDictionary) { |
|
180 string Cmd = Message.GetChild("CMD").ToString(); |
|
181 if (Cmd.Equals("SES")) { |
|
182 processSessionMessage(Message); |
|
183 } else if (Cmd.Equals("RPL")) { |
|
184 UInt32 SID = Message.GetChild("SID").ToUInt32(); |
|
185 CSyncRequest SRequest = null; |
|
186 lock (_invocations) { |
|
187 if (_invocations.TryGetValue(SID, out SRequest)) { |
|
188 _invocations.Remove(SID); |
|
189 } else { |
|
190 Console.WriteLine("UCIS.PML.Connection.Worker Invalid request ID in reply: " + SID.ToString()); |
|
191 } |
|
192 } |
|
193 if (SRequest != null) { |
|
194 SRequest.Reply = Message.GetChild("MSG"); |
|
195 SRequest.ResetEvent.Set(); |
|
196 } |
|
197 } else if (Cmd.Equals("REQ") || Cmd.Equals("MSG")) { |
|
198 UCIS.ThreadPool.RunCall(processCall, Message); |
|
199 } else { |
|
200 Console.WriteLine("UCIS.PML.Connection.Worker Invalid command received"); |
|
201 } |
|
202 } |
|
203 } |
|
204 private void processSessionMessage(PmlElement Message) { |
|
205 UInt32 SID = Message.GetChild("SID").ToUInt32(); |
|
206 byte SCMD = Message.GetChild("SCMD").ToByte(); |
|
207 PmlElement InnerMsg = Message.GetChild("MSG"); |
|
208 ISession Session = null; |
|
209 lock (_sessions) if (!_sessions.TryGetValue(SID, out Session)) Session = null; |
|
210 switch (SCMD) { |
|
211 case 0: //Request |
|
212 if (Session != null) { |
|
213 try { |
|
214 Session.CloseIn(); |
|
215 } catch (Exception ex) { |
|
216 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage-Request: exception in session.CloseIn: " + ex.ToString()); |
|
217 } |
|
218 WriteSessionMessage(SID, 2, null); |
|
219 } else if (ChannelRequestReceived != null) { |
|
220 try { |
|
221 PmlChannelRequestReceivedEventArgsA ea = new PmlChannelRequestReceivedEventArgsA(this, SID, InnerMsg); |
|
222 ChannelRequestReceived(this, ea); |
|
223 ea.RejectIfNotAccepted(); |
|
224 } catch (Exception ex) { |
|
225 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in ChannelRequestReceived: " + ex.ToString()); |
|
226 WriteSessionMessage(SID, 2, null); |
|
227 } |
|
228 } else { |
|
229 WriteSessionMessage(SID, 2, null); |
|
230 } |
|
231 break; |
|
232 case 1: //Message |
|
233 if (Session != null) { |
|
234 try { |
|
235 Session.MessageIn(InnerMsg); |
|
236 } catch (Exception ex) { |
|
237 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in session.MessageIn: " + ex.ToString()); |
|
238 WriteSessionMessage(SID, 2, null); |
|
239 } |
|
240 } else { |
|
241 WriteSessionMessage(SID, 2, null); |
|
242 } |
|
243 break; |
|
244 case 2: //Close |
|
245 if (Session != null) { |
|
246 try { |
|
247 if (InnerMsg != null && !(InnerMsg is PmlNull)) Session.MessageIn(InnerMsg); |
|
248 } catch (Exception ex) { |
|
249 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage-Close: exception in session.MessageIn: " + ex.ToString()); |
|
250 } finally { |
|
251 try { |
|
252 Session.CloseIn(); |
|
253 } catch (Exception ex) { |
|
254 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in session.CloseIn: " + ex.ToString()); |
|
255 } |
|
256 } |
|
257 } |
|
258 break; |
|
259 } |
|
260 } |
|
261 private void processCall(object state) { |
|
262 PmlDictionary Message = (PmlDictionary)state; |
|
263 bool wantReply = Message.ContainsKey("SID"); |
|
264 UInt32 SID = 0; |
|
265 if (wantReply) SID = Message.GetChild("SID").ToUInt32(); |
|
266 PmlElement Reply = null; |
|
267 try { |
|
268 if (CallReceived != null) { |
|
269 PmlCallReceivedEventArgs ea = new PmlCallReceivedEventArgs(Message.GetChild("MSG"), wantReply, SID); |
|
270 CallReceived(this, ea); |
|
271 Reply = ea.Reply; |
|
272 } |
|
273 } catch (Exception ex) { |
|
274 Reply = new PmlDictionary(); |
|
275 ((PmlDictionary)Reply).Add("EXCEPTION", new PmlString(ex.ToString())); |
|
276 Console.WriteLine(ex.ToString()); |
|
277 } finally { |
|
278 if (wantReply && Channel.IsOpen) { |
|
279 try { |
|
280 WriteSyncMessage(SID, true, Reply); |
|
281 } catch (Exception ex) { |
|
282 Console.WriteLine("UCIS.Pml.PmlCommunicator.processCall: exception: " + ex.ToString()); |
|
283 closed(); |
|
284 Channel.Close(); |
|
285 } |
|
286 } |
|
287 } |
|
288 } |
|
289 |
|
290 public void Call(PmlElement message) { |
|
291 PmlDictionary Msg = new PmlDictionary(); |
|
292 Msg.Add("CMD", new PmlString("MSG")); |
|
293 Msg.Add("MSG", message); |
|
294 _WriteMessage(Msg); |
|
295 } |
|
296 public PmlElement Invoke(PmlElement message) { |
|
297 return Invoke(message, 60000); |
|
298 } |
|
299 public PmlElement Invoke(PmlElement message, int timeout) { |
|
300 if (_closed) throw new InvalidOperationException("Sorry, we're closed."); |
|
301 CSyncRequest SyncEvent = new CSyncRequest(); |
|
302 UInt32 SID = GetNextSessionId(false); |
|
303 lock (_invocations) _invocations.Add(SID, SyncEvent); |
|
304 try { |
|
305 WriteSyncMessage(SID, false, message); |
|
306 if (!SyncEvent.ResetEvent.WaitOne(timeout, false)) { |
|
307 if (!_closed) lock (_invocations) _invocations.Remove(SID); |
|
308 throw new TimeoutException("The SyncRequest timed out (SID=" + SID.ToString() + ")"); |
|
309 } |
|
310 } finally { |
|
311 lock (_invocations) _invocations.Remove(SID); |
|
312 } |
|
313 return SyncEvent.Reply; |
|
314 } |
|
315 |
|
316 public IPmlChannel CreateChannel(PmlElement data) { |
|
317 UInt32 sid = GetNextSessionId(true); |
|
318 PmlSubChannel ch = new PmlSubChannel(this, sid, true); |
|
319 WriteSessionMessage(sid, 0, data); |
|
320 if (!ch.IsOpen) return null; |
|
321 return ch; |
|
322 } |
|
323 |
|
324 private void AddSession(ISession session) { |
|
325 if (_closed) return; |
|
326 lock (_sessions) _sessions.Add(session.ID, session); |
|
327 } |
|
328 private void RemoveSession(UInt32 session) { |
|
329 if (_closed) return; |
|
330 lock (_sessions) _sessions.Remove(session); |
|
331 } |
|
332 private void RemoveSession(ISession session) { |
|
333 RemoveSession(session.ID); |
|
334 } |
|
335 |
|
336 private UInt32 GetNextSessionId(bool IsSession) { |
|
337 if (IsSession) { |
|
338 lock (_sessions) { |
|
339 do { |
|
340 unchecked { pNextSession++; } |
|
341 } while (_sessions.ContainsKey(pNextSession)); |
|
342 return pNextSession; |
|
343 } |
|
344 } else { |
|
345 lock (_invocations) { |
|
346 do { |
|
347 unchecked { pNextSyncRequest++; } |
|
348 } while (_invocations.ContainsKey(pNextSyncRequest)); |
|
349 return pNextSyncRequest; |
|
350 } |
|
351 } |
|
352 } |
|
353 |
|
354 protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) { |
|
355 PmlDictionary Msg2 = new PmlDictionary(); |
|
356 Msg2.Add("CMD", new PmlString(RPL ? "RPL" : "REQ")); |
|
357 Msg2.Add("SID", new PmlInteger(SID)); |
|
358 Msg2.Add("MSG", MSG); |
|
359 _WriteMessage(Msg2); |
|
360 } |
|
361 protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) { |
|
362 PmlDictionary Msg2 = new PmlDictionary(); |
|
363 Msg2.Add("CMD", new PmlString("SES")); |
|
364 Msg2.Add("SID", new PmlInteger(SID)); |
|
365 Msg2.Add("SCMD", new PmlInteger(CMD)); |
|
366 if (MSG != null) Msg2.Add("MSG", MSG); |
|
367 _WriteMessage(Msg2); |
|
368 } |
|
369 |
|
370 |
|
371 |
|
372 /* LegacyPmlCommunicator compatibility */ |
|
373 public PmlElement SyncRequest(PmlElement Request) { |
|
374 return Invoke(Request); |
|
375 } |
|
376 public PmlElement SyncRequest(PmlElement Request, int Timeout) { |
|
377 return Invoke(Request, Timeout); |
|
378 } |
|
379 public void SendMessage(PmlElement Message) { |
|
380 Call(Message); |
|
381 } |
|
382 } |
|
383 } |