comparison Util/QueuedPacketStream.cs @ 1:28dc7d535036

Small improvements, introduced PacketStream
author Ivo Smits <Ivo@UCIS.nl>
date Mon, 07 Jan 2013 16:43:28 +0100
parents
children 4b78cc5f116b
comparison
equal deleted inserted replaced
0:3ab940a0c7a0 1:28dc7d535036
1 using System;
2 using System.Collections.Generic;
3 using System.IO;
4 using System.Threading;
5 using SysThreadPool = System.Threading.ThreadPool;
6
7 namespace UCIS.Util {
8 public abstract class QueuedPacketStream : PacketStream {
9 Queue<Byte[]> ReceiveQueue = new Queue<byte[]>();
10 Byte[] ReceiveBuffer = null;
11 int ReceiveBufferOffset = 0;
12 int ReceiveWaiting = 0;
13 AutoResetEvent ReceiveEvent = new AutoResetEvent(false);
14 AsyncResult AsyncReceiveOperation = null;
15 protected Boolean Closed { get; private set; }
16
17 public QueuedPacketStream() {
18 ReadTimeout = Timeout.Infinite;
19 Closed = false;
20 }
21
22 protected void AddReadBufferCopy(Byte[] buffer, int offset, int count) {
23 Byte[] store;
24 store = new Byte[count];
25 Buffer.BlockCopy(buffer, offset, store, 0, count);
26 AddReadBufferNoCopy(store);
27 }
28 protected void AddReadBufferNoCopy(Byte[] store) {
29 if (Closed) return;
30 lock (ReceiveQueue) {
31 ReceiveQueue.Enqueue(store);
32 Interlocked.Add(ref ReceiveWaiting, store.Length);
33 ReceiveEvent.Set();
34 if (AsyncReceiveOperation != null && (store.Length > 0 || AsyncReceiveOperation.IsReadPacket)) {
35 AsyncReceiveOperation.SetCompleted(false);
36 AsyncReceiveOperation = null;
37 }
38 }
39 }
40 public override void Close() {
41 Closed = true;
42 base.Close();
43 ReceiveEvent.Set();
44 lock (ReceiveQueue) {
45 if (AsyncReceiveOperation != null) {
46 AsyncReceiveOperation.SetCompleted(false);
47 AsyncReceiveOperation = null;
48 }
49 }
50 }
51
52 public override bool CanSeek { get { return false; } }
53 public override bool CanTimeout { get { return true; } }
54 public override bool CanRead { get { return !Closed; } }
55 public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } }
56 public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }
57 public override void SetLength(long value) { throw new NotSupportedException(); }
58
59 public override int ReadTimeout { get; set; }
60 public override long Length { get { return ReceiveWaiting; } }
61
62 public int WaitForPacket() {
63 while (ReceiveBuffer == null) {
64 lock (ReceiveQueue) {
65 if (ReceiveQueue.Count > 0) {
66 ReceiveBuffer = ReceiveQueue.Dequeue();
67 ReceiveBufferOffset = 0;
68 continue;
69 }
70 }
71 if (Closed) throw new ObjectDisposedException("QueuedPacketStream", "The connection has been closed");
72 if (ReadTimeout == 0 || !ReceiveEvent.WaitOne(ReadTimeout, false)) throw new TimeoutException();
73 }
74 return ReceiveBuffer.Length - ReceiveBufferOffset;
75 }
76 public override int Read(byte[] buffer, int offset, int count) {
77 int left = 0;
78 while (true) {
79 left = WaitForPacket();
80 if (left > 0) break;
81 ReceiveBuffer = null;
82 }
83 if (count > left) count = left;
84 Buffer.BlockCopy(ReceiveBuffer, ReceiveBufferOffset, buffer, offset, count);
85 ReceiveBufferOffset += count;
86 if (ReceiveBufferOffset == ReceiveBuffer.Length) ReceiveBuffer = null;
87 Interlocked.Add(ref ReceiveWaiting, -count);
88 return count;
89 }
90 public override Byte[] ReadPacket() {
91 WaitForPacket();
92 Byte[] arr = ReceiveBuffer;
93 if (ReceiveBufferOffset > 0) {
94 arr = new Byte[ReceiveBuffer.Length - ReceiveBufferOffset];
95 Buffer.BlockCopy(ReceiveBuffer, ReceiveBufferOffset, arr, 0, arr.Length - ReceiveBufferOffset);
96 }
97 ReceiveBuffer = null;
98 return arr;
99 }
100 public override ArraySegment<byte> ReadPacketFast() {
101 WaitForPacket();
102 ArraySegment<byte> ret = new ArraySegment<byte>(ReceiveBuffer, ReceiveBufferOffset, ReceiveBuffer.Length - ReceiveBufferOffset);
103 ReceiveBuffer = null;
104 return ret;
105 }
106
107 class AsyncResult : IAsyncResult {
108 public Object AsyncState { get; private set; }
109 public WaitHandle AsyncWaitHandle { get { return WaitHandle; } }
110 public Boolean CompletedSynchronously { get; private set; }
111 public Boolean IsCompleted { get; private set; }
112 public Boolean IsReadPacket { get; private set; }
113
114 public Byte[] Buffer = null;
115 public int BufferOffset = 0;
116 public int BufferLength = 0;
117
118 private ManualResetEvent WaitHandle = new ManualResetEvent(false);
119 private AsyncCallback Callback = null;
120 private void CallCallback(Object state) {
121 if (Callback != null) Callback(this);
122 }
123 public void SetCompleted(Boolean synchronously) {
124 CompletedSynchronously = synchronously;
125 IsCompleted = true;
126 WaitHandle.Set();
127 if (Callback != null) SysThreadPool.QueueUserWorkItem(CallCallback);
128 }
129 public AsyncResult(AsyncCallback callback, Object state) {
130 this.Callback = callback;
131 this.AsyncState = state;
132 CompletedSynchronously = false;
133 IsCompleted = false;
134 IsReadPacket = true;
135 }
136 public AsyncResult(AsyncCallback callback, Object state, Byte[] buffer, int bufferOffset, int bufferLength)
137 : this(callback, state) {
138 this.Buffer = buffer;
139 this.BufferOffset = bufferOffset;
140 this.BufferLength = bufferLength;
141 IsReadPacket = false;
142 }
143 }
144 private IAsyncResult BeginAsyncReadOperation(AsyncResult ar) {
145 lock (ReceiveQueue) {
146 if (AsyncReceiveOperation != null) throw new InvalidOperationException("Another asynchronous operation is in progress");
147 if (ReceiveBuffer != null || ReceiveQueue.Count > 0) {
148 ar.SetCompleted(true);
149 } else {
150 if (Closed) throw new ObjectDisposedException("QueuedPacketStream", "The connection has been closed");
151 AsyncReceiveOperation = ar;
152 }
153 }
154 return ar;
155 }
156 private void EndAsyncReadOperation(AsyncResult ar) {
157 lock (ReceiveQueue) {
158 if (AsyncReceiveOperation != null && ar != AsyncReceiveOperation) throw new InvalidOperationException("The given AsyncResult object does not match the current pending operation");
159 AsyncReceiveOperation = null;
160 }
161 }
162 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) {
163 return BeginAsyncReadOperation(new AsyncResult(callback, state, buffer, offset, count));
164 }
165 public override IAsyncResult BeginReadPacket(AsyncCallback callback, object state) {
166 return BeginAsyncReadOperation(new AsyncResult(callback, state));
167 }
168 public override IAsyncResult BeginReadPacketFast(AsyncCallback callback, object state) {
169 return BeginAsyncReadOperation(new AsyncResult(callback, state));
170 }
171 public override int EndRead(IAsyncResult asyncResult) {
172 AsyncResult ar = (AsyncResult)asyncResult;
173 EndAsyncReadOperation(ar);
174 return Read(ar.Buffer, ar.BufferOffset, ar.BufferLength);
175 }
176 public override Byte[] EndReadPacket(IAsyncResult asyncResult) {
177 AsyncResult ar = (AsyncResult)asyncResult;
178 EndAsyncReadOperation(ar);
179 return ReadPacket();
180 }
181 public override ArraySegment<Byte> EndReadPacketFast(IAsyncResult asyncResult) {
182 AsyncResult ar = (AsyncResult)asyncResult;
183 EndAsyncReadOperation(ar);
184 return ReadPacketFast();
185 }
186 }
187 }