comparison Util/PipeStream.cs @ 0:3ab940a0c7a0

Initial commit
author Ivo Smits <Ivo@UCIS.nl>
date Tue, 11 Sep 2012 16:28:53 +0200
parents
children 28dc7d535036
comparison
equal deleted inserted replaced
-1:000000000000 0:3ab940a0c7a0
1 using System;
2 using System.Collections.Generic;
3 using System.IO;
4 using System.Threading;
5
6 namespace UCIS.Util {
7 public class PipeStream : Stream {
8 private Queue<byte[]> queue = new Queue<byte[]>();
9 private byte[] currentBuffer = null;
10 private int currentBufferIndex;
11 private AutoResetEvent resetEvent = new AutoResetEvent(false);
12 private Boolean closed = false;
13
14 public PipeStream() {
15 ReadTimeout = Timeout.Infinite;
16 }
17
18 public override int ReadTimeout { get; set; }
19 public override bool CanRead { get { return !closed; } }
20 public override bool CanWrite { get { return !closed; } }
21 public override bool CanSeek { get { return false; } }
22 public override bool CanTimeout { get { return !closed; } }
23 public override long Length { get { throw new NotSupportedException(); } }
24 public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } }
25 public override void Flush() { }
26 public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }
27 public override void SetLength(long value) { throw new NotSupportedException(); }
28 public override int Read(byte[] buffer, int offset, int count) {
29 if (closed) throw new ObjectDisposedException("PipeStream");
30 if (currentBuffer == null) {
31 //if (queue.Count == 0) if (!resetEvent.WaitOne(this.ReadTimeout)) throw new TimeoutException();
32 if (queue.Count == 0 && (ReadTimeout == 0 || !resetEvent.WaitOne(ReadTimeout))) throw new TimeoutException();
33 //while (queue.Count == 0) Thread.Sleep(100);
34 resetEvent.Reset();
35 currentBuffer = queue.Dequeue();
36 currentBufferIndex = 0;
37 }
38 if (count > currentBuffer.Length - currentBufferIndex) count = currentBuffer.Length - currentBufferIndex;
39 Buffer.BlockCopy(currentBuffer, currentBufferIndex, buffer, offset, count);
40 currentBufferIndex += count;
41 if (currentBufferIndex == currentBuffer.Length) currentBuffer = null;
42 return count;
43 }
44 public override void Write(byte[] buffer, int offset, int count) {
45 byte[] tostore;
46 if (closed) throw new ObjectDisposedException("PipeStream");
47 if (count == 0) return;
48 tostore = new byte[count];
49 Buffer.BlockCopy(buffer, offset, tostore, 0, count);
50 queue.Enqueue(tostore);
51 resetEvent.Set();
52 }
53 public override void Close() {
54 closed = true;
55 resetEvent.Set();
56 base.Close();
57 }
58 }
59 }