comparison Pml/LegacyPmlCommunicator.cs @ 0:3ab940a0c7a0

Initial commit
author Ivo Smits <Ivo@UCIS.nl>
date Tue, 11 Sep 2012 16:28:53 +0200
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3ab940a0c7a0
1 using System;
2 using System.Threading;
3 using System.Collections.Generic;
4 using UCIS.Pml;
5
6 namespace UCIS.Pml {
7 public class LegacyPmlCommunicator {
8 private class CSyncRequest {
9 internal PmlElement Reply;
10 internal ManualResetEvent ResetEvent = new ManualResetEvent(false);
11 }
12 public abstract class SessionBase {
13 private bool pActive;
14 private LegacyPmlCommunicator pConnection;
15 private UInt32 pSID;
16
17 public uint SID { get { return pSID; } }
18 public bool Active { get { return pActive; } }
19 public LegacyPmlCommunicator Communicator { get { return pConnection; } }
20
21 protected SessionBase(LegacyPmlCommunicator Connection) {
22 pConnection = Connection;
23 }
24
25 protected void Accept(UInt32 SID) {
26 if (pActive) throw new InvalidOperationException("Session is active");
27 pSID = SID;
28 lock (pConnection.pSessions) pConnection.pSessions.Add(pSID, this);
29 pActive = true;
30 }
31 protected void Request() {
32 Request(null);
33 }
34 protected void Request(PmlElement Message) {
35 if (pActive) throw new InvalidOperationException("Session is active");
36 pSID = pConnection.GetNextSessionId(true);
37 lock (pConnection.pSessions) pConnection.pSessions.Add(pSID, this);
38 pConnection.WriteSessionMessage(pSID, 0, Message);
39 pActive = true;
40 }
41
42 protected internal abstract void MessageIn(PmlElement Message);
43
44 protected void SendMessage(PmlElement Message) {
45 if (!pActive) throw new InvalidOperationException("Session is not active");
46 pConnection.WriteSessionMessage(pSID, 1, Message);
47 }
48
49 public void Close() {
50 Close(null);
51 }
52 public void Close(PmlElement Message) {
53 if (!pActive) return; // throw new InvalidOperationException("Session is not active");
54 pConnection.WriteSessionMessage(pSID, 2, Message);
55 ClosedA();
56 }
57
58 internal void ClosedA() {
59 pActive = false;
60 lock (pConnection.pSessions) pConnection.pSessions.Remove(pSID);
61 }
62
63 internal void ClosedB(PmlElement Message) {
64 pActive = false;
65 Closed(Message);
66 }
67
68 protected virtual void Closed(PmlElement Message) {
69 }
70 }
71 public class Session : SessionBase {
72 public event MessageReceivedEventHandler MessageReceived;
73 public delegate void MessageReceivedEventHandler(PmlElement Message);
74 public event SessionClosedEventHandler SessionClosed;
75 public delegate void SessionClosedEventHandler(PmlElement Message);
76
77 public Session(LegacyPmlCommunicator Connection) : base(Connection) { }
78
79 public new void Accept(UInt32 SID) {
80 base.Accept(SID);
81 }
82 public new void Request() {
83 Request(null);
84 }
85 public new void Request(PmlElement Message) {
86 base.Request(Message);
87 }
88
89 protected internal override void MessageIn(PmlElement Message) {
90 if (MessageReceived != null) MessageReceived(Message);
91 }
92
93 public new void SendMessage(PmlElement Message) {
94 base.SendMessage(Message);
95 }
96
97 protected override void Closed(PmlElement Message) {
98 if (SessionClosed != null) SessionClosed(Message);
99 }
100 }
101
102 private Dictionary<UInt32, SessionBase> pSessions = new Dictionary<UInt32, SessionBase>();
103 private UInt32 pNextSession;
104 private Dictionary<UInt32, CSyncRequest> pSyncRequests = new Dictionary<UInt32, CSyncRequest>();
105 private UInt32 pNextSyncRequest;
106
107 private IPmlChannel _channel;
108
109 public event MessageReceivedEventHandler MessageReceived;
110 public delegate void MessageReceivedEventHandler(PmlElement Message);
111 public event RequestReceivedEventHandler RequestReceived;
112 public delegate void RequestReceivedEventHandler(PmlElement Request, ref PmlElement Reply);
113 public event SessionRequestReceivedEventHandler SessionRequestReceived;
114 public delegate void SessionRequestReceivedEventHandler(PmlElement Request, uint SID);
115 public event EventHandler Closed;
116
117 public ICollection<SessionBase> Sessions { get { return (ICollection<SessionBase>)pSessions.Values; } }
118 public int SyncRequests { get { return pSyncRequests.Count; } }
119
120 public LegacyPmlCommunicator(IPmlChannel channel, bool autoStart) {
121 _channel = channel;
122 if (autoStart) Start();
123 //_channel.BeginReadMessage(messageReceived, null);
124 //_channel.MessageReceived += messageReceived;
125 //_channel.Closed += closed;
126 }
127 public void Start() {
128 _channel.BeginReadMessage(messageReceived, null);
129 }
130
131 public IPmlChannel Channel { get { return _channel; } }
132
133 public void Close() {
134 //_channel.MessageReceived -= messageReceived;
135 //_channel.Closed -= closed;
136 _channel.Close();
137 }
138
139 private void _WriteMessage(PmlElement Message) {
140 lock (_channel) {
141 if (_channel.IsOpen) {
142 _channel.WriteMessage(Message);
143 } else {
144 throw new InvalidOperationException("Could not write message: the channel is not open");
145 }
146 }
147 }
148
149 private UInt32 GetNextSessionId(bool IsSession) {
150 if (IsSession) {
151 lock (pSessions) {
152 do {
153 if (pNextSession == UInt32.MaxValue) {
154 pNextSession = 0;
155 } else {
156 pNextSession += (uint)1;
157 }
158 }
159 while (pSessions.ContainsKey(pNextSession));
160 return pNextSession;
161 }
162 } else {
163 lock (pSyncRequests) {
164 do {
165 if (pNextSyncRequest == UInt32.MaxValue) {
166 pNextSyncRequest = 0;
167 } else {
168 pNextSyncRequest += (uint)1;
169 }
170 }
171 while (pSyncRequests.ContainsKey(pNextSyncRequest));
172 return pNextSyncRequest;
173 }
174 }
175 }
176
177 protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) {
178 PmlDictionary Msg2 = new PmlDictionary();
179 Msg2.Add("CMD", new PmlString("SES"));
180 Msg2.Add("SID", new PmlInteger(SID));
181 Msg2.Add("SCMD", new PmlInteger(CMD));
182 Msg2.Add("MSG", MSG);
183 _WriteMessage(Msg2);
184 }
185
186 protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) {
187 PmlDictionary Msg2 = new PmlDictionary();
188 if (RPL) {
189 Msg2.Add("CMD", new PmlString("RPL"));
190 } else {
191 Msg2.Add("CMD", new PmlString("REQ"));
192 }
193 Msg2.Add("SID", new PmlInteger(SID));
194 Msg2.Add("MSG", MSG);
195 _WriteMessage(Msg2);
196 }
197
198 private void messageReceived(IAsyncResult ar) {
199 PmlElement Message;
200 try {
201 Message = _channel.EndReadMessage(ar);
202 _channel.BeginReadMessage(messageReceived, null);
203 } catch (InvalidOperationException ex) {
204 Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message);
205 closed();
206 _channel.Close();
207 return;
208 } catch (Exception ex) {
209 Console.WriteLine(ex.ToString());
210 closed();
211 _channel.Close();
212 return;
213 }
214 int Ping = 0;
215 if (Message == null) {
216 if (Ping > 2) {
217 _channel.Close();
218 return;
219 } else {
220 _WriteMessage(new PmlString("PING"));
221 }
222 Ping += 1;
223 } else if (Message is PmlString) {
224 string Cmd = Message.ToString();
225 if (Cmd.Equals("PING")) {
226 _WriteMessage(new PmlString("PONG"));
227 } else if (Cmd.Equals("PONG")) {
228 Ping = 0;
229 }
230 } else if (Message is PmlDictionary) {
231 string Cmd = null;
232 Cmd = Message.GetChild("CMD").ToString();
233 if (Cmd.Equals("SES")) {
234 UInt32 SID = default(UInt32);
235 byte SCMD = 0;
236 SessionBase Session = default(SessionBase);
237 PmlElement InnerMsg = default(PmlElement);
238 SID = Message.GetChild("SID").ToUInt32();
239 SCMD = Message.GetChild("SCMD").ToByte();
240 InnerMsg = Message.GetChild("MSG");
241 lock (pSessions) {
242 if (pSessions.ContainsKey(SID)) {
243 Session = pSessions[SID];
244 } else {
245 Session = null;
246 }
247 }
248 if (SCMD == 0) {
249 if (Session == null) {
250 if (SessionRequestReceived != null) {
251 try {
252 SessionRequestReceived(InnerMsg, SID);
253 } catch (Exception ex) {
254 Console.WriteLine("Exception in LegacyPmlCommnuicator.messageReceived->SessionRequestReceived: " + ex.ToString());
255 WriteSessionMessage(SID, 2, null);
256 }
257 }
258 } else {
259 try {
260 Session.ClosedA();
261 Session.ClosedB(null);
262 } catch (Exception ex) {
263 Console.WriteLine("Exception in LegacyPmlCommnuicator.messageReceived->Session.ClosedA/B: " + ex.ToString());
264 }
265 WriteSessionMessage(SID, 2, null);
266 }
267 } else if (SCMD == 1) {
268 if (Session == null) {
269 WriteSessionMessage(SID, 2, null);
270 } else {
271 try {
272 Session.MessageIn(InnerMsg);
273 } catch (Exception ex) {
274 Console.WriteLine("Exception in LegacyPmlCommnuicator.messageReceived->Session.MessageIn: " + ex.ToString());
275 WriteSessionMessage(SID, 2, null);
276 }
277 }
278 } else if (SCMD == 2) {
279 if (Session != null) {
280 try {
281 Session.ClosedA();
282 Session.ClosedB(InnerMsg);
283 } catch (Exception ex) {
284 Console.WriteLine("Exception in LegacyPmlCommnuicator.messageReceived->Session.ClosedA/B: " + ex.ToString());
285 }
286 }
287 }
288 } else if (Cmd.Equals("RPL")) {
289 UInt32 SID = default(UInt32);
290 CSyncRequest SRequest = null;
291 SID = Message.GetChild("SID").ToUInt32();
292 lock (pSyncRequests) {
293 if (pSyncRequests.TryGetValue(SID, out SRequest)) {
294 pSyncRequests.Remove(SID);
295 } else {
296 Console.WriteLine("UCIS.PML.Connection.Worker Invalid request ID in reply: " + SID.ToString());
297 }
298 }
299 if (SRequest != null) {
300 SRequest.Reply = Message.GetChild("MSG");
301 SRequest.ResetEvent.Set();
302 }
303 } else if (Cmd.Equals("REQ")) {
304 UCIS.ThreadPool.RunCall(SyncRequestHandler, Message);
305 //System.Threading.ThreadPool.QueueUserWorkItem(new System.Threading.WaitCallback(SyncRequestHandler), Message);
306 } else if (Cmd.Equals("MSG")) {
307 PmlElement InnerMsg = Message.GetChild("MSG");
308 if (MessageReceived != null) MessageReceived(InnerMsg);
309 } else {
310 throw new InvalidOperationException("Invalid operation");
311 }
312 }
313 }
314 private void closed() {
315 //_channel.MessageReceived -= messageReceived;
316 //_channel.Closed -= closed;
317 Console.WriteLine("UCIS.PML.Connection: Connection closed");
318 try {
319 SessionBase[] sessions;
320 lock (pSessions) {
321 sessions = new SessionBase[pSessions.Count];
322 pSessions.Values.CopyTo(sessions, 0);
323 }
324 foreach (SessionBase S in sessions) {
325 try {
326 S.ClosedB(null);
327 } catch (Exception ex) {
328 Console.WriteLine(ex.ToString());
329 }
330 }
331 } catch (Exception ex) {
332 Console.WriteLine(ex.ToString());
333 }
334 lock (pSessions) pSessions.Clear();
335 try {
336 CSyncRequest[] reqs;
337 lock (pSyncRequests) {
338 reqs = new CSyncRequest[pSyncRequests.Count];
339 pSyncRequests.Values.CopyTo(reqs, 0);
340 }
341 foreach (CSyncRequest T in reqs) {
342 T.ResetEvent.Set();
343 }
344 } catch (Exception ex) {
345 Console.WriteLine(ex.ToString());
346 }
347 lock (pSyncRequests) pSyncRequests.Clear();
348 if (Closed != null) Closed(this, new EventArgs());
349 }
350
351 private void SyncRequestHandler(object state) {
352 PmlDictionary Message = (PmlDictionary)state;
353 PmlElement Reply = null;
354 UInt32 SID = 0;
355 try {
356 SID = Message.GetChild("SID").ToUInt32();
357 PmlElement InnerMsg = Message.GetChild("MSG");
358 if (RequestReceived != null) {
359 RequestReceived(InnerMsg, ref Reply);
360 }
361 } catch (Exception ex) {
362 Reply = new PmlDictionary();
363 ((PmlDictionary)Reply).Add("EXCEPTION", new PmlString(ex.ToString()));
364 Console.WriteLine(ex.ToString());
365 }
366 try {
367 WriteSyncMessage(SID, true, Reply);
368 } catch (Exception ex) {
369 Console.WriteLine("Exception: " + ex.ToString());
370 }
371 }
372
373 public PmlElement SyncRequest(PmlElement Request) {
374 return SyncRequest(Request, 30000);
375 }
376 public PmlElement SyncRequest(PmlElement Request, int Timeout) {
377 CSyncRequest SyncEvent = new CSyncRequest();
378 UInt32 SID = GetNextSessionId(false);
379 lock (pSyncRequests) pSyncRequests.Add(SID, SyncEvent);
380 try {
381 WriteSyncMessage(SID, false, Request);
382 if (!SyncEvent.ResetEvent.WaitOne(Timeout, false)) {
383 lock (pSyncRequests) pSyncRequests.Remove(SID);
384 throw new TimeoutException("The SyncRequest timed out (SID=" + SID.ToString() + ")");
385 }
386 } finally {
387 lock (pSyncRequests) pSyncRequests.Remove(SID);
388 }
389 return SyncEvent.Reply;
390 }
391
392 public void SendMessage(PmlElement Message) {
393 PmlDictionary Msg = new PmlDictionary();
394 Msg.Add("CMD", new PmlString("MSG"));
395 Msg.Add("MSG", Message);
396 _WriteMessage(Msg);
397 }
398
399 public void SendRawMessage(PmlElement Message) {
400 _WriteMessage(Message);
401 }
402 }
403 }