diff Util/QueuedPacketStream.cs @ 1:28dc7d535036

Small improvements, introduced PacketStream
author Ivo Smits <Ivo@UCIS.nl>
date Mon, 07 Jan 2013 16:43:28 +0100
parents
children 4b78cc5f116b
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Util/QueuedPacketStream.cs	Mon Jan 07 16:43:28 2013 +0100
@@ -0,0 +1,187 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Threading;
+using SysThreadPool = System.Threading.ThreadPool;
+
+namespace UCIS.Util {
+	public abstract class QueuedPacketStream : PacketStream {
+		Queue<Byte[]> ReceiveQueue = new Queue<byte[]>();
+		Byte[] ReceiveBuffer = null;
+		int ReceiveBufferOffset = 0;
+		int ReceiveWaiting = 0;
+		AutoResetEvent ReceiveEvent = new AutoResetEvent(false);
+		AsyncResult AsyncReceiveOperation = null;
+		protected Boolean Closed { get; private set; }
+
+		public QueuedPacketStream() {
+			ReadTimeout = Timeout.Infinite;
+			Closed = false;
+		}
+
+		protected void AddReadBufferCopy(Byte[] buffer, int offset, int count) {
+			Byte[] store;
+			store = new Byte[count];
+			Buffer.BlockCopy(buffer, offset, store, 0, count);
+			AddReadBufferNoCopy(store);
+		}
+		protected void AddReadBufferNoCopy(Byte[] store) {
+			if (Closed) return;
+			lock (ReceiveQueue) {
+				ReceiveQueue.Enqueue(store);
+				Interlocked.Add(ref ReceiveWaiting, store.Length);
+				ReceiveEvent.Set();
+				if (AsyncReceiveOperation != null && (store.Length > 0 || AsyncReceiveOperation.IsReadPacket)) {
+					AsyncReceiveOperation.SetCompleted(false);
+					AsyncReceiveOperation = null;
+				}
+			}
+		}
+		public override void Close() {
+			Closed = true;
+			base.Close();
+			ReceiveEvent.Set();
+			lock (ReceiveQueue) {
+				if (AsyncReceiveOperation != null) {
+					AsyncReceiveOperation.SetCompleted(false);
+					AsyncReceiveOperation = null;
+				}
+			}
+		}
+
+		public override bool CanSeek { get { return false; } }
+		public override bool CanTimeout { get { return true; } }
+		public override bool CanRead { get { return !Closed; } }
+		public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } }
+		public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }
+		public override void SetLength(long value) { throw new NotSupportedException(); }
+
+		public override int ReadTimeout { get; set; }
+		public override long Length { get { return ReceiveWaiting; } }
+
+		public int WaitForPacket() {
+			while (ReceiveBuffer == null) {
+				lock (ReceiveQueue) {
+					if (ReceiveQueue.Count > 0) {
+						ReceiveBuffer = ReceiveQueue.Dequeue();
+						ReceiveBufferOffset = 0;
+						continue;
+					}
+				}
+				if (Closed) throw new ObjectDisposedException("QueuedPacketStream", "The connection has been closed");
+				if (ReadTimeout == 0 || !ReceiveEvent.WaitOne(ReadTimeout, false)) throw new TimeoutException();
+			}
+			return ReceiveBuffer.Length - ReceiveBufferOffset;
+		}
+		public override int Read(byte[] buffer, int offset, int count) {
+			int left = 0;
+			while (true) {
+				left = WaitForPacket();
+				if (left > 0) break;
+				ReceiveBuffer = null;
+			}
+			if (count > left) count = left;
+			Buffer.BlockCopy(ReceiveBuffer, ReceiveBufferOffset, buffer, offset, count);
+			ReceiveBufferOffset += count;
+			if (ReceiveBufferOffset == ReceiveBuffer.Length) ReceiveBuffer = null;
+			Interlocked.Add(ref ReceiveWaiting, -count);
+			return count;
+		}
+		public override Byte[] ReadPacket() {
+			WaitForPacket();
+			Byte[] arr = ReceiveBuffer;
+			if (ReceiveBufferOffset > 0) {
+				arr = new Byte[ReceiveBuffer.Length - ReceiveBufferOffset];
+				Buffer.BlockCopy(ReceiveBuffer, ReceiveBufferOffset, arr, 0, arr.Length - ReceiveBufferOffset);
+			}
+			ReceiveBuffer = null;
+			return arr;
+		}
+		public override ArraySegment<byte> ReadPacketFast() {
+			WaitForPacket();
+			ArraySegment<byte> ret = new ArraySegment<byte>(ReceiveBuffer, ReceiveBufferOffset, ReceiveBuffer.Length - ReceiveBufferOffset);
+			ReceiveBuffer = null;
+			return ret;
+		}
+
+		class AsyncResult : IAsyncResult {
+			public Object AsyncState { get; private set; }
+			public WaitHandle AsyncWaitHandle { get { return WaitHandle; } }
+			public Boolean CompletedSynchronously { get; private set; }
+			public Boolean IsCompleted { get; private set; }
+			public Boolean IsReadPacket { get; private set; }
+
+			public Byte[] Buffer = null;
+			public int BufferOffset = 0;
+			public int BufferLength = 0;
+
+			private ManualResetEvent WaitHandle = new ManualResetEvent(false);
+			private AsyncCallback Callback = null;
+			private void CallCallback(Object state) {
+				if (Callback != null) Callback(this);
+			}
+			public void SetCompleted(Boolean synchronously) {
+				CompletedSynchronously = synchronously;
+				IsCompleted = true;
+				WaitHandle.Set();
+				if (Callback != null) SysThreadPool.QueueUserWorkItem(CallCallback);
+			}
+			public AsyncResult(AsyncCallback callback, Object state) {
+				this.Callback = callback;
+				this.AsyncState = state;
+				CompletedSynchronously = false;
+				IsCompleted = false;
+				IsReadPacket = true;
+			}
+			public AsyncResult(AsyncCallback callback, Object state, Byte[] buffer, int bufferOffset, int bufferLength)
+				: this(callback, state) {
+				this.Buffer = buffer;
+				this.BufferOffset = bufferOffset;
+				this.BufferLength = bufferLength;
+				IsReadPacket = false;
+			}
+		}
+		private IAsyncResult BeginAsyncReadOperation(AsyncResult ar) {
+			lock (ReceiveQueue) {
+				if (AsyncReceiveOperation != null) throw new InvalidOperationException("Another asynchronous operation is in progress");
+				if (ReceiveBuffer != null || ReceiveQueue.Count > 0) {
+					ar.SetCompleted(true);
+				} else {
+					if (Closed) throw new ObjectDisposedException("QueuedPacketStream", "The connection has been closed");
+					AsyncReceiveOperation = ar;
+				}
+			}
+			return ar;
+		}
+		private void EndAsyncReadOperation(AsyncResult ar) {
+			lock (ReceiveQueue) {
+				if (AsyncReceiveOperation != null && ar != AsyncReceiveOperation) throw new InvalidOperationException("The given AsyncResult object does not match the current pending operation");
+				AsyncReceiveOperation = null;
+			}
+		}
+		public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) {
+			return BeginAsyncReadOperation(new AsyncResult(callback, state, buffer, offset, count));
+		}
+		public override IAsyncResult BeginReadPacket(AsyncCallback callback, object state) {
+			return BeginAsyncReadOperation(new AsyncResult(callback, state));
+		}
+		public override IAsyncResult BeginReadPacketFast(AsyncCallback callback, object state) {
+			return BeginAsyncReadOperation(new AsyncResult(callback, state));
+		}
+		public override int EndRead(IAsyncResult asyncResult) {
+			AsyncResult ar = (AsyncResult)asyncResult;
+			EndAsyncReadOperation(ar);
+			return Read(ar.Buffer, ar.BufferOffset, ar.BufferLength);
+		}
+		public override Byte[] EndReadPacket(IAsyncResult asyncResult) {
+			AsyncResult ar = (AsyncResult)asyncResult;
+			EndAsyncReadOperation(ar);
+			return ReadPacket();
+		}
+		public override ArraySegment<Byte> EndReadPacketFast(IAsyncResult asyncResult) {
+			AsyncResult ar = (AsyncResult)asyncResult;
+			EndAsyncReadOperation(ar);
+			return ReadPacketFast();
+		}
+	}
+}