Mercurial > hg > ucis.core
diff Util/CrossStream.cs @ 0:3ab940a0c7a0
Initial commit
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Tue, 11 Sep 2012 16:28:53 +0200 |
parents | |
children | 28dc7d535036 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Util/CrossStream.cs Tue Sep 11 16:28:53 2012 +0200 @@ -0,0 +1,65 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Threading; + +namespace UCIS.Util { + public class CrossStream : Stream { + private Queue<byte[]> queue = new Queue<byte[]>(); + private byte[] currentBuffer = null; + private int currentBufferIndex; + private CrossStream otherPart; + private AutoResetEvent resetEvent = new AutoResetEvent(false); + + public static CrossStream CreatePair(out CrossStream stream1, out CrossStream stream2) { + stream1 = new CrossStream(); + stream2 = new CrossStream(stream1); + stream1.otherPart = stream2; + return stream1; + } + public static CrossStream CreatePair(out CrossStream stream2) { + CrossStream stream1 = new CrossStream(); + stream2 = new CrossStream(stream1); + stream1.otherPart = stream2; + return stream1; + } + + private CrossStream() { } + public CrossStream(CrossStream other) { + this.otherPart = other; + } + + public override bool CanRead { get { return true; } } + public override bool CanWrite { get { return true; } } + public override bool CanSeek { get { return false; } } + public override bool CanTimeout { get { return true; } } + public override long Length { get { throw new NotSupportedException(); } } + public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } } + public override void Flush() { } + public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } + public override void SetLength(long value) { throw new NotSupportedException(); } + public override int Read(byte[] buffer, int offset, int count) { + if (currentBuffer == null) { + //if (queue.Count == 0) if (!resetEvent.WaitOne(this.ReadTimeout)) throw new TimeoutException(); + if (queue.Count == 0 && !resetEvent.WaitOne()) throw new TimeoutException(); + //while (queue.Count == 0) Thread.Sleep(100); + resetEvent.Reset(); + currentBuffer = queue.Dequeue(); + currentBufferIndex = 0; + } + if (count > currentBuffer.Length - currentBufferIndex) count = currentBuffer.Length - currentBufferIndex; + Buffer.BlockCopy(currentBuffer, currentBufferIndex, buffer, offset, count); + currentBufferIndex += count; + if (currentBufferIndex == currentBuffer.Length) currentBuffer = null; + return count; + } + public override void Write(byte[] buffer, int offset, int count) { + byte[] tostore; + if (count == 0) return; + tostore = new byte[count]; + Buffer.BlockCopy(buffer, offset, tostore, 0, count); + otherPart.queue.Enqueue(tostore); + otherPart.resetEvent.Set(); + } + } +}