diff Util/PipeStream.cs @ 0:3ab940a0c7a0

Initial commit
author Ivo Smits <Ivo@UCIS.nl>
date Tue, 11 Sep 2012 16:28:53 +0200
parents
children 28dc7d535036
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Util/PipeStream.cs	Tue Sep 11 16:28:53 2012 +0200
@@ -0,0 +1,59 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Threading;
+
+namespace UCIS.Util {
+	public class PipeStream : Stream {
+		private Queue<byte[]> queue = new Queue<byte[]>();
+		private byte[] currentBuffer = null;
+		private int currentBufferIndex;
+		private AutoResetEvent resetEvent = new AutoResetEvent(false);
+		private Boolean closed = false;
+
+		public PipeStream() {
+			ReadTimeout = Timeout.Infinite;
+		}
+
+		public override int ReadTimeout { get; set; }
+		public override bool CanRead { get { return !closed; } }
+		public override bool CanWrite { get { return !closed; } }
+		public override bool CanSeek { get { return false; } }
+		public override bool CanTimeout { get { return !closed; } }
+		public override long Length { get { throw new NotSupportedException(); } }
+		public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } }
+		public override void Flush() { }
+		public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }
+		public override void SetLength(long value) { throw new NotSupportedException(); }
+		public override int Read(byte[] buffer, int offset, int count) {
+			if (closed) throw new ObjectDisposedException("PipeStream");
+			if (currentBuffer == null) {
+				//if (queue.Count == 0) if (!resetEvent.WaitOne(this.ReadTimeout)) throw new TimeoutException();
+				if (queue.Count == 0 && (ReadTimeout == 0 || !resetEvent.WaitOne(ReadTimeout))) throw new TimeoutException();
+				//while (queue.Count == 0) Thread.Sleep(100);
+				resetEvent.Reset();
+				currentBuffer = queue.Dequeue();
+				currentBufferIndex = 0;
+			}
+			if (count > currentBuffer.Length - currentBufferIndex) count = currentBuffer.Length - currentBufferIndex;
+			Buffer.BlockCopy(currentBuffer, currentBufferIndex, buffer, offset, count);
+			currentBufferIndex += count;
+			if (currentBufferIndex == currentBuffer.Length) currentBuffer = null;
+			return count;
+		}
+		public override void Write(byte[] buffer, int offset, int count) {
+			byte[] tostore;
+			if (closed) throw new ObjectDisposedException("PipeStream");
+			if (count == 0) return;
+			tostore = new byte[count];
+			Buffer.BlockCopy(buffer, offset, tostore, 0, count);
+			queue.Enqueue(tostore);
+			resetEvent.Set();
+		}
+		public override void Close() {
+			closed = true;
+			resetEvent.Set();
+			base.Close();
+		}
+	}
+}