Mercurial > hg > ucis.core
comparison Pml/PmlConnection.cs @ 0:3ab940a0c7a0
Initial commit
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Tue, 11 Sep 2012 16:28:53 +0200 |
parents | |
children | 57b4c2f895d1 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3ab940a0c7a0 |
---|---|
1 using System; | |
2 using System.Collections.Generic; | |
3 using System.Text; | |
4 using UCIS.Net; | |
5 using System.Net.Sockets; | |
6 using System.Threading; | |
7 using System.IO; | |
8 | |
9 namespace UCIS.Pml { | |
10 /*public class PmlConnection : LegacyPmlCommunicator { | |
11 public PmlConnection(Socket Socket) : this(new TCPPmlChannel(Socket)) { } | |
12 public PmlConnection(TCPStream Stream) : this(new TCPPmlChannel(Stream)) { } | |
13 public PmlConnection(Stream Stream) : this(new PmlBinaryRW(Stream)) {} | |
14 public PmlConnection(IPmlRW RW) : this(new PmlChannel(RW)) { } | |
15 public PmlConnection(IPmlChannel CH) : base(CH) { } | |
16 }*/ | |
17 public class PmlConnection { | |
18 private class CSyncRequest { | |
19 internal PmlElement Reply; | |
20 internal ManualResetEvent ResetEvent = new ManualResetEvent(false); | |
21 } | |
22 public abstract class SessionBase { | |
23 private bool pActive; | |
24 private PmlConnection pConnection; | |
25 private UInt32 pSID; | |
26 | |
27 protected SessionBase(PmlConnection Connection) { | |
28 pConnection = Connection; | |
29 } | |
30 | |
31 protected void Accept(UInt32 SID) { | |
32 if (pActive) throw new InvalidOperationException("Session is active"); | |
33 pSID = SID; | |
34 lock (pConnection.pSessions) pConnection.pSessions.Add(pSID, this); | |
35 pActive = true; | |
36 } | |
37 protected void Request() { | |
38 Request(null); | |
39 } | |
40 protected void Request(PmlElement Message) { | |
41 if (pActive) throw new InvalidOperationException("Session is active"); | |
42 pSID = pConnection.GetNextSessionId(true); | |
43 lock (pConnection.pSessions) pConnection.pSessions.Add(pSID, this); | |
44 pConnection.WriteSessionMessage(pSID, 0, Message); | |
45 pActive = true; | |
46 } | |
47 | |
48 protected internal abstract void MessageIn(PmlElement Message); | |
49 | |
50 protected void SendMessage(PmlElement Message) { | |
51 if (!pActive) throw new InvalidOperationException("Session is not active"); | |
52 pConnection.WriteSessionMessage(pSID, 1, Message); | |
53 } | |
54 | |
55 public void Close() { | |
56 Close(null); | |
57 } | |
58 public void Close(PmlElement Message) { | |
59 if (!pActive) throw new InvalidOperationException("Session is not active"); | |
60 pConnection.WriteSessionMessage(pSID, 2, Message); | |
61 ClosedA(); | |
62 } | |
63 | |
64 internal void ClosedA() { | |
65 pActive = false; | |
66 lock (pConnection.pSessions) pConnection.pSessions.Remove(pSID); | |
67 } | |
68 | |
69 internal void ClosedB(PmlElement Message) { | |
70 pActive = false; | |
71 Closed(Message); | |
72 } | |
73 | |
74 protected virtual void Closed(PmlElement Message) { | |
75 } | |
76 } | |
77 public class Session : SessionBase { | |
78 public event MessageReceivedEventHandler MessageReceived; | |
79 public delegate void MessageReceivedEventHandler(PmlElement Message); | |
80 public event SessionClosedEventHandler SessionClosed; | |
81 public delegate void SessionClosedEventHandler(PmlElement Message); | |
82 | |
83 public Session(PmlConnection Connection) : base(Connection) { } | |
84 | |
85 public new void Accept(UInt32 SID) { | |
86 base.Accept(SID); | |
87 } | |
88 public new void Request() { | |
89 Request(null); | |
90 } | |
91 public new void Request(PmlElement Message) { | |
92 base.Request(Message); | |
93 } | |
94 | |
95 protected internal override void MessageIn(PmlElement Message) { | |
96 if (MessageReceived != null) MessageReceived(Message); | |
97 } | |
98 | |
99 public new void SendMessage(PmlElement Message) { | |
100 base.SendMessage(Message); | |
101 } | |
102 | |
103 protected override void Closed(PmlElement Message) { | |
104 if (SessionClosed != null) SessionClosed(Message); | |
105 } | |
106 } | |
107 | |
108 private Dictionary<UInt32, SessionBase> pSessions = new Dictionary<UInt32, SessionBase>(); | |
109 private UInt32 pNextSession; | |
110 private Dictionary<UInt32, CSyncRequest> pSyncRequests = new Dictionary<UInt32, CSyncRequest>(); | |
111 private UInt32 pNextSyncRequest; | |
112 | |
113 private Stream pStream; | |
114 | |
115 public event MessageReceivedEventHandler MessageReceived; | |
116 public delegate void MessageReceivedEventHandler(PmlElement Message); | |
117 public event RequestReceivedEventHandler RequestReceived; | |
118 public delegate void RequestReceivedEventHandler(PmlElement Request, ref PmlElement Reply); | |
119 public event SessionRequestReceivedEventHandler SessionRequestReceived; | |
120 public delegate void SessionRequestReceivedEventHandler(PmlElement Request, uint SID); | |
121 | |
122 private IPmlWriter _writer; | |
123 private IPmlReader _reader; | |
124 | |
125 public PmlConnection(Socket Socket) : this(new TCPStream(Socket)) { } | |
126 public PmlConnection(Stream Stream) : this(new PmlBinaryRW(Stream)) { | |
127 pStream = Stream; | |
128 } | |
129 public PmlConnection(IPmlRW RMRW) : this(RMRW, RMRW) { } | |
130 public PmlConnection(IPmlReader Reader, IPmlWriter Writer) { | |
131 _reader = Reader; | |
132 _writer = Writer; | |
133 } | |
134 | |
135 public void Close() { | |
136 if (pStream != null) pStream.Close(); | |
137 } | |
138 | |
139 public IPmlReader Reader { | |
140 get { return _reader; } | |
141 } | |
142 public IPmlWriter Writer { | |
143 get { return _writer; } | |
144 } | |
145 private PmlElement _ReadMessage() { | |
146 PmlElement Message = _reader.ReadMessage(); | |
147 return Message; //Warning: Can't lock reader because it can be the same as the Writer (possible deadlock) | |
148 } | |
149 private void _WriteMessage(PmlElement Message) { | |
150 lock (_writer) _writer.WriteMessage(Message); | |
151 } | |
152 | |
153 private UInt32 GetNextSessionId(bool IsSession) { | |
154 if (IsSession) { | |
155 lock (pSessions) { | |
156 do { | |
157 if (pNextSession == UInt32.MaxValue) { | |
158 pNextSession = 0; | |
159 } else { | |
160 pNextSession += (uint)1; | |
161 } | |
162 } | |
163 while (pSessions.ContainsKey(pNextSession)); | |
164 return pNextSession; | |
165 } | |
166 } else { | |
167 lock (pSyncRequests) { | |
168 do { | |
169 if (pNextSyncRequest == UInt32.MaxValue) { | |
170 pNextSyncRequest = 0; | |
171 } else { | |
172 pNextSyncRequest += (uint)1; | |
173 } | |
174 } | |
175 while (pSyncRequests.ContainsKey(pNextSyncRequest)); | |
176 return pNextSyncRequest; | |
177 } | |
178 } | |
179 } | |
180 | |
181 protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) { | |
182 PmlDictionary Msg2 = new PmlDictionary(); | |
183 Msg2.Add("CMD", new PmlString("SES")); | |
184 Msg2.Add("SID", new PmlInteger(SID)); | |
185 Msg2.Add("SCMD", new PmlInteger(CMD)); | |
186 Msg2.Add("MSG", MSG); | |
187 _WriteMessage(Msg2); | |
188 } | |
189 | |
190 protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) { | |
191 PmlDictionary Msg2 = new PmlDictionary(); | |
192 if (RPL) { | |
193 Msg2.Add("CMD", new PmlString("RPL")); | |
194 } else { | |
195 Msg2.Add("CMD", new PmlString("REQ")); | |
196 } | |
197 Msg2.Add("SID", new PmlInteger(SID)); | |
198 Msg2.Add("MSG", MSG); | |
199 _WriteMessage(Msg2); | |
200 } | |
201 | |
202 public void Worker() { | |
203 try { | |
204 PmlElement Message = null; | |
205 int Ping = 0; | |
206 while (true) { | |
207 try { | |
208 Message = _ReadMessage(); | |
209 if (Message == null) Console.WriteLine("UCIS.PML.Connection: Message is just null?"); | |
210 } catch (EndOfStreamException) { | |
211 Console.WriteLine("UCIS.PML.Connection: End of stream"); | |
212 return; | |
213 } catch (SocketException ex) { | |
214 if (ex.ErrorCode == (int)SocketError.TimedOut) { | |
215 Console.WriteLine("UCIS.PML.Connection: SocketException/TimedOut"); | |
216 Message = null; | |
217 } else if (ex.ErrorCode == (int)SocketError.ConnectionReset) { | |
218 Console.WriteLine("UCIS.PML.Connection: Connection reset by peer"); | |
219 return; | |
220 } else { | |
221 throw new Exception("Exception while reading message", ex); | |
222 } | |
223 } catch (IOException ex) { | |
224 Console.WriteLine("UCIS.PML.Connection: IOException: " + ex.Message); | |
225 Message = null; | |
226 } catch (TimeoutException) { | |
227 Message = null; | |
228 } | |
229 if (Message == null) { | |
230 if (Ping > 2) { | |
231 Console.WriteLine("UCIS.PML.Connection: Connection timed out"); | |
232 break; | |
233 } else { | |
234 _WriteMessage(new PmlString("PING")); | |
235 } | |
236 Ping += 1; | |
237 } else if (Message is PmlString) { | |
238 string Cmd = Message.ToString(); | |
239 if (Cmd.Equals("PING")) { | |
240 _WriteMessage(new PmlString("PONG")); | |
241 } else if (Cmd.Equals("PONG")) { | |
242 Ping = 0; | |
243 } | |
244 } else if (Message is PmlDictionary) { | |
245 string Cmd = null; | |
246 Cmd = Message.GetChild("CMD").ToString(); | |
247 if (Cmd.Equals("SES")) { | |
248 UInt32 SID = default(UInt32); | |
249 byte SCMD = 0; | |
250 SessionBase Session = default(SessionBase); | |
251 PmlElement InnerMsg = default(PmlElement); | |
252 SID = Message.GetChild("SID").ToUInt32(); | |
253 SCMD = Message.GetChild("SCMD").ToByte(); | |
254 InnerMsg = Message.GetChild("MSG"); | |
255 lock (pSessions) { | |
256 if (pSessions.ContainsKey(SID)) { | |
257 Session = pSessions[SID]; | |
258 } else { | |
259 Session = null; | |
260 } | |
261 } | |
262 if (SCMD == 0) { | |
263 if (Session == null) { | |
264 if (SessionRequestReceived != null) { | |
265 SessionRequestReceived(InnerMsg, SID); | |
266 } | |
267 } else { | |
268 Session.ClosedA(); | |
269 Session.ClosedB(null); | |
270 WriteSessionMessage(SID, 2, null); | |
271 } | |
272 } else if (SCMD == 1) { | |
273 if (Session == null) { | |
274 WriteSessionMessage(SID, 2, null); | |
275 } else { | |
276 Session.MessageIn(InnerMsg); | |
277 } | |
278 } else if (SCMD == 2) { | |
279 if (Session != null) { | |
280 Session.ClosedA(); | |
281 Session.ClosedB(InnerMsg); | |
282 } | |
283 } | |
284 } else if (Cmd.Equals("RPL")) { | |
285 UInt32 SID = default(UInt32); | |
286 CSyncRequest SRequest = null; | |
287 SID = Message.GetChild("SID").ToUInt32(); | |
288 lock (pSyncRequests) { | |
289 if (pSyncRequests.TryGetValue(SID, out SRequest)) { | |
290 pSyncRequests.Remove(SID); | |
291 } else { | |
292 Console.WriteLine("UCIS.PML.Connection.Worker Invalid request ID in reply: " + SID.ToString()); | |
293 } | |
294 } | |
295 if (SRequest != null) { | |
296 SRequest.Reply = Message.GetChild("MSG"); | |
297 SRequest.ResetEvent.Set(); | |
298 } | |
299 } else if (Cmd.Equals("REQ")) { | |
300 System.Threading.ThreadPool.QueueUserWorkItem(new System.Threading.WaitCallback(SyncRequestHandler), Message); | |
301 } else if (Cmd.Equals("MSG")) { | |
302 PmlElement InnerMsg = Message.GetChild("MSG"); | |
303 if (MessageReceived != null) MessageReceived(InnerMsg); | |
304 } else { | |
305 throw new InvalidOperationException("Invalid operation"); | |
306 } | |
307 } | |
308 } | |
309 } catch (System.Threading.ThreadAbortException ex) { | |
310 throw ex; | |
311 } catch (Exception ex) { | |
312 Console.WriteLine(ex.ToString()); | |
313 } finally { | |
314 Console.WriteLine("UCIS.PML.Connection: Connection closed"); | |
315 try { | |
316 foreach (SessionBase S in pSessions.Values) { | |
317 try { | |
318 S.ClosedB(null); | |
319 } catch (Exception ex) { | |
320 Console.WriteLine(ex.ToString()); | |
321 } | |
322 } | |
323 pSessions.Clear(); | |
324 foreach (CSyncRequest T in pSyncRequests.Values) { | |
325 T.ResetEvent.Set(); | |
326 } | |
327 } catch (Exception ex) { | |
328 Console.WriteLine(ex.ToString()); | |
329 } | |
330 } | |
331 } | |
332 | |
333 private void SyncRequestHandler(object state) { | |
334 PmlDictionary Message = (PmlDictionary)state; | |
335 UInt32 SID = default(UInt32); | |
336 PmlElement InnerMsg = default(PmlElement); | |
337 PmlElement Reply = default(PmlElement); | |
338 Reply = null; | |
339 SID = Message.GetChild("SID").ToUInt32(); | |
340 InnerMsg = Message.GetChild("MSG"); | |
341 try { | |
342 if (RequestReceived != null) { | |
343 RequestReceived(InnerMsg, ref Reply); | |
344 } | |
345 } catch (Exception ex) { | |
346 Reply = new PmlDictionary(); | |
347 ((PmlDictionary)Reply).Add("EXCEPTION", new PmlString(ex.ToString())); | |
348 Console.WriteLine(ex.ToString()); | |
349 } | |
350 WriteSyncMessage(SID, true, Reply); | |
351 } | |
352 | |
353 public PmlElement SyncRequest(PmlElement Request) { | |
354 return SyncRequest(Request, 30000); | |
355 } | |
356 public PmlElement SyncRequest(PmlElement Request, int Timeout) { | |
357 UInt32 SID = default(UInt32); | |
358 CSyncRequest SyncEvent = new CSyncRequest(); | |
359 SID = GetNextSessionId(false); | |
360 lock (pSyncRequests) pSyncRequests.Add(SID, SyncEvent); | |
361 WriteSyncMessage(SID, false, Request); | |
362 if (!SyncEvent.ResetEvent.WaitOne(Timeout, false)) { | |
363 Console.WriteLine("UCIS.PML.Connection.SyncRequest Timeout: " + SID.ToString()); | |
364 lock (pSyncRequests) pSyncRequests.Remove(SID); | |
365 throw new TimeoutException(); | |
366 } | |
367 return SyncEvent.Reply; | |
368 } | |
369 | |
370 public void SendMessage(PmlElement Message) { | |
371 PmlDictionary Msg = new PmlDictionary(); | |
372 Msg.Add("CMD", new PmlString("MSG")); | |
373 Msg.Add("MSG", Message); | |
374 _WriteMessage(Msg); | |
375 } | |
376 | |
377 public PmlElement ReadMessage() { | |
378 return _ReadMessage(); | |
379 } | |
380 public void SendRawMessage(PmlElement Message) { | |
381 _WriteMessage(Message); | |
382 } | |
383 } | |
384 } |