Mercurial > hg > ucis.core
diff Util/QueuedPacketStream.cs @ 1:28dc7d535036
Small improvements, introduced PacketStream
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Mon, 07 Jan 2013 16:43:28 +0100 |
parents | |
children | 4b78cc5f116b |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Util/QueuedPacketStream.cs Mon Jan 07 16:43:28 2013 +0100 @@ -0,0 +1,187 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Threading; +using SysThreadPool = System.Threading.ThreadPool; + +namespace UCIS.Util { + public abstract class QueuedPacketStream : PacketStream { + Queue<Byte[]> ReceiveQueue = new Queue<byte[]>(); + Byte[] ReceiveBuffer = null; + int ReceiveBufferOffset = 0; + int ReceiveWaiting = 0; + AutoResetEvent ReceiveEvent = new AutoResetEvent(false); + AsyncResult AsyncReceiveOperation = null; + protected Boolean Closed { get; private set; } + + public QueuedPacketStream() { + ReadTimeout = Timeout.Infinite; + Closed = false; + } + + protected void AddReadBufferCopy(Byte[] buffer, int offset, int count) { + Byte[] store; + store = new Byte[count]; + Buffer.BlockCopy(buffer, offset, store, 0, count); + AddReadBufferNoCopy(store); + } + protected void AddReadBufferNoCopy(Byte[] store) { + if (Closed) return; + lock (ReceiveQueue) { + ReceiveQueue.Enqueue(store); + Interlocked.Add(ref ReceiveWaiting, store.Length); + ReceiveEvent.Set(); + if (AsyncReceiveOperation != null && (store.Length > 0 || AsyncReceiveOperation.IsReadPacket)) { + AsyncReceiveOperation.SetCompleted(false); + AsyncReceiveOperation = null; + } + } + } + public override void Close() { + Closed = true; + base.Close(); + ReceiveEvent.Set(); + lock (ReceiveQueue) { + if (AsyncReceiveOperation != null) { + AsyncReceiveOperation.SetCompleted(false); + AsyncReceiveOperation = null; + } + } + } + + public override bool CanSeek { get { return false; } } + public override bool CanTimeout { get { return true; } } + public override bool CanRead { get { return !Closed; } } + public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } } + public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } + public override void SetLength(long value) { throw new NotSupportedException(); } + + public override int ReadTimeout { get; set; } + public override long Length { get { return ReceiveWaiting; } } + + public int WaitForPacket() { + while (ReceiveBuffer == null) { + lock (ReceiveQueue) { + if (ReceiveQueue.Count > 0) { + ReceiveBuffer = ReceiveQueue.Dequeue(); + ReceiveBufferOffset = 0; + continue; + } + } + if (Closed) throw new ObjectDisposedException("QueuedPacketStream", "The connection has been closed"); + if (ReadTimeout == 0 || !ReceiveEvent.WaitOne(ReadTimeout, false)) throw new TimeoutException(); + } + return ReceiveBuffer.Length - ReceiveBufferOffset; + } + public override int Read(byte[] buffer, int offset, int count) { + int left = 0; + while (true) { + left = WaitForPacket(); + if (left > 0) break; + ReceiveBuffer = null; + } + if (count > left) count = left; + Buffer.BlockCopy(ReceiveBuffer, ReceiveBufferOffset, buffer, offset, count); + ReceiveBufferOffset += count; + if (ReceiveBufferOffset == ReceiveBuffer.Length) ReceiveBuffer = null; + Interlocked.Add(ref ReceiveWaiting, -count); + return count; + } + public override Byte[] ReadPacket() { + WaitForPacket(); + Byte[] arr = ReceiveBuffer; + if (ReceiveBufferOffset > 0) { + arr = new Byte[ReceiveBuffer.Length - ReceiveBufferOffset]; + Buffer.BlockCopy(ReceiveBuffer, ReceiveBufferOffset, arr, 0, arr.Length - ReceiveBufferOffset); + } + ReceiveBuffer = null; + return arr; + } + public override ArraySegment<byte> ReadPacketFast() { + WaitForPacket(); + ArraySegment<byte> ret = new ArraySegment<byte>(ReceiveBuffer, ReceiveBufferOffset, ReceiveBuffer.Length - ReceiveBufferOffset); + ReceiveBuffer = null; + return ret; + } + + class AsyncResult : IAsyncResult { + public Object AsyncState { get; private set; } + public WaitHandle AsyncWaitHandle { get { return WaitHandle; } } + public Boolean CompletedSynchronously { get; private set; } + public Boolean IsCompleted { get; private set; } + public Boolean IsReadPacket { get; private set; } + + public Byte[] Buffer = null; + public int BufferOffset = 0; + public int BufferLength = 0; + + private ManualResetEvent WaitHandle = new ManualResetEvent(false); + private AsyncCallback Callback = null; + private void CallCallback(Object state) { + if (Callback != null) Callback(this); + } + public void SetCompleted(Boolean synchronously) { + CompletedSynchronously = synchronously; + IsCompleted = true; + WaitHandle.Set(); + if (Callback != null) SysThreadPool.QueueUserWorkItem(CallCallback); + } + public AsyncResult(AsyncCallback callback, Object state) { + this.Callback = callback; + this.AsyncState = state; + CompletedSynchronously = false; + IsCompleted = false; + IsReadPacket = true; + } + public AsyncResult(AsyncCallback callback, Object state, Byte[] buffer, int bufferOffset, int bufferLength) + : this(callback, state) { + this.Buffer = buffer; + this.BufferOffset = bufferOffset; + this.BufferLength = bufferLength; + IsReadPacket = false; + } + } + private IAsyncResult BeginAsyncReadOperation(AsyncResult ar) { + lock (ReceiveQueue) { + if (AsyncReceiveOperation != null) throw new InvalidOperationException("Another asynchronous operation is in progress"); + if (ReceiveBuffer != null || ReceiveQueue.Count > 0) { + ar.SetCompleted(true); + } else { + if (Closed) throw new ObjectDisposedException("QueuedPacketStream", "The connection has been closed"); + AsyncReceiveOperation = ar; + } + } + return ar; + } + private void EndAsyncReadOperation(AsyncResult ar) { + lock (ReceiveQueue) { + if (AsyncReceiveOperation != null && ar != AsyncReceiveOperation) throw new InvalidOperationException("The given AsyncResult object does not match the current pending operation"); + AsyncReceiveOperation = null; + } + } + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { + return BeginAsyncReadOperation(new AsyncResult(callback, state, buffer, offset, count)); + } + public override IAsyncResult BeginReadPacket(AsyncCallback callback, object state) { + return BeginAsyncReadOperation(new AsyncResult(callback, state)); + } + public override IAsyncResult BeginReadPacketFast(AsyncCallback callback, object state) { + return BeginAsyncReadOperation(new AsyncResult(callback, state)); + } + public override int EndRead(IAsyncResult asyncResult) { + AsyncResult ar = (AsyncResult)asyncResult; + EndAsyncReadOperation(ar); + return Read(ar.Buffer, ar.BufferOffset, ar.BufferLength); + } + public override Byte[] EndReadPacket(IAsyncResult asyncResult) { + AsyncResult ar = (AsyncResult)asyncResult; + EndAsyncReadOperation(ar); + return ReadPacket(); + } + public override ArraySegment<Byte> EndReadPacketFast(IAsyncResult asyncResult) { + AsyncResult ar = (AsyncResult)asyncResult; + EndAsyncReadOperation(ar); + return ReadPacketFast(); + } + } +}