comparison Pml/PmlConnection.cs @ 0:3ab940a0c7a0

Initial commit
author Ivo Smits <Ivo@UCIS.nl>
date Tue, 11 Sep 2012 16:28:53 +0200
parents
children 57b4c2f895d1
comparison
equal deleted inserted replaced
-1:000000000000 0:3ab940a0c7a0
1 using System;
2 using System.Collections.Generic;
3 using System.Text;
4 using UCIS.Net;
5 using System.Net.Sockets;
6 using System.Threading;
7 using System.IO;
8
9 namespace UCIS.Pml {
10 /*public class PmlConnection : LegacyPmlCommunicator {
11 public PmlConnection(Socket Socket) : this(new TCPPmlChannel(Socket)) { }
12 public PmlConnection(TCPStream Stream) : this(new TCPPmlChannel(Stream)) { }
13 public PmlConnection(Stream Stream) : this(new PmlBinaryRW(Stream)) {}
14 public PmlConnection(IPmlRW RW) : this(new PmlChannel(RW)) { }
15 public PmlConnection(IPmlChannel CH) : base(CH) { }
16 }*/
17 public class PmlConnection {
18 private class CSyncRequest {
19 internal PmlElement Reply;
20 internal ManualResetEvent ResetEvent = new ManualResetEvent(false);
21 }
22 public abstract class SessionBase {
23 private bool pActive;
24 private PmlConnection pConnection;
25 private UInt32 pSID;
26
27 protected SessionBase(PmlConnection Connection) {
28 pConnection = Connection;
29 }
30
31 protected void Accept(UInt32 SID) {
32 if (pActive) throw new InvalidOperationException("Session is active");
33 pSID = SID;
34 lock (pConnection.pSessions) pConnection.pSessions.Add(pSID, this);
35 pActive = true;
36 }
37 protected void Request() {
38 Request(null);
39 }
40 protected void Request(PmlElement Message) {
41 if (pActive) throw new InvalidOperationException("Session is active");
42 pSID = pConnection.GetNextSessionId(true);
43 lock (pConnection.pSessions) pConnection.pSessions.Add(pSID, this);
44 pConnection.WriteSessionMessage(pSID, 0, Message);
45 pActive = true;
46 }
47
48 protected internal abstract void MessageIn(PmlElement Message);
49
50 protected void SendMessage(PmlElement Message) {
51 if (!pActive) throw new InvalidOperationException("Session is not active");
52 pConnection.WriteSessionMessage(pSID, 1, Message);
53 }
54
55 public void Close() {
56 Close(null);
57 }
58 public void Close(PmlElement Message) {
59 if (!pActive) throw new InvalidOperationException("Session is not active");
60 pConnection.WriteSessionMessage(pSID, 2, Message);
61 ClosedA();
62 }
63
64 internal void ClosedA() {
65 pActive = false;
66 lock (pConnection.pSessions) pConnection.pSessions.Remove(pSID);
67 }
68
69 internal void ClosedB(PmlElement Message) {
70 pActive = false;
71 Closed(Message);
72 }
73
74 protected virtual void Closed(PmlElement Message) {
75 }
76 }
77 public class Session : SessionBase {
78 public event MessageReceivedEventHandler MessageReceived;
79 public delegate void MessageReceivedEventHandler(PmlElement Message);
80 public event SessionClosedEventHandler SessionClosed;
81 public delegate void SessionClosedEventHandler(PmlElement Message);
82
83 public Session(PmlConnection Connection) : base(Connection) { }
84
85 public new void Accept(UInt32 SID) {
86 base.Accept(SID);
87 }
88 public new void Request() {
89 Request(null);
90 }
91 public new void Request(PmlElement Message) {
92 base.Request(Message);
93 }
94
95 protected internal override void MessageIn(PmlElement Message) {
96 if (MessageReceived != null) MessageReceived(Message);
97 }
98
99 public new void SendMessage(PmlElement Message) {
100 base.SendMessage(Message);
101 }
102
103 protected override void Closed(PmlElement Message) {
104 if (SessionClosed != null) SessionClosed(Message);
105 }
106 }
107
108 private Dictionary<UInt32, SessionBase> pSessions = new Dictionary<UInt32, SessionBase>();
109 private UInt32 pNextSession;
110 private Dictionary<UInt32, CSyncRequest> pSyncRequests = new Dictionary<UInt32, CSyncRequest>();
111 private UInt32 pNextSyncRequest;
112
113 private Stream pStream;
114
115 public event MessageReceivedEventHandler MessageReceived;
116 public delegate void MessageReceivedEventHandler(PmlElement Message);
117 public event RequestReceivedEventHandler RequestReceived;
118 public delegate void RequestReceivedEventHandler(PmlElement Request, ref PmlElement Reply);
119 public event SessionRequestReceivedEventHandler SessionRequestReceived;
120 public delegate void SessionRequestReceivedEventHandler(PmlElement Request, uint SID);
121
122 private IPmlWriter _writer;
123 private IPmlReader _reader;
124
125 public PmlConnection(Socket Socket) : this(new TCPStream(Socket)) { }
126 public PmlConnection(Stream Stream) : this(new PmlBinaryRW(Stream)) {
127 pStream = Stream;
128 }
129 public PmlConnection(IPmlRW RMRW) : this(RMRW, RMRW) { }
130 public PmlConnection(IPmlReader Reader, IPmlWriter Writer) {
131 _reader = Reader;
132 _writer = Writer;
133 }
134
135 public void Close() {
136 if (pStream != null) pStream.Close();
137 }
138
139 public IPmlReader Reader {
140 get { return _reader; }
141 }
142 public IPmlWriter Writer {
143 get { return _writer; }
144 }
145 private PmlElement _ReadMessage() {
146 PmlElement Message = _reader.ReadMessage();
147 return Message; //Warning: Can't lock reader because it can be the same as the Writer (possible deadlock)
148 }
149 private void _WriteMessage(PmlElement Message) {
150 lock (_writer) _writer.WriteMessage(Message);
151 }
152
153 private UInt32 GetNextSessionId(bool IsSession) {
154 if (IsSession) {
155 lock (pSessions) {
156 do {
157 if (pNextSession == UInt32.MaxValue) {
158 pNextSession = 0;
159 } else {
160 pNextSession += (uint)1;
161 }
162 }
163 while (pSessions.ContainsKey(pNextSession));
164 return pNextSession;
165 }
166 } else {
167 lock (pSyncRequests) {
168 do {
169 if (pNextSyncRequest == UInt32.MaxValue) {
170 pNextSyncRequest = 0;
171 } else {
172 pNextSyncRequest += (uint)1;
173 }
174 }
175 while (pSyncRequests.ContainsKey(pNextSyncRequest));
176 return pNextSyncRequest;
177 }
178 }
179 }
180
181 protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) {
182 PmlDictionary Msg2 = new PmlDictionary();
183 Msg2.Add("CMD", new PmlString("SES"));
184 Msg2.Add("SID", new PmlInteger(SID));
185 Msg2.Add("SCMD", new PmlInteger(CMD));
186 Msg2.Add("MSG", MSG);
187 _WriteMessage(Msg2);
188 }
189
190 protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) {
191 PmlDictionary Msg2 = new PmlDictionary();
192 if (RPL) {
193 Msg2.Add("CMD", new PmlString("RPL"));
194 } else {
195 Msg2.Add("CMD", new PmlString("REQ"));
196 }
197 Msg2.Add("SID", new PmlInteger(SID));
198 Msg2.Add("MSG", MSG);
199 _WriteMessage(Msg2);
200 }
201
202 public void Worker() {
203 try {
204 PmlElement Message = null;
205 int Ping = 0;
206 while (true) {
207 try {
208 Message = _ReadMessage();
209 if (Message == null) Console.WriteLine("UCIS.PML.Connection: Message is just null?");
210 } catch (EndOfStreamException) {
211 Console.WriteLine("UCIS.PML.Connection: End of stream");
212 return;
213 } catch (SocketException ex) {
214 if (ex.ErrorCode == (int)SocketError.TimedOut) {
215 Console.WriteLine("UCIS.PML.Connection: SocketException/TimedOut");
216 Message = null;
217 } else if (ex.ErrorCode == (int)SocketError.ConnectionReset) {
218 Console.WriteLine("UCIS.PML.Connection: Connection reset by peer");
219 return;
220 } else {
221 throw new Exception("Exception while reading message", ex);
222 }
223 } catch (IOException ex) {
224 Console.WriteLine("UCIS.PML.Connection: IOException: " + ex.Message);
225 Message = null;
226 } catch (TimeoutException) {
227 Message = null;
228 }
229 if (Message == null) {
230 if (Ping > 2) {
231 Console.WriteLine("UCIS.PML.Connection: Connection timed out");
232 break;
233 } else {
234 _WriteMessage(new PmlString("PING"));
235 }
236 Ping += 1;
237 } else if (Message is PmlString) {
238 string Cmd = Message.ToString();
239 if (Cmd.Equals("PING")) {
240 _WriteMessage(new PmlString("PONG"));
241 } else if (Cmd.Equals("PONG")) {
242 Ping = 0;
243 }
244 } else if (Message is PmlDictionary) {
245 string Cmd = null;
246 Cmd = Message.GetChild("CMD").ToString();
247 if (Cmd.Equals("SES")) {
248 UInt32 SID = default(UInt32);
249 byte SCMD = 0;
250 SessionBase Session = default(SessionBase);
251 PmlElement InnerMsg = default(PmlElement);
252 SID = Message.GetChild("SID").ToUInt32();
253 SCMD = Message.GetChild("SCMD").ToByte();
254 InnerMsg = Message.GetChild("MSG");
255 lock (pSessions) {
256 if (pSessions.ContainsKey(SID)) {
257 Session = pSessions[SID];
258 } else {
259 Session = null;
260 }
261 }
262 if (SCMD == 0) {
263 if (Session == null) {
264 if (SessionRequestReceived != null) {
265 SessionRequestReceived(InnerMsg, SID);
266 }
267 } else {
268 Session.ClosedA();
269 Session.ClosedB(null);
270 WriteSessionMessage(SID, 2, null);
271 }
272 } else if (SCMD == 1) {
273 if (Session == null) {
274 WriteSessionMessage(SID, 2, null);
275 } else {
276 Session.MessageIn(InnerMsg);
277 }
278 } else if (SCMD == 2) {
279 if (Session != null) {
280 Session.ClosedA();
281 Session.ClosedB(InnerMsg);
282 }
283 }
284 } else if (Cmd.Equals("RPL")) {
285 UInt32 SID = default(UInt32);
286 CSyncRequest SRequest = null;
287 SID = Message.GetChild("SID").ToUInt32();
288 lock (pSyncRequests) {
289 if (pSyncRequests.TryGetValue(SID, out SRequest)) {
290 pSyncRequests.Remove(SID);
291 } else {
292 Console.WriteLine("UCIS.PML.Connection.Worker Invalid request ID in reply: " + SID.ToString());
293 }
294 }
295 if (SRequest != null) {
296 SRequest.Reply = Message.GetChild("MSG");
297 SRequest.ResetEvent.Set();
298 }
299 } else if (Cmd.Equals("REQ")) {
300 System.Threading.ThreadPool.QueueUserWorkItem(new System.Threading.WaitCallback(SyncRequestHandler), Message);
301 } else if (Cmd.Equals("MSG")) {
302 PmlElement InnerMsg = Message.GetChild("MSG");
303 if (MessageReceived != null) MessageReceived(InnerMsg);
304 } else {
305 throw new InvalidOperationException("Invalid operation");
306 }
307 }
308 }
309 } catch (System.Threading.ThreadAbortException ex) {
310 throw ex;
311 } catch (Exception ex) {
312 Console.WriteLine(ex.ToString());
313 } finally {
314 Console.WriteLine("UCIS.PML.Connection: Connection closed");
315 try {
316 foreach (SessionBase S in pSessions.Values) {
317 try {
318 S.ClosedB(null);
319 } catch (Exception ex) {
320 Console.WriteLine(ex.ToString());
321 }
322 }
323 pSessions.Clear();
324 foreach (CSyncRequest T in pSyncRequests.Values) {
325 T.ResetEvent.Set();
326 }
327 } catch (Exception ex) {
328 Console.WriteLine(ex.ToString());
329 }
330 }
331 }
332
333 private void SyncRequestHandler(object state) {
334 PmlDictionary Message = (PmlDictionary)state;
335 UInt32 SID = default(UInt32);
336 PmlElement InnerMsg = default(PmlElement);
337 PmlElement Reply = default(PmlElement);
338 Reply = null;
339 SID = Message.GetChild("SID").ToUInt32();
340 InnerMsg = Message.GetChild("MSG");
341 try {
342 if (RequestReceived != null) {
343 RequestReceived(InnerMsg, ref Reply);
344 }
345 } catch (Exception ex) {
346 Reply = new PmlDictionary();
347 ((PmlDictionary)Reply).Add("EXCEPTION", new PmlString(ex.ToString()));
348 Console.WriteLine(ex.ToString());
349 }
350 WriteSyncMessage(SID, true, Reply);
351 }
352
353 public PmlElement SyncRequest(PmlElement Request) {
354 return SyncRequest(Request, 30000);
355 }
356 public PmlElement SyncRequest(PmlElement Request, int Timeout) {
357 UInt32 SID = default(UInt32);
358 CSyncRequest SyncEvent = new CSyncRequest();
359 SID = GetNextSessionId(false);
360 lock (pSyncRequests) pSyncRequests.Add(SID, SyncEvent);
361 WriteSyncMessage(SID, false, Request);
362 if (!SyncEvent.ResetEvent.WaitOne(Timeout, false)) {
363 Console.WriteLine("UCIS.PML.Connection.SyncRequest Timeout: " + SID.ToString());
364 lock (pSyncRequests) pSyncRequests.Remove(SID);
365 throw new TimeoutException();
366 }
367 return SyncEvent.Reply;
368 }
369
370 public void SendMessage(PmlElement Message) {
371 PmlDictionary Msg = new PmlDictionary();
372 Msg.Add("CMD", new PmlString("MSG"));
373 Msg.Add("MSG", Message);
374 _WriteMessage(Msg);
375 }
376
377 public PmlElement ReadMessage() {
378 return _ReadMessage();
379 }
380 public void SendRawMessage(PmlElement Message) {
381 _WriteMessage(Message);
382 }
383 }
384 }