# HG changeset patch # User Ivo Smits # Date 1357573408 -3600 # Node ID 28dc7d535036bea53268c96d4456ebe53c1cdd50 # Parent 3ab940a0c7a01d245e9e58322ff699fd30c03ee7 Small improvements, introduced PacketStream diff -r 3ab940a0c7a0 -r 28dc7d535036 Pml/Elements/Collection.cs --- a/Pml/Elements/Collection.cs Tue Sep 11 16:28:53 2012 +0200 +++ b/Pml/Elements/Collection.cs Mon Jan 07 16:43:28 2013 +0100 @@ -6,13 +6,16 @@ public class PmlCollection : PmlElement, ICollection { private List pItems = new List(); - public PmlCollection() { } + public PmlCollection() { } public PmlCollection(params PmlElement[] Elements) { pItems.AddRange(Elements); } public PmlCollection(IEnumerable Elements) { pItems.AddRange(Elements); } + public PmlCollection(params String[] Elements) { + foreach (String s in Elements) pItems.Add(s); + } public PmlElement Add(PmlElement Element) { pItems.Add(Element); diff -r 3ab940a0c7a0 -r 28dc7d535036 Pml/RW/PmlBinaryRW.cs --- a/Pml/RW/PmlBinaryRW.cs Tue Sep 11 16:28:53 2012 +0200 +++ b/Pml/RW/PmlBinaryRW.cs Mon Jan 07 16:43:28 2013 +0100 @@ -34,29 +34,18 @@ } public class PmlBinaryWriter : IPmlWriter { - //private BinaryWriter pWriter; private Stream pStream; private Encoding pEncoding; - /*public PmlBinaryWriter(BinaryWriter Writer) { - pWriter = Writer; - }*/ public PmlBinaryWriter(Stream Stream) { - //pWriter = new BinaryWriter(Stream); pStream = Stream; pEncoding = Encoding.UTF8; } public PmlBinaryWriter(Stream Stream, Encoding Encoding) { - //pWriter = new BinaryWriter(Stream, Encoding); pStream = Stream; pEncoding = Encoding; } - /*public BinaryWriter BaseWriter { - get { return pWriter; } - set { pWriter = value; } - }*/ - public void WriteMessage(PmlElement Message) { MemoryStream stream = new MemoryStream(); BinaryWriter writer = new BinaryWriter(stream, pEncoding); @@ -67,6 +56,13 @@ } } + public static Byte[] EncodeMessage(PmlElement message) { + MemoryStream stream = new MemoryStream(); + BinaryWriter writer = new BinaryWriter(stream, Encoding.UTF8); + WriteMessageTo(message, writer); + return stream.ToArray(); + } + public static void WriteMessageTo(PmlElement Message, BinaryWriter Writer) { lock (Writer) { Writer.Write((byte)255); @@ -77,13 +73,10 @@ } private static void WriteElementTo(PmlElement Element, BinaryWriter Writer) { - //byte[] Buffer = null; if (Element == null) { - //Writer.Write((byte)PmlElementType.Null); Writer.Write((byte)0); return; } - //Writer.Write((byte)Element.Type); switch (Element.Type) { case PmlType.Null: Writer.Write((byte)0); @@ -119,7 +112,7 @@ Writer.Write((byte)11); string Str = Element.ToString(); if (Str == null) { - Writer.Write(""); + Writer.Write(String.Empty); } else { Writer.Write(Str); } @@ -166,6 +159,14 @@ return ReadMessageFrom(pReader); } + public static PmlElement DecodeMessage(Byte[] message) { + using (MemoryStream ms = new MemoryStream(message)) { + using (BinaryReader reader = new BinaryReader(ms, Encoding.UTF8)) { + return ReadMessageFrom(reader); + } + } + } + public static PmlElement ReadMessageFrom(BinaryReader Reader) { PmlElement Element = null; lock (Reader) { @@ -213,7 +214,6 @@ if (B == 0) return new PmlInteger(Reader.ReadUInt64()); else if (B == 1) return new PmlInteger(Reader.ReadInt64()); else return null; - } default: throw new Exception("Unknown PML type code " + EType.ToString()); diff -r 3ab940a0c7a0 -r 28dc7d535036 UCIS.csproj --- a/UCIS.csproj Tue Sep 11 16:28:53 2012 +0200 +++ b/UCIS.csproj Mon Jan 07 16:43:28 2013 +0100 @@ -12,6 +12,8 @@ UCIS v2.0 512 + true + UCIS.snk true @@ -64,6 +66,7 @@ + @@ -81,7 +84,9 @@ + + diff -r 3ab940a0c7a0 -r 28dc7d535036 UTF8NoPreamble.cs --- a/UTF8NoPreamble.cs Tue Sep 11 16:28:53 2012 +0200 +++ b/UTF8NoPreamble.cs Mon Jan 07 16:43:28 2013 +0100 @@ -1,7 +1,5 @@ namespace UCIS { public class UTF8NoPreamble : System.Text.UTF8Encoding { - public override byte[] GetPreamble() { - return new byte[] { }; - } + public UTF8NoPreamble() : base(false) { } } } diff -r 3ab940a0c7a0 -r 28dc7d535036 Util/ArrayUtil.cs --- a/Util/ArrayUtil.cs Tue Sep 11 16:28:53 2012 +0200 +++ b/Util/ArrayUtil.cs Mon Jan 07 16:43:28 2013 +0100 @@ -84,5 +84,23 @@ for (int i = 0; i < a.Length; i++) if (a[i] != b[i]) return false; return true; } + public static int GetHashCode(T[] array) { + int h = 0; + foreach (T v in array) h ^= v.GetHashCode(); + return h; + } + public static void Add(ref T[] array, T item) { + if (array == null) { + array = new T[] { item }; + } else { + int index = array.Length; + Array.Resize(ref array, index + 1); + array[index] = item; + } + } + public static void AddUnique(ref T[] array, T item) { + if (Array.IndexOf(array, item) != -1) return; + Add(ref array, item); + } } } diff -r 3ab940a0c7a0 -r 28dc7d535036 Util/CrossStream.cs --- a/Util/CrossStream.cs Tue Sep 11 16:28:53 2012 +0200 +++ b/Util/CrossStream.cs Mon Jan 07 16:43:28 2013 +0100 @@ -1,65 +1,44 @@ using System; -using System.Collections.Generic; -using System.IO; -using System.Threading; namespace UCIS.Util { - public class CrossStream : Stream { - private Queue queue = new Queue(); - private byte[] currentBuffer = null; - private int currentBufferIndex; - private CrossStream otherPart; - private AutoResetEvent resetEvent = new AutoResetEvent(false); + public class CrossStream : QueuedPacketStream { + protected CrossStream otherPart; + private Boolean otherClosed = false; + + public CrossStream OtherSide { get { return otherPart; } } public static CrossStream CreatePair(out CrossStream stream1, out CrossStream stream2) { - stream1 = new CrossStream(); - stream2 = new CrossStream(stream1); - stream1.otherPart = stream2; - return stream1; + return stream1 = CreatePair(out stream2); } public static CrossStream CreatePair(out CrossStream stream2) { - CrossStream stream1 = new CrossStream(); - stream2 = new CrossStream(stream1); - stream1.otherPart = stream2; - return stream1; + stream2 = new CrossStream(); + return stream2.otherPart; } - private CrossStream() { } - public CrossStream(CrossStream other) { - this.otherPart = other; + public CrossStream() { + otherPart = new CrossStream(this); + } + protected CrossStream(CrossStream other) { + otherPart = other; } public override bool CanRead { get { return true; } } public override bool CanWrite { get { return true; } } - public override bool CanSeek { get { return false; } } - public override bool CanTimeout { get { return true; } } - public override long Length { get { throw new NotSupportedException(); } } - public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } } public override void Flush() { } - public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } - public override void SetLength(long value) { throw new NotSupportedException(); } - public override int Read(byte[] buffer, int offset, int count) { - if (currentBuffer == null) { - //if (queue.Count == 0) if (!resetEvent.WaitOne(this.ReadTimeout)) throw new TimeoutException(); - if (queue.Count == 0 && !resetEvent.WaitOne()) throw new TimeoutException(); - //while (queue.Count == 0) Thread.Sleep(100); - resetEvent.Reset(); - currentBuffer = queue.Dequeue(); - currentBufferIndex = 0; - } - if (count > currentBuffer.Length - currentBufferIndex) count = currentBuffer.Length - currentBufferIndex; - Buffer.BlockCopy(currentBuffer, currentBufferIndex, buffer, offset, count); - currentBufferIndex += count; - if (currentBufferIndex == currentBuffer.Length) currentBuffer = null; - return count; + + public override void Write(byte[] buffer, int offset, int count) { + if (otherClosed) throw new ObjectDisposedException("CrossStream", "The stream has been closed"); + otherPart.AddReadBufferCopy(buffer, offset, count); } - public override void Write(byte[] buffer, int offset, int count) { - byte[] tostore; - if (count == 0) return; - tostore = new byte[count]; - Buffer.BlockCopy(buffer, offset, tostore, 0, count); - otherPart.queue.Enqueue(tostore); - otherPart.resetEvent.Set(); + + public override void Close() { + CloseBase(); + otherPart.CloseBase(); + } + private void CloseBase() { + otherClosed = true; + otherPart = null; + base.Close(); } } } diff -r 3ab940a0c7a0 -r 28dc7d535036 Util/PacketStream.cs --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Util/PacketStream.cs Mon Jan 07 16:43:28 2013 +0100 @@ -0,0 +1,17 @@ +using System; +using System.IO; + +namespace UCIS.Util { + public abstract class PacketStream : Stream { + public abstract Byte[] ReadPacket(); + public abstract IAsyncResult BeginReadPacket(AsyncCallback callback, object state); + public abstract Byte[] EndReadPacket(IAsyncResult asyncResult); + + public virtual ArraySegment ReadPacketFast() { return new ArraySegment(ReadPacket()); } + public virtual IAsyncResult BeginReadPacketFast(AsyncCallback callback, object state) { return BeginReadPacket(callback, state); } + public virtual ArraySegment EndReadPacketFast(IAsyncResult asyncResult) { return new ArraySegment(EndReadPacket(asyncResult)); } + public virtual void WritePacketFast(Byte[] packet, int unusedBefore, int unusedAfter) { Write(packet, unusedBefore, packet.Length - unusedBefore - unusedAfter); } + public virtual int WriteFastBytesBefore { get { return 0; } } + public virtual int WriteFastBytesAfter { get { return 0; } } + } +} diff -r 3ab940a0c7a0 -r 28dc7d535036 Util/PipeStream.cs --- a/Util/PipeStream.cs Tue Sep 11 16:28:53 2012 +0200 +++ b/Util/PipeStream.cs Mon Jan 07 16:43:28 2013 +0100 @@ -1,59 +1,12 @@ using System; -using System.Collections.Generic; -using System.IO; -using System.Threading; namespace UCIS.Util { - public class PipeStream : Stream { - private Queue queue = new Queue(); - private byte[] currentBuffer = null; - private int currentBufferIndex; - private AutoResetEvent resetEvent = new AutoResetEvent(false); - private Boolean closed = false; - - public PipeStream() { - ReadTimeout = Timeout.Infinite; - } - - public override int ReadTimeout { get; set; } - public override bool CanRead { get { return !closed; } } - public override bool CanWrite { get { return !closed; } } - public override bool CanSeek { get { return false; } } - public override bool CanTimeout { get { return !closed; } } - public override long Length { get { throw new NotSupportedException(); } } - public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } } + public class PipeStream : QueuedPacketStream { + public override bool CanWrite { get { return !Closed; } } public override void Flush() { } - public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } - public override void SetLength(long value) { throw new NotSupportedException(); } - public override int Read(byte[] buffer, int offset, int count) { - if (closed) throw new ObjectDisposedException("PipeStream"); - if (currentBuffer == null) { - //if (queue.Count == 0) if (!resetEvent.WaitOne(this.ReadTimeout)) throw new TimeoutException(); - if (queue.Count == 0 && (ReadTimeout == 0 || !resetEvent.WaitOne(ReadTimeout))) throw new TimeoutException(); - //while (queue.Count == 0) Thread.Sleep(100); - resetEvent.Reset(); - currentBuffer = queue.Dequeue(); - currentBufferIndex = 0; - } - if (count > currentBuffer.Length - currentBufferIndex) count = currentBuffer.Length - currentBufferIndex; - Buffer.BlockCopy(currentBuffer, currentBufferIndex, buffer, offset, count); - currentBufferIndex += count; - if (currentBufferIndex == currentBuffer.Length) currentBuffer = null; - return count; - } public override void Write(byte[] buffer, int offset, int count) { - byte[] tostore; - if (closed) throw new ObjectDisposedException("PipeStream"); - if (count == 0) return; - tostore = new byte[count]; - Buffer.BlockCopy(buffer, offset, tostore, 0, count); - queue.Enqueue(tostore); - resetEvent.Set(); - } - public override void Close() { - closed = true; - resetEvent.Set(); - base.Close(); + if (Closed) throw new ObjectDisposedException("PipeStream"); + AddReadBufferCopy(buffer, offset, count); } } } diff -r 3ab940a0c7a0 -r 28dc7d535036 Util/QueuedPacketStream.cs --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Util/QueuedPacketStream.cs Mon Jan 07 16:43:28 2013 +0100 @@ -0,0 +1,187 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Threading; +using SysThreadPool = System.Threading.ThreadPool; + +namespace UCIS.Util { + public abstract class QueuedPacketStream : PacketStream { + Queue ReceiveQueue = new Queue(); + Byte[] ReceiveBuffer = null; + int ReceiveBufferOffset = 0; + int ReceiveWaiting = 0; + AutoResetEvent ReceiveEvent = new AutoResetEvent(false); + AsyncResult AsyncReceiveOperation = null; + protected Boolean Closed { get; private set; } + + public QueuedPacketStream() { + ReadTimeout = Timeout.Infinite; + Closed = false; + } + + protected void AddReadBufferCopy(Byte[] buffer, int offset, int count) { + Byte[] store; + store = new Byte[count]; + Buffer.BlockCopy(buffer, offset, store, 0, count); + AddReadBufferNoCopy(store); + } + protected void AddReadBufferNoCopy(Byte[] store) { + if (Closed) return; + lock (ReceiveQueue) { + ReceiveQueue.Enqueue(store); + Interlocked.Add(ref ReceiveWaiting, store.Length); + ReceiveEvent.Set(); + if (AsyncReceiveOperation != null && (store.Length > 0 || AsyncReceiveOperation.IsReadPacket)) { + AsyncReceiveOperation.SetCompleted(false); + AsyncReceiveOperation = null; + } + } + } + public override void Close() { + Closed = true; + base.Close(); + ReceiveEvent.Set(); + lock (ReceiveQueue) { + if (AsyncReceiveOperation != null) { + AsyncReceiveOperation.SetCompleted(false); + AsyncReceiveOperation = null; + } + } + } + + public override bool CanSeek { get { return false; } } + public override bool CanTimeout { get { return true; } } + public override bool CanRead { get { return !Closed; } } + public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } } + public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } + public override void SetLength(long value) { throw new NotSupportedException(); } + + public override int ReadTimeout { get; set; } + public override long Length { get { return ReceiveWaiting; } } + + public int WaitForPacket() { + while (ReceiveBuffer == null) { + lock (ReceiveQueue) { + if (ReceiveQueue.Count > 0) { + ReceiveBuffer = ReceiveQueue.Dequeue(); + ReceiveBufferOffset = 0; + continue; + } + } + if (Closed) throw new ObjectDisposedException("QueuedPacketStream", "The connection has been closed"); + if (ReadTimeout == 0 || !ReceiveEvent.WaitOne(ReadTimeout, false)) throw new TimeoutException(); + } + return ReceiveBuffer.Length - ReceiveBufferOffset; + } + public override int Read(byte[] buffer, int offset, int count) { + int left = 0; + while (true) { + left = WaitForPacket(); + if (left > 0) break; + ReceiveBuffer = null; + } + if (count > left) count = left; + Buffer.BlockCopy(ReceiveBuffer, ReceiveBufferOffset, buffer, offset, count); + ReceiveBufferOffset += count; + if (ReceiveBufferOffset == ReceiveBuffer.Length) ReceiveBuffer = null; + Interlocked.Add(ref ReceiveWaiting, -count); + return count; + } + public override Byte[] ReadPacket() { + WaitForPacket(); + Byte[] arr = ReceiveBuffer; + if (ReceiveBufferOffset > 0) { + arr = new Byte[ReceiveBuffer.Length - ReceiveBufferOffset]; + Buffer.BlockCopy(ReceiveBuffer, ReceiveBufferOffset, arr, 0, arr.Length - ReceiveBufferOffset); + } + ReceiveBuffer = null; + return arr; + } + public override ArraySegment ReadPacketFast() { + WaitForPacket(); + ArraySegment ret = new ArraySegment(ReceiveBuffer, ReceiveBufferOffset, ReceiveBuffer.Length - ReceiveBufferOffset); + ReceiveBuffer = null; + return ret; + } + + class AsyncResult : IAsyncResult { + public Object AsyncState { get; private set; } + public WaitHandle AsyncWaitHandle { get { return WaitHandle; } } + public Boolean CompletedSynchronously { get; private set; } + public Boolean IsCompleted { get; private set; } + public Boolean IsReadPacket { get; private set; } + + public Byte[] Buffer = null; + public int BufferOffset = 0; + public int BufferLength = 0; + + private ManualResetEvent WaitHandle = new ManualResetEvent(false); + private AsyncCallback Callback = null; + private void CallCallback(Object state) { + if (Callback != null) Callback(this); + } + public void SetCompleted(Boolean synchronously) { + CompletedSynchronously = synchronously; + IsCompleted = true; + WaitHandle.Set(); + if (Callback != null) SysThreadPool.QueueUserWorkItem(CallCallback); + } + public AsyncResult(AsyncCallback callback, Object state) { + this.Callback = callback; + this.AsyncState = state; + CompletedSynchronously = false; + IsCompleted = false; + IsReadPacket = true; + } + public AsyncResult(AsyncCallback callback, Object state, Byte[] buffer, int bufferOffset, int bufferLength) + : this(callback, state) { + this.Buffer = buffer; + this.BufferOffset = bufferOffset; + this.BufferLength = bufferLength; + IsReadPacket = false; + } + } + private IAsyncResult BeginAsyncReadOperation(AsyncResult ar) { + lock (ReceiveQueue) { + if (AsyncReceiveOperation != null) throw new InvalidOperationException("Another asynchronous operation is in progress"); + if (ReceiveBuffer != null || ReceiveQueue.Count > 0) { + ar.SetCompleted(true); + } else { + if (Closed) throw new ObjectDisposedException("QueuedPacketStream", "The connection has been closed"); + AsyncReceiveOperation = ar; + } + } + return ar; + } + private void EndAsyncReadOperation(AsyncResult ar) { + lock (ReceiveQueue) { + if (AsyncReceiveOperation != null && ar != AsyncReceiveOperation) throw new InvalidOperationException("The given AsyncResult object does not match the current pending operation"); + AsyncReceiveOperation = null; + } + } + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { + return BeginAsyncReadOperation(new AsyncResult(callback, state, buffer, offset, count)); + } + public override IAsyncResult BeginReadPacket(AsyncCallback callback, object state) { + return BeginAsyncReadOperation(new AsyncResult(callback, state)); + } + public override IAsyncResult BeginReadPacketFast(AsyncCallback callback, object state) { + return BeginAsyncReadOperation(new AsyncResult(callback, state)); + } + public override int EndRead(IAsyncResult asyncResult) { + AsyncResult ar = (AsyncResult)asyncResult; + EndAsyncReadOperation(ar); + return Read(ar.Buffer, ar.BufferOffset, ar.BufferLength); + } + public override Byte[] EndReadPacket(IAsyncResult asyncResult) { + AsyncResult ar = (AsyncResult)asyncResult; + EndAsyncReadOperation(ar); + return ReadPacket(); + } + public override ArraySegment EndReadPacketFast(IAsyncResult asyncResult) { + AsyncResult ar = (AsyncResult)asyncResult; + EndAsyncReadOperation(ar); + return ReadPacketFast(); + } + } +}