0
|
1 ???using System; |
|
2 using System.Threading; |
|
3 using System.Collections.Generic; |
|
4 using UCIS.Pml; |
|
5 |
|
6 namespace UCIS.Pml { |
|
7 public class PmlCommunicator { |
|
8 private class CSyncRequest { |
|
9 internal PmlElement Reply; |
|
10 } |
|
11 private interface ISession { |
|
12 void MessageIn(PmlElement message); |
|
13 void CloseIn(); |
|
14 UInt32 ID { get; } |
|
15 } |
|
16 private class PmlSubChannel : ActivePmlChannel, ISession { |
|
17 private enum ChannelState { Requesting, Acknowledged, Closed } |
|
18 |
|
19 private PmlCommunicator _communicator; |
|
20 private UInt32 _id; |
|
21 private ChannelState _state; |
|
22 |
|
23 internal PmlSubChannel(PmlCommunicator communicator, UInt32 sid, bool accepted) { |
|
24 _communicator = communicator; |
|
25 _id = sid; |
|
26 _state = accepted ? ChannelState.Acknowledged : ChannelState.Requesting; |
|
27 if (accepted) _communicator.AddSession(this); |
|
28 } |
|
29 |
|
30 public override bool IsOpen { get { return _state == ChannelState.Acknowledged; } } |
|
31 |
|
32 uint ISession.ID { get { return _id; } } |
|
33 void ISession.CloseIn() { |
|
34 _state = ChannelState.Closed; |
|
35 _communicator.RemoveSession(this); |
|
36 base.Close(); |
|
37 } |
|
38 void ISession.MessageIn(PmlElement message) { |
|
39 base.PushReceivedMessage(message); |
|
40 } |
|
41 |
|
42 public override void WriteMessage(PmlElement message) { |
|
43 if (_state != ChannelState.Acknowledged) throw new InvalidOperationException("The subchannel is not open"); |
|
44 _communicator.WriteSessionMessage(_id, 1, message); |
|
45 } |
|
46 public override void Close() { |
|
47 if (_state != ChannelState.Acknowledged) return; |
|
48 _state = ChannelState.Closed; |
|
49 _communicator.WriteSessionMessage(_id, 2, null); |
|
50 _communicator.RemoveSession(this); |
|
51 base.Close(); |
|
52 } |
|
53 } |
|
54 private class PmlChannelRequestReceivedEventArgsA : PmlChannelRequestReceivedEventArgs { |
|
55 private PmlCommunicator _communicator; |
|
56 private PmlElement _data; |
|
57 private bool _accepted, _rejected; |
|
58 private UInt32 _sid; |
|
59 internal PmlChannelRequestReceivedEventArgsA(PmlCommunicator communicator, UInt32 sid, PmlElement message) { |
|
60 _communicator = communicator; |
|
61 _data = message; |
|
62 _sid = sid; |
|
63 _accepted = _rejected = false; |
|
64 } |
|
65 public override IPmlChannel Accept() { |
|
66 if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected"); |
|
67 _accepted = true; |
|
68 return new PmlSubChannel(_communicator, _sid, true); |
|
69 } |
|
70 public override void Reject() { |
|
71 if (_accepted) throw new InvalidOperationException("The channel has already been accepted"); |
|
72 if (_rejected) return; |
|
73 _rejected = true; |
|
74 _communicator.WriteSessionMessage(_sid, 2, null); |
|
75 } |
|
76 internal void RejectIfNotAccepted() { |
|
77 if (!_accepted) Reject(); |
|
78 } |
103
|
79 public override PmlElement Data { get { return _data; } } |
0
|
80 } |
|
81 |
|
82 public event EventHandler<PmlCallReceivedEventArgs> CallReceived; |
|
83 public event EventHandler<PmlChannelRequestReceivedEventArgs> ChannelRequestReceived; |
|
84 public event EventHandler Closed; |
|
85 |
|
86 private Dictionary<UInt32, ISession> _sessions = new Dictionary<UInt32, ISession>(); |
|
87 private Dictionary<UInt32, CSyncRequest> _invocations = new Dictionary<UInt32, CSyncRequest>(); |
|
88 private UInt32 pNextSession; |
|
89 private UInt32 pNextSyncRequest; |
|
90 |
|
91 private bool _closed; |
|
92 private IPmlChannel _channel; |
|
93 |
|
94 public IPmlChannel Channel { get { return _channel; } } |
|
95 |
|
96 public PmlCommunicator(IPmlChannel channel, bool autoStart) { |
|
97 _channel = channel; |
|
98 if (autoStart) Start(); |
|
99 } |
|
100 public void Start() { |
|
101 _channel.BeginReadMessage(messageReceived, null); |
|
102 } |
|
103 public void StartSync() { |
|
104 while (true) { |
|
105 try { |
|
106 processMessage(_channel.ReadMessage()); |
|
107 } catch (InvalidOperationException ex) { |
|
108 Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message); |
|
109 closed(); |
|
110 _channel.Close(); |
|
111 return; |
|
112 } catch (Exception ex) { |
|
113 Console.WriteLine(ex.ToString()); |
|
114 closed(); |
|
115 _channel.Close(); |
|
116 return; |
|
117 } |
|
118 } |
|
119 } |
|
120 public void Close() { |
|
121 _channel.Close(); |
|
122 } |
|
123 public void WriteRawMessage(PmlElement Message) { |
|
124 _WriteMessage(Message); |
|
125 } |
|
126 |
|
127 private void _WriteMessage(PmlElement Message) { |
|
128 lock (_channel) { |
|
129 if (!_channel.IsOpen) throw new InvalidOperationException("Could not write message: the channel is not open"); |
|
130 _channel.WriteMessage(Message); |
|
131 } |
|
132 } |
|
133 private void closed() { |
|
134 _closed = true; |
|
135 lock (_sessions) { |
|
136 foreach (ISession S in _sessions.Values) { |
|
137 try { |
|
138 S.CloseIn(); |
|
139 } catch (Exception ex) { |
|
140 Console.WriteLine(ex.ToString()); |
|
141 } |
|
142 } |
|
143 _sessions.Clear(); |
|
144 } |
|
145 lock (_invocations) { |
104
|
146 foreach (CSyncRequest T in _invocations.Values) lock (T) Monitor.Pulse(T); |
0
|
147 _invocations.Clear(); |
|
148 } |
|
149 if (Closed != null) Closed(this, new EventArgs()); |
|
150 } |
|
151 |
|
152 private void messageReceived(IAsyncResult ar) { |
|
153 try { |
|
154 PmlElement Message = _channel.EndReadMessage(ar); |
|
155 processMessage(Message); |
|
156 _channel.BeginReadMessage(messageReceived, null); |
|
157 } catch (InvalidOperationException ex) { |
|
158 Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message); |
|
159 closed(); |
|
160 _channel.Close(); |
|
161 return; |
|
162 } catch (Exception ex) { |
|
163 Console.WriteLine(ex.ToString()); |
|
164 closed(); |
|
165 _channel.Close(); |
|
166 return; |
|
167 } |
|
168 } |
|
169 private void processMessage(PmlElement Message) { |
|
170 if (Message is PmlString) { |
|
171 string Cmd = Message.ToString(); |
|
172 if (Cmd.Equals("PING")) { |
103
|
173 _WriteMessage("PONG"); |
|
174 } else if (Cmd.Equals("PONG")) { |
0
|
175 } |
|
176 } else if (Message is PmlDictionary) { |
|
177 string Cmd = Message.GetChild("CMD").ToString(); |
|
178 if (Cmd.Equals("SES")) { |
|
179 processSessionMessage(Message); |
|
180 } else if (Cmd.Equals("RPL")) { |
|
181 UInt32 SID = Message.GetChild("SID").ToUInt32(); |
|
182 CSyncRequest SRequest = null; |
|
183 lock (_invocations) { |
|
184 if (_invocations.TryGetValue(SID, out SRequest)) { |
|
185 _invocations.Remove(SID); |
|
186 } else { |
|
187 Console.WriteLine("UCIS.PML.Connection.Worker Invalid request ID in reply: " + SID.ToString()); |
|
188 } |
|
189 } |
|
190 if (SRequest != null) { |
|
191 SRequest.Reply = Message.GetChild("MSG"); |
104
|
192 lock (SRequest) Monitor.Pulse(SRequest); |
0
|
193 } |
|
194 } else if (Cmd.Equals("REQ") || Cmd.Equals("MSG")) { |
|
195 UCIS.ThreadPool.RunCall(processCall, Message); |
|
196 } else { |
|
197 Console.WriteLine("UCIS.PML.Connection.Worker Invalid command received"); |
|
198 } |
|
199 } |
|
200 } |
|
201 private void processSessionMessage(PmlElement Message) { |
|
202 UInt32 SID = Message.GetChild("SID").ToUInt32(); |
|
203 byte SCMD = Message.GetChild("SCMD").ToByte(); |
|
204 PmlElement InnerMsg = Message.GetChild("MSG"); |
|
205 ISession Session = null; |
|
206 lock (_sessions) if (!_sessions.TryGetValue(SID, out Session)) Session = null; |
|
207 switch (SCMD) { |
|
208 case 0: //Request |
|
209 if (Session != null) { |
|
210 try { |
|
211 Session.CloseIn(); |
|
212 } catch (Exception ex) { |
|
213 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage-Request: exception in session.CloseIn: " + ex.ToString()); |
|
214 } |
|
215 WriteSessionMessage(SID, 2, null); |
|
216 } else if (ChannelRequestReceived != null) { |
|
217 try { |
|
218 PmlChannelRequestReceivedEventArgsA ea = new PmlChannelRequestReceivedEventArgsA(this, SID, InnerMsg); |
|
219 ChannelRequestReceived(this, ea); |
|
220 ea.RejectIfNotAccepted(); |
|
221 } catch (Exception ex) { |
|
222 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in ChannelRequestReceived: " + ex.ToString()); |
|
223 WriteSessionMessage(SID, 2, null); |
|
224 } |
|
225 } else { |
|
226 WriteSessionMessage(SID, 2, null); |
|
227 } |
|
228 break; |
|
229 case 1: //Message |
|
230 if (Session != null) { |
|
231 try { |
|
232 Session.MessageIn(InnerMsg); |
|
233 } catch (Exception ex) { |
|
234 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in session.MessageIn: " + ex.ToString()); |
|
235 WriteSessionMessage(SID, 2, null); |
|
236 } |
|
237 } else { |
|
238 WriteSessionMessage(SID, 2, null); |
|
239 } |
|
240 break; |
|
241 case 2: //Close |
|
242 if (Session != null) { |
|
243 try { |
|
244 if (InnerMsg != null && !(InnerMsg is PmlNull)) Session.MessageIn(InnerMsg); |
|
245 } catch (Exception ex) { |
|
246 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage-Close: exception in session.MessageIn: " + ex.ToString()); |
|
247 } finally { |
|
248 try { |
|
249 Session.CloseIn(); |
|
250 } catch (Exception ex) { |
|
251 Console.WriteLine("UCIS.Pml.PmlCommunicator.processSessionMessage: exception in session.CloseIn: " + ex.ToString()); |
|
252 } |
|
253 } |
|
254 } |
|
255 break; |
|
256 } |
|
257 } |
|
258 private void processCall(object state) { |
|
259 PmlDictionary Message = (PmlDictionary)state; |
|
260 bool wantReply = Message.ContainsKey("SID"); |
|
261 UInt32 SID = 0; |
|
262 if (wantReply) SID = Message.GetChild("SID").ToUInt32(); |
|
263 PmlElement Reply = null; |
|
264 try { |
|
265 if (CallReceived != null) { |
|
266 PmlCallReceivedEventArgs ea = new PmlCallReceivedEventArgs(Message.GetChild("MSG"), wantReply, SID); |
|
267 CallReceived(this, ea); |
|
268 Reply = ea.Reply; |
|
269 } |
|
270 } catch (Exception ex) { |
|
271 Reply = new PmlDictionary(); |
|
272 ((PmlDictionary)Reply).Add("EXCEPTION", new PmlString(ex.ToString())); |
|
273 Console.WriteLine(ex.ToString()); |
|
274 } finally { |
|
275 if (wantReply && Channel.IsOpen) { |
|
276 try { |
|
277 WriteSyncMessage(SID, true, Reply); |
|
278 } catch (Exception ex) { |
|
279 Console.WriteLine("UCIS.Pml.PmlCommunicator.processCall: exception: " + ex.ToString()); |
|
280 closed(); |
|
281 Channel.Close(); |
|
282 } |
|
283 } |
|
284 } |
|
285 } |
|
286 |
|
287 public void Call(PmlElement message) { |
104
|
288 PmlDictionary Msg = new PmlDictionary() { { "CMD", "MSG" }, { "MSG", message } }; |
0
|
289 _WriteMessage(Msg); |
|
290 } |
|
291 public PmlElement Invoke(PmlElement message) { |
|
292 return Invoke(message, 60000); |
|
293 } |
|
294 public PmlElement Invoke(PmlElement message, int timeout) { |
|
295 if (_closed) throw new InvalidOperationException("Sorry, we're closed."); |
|
296 CSyncRequest SyncEvent = new CSyncRequest(); |
104
|
297 UInt32 SID; |
|
298 lock (_invocations) { |
|
299 SID = GetNextSessionId(ref pNextSyncRequest, _invocations); |
|
300 _invocations.Add(SID, SyncEvent); |
|
301 } |
0
|
302 try { |
|
303 WriteSyncMessage(SID, false, message); |
104
|
304 Boolean success; |
|
305 lock (SyncEvent) success = Monitor.Wait(SyncEvent, timeout); |
|
306 if (!success) throw new TimeoutException("The SyncRequest timed out (SID=" + SID.ToString() + ")"); |
0
|
307 } finally { |
|
308 lock (_invocations) _invocations.Remove(SID); |
|
309 } |
|
310 return SyncEvent.Reply; |
|
311 } |
|
312 |
|
313 public IPmlChannel CreateChannel(PmlElement data) { |
104
|
314 UInt32 sid = GetNextSessionId(ref pNextSession, _sessions); |
0
|
315 PmlSubChannel ch = new PmlSubChannel(this, sid, true); |
|
316 WriteSessionMessage(sid, 0, data); |
|
317 if (!ch.IsOpen) return null; |
|
318 return ch; |
|
319 } |
|
320 |
|
321 private void AddSession(ISession session) { |
|
322 if (_closed) return; |
|
323 lock (_sessions) _sessions.Add(session.ID, session); |
|
324 } |
|
325 private void RemoveSession(UInt32 session) { |
|
326 if (_closed) return; |
|
327 lock (_sessions) _sessions.Remove(session); |
|
328 } |
|
329 private void RemoveSession(ISession session) { |
|
330 RemoveSession(session.ID); |
|
331 } |
|
332 |
104
|
333 private static UInt32 GetNextSessionId<T>(ref UInt32 id, IDictionary<UInt32, T> dictionary) { |
|
334 lock (dictionary) { |
|
335 do { |
|
336 id++; |
|
337 } while (dictionary.ContainsKey(id)); |
|
338 return id; |
0
|
339 } |
|
340 } |
|
341 |
|
342 protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) { |
104
|
343 PmlDictionary Msg2 = new PmlDictionary() { |
|
344 { "CMD", RPL ? "RPL" : "REQ" }, |
|
345 { "SID", SID }, |
|
346 { "MSG", MSG }, |
|
347 }; |
0
|
348 _WriteMessage(Msg2); |
|
349 } |
|
350 protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) { |
104
|
351 PmlDictionary Msg2 = new PmlDictionary() { |
|
352 { "CMD", "SES" }, |
|
353 { "SID", SID }, |
|
354 { "SCMD", CMD }, |
|
355 }; |
0
|
356 if (MSG != null) Msg2.Add("MSG", MSG); |
|
357 _WriteMessage(Msg2); |
|
358 } |
|
359 |
|
360 /* LegacyPmlCommunicator compatibility */ |
|
361 public PmlElement SyncRequest(PmlElement Request) { |
|
362 return Invoke(Request); |
|
363 } |
|
364 public PmlElement SyncRequest(PmlElement Request, int Timeout) { |
|
365 return Invoke(Request, Timeout); |
|
366 } |
|
367 public void SendMessage(PmlElement Message) { |
|
368 Call(Message); |
|
369 } |
|
370 } |
|
371 } |