0
|
1 ???using System; |
|
2 using System.Collections.Generic; |
|
3 using System.IO; |
|
4 using System.Threading; |
|
5 |
|
6 namespace UCIS.Util { |
|
7 public class CrossStream : Stream { |
|
8 private Queue<byte[]> queue = new Queue<byte[]>(); |
|
9 private byte[] currentBuffer = null; |
|
10 private int currentBufferIndex; |
|
11 private CrossStream otherPart; |
|
12 private AutoResetEvent resetEvent = new AutoResetEvent(false); |
|
13 |
|
14 public static CrossStream CreatePair(out CrossStream stream1, out CrossStream stream2) { |
|
15 stream1 = new CrossStream(); |
|
16 stream2 = new CrossStream(stream1); |
|
17 stream1.otherPart = stream2; |
|
18 return stream1; |
|
19 } |
|
20 public static CrossStream CreatePair(out CrossStream stream2) { |
|
21 CrossStream stream1 = new CrossStream(); |
|
22 stream2 = new CrossStream(stream1); |
|
23 stream1.otherPart = stream2; |
|
24 return stream1; |
|
25 } |
|
26 |
|
27 private CrossStream() { } |
|
28 public CrossStream(CrossStream other) { |
|
29 this.otherPart = other; |
|
30 } |
|
31 |
|
32 public override bool CanRead { get { return true; } } |
|
33 public override bool CanWrite { get { return true; } } |
|
34 public override bool CanSeek { get { return false; } } |
|
35 public override bool CanTimeout { get { return true; } } |
|
36 public override long Length { get { throw new NotSupportedException(); } } |
|
37 public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } } |
|
38 public override void Flush() { } |
|
39 public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } |
|
40 public override void SetLength(long value) { throw new NotSupportedException(); } |
|
41 public override int Read(byte[] buffer, int offset, int count) { |
|
42 if (currentBuffer == null) { |
|
43 //if (queue.Count == 0) if (!resetEvent.WaitOne(this.ReadTimeout)) throw new TimeoutException(); |
|
44 if (queue.Count == 0 && !resetEvent.WaitOne()) throw new TimeoutException(); |
|
45 //while (queue.Count == 0) Thread.Sleep(100); |
|
46 resetEvent.Reset(); |
|
47 currentBuffer = queue.Dequeue(); |
|
48 currentBufferIndex = 0; |
|
49 } |
|
50 if (count > currentBuffer.Length - currentBufferIndex) count = currentBuffer.Length - currentBufferIndex; |
|
51 Buffer.BlockCopy(currentBuffer, currentBufferIndex, buffer, offset, count); |
|
52 currentBufferIndex += count; |
|
53 if (currentBufferIndex == currentBuffer.Length) currentBuffer = null; |
|
54 return count; |
|
55 } |
|
56 public override void Write(byte[] buffer, int offset, int count) { |
|
57 byte[] tostore; |
|
58 if (count == 0) return; |
|
59 tostore = new byte[count]; |
|
60 Buffer.BlockCopy(buffer, offset, tostore, 0, count); |
|
61 otherPart.queue.Enqueue(tostore); |
|
62 otherPart.resetEvent.Set(); |
|
63 } |
|
64 } |
|
65 } |