Mercurial > hg > ucis.core
diff Util/PipeStream.cs @ 1:28dc7d535036
Small improvements, introduced PacketStream
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Mon, 07 Jan 2013 16:43:28 +0100 |
parents | 3ab940a0c7a0 |
children |
line wrap: on
line diff
--- a/Util/PipeStream.cs Tue Sep 11 16:28:53 2012 +0200 +++ b/Util/PipeStream.cs Mon Jan 07 16:43:28 2013 +0100 @@ -1,59 +1,12 @@ using System; -using System.Collections.Generic; -using System.IO; -using System.Threading; namespace UCIS.Util { - public class PipeStream : Stream { - private Queue<byte[]> queue = new Queue<byte[]>(); - private byte[] currentBuffer = null; - private int currentBufferIndex; - private AutoResetEvent resetEvent = new AutoResetEvent(false); - private Boolean closed = false; - - public PipeStream() { - ReadTimeout = Timeout.Infinite; - } - - public override int ReadTimeout { get; set; } - public override bool CanRead { get { return !closed; } } - public override bool CanWrite { get { return !closed; } } - public override bool CanSeek { get { return false; } } - public override bool CanTimeout { get { return !closed; } } - public override long Length { get { throw new NotSupportedException(); } } - public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } } + public class PipeStream : QueuedPacketStream { + public override bool CanWrite { get { return !Closed; } } public override void Flush() { } - public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } - public override void SetLength(long value) { throw new NotSupportedException(); } - public override int Read(byte[] buffer, int offset, int count) { - if (closed) throw new ObjectDisposedException("PipeStream"); - if (currentBuffer == null) { - //if (queue.Count == 0) if (!resetEvent.WaitOne(this.ReadTimeout)) throw new TimeoutException(); - if (queue.Count == 0 && (ReadTimeout == 0 || !resetEvent.WaitOne(ReadTimeout))) throw new TimeoutException(); - //while (queue.Count == 0) Thread.Sleep(100); - resetEvent.Reset(); - currentBuffer = queue.Dequeue(); - currentBufferIndex = 0; - } - if (count > currentBuffer.Length - currentBufferIndex) count = currentBuffer.Length - currentBufferIndex; - Buffer.BlockCopy(currentBuffer, currentBufferIndex, buffer, offset, count); - currentBufferIndex += count; - if (currentBufferIndex == currentBuffer.Length) currentBuffer = null; - return count; - } public override void Write(byte[] buffer, int offset, int count) { - byte[] tostore; - if (closed) throw new ObjectDisposedException("PipeStream"); - if (count == 0) return; - tostore = new byte[count]; - Buffer.BlockCopy(buffer, offset, tostore, 0, count); - queue.Enqueue(tostore); - resetEvent.Set(); - } - public override void Close() { - closed = true; - resetEvent.Set(); - base.Close(); + if (Closed) throw new ObjectDisposedException("PipeStream"); + AddReadBufferCopy(buffer, offset, count); } } }