# HG changeset patch # User Ivo Smits # Date 1415381859 -3600 # Node ID df53bdd49507cbe30db7a3fa0b41ea515ece90e3 # Parent 0fc3f42a855597ff52bd3a9ad89e0c4201880fab# Parent 5e717aac4c1d7c8fe288e23970092812e9d410b1 Merge diff -r 5e717aac4c1d -r df53bdd49507 Net/ConnectionList.cs --- a/Net/ConnectionList.cs Fri Nov 07 18:33:34 2014 +0100 +++ b/Net/ConnectionList.cs Fri Nov 07 18:37:39 2014 +0100 @@ -67,7 +67,7 @@ } } public INetworkConnection this[int index] { - get { return _list[index]; } + get { lock (_list) return _list[index]; } set { throw new NotSupportedException(); } } public int IndexOf(INetworkConnection item) { diff -r 5e717aac4c1d -r df53bdd49507 Net/HTTP.cs --- a/Net/HTTP.cs Fri Nov 07 18:33:34 2014 +0100 +++ b/Net/HTTP.cs Fri Nov 07 18:37:39 2014 +0100 @@ -99,7 +99,7 @@ public Socket Socket { get; private set; } public Boolean SuppressStandardHeaders { get; set; } - public TCPStream TCPStream { get; private set; } + public TCPStream TCPStream { get { return Reader.BaseStream as TCPStream; } } private StreamWriter Writer; private PrebufferingStream Reader; @@ -411,14 +411,10 @@ if (socket.ProtocolType == ProtocolType.Tcp) socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); if (stream == null) stream = new NetworkStream(socket, true); } - Init(stream); - } - - private void Init(Stream Stream) { - Writer = new StreamWriter(Stream, Encoding.ASCII); + Writer = new StreamWriter(stream, Encoding.ASCII); Writer.NewLine = "\r\n"; Writer.AutoFlush = true; - Reader = new PrebufferingStream(Stream); + Reader = new PrebufferingStream(stream); Reader.BeginPrebuffering(PrebufferCallback, null); } @@ -470,14 +466,22 @@ RequestPath = Uri.UnescapeDataString(request[0]); RequestQuery = request.Length > 1 ? request[1] : null; RequestHeaders = new List(); + String headerName = null, headerValue = null; while (true) { line = ReadLine(); if (line == null) goto SendError400AndClose; if (line.Length == 0) break; - request = line.Split(new String[] { ": " }, 2, StringSplitOptions.None); - if (request.Length != 2) goto SendError400AndClose; - RequestHeaders.Add(new HTTPHeader(request[0], request[1])); + if (line[0] == ' ' || line[0] == '\t') { + headerValue += line; + } else { + if (headerName != null) RequestHeaders.Add(new HTTPHeader(headerName, (headerValue ?? String.Empty).TrimStart())); + request = line.Split(new Char[] { ':' }, 2, StringSplitOptions.None); + if (request.Length != 2) goto SendError400AndClose; + headerName = request[0]; + headerValue = request[1]; + } } + if (headerName != null) RequestHeaders.Add(new HTTPHeader(headerName, (headerValue ?? String.Empty).TrimStart())); IHTTPContentProvider content = Server.ContentProvider; if (content == null) goto SendError500AndClose; State = HTTPConnectionState.ProcessingRequest; @@ -732,8 +736,14 @@ } private void SendErrorAndClose(int code) { - SendErrorResponse(code); - Close(); + try { + SendErrorResponse(code); + } catch (IOException) { + } catch (SocketException) { + } catch (ObjectDisposedException) { + } finally { + Close(); + } } private void Close() { if (State == HTTPConnectionState.Closed) return; diff -r 5e717aac4c1d -r df53bdd49507 Net/TCPServer.cs --- a/Net/TCPServer.cs Fri Nov 07 18:33:34 2014 +0100 +++ b/Net/TCPServer.cs Fri Nov 07 18:37:39 2014 +0100 @@ -30,10 +30,6 @@ private NetworkConnectionList _Clients = new NetworkConnectionList(); private ModuleCollection _Modules = new ModuleCollection(); private IModule _CatchAllModule = null; - private Int32 _ThrottleCounter; - private Int32 _ThrottleBurst = 10; - private Int32 _ThrottleRate = 0; - private Timer _ThrottleTimer = null; public TCPServer() { _ThreadPool = UCIS.ThreadPool.DefaultPool; @@ -56,28 +52,6 @@ } } - public Int32 ThrottleRate { - get { return _ThrottleRate; } - set { - if (value < 0) throw new ArgumentOutOfRangeException("value"); - _ThrottleRate = value; - if (_Listener == null) return; - if (value == 0 && _ThrottleTimer != null) { - _ThrottleTimer.Dispose(); - _ThrottleTimer = null; - } else if (value > 0 && _ThrottleTimer == null) { - _ThrottleTimer = new Timer(ThrottleCallback, null, 1000, 1000); - } - } - } - public Int32 ThrottleBurst { - get { return _ThrottleBurst; } - set { - if (value < 1) throw new ArgumentOutOfRangeException("value"); - _ThrottleBurst = value; - } - } - public EndPoint LocalEndPoint { get { return _Listener.LocalEndPoint; } } public void Listen(int Port) { @@ -85,22 +59,13 @@ } public void Listen(AddressFamily af, int Port) { Stop(); - _Listener = new Socket(af, SocketType.Stream, ProtocolType.Tcp); - - _Listener.Blocking = false; _Listener.Bind(new IPEndPoint(af == AddressFamily.InterNetworkV6 ? IPAddress.IPv6Any : IPAddress.Any, Port)); _Listener.Listen(25); - _ThrottleCounter = _ThrottleBurst; _Listener.BeginAccept(AcceptCallback, null); - - if (_ThrottleRate > 0) { - _ThrottleTimer = new Timer(ThrottleCallback, null, 1000, 1000); - } } public void Stop() { - if (_ThrottleTimer != null) _ThrottleTimer.Dispose(); if (_Listener != null && _Listener.IsBound) _Listener.Close(); _Listener = null; } @@ -118,10 +83,9 @@ } } - private void AcceptCallback(System.IAsyncResult ar) { - Client Client = null; + private void AcceptCallback(IAsyncResult ar) { + if (_Listener == null) return; Socket Socket = null; - if (_Listener == null) return; try { Socket = _Listener.EndAccept(ar); } catch (ObjectDisposedException) { @@ -133,7 +97,7 @@ } if (Socket != null) { try { - Client = new Client(Socket, this); + Client Client = new Client(Socket, this); _Clients.Add(Client); if (ClientAccepted != null) ClientAccepted(this, new ClientAcceptedEventArgs(Client)); Client.Start(_ThreadPool); @@ -141,20 +105,7 @@ Console.WriteLine(ex.ToString()); } } - if (_ThrottleCounter > 0 || _ThrottleRate == 0) { - _ThrottleCounter--; - _Listener.BeginAccept(AcceptCallback, null); - } - } - - private void ThrottleCallback(Object state) { - if (_Listener == null) return; - if (_ThrottleRate == 0) return; - if (_ThrottleCounter >= _ThrottleBurst) return; - if (_ThrottleCounter <= 0) { - _Listener.BeginAccept(AcceptCallback, null); - } - _ThrottleRate += _ThrottleRate; + _Listener.BeginAccept(AcceptCallback, null); } public interface IModule { @@ -164,7 +115,6 @@ private class Client : TCPStream { private TCPServer _Server; - private UCIS.ThreadPool.WorkItem _WorkItem; private IModule _Module; private byte _MagicNumber; @@ -176,7 +126,6 @@ } private void _Stream_Closed(object sender, EventArgs e) { - _WorkItem = null; _Module = null; _Server = null; base.Closed -= _Stream_Closed; @@ -184,24 +133,20 @@ internal Client(Socket Socket, TCPServer Server) : base(Socket) { _Server = Server; - Socket.Blocking = true; base.Closed += _Stream_Closed; this.Tag = Server; } internal void Start(UCIS.ThreadPool Pool) { - _WorkItem = Pool.QueueWorkItem(WorkerProc, null); + Pool.QueueWorkItem(WorkerProc, null); } private void WorkerProc(object state) { bool CloseSocket = true; try { try { - base.Blocking = true; //base.NoDelay = true; base.ReadTimeout = 5000; - base.WriteBufferSize = 1024 * 10; - base.ReadBufferSize = 1024 * 10; //Console.WriteLine("TCPServer: Accepted connection from " + base.Socket.RemoteEndPoint.ToString()); _MagicNumber = (byte)base.PeekByte(); } catch (TimeoutException ex) { diff -r 5e717aac4c1d -r df53bdd49507 Net/TCPStream.cs --- a/Net/TCPStream.cs Fri Nov 07 18:33:34 2014 +0100 +++ b/Net/TCPStream.cs Fri Nov 07 18:37:39 2014 +0100 @@ -4,7 +4,6 @@ using System.Net.Sockets; using System.Threading; using UCIS.Util; -using SysThreadPool = System.Threading.ThreadPool; namespace UCIS.Net { public class TCPStream : Stream, INetworkConnection { @@ -19,7 +18,6 @@ private ulong _BytesWritten; private ulong _BytesRead; private DateTime _StartTime; - private bool _Blocking; public event EventHandler Closed; @@ -27,7 +25,6 @@ _Socket = Socket; _HasPeekByte = false; _StartTime = DateTime.Now; - _Blocking = _Socket.Blocking; } public Socket Socket { @@ -39,11 +36,8 @@ } public bool Blocking { - get { return _Blocking; } - set { - Socket.Blocking = value; - _Blocking = value; - } + get { return _Socket.Blocking; } + set { Socket.Blocking = value; } } public bool NoDelay { @@ -91,7 +85,6 @@ } catch (SocketException ex) { switch (ex.SocketErrorCode) { case SocketError.WouldBlock: - _Socket.Blocking = _Blocking; throw new TimeoutException("The receive operation would block", ex); case SocketError.TimedOut: throw new TimeoutException("The receive operation timed out", ex); @@ -212,7 +205,6 @@ } catch (SocketException ex) { switch (ex.SocketErrorCode) { case SocketError.WouldBlock: - _Socket.Blocking = _Blocking; throw new TimeoutException("The send operation would block", ex); case SocketError.TimedOut: throw new TimeoutException("The send operation timed out", ex); diff -r 5e717aac4c1d -r df53bdd49507 Net/WebSocketPacketStream.cs --- a/Net/WebSocketPacketStream.cs Fri Nov 07 18:33:34 2014 +0100 +++ b/Net/WebSocketPacketStream.cs Fri Nov 07 18:37:39 2014 +0100 @@ -140,116 +140,124 @@ if (packet.Length > count) leftOver = ArrayUtil.Slice(packet, count); return count; } - public override byte[] ReadPacket() { + private int ReadRawMessage(out Byte[] payloadret) { if (leftOver != null) throw new InvalidOperationException("There is remaining data from a partial read"); negotiationEvent.WaitOne(); if (closed) throw new ObjectDisposedException("WebSocketPacketStream"); - try { - if (wsProtocol == 13) { - Byte[] multipartbuffer = null; - int multipartopcode = -1; - while (true) { - int flags = baseStream.ReadByte(); - if (flags == -1) throw new EndOfStreamException(); - UInt64 pllen = (byte)baseStream.ReadByte(); - Boolean masked = (pllen & 128) != 0; - pllen &= 127; - if (pllen == 126) { - pllen = (uint)baseStream.ReadByte() << 8; - pllen |= (uint)baseStream.ReadByte(); - } else if (pllen == 127) { - pllen = (ulong)baseStream.ReadByte() << 56; - pllen |= (ulong)baseStream.ReadByte() << 48; - pllen |= (ulong)baseStream.ReadByte() << 40; - pllen |= (ulong)baseStream.ReadByte() << 32; - pllen |= (uint)baseStream.ReadByte() << 24; - pllen |= (uint)baseStream.ReadByte() << 16; - pllen |= (uint)baseStream.ReadByte() << 8; - pllen |= (uint)baseStream.ReadByte(); + if (wsProtocol == 13) { + Byte[] multipartbuffer = null; + int multipartopcode = -1; + while (true) { + int flags = baseStream.ReadByte(); + if (flags == -1) throw new EndOfStreamException(); + UInt64 pllen = (byte)baseStream.ReadByte(); + Boolean masked = (pllen & 128) != 0; + pllen &= 127; + if (pllen == 126) { + pllen = (uint)baseStream.ReadByte() << 8; + pllen |= (uint)baseStream.ReadByte(); + } else if (pllen == 127) { + pllen = (ulong)baseStream.ReadByte() << 56; + pllen |= (ulong)baseStream.ReadByte() << 48; + pllen |= (ulong)baseStream.ReadByte() << 40; + pllen |= (ulong)baseStream.ReadByte() << 32; + pllen |= (uint)baseStream.ReadByte() << 24; + pllen |= (uint)baseStream.ReadByte() << 16; + pllen |= (uint)baseStream.ReadByte() << 8; + pllen |= (uint)baseStream.ReadByte(); + } + Byte[] mask = new Byte[4]; + if (masked) ReadAllBytes(mask, 0, mask.Length); + //Console.WriteLine("Read flags={0} masked={1} mask={2} len={3}", flags, masked, mask, pllen); + Byte[] payload = new Byte[pllen]; // + (4 - (pllen % 4))]; + ReadAllBytes(payload, 0, (int)pllen); + if (masked) for (int i = 0; i < (int)pllen; i++) payload[i] ^= mask[i % 4]; + int opcode = flags & 0x0f; + Boolean fin = (flags & 0x80) != 0; + if (opcode == 0) { + //Console.WriteLine("WebSocket received continuation frame type {0}!", multipartopcode); + Array.Resize(ref multipartbuffer, multipartbuffer.Length + payload.Length); + payload.CopyTo(multipartbuffer, multipartbuffer.Length - payload.Length); + opcode = -1; + if (fin) { + payload = multipartbuffer; + opcode = multipartopcode; + multipartbuffer = null; } - Byte[] mask = new Byte[4]; - if (masked) ReadAllBytes(mask, 0, mask.Length); - //Console.WriteLine("Read flags={0} masked={1} mask={2} len={3}", flags, masked, mask, pllen); - Byte[] payload = new Byte[pllen]; // + (4 - (pllen % 4))]; - ReadAllBytes(payload, 0, (int)pllen); - if (masked) for (int i = 0; i < (int)pllen; i++) payload[i] ^= mask[i % 4]; - int opcode = flags & 0x0f; - Boolean fin = (flags & 0x80) != 0; - if (opcode == 0) { - //Console.WriteLine("WebSocket received continuation frame type {0}!", multipartopcode); - Array.Resize(ref multipartbuffer, multipartbuffer.Length + payload.Length); - payload.CopyTo(multipartbuffer, multipartbuffer.Length - payload.Length); - opcode = -1; - if (fin) { - payload = multipartbuffer; - opcode = multipartopcode; - multipartbuffer = null; - } - } else if (!fin) { - //Console.WriteLine("WebSocket received non-fin frame type {0}!", opcode); - multipartbuffer = payload; - multipartopcode = opcode; - opcode = -1; - } - if (opcode == -1) { - } else if (opcode == 0) { - throw new NotSupportedException("WebSocket opcode 0 is not supported"); - } else if (opcode == 1) { - String text = Encoding.UTF8.GetString(payload); //, 0, pllen); - return Convert.FromBase64String(text); - } else if (opcode == 2) { - return payload; // ArrayUtil.Slice(payload, 0, pllen); - } else if (opcode == 8) { - return null; - } else if (opcode == 9) { - //Console.WriteLine("WebSocket PING"); - WriteFrame(10, payload, 0, (int)pllen); - } else if (opcode == 10) { //PONG - } else { - //Console.WriteLine("WebSocket UNKNOWN OPCODE {0}", opcode); - } + } else if (!fin) { + //Console.WriteLine("WebSocket received non-fin frame type {0}!", opcode); + multipartbuffer = payload; + multipartopcode = opcode; + opcode = -1; } - } else if (wsProtocol == 100) { - int frameType = baseStream.ReadByte(); - if (frameType == -1) throw new EndOfStreamException(); - if ((frameType & 0x80) != 0) { - int length = 0; + if (opcode == -1) { + } else if (opcode == 0) { + throw new NotSupportedException("WebSocket opcode 0 is not supported"); + } else if (opcode == 1) { + payloadret = payload; + return 1; //text frame + } else if (opcode == 2) { + payloadret = payload; + return 2; //binary frame + } else if (opcode == 8) { + payloadret = null; + return 0; //end of stream + } else if (opcode == 9) { + //Console.WriteLine("WebSocket PING"); + WriteProtocol13Frame(10, payload, 0, (int)pllen); + } else if (opcode == 10) { //PONG + } else { + //Console.WriteLine("WebSocket UNKNOWN OPCODE {0}", opcode); + } + } + } else if (wsProtocol == 100) { + int frameType = baseStream.ReadByte(); + if (frameType == -1) throw new EndOfStreamException(); + if ((frameType & 0x80) != 0) { + int length = 0; + while (true) { + int b = baseStream.ReadByte(); + if (b == -1) throw new EndOfStreamException(); + length = (length << 7) | (b & 0x7f); + if ((b & 0x80) == 0) break; + } + Byte[] buffer = new Byte[length]; + ReadAllBytes(buffer, 0, length); + if (frameType == 0xff && length == 0) { + payloadret = null; + return 0; + } else { + throw new InvalidOperationException(); + } + } else { + using (MemoryStream ms = new MemoryStream()) { while (true) { int b = baseStream.ReadByte(); if (b == -1) throw new EndOfStreamException(); - length = (length << 7) | (b & 0x7f); - if ((b & 0x80) == 0) break; + if (b == 0xff) break; + ms.WriteByte((Byte)b); } - Byte[] buffer = new Byte[length]; - ReadAllBytes(buffer, 0, length); - if (frameType == 0xff && length == 0) { - return null; + if (frameType == 0x00) { + ms.Seek(0, SeekOrigin.Begin); + payloadret = ms.ToArray(); + return 1; //text frame } else { throw new InvalidOperationException(); } - } else { - using (MemoryStream ms = new MemoryStream()) { - while (true) { - int b = baseStream.ReadByte(); - if (b == -1) throw new EndOfStreamException(); - if (b == 0xff) break; - ms.WriteByte((Byte)b); - } - if (frameType == 0x00) { - ms.Seek(0, SeekOrigin.Begin); - StreamReader reader = new StreamReader(ms, Encoding.UTF8, false); - return Convert.FromBase64String(reader.ReadToEnd()); - } else { - throw new InvalidOperationException(); - } - } } - } else { - throw new InvalidOperationException(); } - } catch (Exception ex) { - Console.WriteLine(ex); - throw; + } else { + throw new InvalidOperationException(); + } + } + public override byte[] ReadPacket() { + Byte[] payload; + int opcode = ReadRawMessage(out payload); + switch (opcode) { + case 0: return null; + case 1: return Convert.FromBase64String(Encoding.UTF8.GetString(payload)); + case 2: return payload; + default: throw new InvalidOperationException("Internal error: unexpected frame type"); } } private delegate Byte[] ReadPacketDelegate(); @@ -263,15 +271,19 @@ } public override void Write(byte[] buffer, int offset, int count) { negotiationEvent.WaitOne(); - if (closed) throw new ObjectDisposedException("WebSocketPacketStream"); if (!binaryProtocol) { String encoded = Convert.ToBase64String(buffer, offset, count, Base64FormattingOptions.None); buffer = Encoding.ASCII.GetBytes(encoded); offset = 0; count = buffer.Length; } + WriteRawMessage(buffer, offset, count, binaryProtocol); + } + private void WriteRawMessage(Byte[] buffer, int offset, int count, Boolean binary) { + negotiationEvent.WaitOne(); + if (closed) throw new ObjectDisposedException("WebSocketPacketStream"); if (wsProtocol == 13) { - WriteFrame(binaryProtocol ? (Byte)0x2 : (Byte)0x1, buffer, offset, count); + WriteProtocol13Frame(binary ? (Byte)0x2 : (Byte)0x1, buffer, offset, count); } else if (wsProtocol == 100) { Byte[] bytes = new Byte[2 + count]; bytes[0] = 0x00; @@ -282,7 +294,7 @@ throw new InvalidOperationException(); } } - private void WriteFrame(Byte opcode, Byte[] buffer, int offset, int count) { + private void WriteProtocol13Frame(Byte opcode, Byte[] buffer, int offset, int count) { int pllen = count; int hlen = 2; if (pllen > 0xffff) hlen += 8; @@ -309,5 +321,29 @@ Buffer.BlockCopy(buffer, offset, wbuf, hlen, count); baseStream.Write(wbuf, 0, wbuf.Length); } + + public String ReadTextMessage() { + Byte[] payload; + int opcode = ReadRawMessage(out payload); + switch (opcode) { + case 0: return null; + case 1: + case 2: return Encoding.UTF8.GetString(payload); + default: throw new InvalidOperationException("Internal error: unexpected frame type"); + } + } + private delegate String ReadTextMessageDelegate(); + ReadTextMessageDelegate readTextMessageDelegate; + public IAsyncResult BeginReadTextMessage(AsyncCallback callback, object state) { + if (readTextMessageDelegate == null) readTextMessageDelegate = ReadTextMessage; + return readTextMessageDelegate.BeginInvoke(callback, state); + } + public String EndReadTextMessage(IAsyncResult asyncResult) { + return readTextMessageDelegate.EndInvoke(asyncResult); + } + public void WriteTextMessage(String message) { + Byte[] packet = Encoding.UTF8.GetBytes(message); + WriteRawMessage(packet, 0, packet.Length, false); + } } } \ No newline at end of file diff -r 5e717aac4c1d -r df53bdd49507 Pml/Channels/ActivePmlChannel.cs --- a/Pml/Channels/ActivePmlChannel.cs Fri Nov 07 18:33:34 2014 +0100 +++ b/Pml/Channels/ActivePmlChannel.cs Fri Nov 07 18:37:39 2014 +0100 @@ -1,11 +1,11 @@ using System; using UCIS.Pml; +using UCIS.Util; using System.Collections.Generic; using System.Threading; namespace UCIS.Pml { public abstract class ActivePmlChannel : IPmlChannel { - private AutoResetEvent _receiveEvent = new AutoResetEvent(false); private ReadMessageAsyncResult _asyncWait = null; private Queue _queue = new Queue(); private bool _isOpen = true; @@ -14,41 +14,49 @@ public abstract void WriteMessage(PmlElement message); public PmlElement ReadMessage() { - if (!IsOpen) throw new InvalidOperationException("The channel is not open"); - if (_queue.Count == 0) { - _receiveEvent.WaitOne(); - if (_queue.Count == 0) throw new OperationCanceledException("The operation did not complete"); - } else if (_queue.Count == 1) { - _receiveEvent.Reset(); + lock (_queue) { + if (!IsOpen) throw new InvalidOperationException("The channel is not open"); + while (_queue.Count == 0) { + if (!IsOpen) throw new OperationCanceledException("The operation did not complete"); + Monitor.Wait(_queue); + } + return _queue.Dequeue(); } - return _queue.Dequeue(); } public IAsyncResult BeginReadMessage(AsyncCallback callback, object state) { - ReadMessageAsyncResult ar = new ReadMessageAsyncResult(state, callback); - if (!IsOpen) throw new InvalidOperationException("The channel is not open"); - if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress"); - if (_queue.Count == 0) { - _asyncWait = ar; - } else { - if (_queue.Count == 1) _receiveEvent.Reset(); - ar.Complete(true, _queue.Dequeue(), true); + ReadMessageAsyncResult ar; + Boolean completed = false; + lock (_queue) { + if (!IsOpen) throw new InvalidOperationException("The channel is not open"); + if (_asyncWait != null) throw new InvalidOperationException("Another asynchronous operation is in progress"); + ar = new ReadMessageAsyncResult(callback, state); + if (_queue.Count == 0) { + _asyncWait = ar; + } else { + ar.Message = _queue.Dequeue(); + completed = true; + } } + if (completed) ar.SetCompleted(true, null); return ar; } public PmlElement EndReadMessage(IAsyncResult asyncResult) { ReadMessageAsyncResult ar = (ReadMessageAsyncResult)asyncResult; - if (ar.Error) new OperationCanceledException("The operation did not complete"); + ar.WaitForCompletion(); + if (ar.Error != null) throw new OperationCanceledException("The asynchronous operation failed", ar.Error); return ar.Message; } public virtual void Close() { - _isOpen = false; - ReadMessageAsyncResult asyncWait = Interlocked.Exchange(ref _asyncWait, null); - if (asyncWait != null) asyncWait.Complete(false, null, false); - _receiveEvent.Set(); - _receiveEvent.Close(); + ReadMessageAsyncResult asyncWait; + lock (_queue) { + _isOpen = false; + asyncWait = Interlocked.Exchange(ref _asyncWait, null); + Monitor.PulseAll(_queue); + } + if (asyncWait != null) asyncWait.SetCompleted(false, new ObjectDisposedException("ActivePmlChannel")); } public void Dispose() { @@ -56,51 +64,25 @@ } protected void PushReceivedMessage(PmlElement message) { - ReadMessageAsyncResult asyncWait = Interlocked.Exchange(ref _asyncWait, null); + ReadMessageAsyncResult asyncWait; + lock (_queue) { + asyncWait = Interlocked.Exchange(ref _asyncWait, null); + if (asyncWait == null) { + _queue.Enqueue(message); + Monitor.Pulse(_queue); + } + } if (asyncWait != null) { - asyncWait.Complete(true, message, false); - } else { - _queue.Enqueue(message); - _receiveEvent.Set(); + asyncWait.Message = message; + asyncWait.SetCompleted(false, null); } } - private class ReadMessageAsyncResult : IAsyncResult { - private object state; - private AsyncCallback callback; - private bool completed; - private bool synchronously; - private ManualResetEvent waitHandle; - + class ReadMessageAsyncResult : AsyncResultBase { internal PmlElement Message; - internal bool Error; - - public bool CompletedSynchronously { get { return synchronously; } } - public object AsyncState { get { return state; } } - public WaitHandle AsyncWaitHandle { - get { - if (waitHandle == null) waitHandle = new ManualResetEvent(completed); - return waitHandle; - } - } - public bool IsCompleted { get { return completed; } } - - internal ReadMessageAsyncResult(object state, AsyncCallback callback) { - this.state = state; - this.callback = callback; - this.completed = false; - this.synchronously = false; - this.Message = null; - this.Error = false; - this.waitHandle = null; - } - internal void Complete(bool success, PmlElement message, bool synchronously) { - this.Message = message; - this.Error = !success; - this.synchronously = synchronously; - this.completed = true; - if (waitHandle != null) waitHandle.Set(); - if (this.callback != null) this.callback.Invoke(this); + public ReadMessageAsyncResult(AsyncCallback callback, Object state) : base(callback, state) { } + public void SetCompleted(Boolean synchronously, Exception error) { + base.SetCompleted(synchronously, error); } } } diff -r 5e717aac4c1d -r df53bdd49507 Pml/Channels/PassivePmlChannel.cs --- a/Pml/Channels/PassivePmlChannel.cs Fri Nov 07 18:33:34 2014 +0100 +++ b/Pml/Channels/PassivePmlChannel.cs Fri Nov 07 18:37:39 2014 +0100 @@ -1,5 +1,6 @@ using System; using UCIS.Pml; +using UCIS.Util; using System.Collections.Generic; using System.Threading; @@ -20,40 +21,33 @@ public abstract PmlElement ReadMessage(); public IAsyncResult BeginReadMessage(AsyncCallback callback, object state) { - ReadMessageAsyncResult ar = new ReadMessageAsyncResult(); - ar.Callback = callback; - ar.State = state; + ReadMessageAsyncResult ar = new ReadMessageAsyncResult(callback, state); UCIS.ThreadPool.RunCall(AsyncReadMessage, ar); return ar; } public PmlElement EndReadMessage(IAsyncResult asyncResult) { ReadMessageAsyncResult ar = (ReadMessageAsyncResult)asyncResult; - if (ar.Error != null) throw new Exception("The asynchronous operation could not be completed", ar.Error); + ar.WaitForCompletion(); + if (ar.Error != null) throw new Exception("The asynchronous operation failed", ar.Error); return ar.Message; } - private struct ReadMessageAsyncResult : IAsyncResult { - internal object State; + class ReadMessageAsyncResult : AsyncResultBase { internal PmlElement Message; - internal AsyncCallback Callback; - internal Exception Error; - internal bool Completed; - - public bool CompletedSynchronously { get { return false; } } - public object AsyncState { get { return State; } } - public WaitHandle AsyncWaitHandle { get { return null; } } - public bool IsCompleted { get { return Completed; } } + public ReadMessageAsyncResult(AsyncCallback callback, Object state) : base(callback, state) { } + public void SetCompleted(Boolean synchronously, Exception error, PmlElement message) { + this.Message = message; + base.SetCompleted(synchronously, error); + } } private void AsyncReadMessage(object state) { ReadMessageAsyncResult ar = (ReadMessageAsyncResult)state; try { - ar.Message = ReadMessage(); - ar.Error = null; + PmlElement message = ReadMessage(); + ar.SetCompleted(false, null, message); } catch (Exception ex) { - ar.Error = ex; + ar.SetCompleted(false, ex, null); } - ar.Completed = true; - ar.Callback.Invoke(ar); } } } diff -r 5e717aac4c1d -r df53bdd49507 Pml/Channels/TCPPmlChannel.cs --- a/Pml/Channels/TCPPmlChannel.cs Fri Nov 07 18:33:34 2014 +0100 +++ b/Pml/Channels/TCPPmlChannel.cs Fri Nov 07 18:37:39 2014 +0100 @@ -15,7 +15,6 @@ _socket = socket; _rw = new PmlBinaryRW(_socket); _open = true; - //ThreadPool.RunTask(worker, null); } public override void WriteMessage(PmlElement message) { @@ -33,17 +32,5 @@ public override PmlElement ReadMessage() { return _rw.ReadMessage(); } - - /*private void worker(Object state) { - try { - while (_open) { - base.PushReceivedMessage(_rw.ReadMessage()); - } - } catch (Exception ex) { - Console.WriteLine(ex.ToString()); - } finally { - Close(); - } - }*/ } } diff -r 5e717aac4c1d -r df53bdd49507 Pml/Elements/Collection.cs --- a/Pml/Elements/Collection.cs Fri Nov 07 18:33:34 2014 +0100 +++ b/Pml/Elements/Collection.cs Fri Nov 07 18:37:39 2014 +0100 @@ -4,74 +4,74 @@ namespace UCIS.Pml { public class PmlCollection : PmlElement, ICollection { - private List pItems = new List(); + private List pItems = new List(); - public PmlCollection() { } - public PmlCollection(params PmlElement[] Elements) { - pItems.AddRange(Elements); - } - public PmlCollection(IEnumerable Elements) { - pItems.AddRange(Elements); - } - public PmlCollection(params String[] Elements) { - foreach (String s in Elements) pItems.Add(s); - } + public PmlCollection() { } + public PmlCollection(params PmlElement[] Elements) { + pItems.AddRange(Elements); + } + public PmlCollection(IEnumerable Elements) { + pItems.AddRange(Elements); + } + public PmlCollection(params String[] Elements) { + foreach (String s in Elements) pItems.Add(s); + } - public PmlElement Add(PmlElement Element) { - pItems.Add(Element); - return Element; - } - public void Remove(PmlElement Element) { - pItems.Remove(Element); - } - public void RemoveAt(int Index) { - pItems.RemoveAt(Index); - } - public void Clear() { - pItems.Clear(); - } - public bool Contains(PmlElement item) { - return pItems.Contains(item); - } - public void CopyTo(PmlElement[] array, int arrayIndex) { - pItems.CopyTo(array, arrayIndex); - } - public int Count { get { return pItems.Count; } } - public bool IsReadOnly { get { return false; } } - public IEnumerator GetEnumerator() { return pItems.GetEnumerator(); } - IEnumerator IEnumerable.GetEnumerator() { return pItems.GetEnumerator(); } - bool ICollection.Remove(PmlElement item) { return pItems.Remove(item); } - void ICollection.Add(PmlElement item) { Add(item); } + public PmlElement Add(PmlElement Element) { + pItems.Add(Element); + return Element; + } + public void Remove(PmlElement Element) { + pItems.Remove(Element); + } + public void RemoveAt(int Index) { + pItems.RemoveAt(Index); + } + public void Clear() { + pItems.Clear(); + } + public bool Contains(PmlElement item) { + return pItems.Contains(item); + } + public void CopyTo(PmlElement[] array, int arrayIndex) { + pItems.CopyTo(array, arrayIndex); + } + public int Count { get { return pItems.Count; } } + public bool IsReadOnly { get { return false; } } + public IEnumerator GetEnumerator() { return pItems.GetEnumerator(); } + IEnumerator IEnumerable.GetEnumerator() { return pItems.GetEnumerator(); } + bool ICollection.Remove(PmlElement item) { return pItems.Remove(item); } + void ICollection.Add(PmlElement item) { Add(item); } - public override PmlType Type { get { return PmlType.Collection; } } + public override PmlType Type { get { return PmlType.Collection; } } - public override object ToObject() { return pItems; } - public override string ToString() { return null; } - public override bool ToBoolean() { return pItems.Count > 0; } - public override byte ToByte() { return (Byte)pItems.Count; } - public override decimal ToDecimal() { return pItems.Count; } - public override double ToDouble() { return pItems.Count; } - public override short ToInt16() { return (Int16)pItems.Count; } - public override int ToInt32() { return pItems.Count; } - public override long ToInt64() { return pItems.Count; } - public override sbyte ToSByte() { return (SByte)pItems.Count; } - public override float ToSingle() { return pItems.Count; } - public override ushort ToUInt16() { return (UInt16)pItems.Count; } - public override uint ToUInt32() { return (UInt32)pItems.Count; } - public override ulong ToUInt64() { return (UInt64)pItems.Count; } - public override char ToChar() { return '\0'; } - public override byte[] ToByteArray() { return null; } + public override object ToObject() { return pItems; } + public override string ToString() { return null; } + public override bool ToBoolean() { return pItems.Count > 0; } + public override byte ToByte() { return (Byte)pItems.Count; } + public override decimal ToDecimal() { return pItems.Count; } + public override double ToDouble() { return pItems.Count; } + public override short ToInt16() { return (Int16)pItems.Count; } + public override int ToInt32() { return pItems.Count; } + public override long ToInt64() { return pItems.Count; } + public override sbyte ToSByte() { return (SByte)pItems.Count; } + public override float ToSingle() { return pItems.Count; } + public override ushort ToUInt16() { return (UInt16)pItems.Count; } + public override uint ToUInt32() { return (UInt32)pItems.Count; } + public override ulong ToUInt64() { return (UInt64)pItems.Count; } + public override char ToChar() { return '\0'; } + public override byte[] ToByteArray() { return null; } - //public override PmlElement GetChild(string name) { return GetChild(name); } - public override PmlElement GetChild(int index) { return pItems[index]; } - public override IEnumerable GetChildren() { return pItems; } - public override IEnumerable> GetNamedChildren() { - KeyValuePair[] kvps = new KeyValuePair[pItems.Count]; - for (int i = 0; i < kvps.Length; i++) kvps[i] = new KeyValuePair(null, pItems[i]); - return kvps; - } - public override int GetChildCount() { return pItems.Count; } - public override void AddChild(string name, PmlElement value) { Add(value); } - public override void AddChild(PmlElement value) { Add(value); } + //public override PmlElement GetChild(string name) { return GetChild(name); } + public override PmlElement GetChild(int index) { return pItems[index]; } + public override IEnumerable GetChildren() { return pItems; } + public override IEnumerable> GetNamedChildren() { + KeyValuePair[] kvps = new KeyValuePair[pItems.Count]; + for (int i = 0; i < kvps.Length; i++) kvps[i] = new KeyValuePair(null, pItems[i]); + return kvps; + } + public override int GetChildCount() { return pItems.Count; } + public override void AddChild(string name, PmlElement value) { Add(value); } + public override void AddChild(PmlElement value) { Add(value); } } } diff -r 5e717aac4c1d -r df53bdd49507 Pml/LegacyPmlCommunicator.cs --- a/Pml/LegacyPmlCommunicator.cs Fri Nov 07 18:33:34 2014 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,403 +0,0 @@ -using System; -using System.Threading; -using System.Collections.Generic; -using UCIS.Pml; - -namespace UCIS.Pml { - public class LegacyPmlCommunicator { - private class CSyncRequest { - internal PmlElement Reply; - internal ManualResetEvent ResetEvent = new ManualResetEvent(false); - } - public abstract class SessionBase { - private bool pActive; - private LegacyPmlCommunicator pConnection; - private UInt32 pSID; - - public uint SID { get { return pSID; } } - public bool Active { get { return pActive; } } - public LegacyPmlCommunicator Communicator { get { return pConnection; } } - - protected SessionBase(LegacyPmlCommunicator Connection) { - pConnection = Connection; - } - - protected void Accept(UInt32 SID) { - if (pActive) throw new InvalidOperationException("Session is active"); - pSID = SID; - lock (pConnection.pSessions) pConnection.pSessions.Add(pSID, this); - pActive = true; - } - protected void Request() { - Request(null); - } - protected void Request(PmlElement Message) { - if (pActive) throw new InvalidOperationException("Session is active"); - pSID = pConnection.GetNextSessionId(true); - lock (pConnection.pSessions) pConnection.pSessions.Add(pSID, this); - pConnection.WriteSessionMessage(pSID, 0, Message); - pActive = true; - } - - protected internal abstract void MessageIn(PmlElement Message); - - protected void SendMessage(PmlElement Message) { - if (!pActive) throw new InvalidOperationException("Session is not active"); - pConnection.WriteSessionMessage(pSID, 1, Message); - } - - public void Close() { - Close(null); - } - public void Close(PmlElement Message) { - if (!pActive) return; // throw new InvalidOperationException("Session is not active"); - pConnection.WriteSessionMessage(pSID, 2, Message); - ClosedA(); - } - - internal void ClosedA() { - pActive = false; - lock (pConnection.pSessions) pConnection.pSessions.Remove(pSID); - } - - internal void ClosedB(PmlElement Message) { - pActive = false; - Closed(Message); - } - - protected virtual void Closed(PmlElement Message) { - } - } - public class Session : SessionBase { - public event MessageReceivedEventHandler MessageReceived; - public delegate void MessageReceivedEventHandler(PmlElement Message); - public event SessionClosedEventHandler SessionClosed; - public delegate void SessionClosedEventHandler(PmlElement Message); - - public Session(LegacyPmlCommunicator Connection) : base(Connection) { } - - public new void Accept(UInt32 SID) { - base.Accept(SID); - } - public new void Request() { - Request(null); - } - public new void Request(PmlElement Message) { - base.Request(Message); - } - - protected internal override void MessageIn(PmlElement Message) { - if (MessageReceived != null) MessageReceived(Message); - } - - public new void SendMessage(PmlElement Message) { - base.SendMessage(Message); - } - - protected override void Closed(PmlElement Message) { - if (SessionClosed != null) SessionClosed(Message); - } - } - - private Dictionary pSessions = new Dictionary(); - private UInt32 pNextSession; - private Dictionary pSyncRequests = new Dictionary(); - private UInt32 pNextSyncRequest; - - private IPmlChannel _channel; - - public event MessageReceivedEventHandler MessageReceived; - public delegate void MessageReceivedEventHandler(PmlElement Message); - public event RequestReceivedEventHandler RequestReceived; - public delegate void RequestReceivedEventHandler(PmlElement Request, ref PmlElement Reply); - public event SessionRequestReceivedEventHandler SessionRequestReceived; - public delegate void SessionRequestReceivedEventHandler(PmlElement Request, uint SID); - public event EventHandler Closed; - - public ICollection Sessions { get { return (ICollection)pSessions.Values; } } - public int SyncRequests { get { return pSyncRequests.Count; } } - - public LegacyPmlCommunicator(IPmlChannel channel, bool autoStart) { - _channel = channel; - if (autoStart) Start(); - //_channel.BeginReadMessage(messageReceived, null); - //_channel.MessageReceived += messageReceived; - //_channel.Closed += closed; - } - public void Start() { - _channel.BeginReadMessage(messageReceived, null); - } - - public IPmlChannel Channel { get { return _channel; } } - - public void Close() { - //_channel.MessageReceived -= messageReceived; - //_channel.Closed -= closed; - _channel.Close(); - } - - private void _WriteMessage(PmlElement Message) { - lock (_channel) { - if (_channel.IsOpen) { - _channel.WriteMessage(Message); - } else { - throw new InvalidOperationException("Could not write message: the channel is not open"); - } - } - } - - private UInt32 GetNextSessionId(bool IsSession) { - if (IsSession) { - lock (pSessions) { - do { - if (pNextSession == UInt32.MaxValue) { - pNextSession = 0; - } else { - pNextSession += (uint)1; - } - } - while (pSessions.ContainsKey(pNextSession)); - return pNextSession; - } - } else { - lock (pSyncRequests) { - do { - if (pNextSyncRequest == UInt32.MaxValue) { - pNextSyncRequest = 0; - } else { - pNextSyncRequest += (uint)1; - } - } - while (pSyncRequests.ContainsKey(pNextSyncRequest)); - return pNextSyncRequest; - } - } - } - - protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) { - PmlDictionary Msg2 = new PmlDictionary(); - Msg2.Add("CMD", new PmlString("SES")); - Msg2.Add("SID", new PmlInteger(SID)); - Msg2.Add("SCMD", new PmlInteger(CMD)); - Msg2.Add("MSG", MSG); - _WriteMessage(Msg2); - } - - protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) { - PmlDictionary Msg2 = new PmlDictionary(); - if (RPL) { - Msg2.Add("CMD", new PmlString("RPL")); - } else { - Msg2.Add("CMD", new PmlString("REQ")); - } - Msg2.Add("SID", new PmlInteger(SID)); - Msg2.Add("MSG", MSG); - _WriteMessage(Msg2); - } - - private void messageReceived(IAsyncResult ar) { - PmlElement Message; - try { - Message = _channel.EndReadMessage(ar); - _channel.BeginReadMessage(messageReceived, null); - } catch (InvalidOperationException ex) { - Console.WriteLine("InvalidOperationException in LegacyPmlCommunicator.messageReceived: " + ex.Message); - closed(); - _channel.Close(); - return; - } catch (Exception ex) { - Console.WriteLine(ex.ToString()); - closed(); - _channel.Close(); - return; - } - int Ping = 0; - if (Message == null) { - if (Ping > 2) { - _channel.Close(); - return; - } else { - _WriteMessage(new PmlString("PING")); - } - Ping += 1; - } else if (Message is PmlString) { - string Cmd = Message.ToString(); - if (Cmd.Equals("PING")) { - _WriteMessage(new PmlString("PONG")); - } else if (Cmd.Equals("PONG")) { - Ping = 0; - } - } else if (Message is PmlDictionary) { - string Cmd = null; - Cmd = Message.GetChild("CMD").ToString(); - if (Cmd.Equals("SES")) { - UInt32 SID = default(UInt32); - byte SCMD = 0; - SessionBase Session = default(SessionBase); - PmlElement InnerMsg = default(PmlElement); - SID = Message.GetChild("SID").ToUInt32(); - SCMD = Message.GetChild("SCMD").ToByte(); - InnerMsg = Message.GetChild("MSG"); - lock (pSessions) { - if (pSessions.ContainsKey(SID)) { - Session = pSessions[SID]; - } else { - Session = null; - } - } - if (SCMD == 0) { - if (Session == null) { - if (SessionRequestReceived != null) { - try { - SessionRequestReceived(InnerMsg, SID); - } catch (Exception ex) { - Console.WriteLine("Exception in LegacyPmlCommnuicator.messageReceived->SessionRequestReceived: " + ex.ToString()); - WriteSessionMessage(SID, 2, null); - } - } - } else { - try { - Session.ClosedA(); - Session.ClosedB(null); - } catch (Exception ex) { - Console.WriteLine("Exception in LegacyPmlCommnuicator.messageReceived->Session.ClosedA/B: " + ex.ToString()); - } - WriteSessionMessage(SID, 2, null); - } - } else if (SCMD == 1) { - if (Session == null) { - WriteSessionMessage(SID, 2, null); - } else { - try { - Session.MessageIn(InnerMsg); - } catch (Exception ex) { - Console.WriteLine("Exception in LegacyPmlCommnuicator.messageReceived->Session.MessageIn: " + ex.ToString()); - WriteSessionMessage(SID, 2, null); - } - } - } else if (SCMD == 2) { - if (Session != null) { - try { - Session.ClosedA(); - Session.ClosedB(InnerMsg); - } catch (Exception ex) { - Console.WriteLine("Exception in LegacyPmlCommnuicator.messageReceived->Session.ClosedA/B: " + ex.ToString()); - } - } - } - } else if (Cmd.Equals("RPL")) { - UInt32 SID = default(UInt32); - CSyncRequest SRequest = null; - SID = Message.GetChild("SID").ToUInt32(); - lock (pSyncRequests) { - if (pSyncRequests.TryGetValue(SID, out SRequest)) { - pSyncRequests.Remove(SID); - } else { - Console.WriteLine("UCIS.PML.Connection.Worker Invalid request ID in reply: " + SID.ToString()); - } - } - if (SRequest != null) { - SRequest.Reply = Message.GetChild("MSG"); - SRequest.ResetEvent.Set(); - } - } else if (Cmd.Equals("REQ")) { - UCIS.ThreadPool.RunCall(SyncRequestHandler, Message); - //System.Threading.ThreadPool.QueueUserWorkItem(new System.Threading.WaitCallback(SyncRequestHandler), Message); - } else if (Cmd.Equals("MSG")) { - PmlElement InnerMsg = Message.GetChild("MSG"); - if (MessageReceived != null) MessageReceived(InnerMsg); - } else { - throw new InvalidOperationException("Invalid operation"); - } - } - } - private void closed() { - //_channel.MessageReceived -= messageReceived; - //_channel.Closed -= closed; - Console.WriteLine("UCIS.PML.Connection: Connection closed"); - try { - SessionBase[] sessions; - lock (pSessions) { - sessions = new SessionBase[pSessions.Count]; - pSessions.Values.CopyTo(sessions, 0); - } - foreach (SessionBase S in sessions) { - try { - S.ClosedB(null); - } catch (Exception ex) { - Console.WriteLine(ex.ToString()); - } - } - } catch (Exception ex) { - Console.WriteLine(ex.ToString()); - } - lock (pSessions) pSessions.Clear(); - try { - CSyncRequest[] reqs; - lock (pSyncRequests) { - reqs = new CSyncRequest[pSyncRequests.Count]; - pSyncRequests.Values.CopyTo(reqs, 0); - } - foreach (CSyncRequest T in reqs) { - T.ResetEvent.Set(); - } - } catch (Exception ex) { - Console.WriteLine(ex.ToString()); - } - lock (pSyncRequests) pSyncRequests.Clear(); - if (Closed != null) Closed(this, new EventArgs()); - } - - private void SyncRequestHandler(object state) { - PmlDictionary Message = (PmlDictionary)state; - PmlElement Reply = null; - UInt32 SID = 0; - try { - SID = Message.GetChild("SID").ToUInt32(); - PmlElement InnerMsg = Message.GetChild("MSG"); - if (RequestReceived != null) { - RequestReceived(InnerMsg, ref Reply); - } - } catch (Exception ex) { - Reply = new PmlDictionary(); - ((PmlDictionary)Reply).Add("EXCEPTION", new PmlString(ex.ToString())); - Console.WriteLine(ex.ToString()); - } - try { - WriteSyncMessage(SID, true, Reply); - } catch (Exception ex) { - Console.WriteLine("Exception: " + ex.ToString()); - } - } - - public PmlElement SyncRequest(PmlElement Request) { - return SyncRequest(Request, 30000); - } - public PmlElement SyncRequest(PmlElement Request, int Timeout) { - CSyncRequest SyncEvent = new CSyncRequest(); - UInt32 SID = GetNextSessionId(false); - lock (pSyncRequests) pSyncRequests.Add(SID, SyncEvent); - try { - WriteSyncMessage(SID, false, Request); - if (!SyncEvent.ResetEvent.WaitOne(Timeout, false)) { - lock (pSyncRequests) pSyncRequests.Remove(SID); - throw new TimeoutException("The SyncRequest timed out (SID=" + SID.ToString() + ")"); - } - } finally { - lock (pSyncRequests) pSyncRequests.Remove(SID); - } - return SyncEvent.Reply; - } - - public void SendMessage(PmlElement Message) { - PmlDictionary Msg = new PmlDictionary(); - Msg.Add("CMD", new PmlString("MSG")); - Msg.Add("MSG", Message); - _WriteMessage(Msg); - } - - public void SendRawMessage(PmlElement Message) { - _WriteMessage(Message); - } - } -} diff -r 5e717aac4c1d -r df53bdd49507 Pml/PmlCommunicator.cs --- a/Pml/PmlCommunicator.cs Fri Nov 07 18:33:34 2014 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,342 +0,0 @@ -using System; -using System.Threading; -using System.Collections.Generic; - -namespace UCIS.Pml { -/* class PmlCommunicator : IPmlCommunicator, IDisposable { - private IPmlChannel _channel; - private Dictionary _subchannels = new Dictionary(); - private Random _random = new Random(); - - private enum CommandCode : int { - CallWithoutReply = 0, - CallWithReply = 1, - Message = 2, - ChannelRequest = 3, - ChannelAcknowledge = 4, - ChannelClose = 5, - Error = 6 - } - - private interface IPmlSubChannel { - void CloseIn(); - void ErrorIn(PmlElement message); - void MessageIn(PmlElement message); - } - - private class ChannelRequestWaitHandler : IAsyncResult { - internal AsyncCallback Callback = null; - internal Object CallbackState = null; - internal ManualResetEvent Event = null; - internal PmlSubChannel Channel = null; - internal bool Completed = false; - - internal ChannelRequestWaitHandler(PmlSubChannel channel) { - Channel = channel; - } - - internal void Complete() { - Completed = true; - if (Event != null) Event.Set(); - if (Callback != null) Callback.Invoke(this); - } - - public object AsyncState { get { return CallbackState; } } - public WaitHandle AsyncWaitHandle { get { return null; } } - public bool CompletedSynchronously { get { return false; } } - public bool IsCompleted { get { return Completed; } } - } - private class PmlSubChannel : ActivePmlChannel, IPmlSubChannel { - private enum ChannelState { Requesting, Acknowledged, Closed } - - private PmlCommunicator _communicator; - private UInt32 _id; - private ChannelState _state; - - internal PmlSubChannel(PmlCommunicator communicator, UInt32 sid) { - _communicator = communicator; - _id = sid; - _state = ChannelState.Requesting; - } - - public override bool IsOpen { get { return _state == ChannelState.Acknowledged; } } - - internal void AcknowledgeIn() { - if (_state != 0) throw new InvalidOperationException("The subchannel is not awaiting an acknowledgement"); - _state = ChannelState.Acknowledged; - } - void IPmlSubChannel.CloseIn() { - _state = ChannelState.Closed; - _communicator._subchannels.Remove(_id); - base.Close(); - } - void IPmlSubChannel.ErrorIn(PmlElement message) { - (this as IPmlSubChannel).CloseIn(); - } - void IPmlSubChannel.MessageIn(PmlElement message) { - base.PushReceivedMessage(message); - } - - internal void AcknowledgeOut() { - if (_state != 0) throw new InvalidOperationException("The subchannel is not awaiting an acknowledgement"); - _state = ChannelState.Acknowledged; - _communicator.sendMessage(CommandCode.ChannelAcknowledge, _id, null); - } - internal void RejectOut() { - if (_state != 0) throw new InvalidOperationException("The subchannel is not awaiting an acknowledgement"); - _state = ChannelState.Closed; - _communicator.sendMessage(CommandCode.ChannelClose, _id, null); - } - - public override void SendMessage(PmlElement message) { - if (_state != ChannelState.Acknowledged) throw new InvalidOperationException("The subchannel is not open"); - _communicator.sendMessage(CommandCode.Message, _id, message); - } - public override void Close() { - if (_state != ChannelState.Acknowledged) return; - _state = ChannelState.Closed; - _communicator.sendMessage(CommandCode.ChannelClose, _id, null); - _communicator._subchannels.Remove(_id); - base.Close(); - } - } - private class PmlChannelRequestReceivedEventArgsA : PmlChannelRequestReceivedEventArgs { - private PmlCommunicator _communicator; - private PmlElement _data; - private PmlSubChannel _channel; - private bool _accepted; - private bool _rejected; - internal PmlChannelRequestReceivedEventArgsA(PmlCommunicator communicator, UInt32 sid, PmlElement message) { - _communicator = communicator; - _channel = new PmlSubChannel(communicator, sid); - _data = message; - } - public override IPmlChannel Accept() { - if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected"); - _accepted = true; - _channel.AcknowledgeOut(); - return _channel; - } - public override void Reject() { - if (_accepted) throw new InvalidOperationException("The channel has already been accepted"); - if (_rejected) return; - _rejected = true; - _channel.RejectOut(); - } - internal void RejectIfNotAccepted() { - if (!_accepted) Reject(); - } - public override PmlElement Data { - get { - return _data; - } - } - } - - private class PmlInvocation : IAsyncResult, IPmlSubChannel { - internal PmlCommunicator Communicator = null; - internal AsyncCallback Callback = null; - internal Object CallbackState = null; - internal bool Error = false; - internal bool Completed = false; - internal PmlElement Message = null; - internal ManualResetEvent Event = null; - internal UInt32 ID; - - internal PmlInvocation(PmlCommunicator communicator, UInt32 id) { - Communicator = communicator; - ID = id; - } - - void IPmlSubChannel.CloseIn() { - (this as IPmlSubChannel).ErrorIn(null); - } - void IPmlSubChannel.ErrorIn(PmlElement message) { - Error = true; - Communicator._subchannels.Remove(ID); - (this as IPmlSubChannel).MessageIn(message); - } - void IPmlSubChannel.MessageIn(PmlElement message) { - Message = message; - Completed = true; - if (Event != null) Event.Set(); - if (Callback != null) Callback.Invoke(this); - } - - public object AsyncState { get { return CallbackState; } } - public WaitHandle AsyncWaitHandle { get { return null; } } - public bool CompletedSynchronously { get { return false; } } - public bool IsCompleted { get { return Completed; } } - } - - public event EventHandler CallReceived; - public event EventHandler ChannelRequestReceived; - - public PmlCommunicator(IPmlChannel channel) { - _channel = channel; - _channel.Closed += channelClosed; - } - - public void Dispose() { - _channel.Close(); - _channel = null; - IPmlSubChannel[] A = new IPmlSubChannel[_subchannels.Count]; - _subchannels.Values.CopyTo(A, 0); - foreach (IPmlSubChannel S in A) S.CloseIn(); - _subchannels.Clear(); - _subchannels = null; - _random = null; - } - - private void channelClosed(Object sender, EventArgs e) { - Dispose(); - } - - public IPmlChannel Channel { get { return _channel; } } - - public void Call(PmlElement message) { - sendMessage(0, 0, message); //Call without reply - } - public PmlElement Invoke(PmlElement message) { - return Invoke(message, 60000); - } - public PmlElement Invoke(PmlElement message, int timeout) { - UInt32 sid = getSessionID(); - PmlInvocation inv = new PmlInvocation(this, sid); - inv.Event = new ManualResetEvent(false); - _subchannels.Add(sid, inv); - sendMessage(CommandCode.CallWithReply, sid, message); - inv.Event.WaitOne(timeout); - if (inv.Error) throw new Exception(message.ToString()); - return inv.Message; - } - - public IAsyncResult BeginInvoke(PmlElement message, AsyncCallback callback, Object state) { - UInt32 sid = getSessionID(); - PmlInvocation inv = new PmlInvocation(this, sid); - inv.Callback = callback; - inv.CallbackState = state; - _subchannels.Add(sid, inv); - sendMessage(CommandCode.CallWithReply, sid, message); - return inv; - } - public PmlElement EndInvoke(IAsyncResult result) { - PmlInvocation ar = (PmlInvocation)result; - if (!ar.Completed) { - (_subchannels as IList).Remove(ar); - throw new InvalidOperationException("The asynchronous operation has not completed"); - } else if (ar.Error) { - throw new Exception(ar.Message.ToString()); - } else { - return ar.Message; - } - } - - public IPmlChannel CreateChannel(PmlElement data) { - UInt32 sid = getSessionID(); - PmlSubChannel ch = new PmlSubChannel(this, sid); - ChannelRequestWaitHandler wh = new ChannelRequestWaitHandler(ch); - wh.Event = new ManualResetEvent(false); - _subchannels.Add(sid, ch); - sendMessage(CommandCode.ChannelRequest, sid, data); - wh.Event.WaitOne(); - if (!ch.IsOpen) return null; - return ch; - } - public IAsyncResult BeginCreateChannel(PmlElement data, AsyncCallback callback, Object state) { - UInt32 sid = getSessionID(); - PmlSubChannel ch = new PmlSubChannel(this, sid); - ChannelRequestWaitHandler wh = new ChannelRequestWaitHandler(ch); - wh.Callback = callback; - wh.CallbackState = state; - _subchannels.Add(sid, ch); - sendMessage(CommandCode.ChannelRequest, sid, data); - if (!ch.IsOpen) return null; - return wh; - } - public IPmlChannel EndCreateChannel(IAsyncResult result) { - ChannelRequestWaitHandler ar = (ChannelRequestWaitHandler)result; - if (!ar.Channel.IsOpen) return null; - return ar.Channel; - } - - private UInt32 getSessionID() { - return (uint)_random.Next(); - } - - private void sendMessage(CommandCode cmd, uint sid, PmlElement message) { - PmlDictionary msg = new PmlDictionary(); - msg.Add("c", (int)cmd); - if (cmd > 0) msg.Add("s", sid); - if (message != null) msg.Add("m", message); - _channel.SendMessage(msg); - } - - private void invokeCallReceived(Object state) { - PmlCallReceivedEventArgs e = (PmlCallReceivedEventArgs)state; - try { - if (CallReceived != null) CallReceived(this, e); - if (e.WantReply) sendMessage(CommandCode.Message, e.SID, e.Reply); - } catch (Exception ex) { - if (e.WantReply) sendMessage(CommandCode.Error, e.SID, new PmlString(ex.ToString())); - } - } - private void invokeChannelRequestReceived(Object state) { - PmlChannelRequestReceivedEventArgsA e = (PmlChannelRequestReceivedEventArgsA)state; - if (ChannelRequestReceived != null) ChannelRequestReceived(this, e); - e.RejectIfNotAccepted(); - } - - private void messageReceived(Object sender, EventArgs e) { - IPmlSubChannel subChannel = null; - UInt32 sid = 0; - bool subChannelExists = false; - if (!(e.Message is PmlDictionary)) return; - PmlDictionary msg = (PmlDictionary)e.Message; - PmlElement cmdElement = msg.GetChild("c"); - PmlElement sidElement = msg.GetChild("i"); - PmlElement msgElement = msg.GetChild("m"); - if (cmdElement == null) return; - if (sidElement != null) sid = sidElement.ToUInt32(); - if (sidElement != null) subChannelExists = _subchannels.TryGetValue(sid, out subChannel); - if (!subChannelExists) subChannel = null; - switch ((CommandCode)cmdElement.ToInt32()) { - case CommandCode.CallWithoutReply: - if (CallReceived != null) ThreadPool.RunCall(invokeCallReceived, new PmlCallReceivedEventArgs(msgElement, false, 0)); - break; - case CommandCode.CallWithReply: - if (CallReceived != null) ThreadPool.RunCall(invokeCallReceived, new PmlCallReceivedEventArgs(msgElement, true, sid)); - else sendMessage(CommandCode.Error, sid, null); - break; - case CommandCode.Message: //Reply to call | subchannel message - if (subChannelExists) subChannel.MessageIn(msgElement); - else sendMessage(CommandCode.Error, sid, null); - break; - case CommandCode.ChannelRequest: - if (subChannelExists) { - sendMessage(CommandCode.Error, sid, null); - subChannel.CloseIn(); - } else { - if (ChannelRequestReceived == null) sendMessage(CommandCode.ChannelClose, sid, null); - else ThreadPool.RunCall(invokeChannelRequestReceived, new PmlChannelRequestReceivedEventArgsA(this, sid, msgElement)); - } - break; - case CommandCode.ChannelAcknowledge: - if (subChannelExists) { - if (subChannel is PmlSubChannel) (subChannel as PmlSubChannel).AcknowledgeIn(); - else { - sendMessage(CommandCode.Error, sid, null); //Error - subChannel.CloseIn(); - } - } else sendMessage(CommandCode.Error, sid, null); //Error - break; - case CommandCode.ChannelClose: - if (subChannelExists) subChannel.CloseIn(); - break; - case CommandCode.Error: - if (subChannelExists) subChannel.ErrorIn(msgElement); - break; - } - } - }*/ -} diff -r 5e717aac4c1d -r df53bdd49507 Pml/PmlCommunicator2.cs --- a/Pml/PmlCommunicator2.cs Fri Nov 07 18:33:34 2014 +0100 +++ b/Pml/PmlCommunicator2.cs Fri Nov 07 18:37:39 2014 +0100 @@ -7,98 +7,12 @@ public class PmlCommunicator { private class CSyncRequest { internal PmlElement Reply; - internal ManualResetEvent ResetEvent = new ManualResetEvent(false); } private interface ISession { void MessageIn(PmlElement message); void CloseIn(); UInt32 ID { get; } } - /*public abstract class SessionBase : ISession { - private bool pActive; - private PmlCommunicator _communicator; - private UInt32 _id; - - public uint SID { get { return _id; } } - public bool Active { get { return pActive; } } - public PmlCommunicator Communicator { get { return _communicator; } } - - protected SessionBase(PmlCommunicator Connection) { - _communicator = Connection; - } - - protected void Accept(UInt32 sid) { - if (pActive) throw new InvalidOperationException("Session is active"); - _id = sid; - lock (_communicator._sessions) _communicator._sessions.Add(_id, this); - pActive = true; - } - protected void Request() { - Request(null); - } - protected void Request(PmlElement Message) { - if (pActive) throw new InvalidOperationException("Session is active"); - _id = _communicator.GetNextSessionId(true); - lock (_communicator._sessions) _communicator._sessions.Add(_id, this); - _communicator.WriteSessionMessage(_id, 0, Message); - pActive = true; - } - - uint ISession.ID { get { return _id; } } - void ISession.MessageIn(PmlElement message) { this.MessageIn(message); } - void ISession.CloseIn() { - pActive = false; - _communicator.RemoveSession(this); - Closed(null); - } - - protected internal abstract void MessageIn(PmlElement Message); - - protected void SendMessage(PmlElement Message) { - if (!pActive) throw new InvalidOperationException("Session is not active"); - _communicator.WriteSessionMessage(_id, 1, Message); - } - - public void Close() { - if (!pActive) return; - pActive = false; - _communicator.WriteSessionMessage(_id, 2, null); - _communicator.RemoveSession(this); - } - - protected virtual void Closed(PmlElement Message) { } - } - public class Session : SessionBase { - public event MessageReceivedEventHandler MessageReceived; - public delegate void MessageReceivedEventHandler(PmlElement Message); - public event SessionClosedEventHandler SessionClosed; - public delegate void SessionClosedEventHandler(PmlElement Message); - - public Session(PmlCommunicator Connection) : base(Connection) { } - - public new void Accept(UInt32 SID) { - base.Accept(SID); - } - public new void Request() { - Request(null); - } - public new void Request(PmlElement Message) { - base.Request(Message); - } - - protected internal override void MessageIn(PmlElement Message) { - if (MessageReceived != null) MessageReceived(Message); - } - - public new void SendMessage(PmlElement Message) { - base.SendMessage(Message); - } - - protected override void Closed(PmlElement Message) { - if (SessionClosed != null) SessionClosed(Message); - } - }*/ - private class PmlSubChannel : ActivePmlChannel, ISession { private enum ChannelState { Requesting, Acknowledged, Closed } @@ -148,11 +62,6 @@ _sid = sid; _accepted = _rejected = false; } - public UInt32 AcceptSession() { - if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected"); - _accepted = true; - return _sid; - } public override IPmlChannel Accept() { if (_accepted || _rejected) throw new InvalidOperationException("The channel has already been accepted or rejected"); _accepted = true; @@ -163,16 +72,11 @@ if (_rejected) return; _rejected = true; _communicator.WriteSessionMessage(_sid, 2, null); - //_channel.RejectOut(); } internal void RejectIfNotAccepted() { if (!_accepted) Reject(); } - public override PmlElement Data { - get { - return _data; - } - } + public override PmlElement Data { get { return _data; } } } public event EventHandler CallReceived; @@ -239,9 +143,7 @@ _sessions.Clear(); } lock (_invocations) { - foreach (CSyncRequest T in _invocations.Values) { - T.ResetEvent.Set(); - } + foreach (CSyncRequest T in _invocations.Values) lock (T) Monitor.Pulse(T); _invocations.Clear(); } if (Closed != null) Closed(this, new EventArgs()); @@ -268,9 +170,8 @@ if (Message is PmlString) { string Cmd = Message.ToString(); if (Cmd.Equals("PING")) { - _WriteMessage(new PmlString("PONG")); - /*} else if (Cmd.Equals("PONG")) { - Ping = 0;*/ + _WriteMessage("PONG"); + } else if (Cmd.Equals("PONG")) { } } else if (Message is PmlDictionary) { string Cmd = Message.GetChild("CMD").ToString(); @@ -288,7 +189,7 @@ } if (SRequest != null) { SRequest.Reply = Message.GetChild("MSG"); - SRequest.ResetEvent.Set(); + lock (SRequest) Monitor.Pulse(SRequest); } } else if (Cmd.Equals("REQ") || Cmd.Equals("MSG")) { UCIS.ThreadPool.RunCall(processCall, Message); @@ -384,9 +285,7 @@ } public void Call(PmlElement message) { - PmlDictionary Msg = new PmlDictionary(); - Msg.Add("CMD", new PmlString("MSG")); - Msg.Add("MSG", message); + PmlDictionary Msg = new PmlDictionary() { { "CMD", "MSG" }, { "MSG", message } }; _WriteMessage(Msg); } public PmlElement Invoke(PmlElement message) { @@ -395,14 +294,16 @@ public PmlElement Invoke(PmlElement message, int timeout) { if (_closed) throw new InvalidOperationException("Sorry, we're closed."); CSyncRequest SyncEvent = new CSyncRequest(); - UInt32 SID = GetNextSessionId(false); - lock (_invocations) _invocations.Add(SID, SyncEvent); + UInt32 SID; + lock (_invocations) { + SID = GetNextSessionId(ref pNextSyncRequest, _invocations); + _invocations.Add(SID, SyncEvent); + } try { WriteSyncMessage(SID, false, message); - if (!SyncEvent.ResetEvent.WaitOne(timeout, false)) { - if (!_closed) lock (_invocations) _invocations.Remove(SID); - throw new TimeoutException("The SyncRequest timed out (SID=" + SID.ToString() + ")"); - } + Boolean success; + lock (SyncEvent) success = Monitor.Wait(SyncEvent, timeout); + if (!success) throw new TimeoutException("The SyncRequest timed out (SID=" + SID.ToString() + ")"); } finally { lock (_invocations) _invocations.Remove(SID); } @@ -410,7 +311,7 @@ } public IPmlChannel CreateChannel(PmlElement data) { - UInt32 sid = GetNextSessionId(true); + UInt32 sid = GetNextSessionId(ref pNextSession, _sessions); PmlSubChannel ch = new PmlSubChannel(this, sid, true); WriteSessionMessage(sid, 0, data); if (!ch.IsOpen) return null; @@ -429,42 +330,33 @@ RemoveSession(session.ID); } - private UInt32 GetNextSessionId(bool IsSession) { - if (IsSession) { - lock (_sessions) { - do { - unchecked { pNextSession++; } - } while (_sessions.ContainsKey(pNextSession)); - return pNextSession; - } - } else { - lock (_invocations) { - do { - unchecked { pNextSyncRequest++; } - } while (_invocations.ContainsKey(pNextSyncRequest)); - return pNextSyncRequest; - } + private static UInt32 GetNextSessionId(ref UInt32 id, IDictionary dictionary) { + lock (dictionary) { + do { + id++; + } while (dictionary.ContainsKey(id)); + return id; } } protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) { - PmlDictionary Msg2 = new PmlDictionary(); - Msg2.Add("CMD", new PmlString(RPL ? "RPL" : "REQ")); - Msg2.Add("SID", new PmlInteger(SID)); - Msg2.Add("MSG", MSG); + PmlDictionary Msg2 = new PmlDictionary() { + { "CMD", RPL ? "RPL" : "REQ" }, + { "SID", SID }, + { "MSG", MSG }, + }; _WriteMessage(Msg2); } protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) { - PmlDictionary Msg2 = new PmlDictionary(); - Msg2.Add("CMD", new PmlString("SES")); - Msg2.Add("SID", new PmlInteger(SID)); - Msg2.Add("SCMD", new PmlInteger(CMD)); + PmlDictionary Msg2 = new PmlDictionary() { + { "CMD", "SES" }, + { "SID", SID }, + { "SCMD", CMD }, + }; if (MSG != null) Msg2.Add("MSG", MSG); _WriteMessage(Msg2); } - - /* LegacyPmlCommunicator compatibility */ public PmlElement SyncRequest(PmlElement Request) { return Invoke(Request); diff -r 5e717aac4c1d -r df53bdd49507 Pml/PmlConnection.cs --- a/Pml/PmlConnection.cs Fri Nov 07 18:33:34 2014 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,384 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using UCIS.Net; -using System.Net.Sockets; -using System.Threading; -using System.IO; - -namespace UCIS.Pml { - /*public class PmlConnection : LegacyPmlCommunicator { - public PmlConnection(Socket Socket) : this(new TCPPmlChannel(Socket)) { } - public PmlConnection(TCPStream Stream) : this(new TCPPmlChannel(Stream)) { } - public PmlConnection(Stream Stream) : this(new PmlBinaryRW(Stream)) {} - public PmlConnection(IPmlRW RW) : this(new PmlChannel(RW)) { } - public PmlConnection(IPmlChannel CH) : base(CH) { } - }*/ - public class PmlConnection { - private class CSyncRequest { - internal PmlElement Reply; - internal ManualResetEvent ResetEvent = new ManualResetEvent(false); - } - public abstract class SessionBase { - private bool pActive; - private PmlConnection pConnection; - private UInt32 pSID; - - protected SessionBase(PmlConnection Connection) { - pConnection = Connection; - } - - protected void Accept(UInt32 SID) { - if (pActive) throw new InvalidOperationException("Session is active"); - pSID = SID; - lock (pConnection.pSessions) pConnection.pSessions.Add(pSID, this); - pActive = true; - } - protected void Request() { - Request(null); - } - protected void Request(PmlElement Message) { - if (pActive) throw new InvalidOperationException("Session is active"); - pSID = pConnection.GetNextSessionId(true); - lock (pConnection.pSessions) pConnection.pSessions.Add(pSID, this); - pConnection.WriteSessionMessage(pSID, 0, Message); - pActive = true; - } - - protected internal abstract void MessageIn(PmlElement Message); - - protected void SendMessage(PmlElement Message) { - if (!pActive) throw new InvalidOperationException("Session is not active"); - pConnection.WriteSessionMessage(pSID, 1, Message); - } - - public void Close() { - Close(null); - } - public void Close(PmlElement Message) { - if (!pActive) throw new InvalidOperationException("Session is not active"); - pConnection.WriteSessionMessage(pSID, 2, Message); - ClosedA(); - } - - internal void ClosedA() { - pActive = false; - lock (pConnection.pSessions) pConnection.pSessions.Remove(pSID); - } - - internal void ClosedB(PmlElement Message) { - pActive = false; - Closed(Message); - } - - protected virtual void Closed(PmlElement Message) { - } - } - public class Session : SessionBase { - public event MessageReceivedEventHandler MessageReceived; - public delegate void MessageReceivedEventHandler(PmlElement Message); - public event SessionClosedEventHandler SessionClosed; - public delegate void SessionClosedEventHandler(PmlElement Message); - - public Session(PmlConnection Connection) : base(Connection) { } - - public new void Accept(UInt32 SID) { - base.Accept(SID); - } - public new void Request() { - Request(null); - } - public new void Request(PmlElement Message) { - base.Request(Message); - } - - protected internal override void MessageIn(PmlElement Message) { - if (MessageReceived != null) MessageReceived(Message); - } - - public new void SendMessage(PmlElement Message) { - base.SendMessage(Message); - } - - protected override void Closed(PmlElement Message) { - if (SessionClosed != null) SessionClosed(Message); - } - } - - private Dictionary pSessions = new Dictionary(); - private UInt32 pNextSession; - private Dictionary pSyncRequests = new Dictionary(); - private UInt32 pNextSyncRequest; - - private Stream pStream; - - public event MessageReceivedEventHandler MessageReceived; - public delegate void MessageReceivedEventHandler(PmlElement Message); - public event RequestReceivedEventHandler RequestReceived; - public delegate void RequestReceivedEventHandler(PmlElement Request, ref PmlElement Reply); - public event SessionRequestReceivedEventHandler SessionRequestReceived; - public delegate void SessionRequestReceivedEventHandler(PmlElement Request, uint SID); - - private IPmlWriter _writer; - private IPmlReader _reader; - - public PmlConnection(Socket Socket) : this(new TCPStream(Socket)) { } - public PmlConnection(Stream Stream) : this(new PmlBinaryRW(Stream)) { - pStream = Stream; - } - public PmlConnection(IPmlRW RMRW) : this(RMRW, RMRW) { } - public PmlConnection(IPmlReader Reader, IPmlWriter Writer) { - _reader = Reader; - _writer = Writer; - } - - public void Close() { - if (pStream != null) pStream.Close(); - } - - public IPmlReader Reader { - get { return _reader; } - } - public IPmlWriter Writer { - get { return _writer; } - } - private PmlElement _ReadMessage() { - PmlElement Message = _reader.ReadMessage(); - return Message; //Warning: Can't lock reader because it can be the same as the Writer (possible deadlock) - } - private void _WriteMessage(PmlElement Message) { - lock (_writer) _writer.WriteMessage(Message); - } - - private UInt32 GetNextSessionId(bool IsSession) { - if (IsSession) { - lock (pSessions) { - do { - if (pNextSession == UInt32.MaxValue) { - pNextSession = 0; - } else { - pNextSession += (uint)1; - } - } - while (pSessions.ContainsKey(pNextSession)); - return pNextSession; - } - } else { - lock (pSyncRequests) { - do { - if (pNextSyncRequest == UInt32.MaxValue) { - pNextSyncRequest = 0; - } else { - pNextSyncRequest += (uint)1; - } - } - while (pSyncRequests.ContainsKey(pNextSyncRequest)); - return pNextSyncRequest; - } - } - } - - protected void WriteSessionMessage(UInt32 SID, byte CMD, PmlElement MSG) { - PmlDictionary Msg2 = new PmlDictionary(); - Msg2.Add("CMD", new PmlString("SES")); - Msg2.Add("SID", new PmlInteger(SID)); - Msg2.Add("SCMD", new PmlInteger(CMD)); - Msg2.Add("MSG", MSG); - _WriteMessage(Msg2); - } - - protected void WriteSyncMessage(UInt32 SID, bool RPL, PmlElement MSG) { - PmlDictionary Msg2 = new PmlDictionary(); - if (RPL) { - Msg2.Add("CMD", new PmlString("RPL")); - } else { - Msg2.Add("CMD", new PmlString("REQ")); - } - Msg2.Add("SID", new PmlInteger(SID)); - Msg2.Add("MSG", MSG); - _WriteMessage(Msg2); - } - - public void Worker() { - try { - PmlElement Message = null; - int Ping = 0; - while (true) { - try { - Message = _ReadMessage(); - if (Message == null) Console.WriteLine("UCIS.PML.Connection: Message is just null?"); - } catch (EndOfStreamException) { - Console.WriteLine("UCIS.PML.Connection: End of stream"); - return; - } catch (SocketException ex) { - if (ex.ErrorCode == (int)SocketError.TimedOut) { - Console.WriteLine("UCIS.PML.Connection: SocketException/TimedOut"); - Message = null; - } else if (ex.ErrorCode == (int)SocketError.ConnectionReset) { - Console.WriteLine("UCIS.PML.Connection: Connection reset by peer"); - return; - } else { - throw new Exception("Exception while reading message", ex); - } - } catch (IOException ex) { - Console.WriteLine("UCIS.PML.Connection: IOException: " + ex.Message); - Message = null; - } catch (TimeoutException) { - Message = null; - } - if (Message == null) { - if (Ping > 2) { - Console.WriteLine("UCIS.PML.Connection: Connection timed out"); - break; - } else { - _WriteMessage(new PmlString("PING")); - } - Ping += 1; - } else if (Message is PmlString) { - string Cmd = Message.ToString(); - if (Cmd.Equals("PING")) { - _WriteMessage(new PmlString("PONG")); - } else if (Cmd.Equals("PONG")) { - Ping = 0; - } - } else if (Message is PmlDictionary) { - string Cmd = null; - Cmd = Message.GetChild("CMD").ToString(); - if (Cmd.Equals("SES")) { - UInt32 SID = default(UInt32); - byte SCMD = 0; - SessionBase Session = default(SessionBase); - PmlElement InnerMsg = default(PmlElement); - SID = Message.GetChild("SID").ToUInt32(); - SCMD = Message.GetChild("SCMD").ToByte(); - InnerMsg = Message.GetChild("MSG"); - lock (pSessions) { - if (pSessions.ContainsKey(SID)) { - Session = pSessions[SID]; - } else { - Session = null; - } - } - if (SCMD == 0) { - if (Session == null) { - if (SessionRequestReceived != null) { - SessionRequestReceived(InnerMsg, SID); - } - } else { - Session.ClosedA(); - Session.ClosedB(null); - WriteSessionMessage(SID, 2, null); - } - } else if (SCMD == 1) { - if (Session == null) { - WriteSessionMessage(SID, 2, null); - } else { - Session.MessageIn(InnerMsg); - } - } else if (SCMD == 2) { - if (Session != null) { - Session.ClosedA(); - Session.ClosedB(InnerMsg); - } - } - } else if (Cmd.Equals("RPL")) { - UInt32 SID = default(UInt32); - CSyncRequest SRequest = null; - SID = Message.GetChild("SID").ToUInt32(); - lock (pSyncRequests) { - if (pSyncRequests.TryGetValue(SID, out SRequest)) { - pSyncRequests.Remove(SID); - } else { - Console.WriteLine("UCIS.PML.Connection.Worker Invalid request ID in reply: " + SID.ToString()); - } - } - if (SRequest != null) { - SRequest.Reply = Message.GetChild("MSG"); - SRequest.ResetEvent.Set(); - } - } else if (Cmd.Equals("REQ")) { - System.Threading.ThreadPool.QueueUserWorkItem(new System.Threading.WaitCallback(SyncRequestHandler), Message); - } else if (Cmd.Equals("MSG")) { - PmlElement InnerMsg = Message.GetChild("MSG"); - if (MessageReceived != null) MessageReceived(InnerMsg); - } else { - throw new InvalidOperationException("Invalid operation"); - } - } - } - } catch (System.Threading.ThreadAbortException ex) { - throw; - } catch (Exception ex) { - Console.WriteLine(ex.ToString()); - } finally { - Console.WriteLine("UCIS.PML.Connection: Connection closed"); - try { - foreach (SessionBase S in pSessions.Values) { - try { - S.ClosedB(null); - } catch (Exception ex) { - Console.WriteLine(ex.ToString()); - } - } - pSessions.Clear(); - foreach (CSyncRequest T in pSyncRequests.Values) { - T.ResetEvent.Set(); - } - } catch (Exception ex) { - Console.WriteLine(ex.ToString()); - } - } - } - - private void SyncRequestHandler(object state) { - PmlDictionary Message = (PmlDictionary)state; - UInt32 SID = default(UInt32); - PmlElement InnerMsg = default(PmlElement); - PmlElement Reply = default(PmlElement); - Reply = null; - SID = Message.GetChild("SID").ToUInt32(); - InnerMsg = Message.GetChild("MSG"); - try { - if (RequestReceived != null) { - RequestReceived(InnerMsg, ref Reply); - } - } catch (Exception ex) { - Reply = new PmlDictionary(); - ((PmlDictionary)Reply).Add("EXCEPTION", new PmlString(ex.ToString())); - Console.WriteLine(ex.ToString()); - } - WriteSyncMessage(SID, true, Reply); - } - - public PmlElement SyncRequest(PmlElement Request) { - return SyncRequest(Request, 30000); - } - public PmlElement SyncRequest(PmlElement Request, int Timeout) { - UInt32 SID = default(UInt32); - CSyncRequest SyncEvent = new CSyncRequest(); - SID = GetNextSessionId(false); - lock (pSyncRequests) pSyncRequests.Add(SID, SyncEvent); - WriteSyncMessage(SID, false, Request); - if (!SyncEvent.ResetEvent.WaitOne(Timeout, false)) { - Console.WriteLine("UCIS.PML.Connection.SyncRequest Timeout: " + SID.ToString()); - lock (pSyncRequests) pSyncRequests.Remove(SID); - throw new TimeoutException(); - } - return SyncEvent.Reply; - } - - public void SendMessage(PmlElement Message) { - PmlDictionary Msg = new PmlDictionary(); - Msg.Add("CMD", new PmlString("MSG")); - Msg.Add("MSG", Message); - _WriteMessage(Msg); - } - - public PmlElement ReadMessage() { - return _ReadMessage(); - } - public void SendRawMessage(PmlElement Message) { - _WriteMessage(Message); - } - } -} diff -r 5e717aac4c1d -r df53bdd49507 ThreadPool.cs --- a/ThreadPool.cs Fri Nov 07 18:33:34 2014 +0100 +++ b/ThreadPool.cs Fri Nov 07 18:37:39 2014 +0100 @@ -112,7 +112,7 @@ if (pThreadsMinIdle > value) throw new ArgumentOutOfRangeException("ThreadsMaxIdle", "ThreadsMaxIdle must be greater than or equal to ThreadsMinIdle"); if (value < 0) throw new ArgumentOutOfRangeException("ThreadsMaxIdle", "ThreadsMaxIdle must greater than or equal to 0"); lock (pIdleThreads) { - while (value > pIdleThreads.Count) { + while (value < pIdleThreads.Count) { ThreadInfo T = pIdleThreads.Dequeue(); T.Abort = true; T.WaitHandle.Set(); diff -r 5e717aac4c1d -r df53bdd49507 UCIS.Core.csproj --- a/UCIS.Core.csproj Fri Nov 07 18:33:34 2014 +0100 +++ b/UCIS.Core.csproj Fri Nov 07 18:37:39 2014 +0100 @@ -117,15 +117,12 @@ - - - diff -r 5e717aac4c1d -r df53bdd49507 USBLib/Communication/LibUsb1/libusb1.cs --- a/USBLib/Communication/LibUsb1/libusb1.cs Fri Nov 07 18:33:34 2014 +0100 +++ b/USBLib/Communication/LibUsb1/libusb1.cs Fri Nov 07 18:37:39 2014 +0100 @@ -29,7 +29,7 @@ } unsafe static class libusb1 { const CallingConvention LIBUSB1_CC = CallingConvention.Winapi; - const String LIBUSB1_DLL = "libusb-1.0.dll"; + const String LIBUSB1_DLL = "libusb-1.0"; [StructLayout(LayoutKind.Sequential, Pack = 1)] public struct libusb_device_descriptor { diff -r 5e717aac4c1d -r df53bdd49507 Util/AsyncResultBase.cs --- a/Util/AsyncResultBase.cs Fri Nov 07 18:33:34 2014 +0100 +++ b/Util/AsyncResultBase.cs Fri Nov 07 18:37:39 2014 +0100 @@ -8,6 +8,7 @@ static Boolean ThreadInCallback = false; ManualResetEvent WaitEvent = null; AsyncCallback Callback = null; + Object MonitorWaitHandle = new Object(); public object AsyncState { get; private set; } public bool CompletedSynchronously { get; private set; } public bool IsCompleted { get; private set; } @@ -36,6 +37,7 @@ lock (this) { IsCompleted = true; if (WaitEvent != null) WaitEvent.Set(); + if (MonitorWaitHandle != null) lock (MonitorWaitHandle) Monitor.Pulse(MonitorWaitHandle); } if (Callback != null) { if (synchronously && !ThreadInCallback) { @@ -51,6 +53,10 @@ } } + public void WaitForCompletion() { + lock (this) if (!IsCompleted) lock (MonitorWaitHandle) Monitor.Wait(MonitorWaitHandle); + } + protected void ThrowError() { if (Error != null) throw Error; } diff -r 5e717aac4c1d -r df53bdd49507 Util/QueuedPacketStream.cs --- a/Util/QueuedPacketStream.cs Fri Nov 07 18:33:34 2014 +0100 +++ b/Util/QueuedPacketStream.cs Fri Nov 07 18:37:39 2014 +0100 @@ -2,7 +2,6 @@ using System.Collections.Generic; using System.IO; using System.Threading; -using SysThreadPool = System.Threading.ThreadPool; namespace UCIS.Util { public abstract class QueuedPacketStream : PacketStream { diff -r 5e717aac4c1d -r df53bdd49507 Xml/Socket.cs --- a/Xml/Socket.cs Fri Nov 07 18:33:34 2014 +0100 +++ b/Xml/Socket.cs Fri Nov 07 18:37:39 2014 +0100 @@ -58,18 +58,16 @@ return Buffer; } public virtual XmlDocument ReadDocument() { - MemoryStream Buffer = ReadRawDocument(); - try { - XmlDocument Doc = new XmlDocument(); - XmlReader XMLReader = XmlReader.Create(Buffer, pXMLReaderSettings); - Doc.Load(XMLReader); - XMLReader.Close(); - return Doc; - } catch (Exception ex) { - Buffer.Seek(0, SeekOrigin.Begin); - throw new IOException("Could not parse XML document: \"" + Encoding.UTF8.GetString(Buffer.ToArray()) + "\"", ex); - } + MemoryStream Buffer = ReadRawDocument(); + XmlDocument Doc = new XmlDocument(); + try { + using (XmlReader XMLReader = XmlReader.Create(Buffer, pXMLReaderSettings)) Doc.Load(XMLReader); + } catch (Exception ex) { + Buffer.Seek(0, SeekOrigin.Begin); + throw new IOException("Could not parse XML document: \"" + Encoding.UTF8.GetString(Buffer.ToArray()) + "\"", ex); } + return Doc; + } protected virtual void CreateWriter() { pWriter = XmlWriter.Create(pStream, WriterSettings);