comparison Util/PrebufferingStream.cs @ 8:9525fb2d14ec

Small fix and new functions in PrebufferingStream
author Ivo Smits <Ivo@UCIS.nl>
date Wed, 16 Jan 2013 23:27:06 +0100
parents 2933f7750542
children 2e3f57f326b3
comparison
equal deleted inserted replaced
7:4b78cc5f116b 8:9525fb2d14ec
5 using System.Threading; 5 using System.Threading;
6 6
7 namespace UCIS.Util { 7 namespace UCIS.Util {
8 public class PrebufferingStream : Stream { 8 public class PrebufferingStream : Stream {
9 class AsyncResult : AsyncResultBase { 9 class AsyncResult : AsyncResultBase {
10 public int Count { get; private set; } 10 public Byte[] Buffer { get; set; }
11 public int Offset { get; set; }
11 public int Left { get; set; } 12 public int Left { get; set; }
13 public int Count { get; set; }
12 public AsyncResult(AsyncCallback callback, Object state) : base(callback, state) { } 14 public AsyncResult(AsyncCallback callback, Object state) : base(callback, state) { }
13 public void SetCompleted(Boolean synchronously, int count, Exception error) { 15 public void SetCompleted(Boolean synchronously, int count, Exception error) {
14 this.Count = count; 16 this.Count = count;
17 base.SetCompleted(synchronously, error);
18 }
19 public void SetCompleted(Boolean synchronously, Exception error) {
15 base.SetCompleted(synchronously, error); 20 base.SetCompleted(synchronously, error);
16 } 21 }
17 public int WaitForCompletion() { 22 public int WaitForCompletion() {
18 WaitHandle wh = null; 23 WaitHandle wh = null;
19 lock (this) if (!IsCompleted) wh = AsyncWaitHandle; 24 lock (this) if (!IsCompleted) wh = AsyncWaitHandle;
41 public IAsyncResult BeginPrebuffering(AsyncCallback callback, Object state) { 46 public IAsyncResult BeginPrebuffering(AsyncCallback callback, Object state) {
42 return BeginPrebuffering(1, callback, state); 47 return BeginPrebuffering(1, callback, state);
43 } 48 }
44 public IAsyncResult BeginPrebuffering(int count, AsyncCallback callback, Object state) { 49 public IAsyncResult BeginPrebuffering(int count, AsyncCallback callback, Object state) {
45 AsyncResult ar = new AsyncResult(callback, state); 50 AsyncResult ar = new AsyncResult(callback, state);
46 if (prebuffercount > count) { 51 if (prebuffercount >= count) {
47 ar.SetCompleted(true, count, null); 52 ar.SetCompleted(true, prebuffercount, null);
48 } else { 53 } else {
49 PrepareBuffer(count); 54 PrepareBuffer(count);
55 ar.Left = count - prebuffercount;
50 int off = prebufferoffset + prebuffercount; 56 int off = prebufferoffset + prebuffercount;
51 baseStream.BeginRead(prebuffer, off, prebuffer.Length - off, asyncPrebufferReadCallback, ar); 57 baseStream.BeginRead(prebuffer, off, prebuffer.Length - off, asyncPrebufferReadCallback, ar);
52 } 58 }
53 return ar; 59 return ar;
54 } 60 }
151 } else { 157 } else {
152 return baseStream.Read(buffer, offset, count); 158 return baseStream.Read(buffer, offset, count);
153 } 159 }
154 } 160 }
155 161
162 public void ReadAll(Byte[] buffer, int offset, int count) {
163 while (count > 0) {
164 int read = Read(buffer, offset, count);
165 if (read <= 0) throw new EndOfStreamException();
166 offset += read;
167 count -= read;
168 }
169 }
170
171 public Byte[] ReadAll(int count) {
172 Byte[] buffer = new Byte[count];
173 ReadAll(buffer, 0, count);
174 return buffer;
175 }
176
156 public override int ReadByte() { 177 public override int ReadByte() {
157 if (Prebuffer(1) < 1) return -1; 178 if (Prebuffer(1) < 1) return -1;
158 int v = prebuffer[prebufferoffset]; 179 int v = prebuffer[prebufferoffset];
159 prebufferoffset++; 180 prebufferoffset++;
160 prebuffercount--; 181 prebuffercount--;
182 } else { 203 } else {
183 return baseStream.EndRead(asyncResult); 204 return baseStream.EndRead(asyncResult);
184 } 205 }
185 } 206 }
186 207
208 public IAsyncResult BeginReadAll(byte[] buffer, int offset, int count, AsyncCallback callback, object state) {
209 AsyncResult ar = new AsyncResult(callback, state);
210 ar.Buffer = buffer;
211 ar.Offset = 0;
212 ar.Left = count;
213 ar.Count = 0;
214 if (prebuffercount > 0) {
215 int read = Math.Min(ar.Left, prebuffercount);
216 Buffer.BlockCopy(prebuffer, prebufferoffset, ar.Buffer, ar.Offset, read);
217 prebufferoffset += read;
218 prebuffercount -= read;
219 ar.Offset += read;
220 ar.Left -= read;
221 ar.Count += read;
222 }
223 if (ar.Left > 0) {
224 baseStream.BeginRead(ar.Buffer, ar.Offset, ar.Left, asyncReadAllReadCallback, ar);
225 } else {
226 ar.SetCompleted(true, count, null);
227 }
228 return ar;
229 }
230
231 private void asyncReadAllReadCallback(IAsyncResult ar) {
232 AsyncResult myar = (AsyncResult)ar.AsyncState;
233 try {
234 int len = baseStream.EndRead(ar);
235 if (len <= 0) throw new EndOfStreamException();
236 myar.Offset += len;
237 myar.Left -= len;
238 myar.Count += len;
239 if (myar.Left > 0) {
240 int off = prebufferoffset + prebuffercount;
241 baseStream.BeginRead(myar.Buffer, myar.Offset, myar.Left, asyncReadAllReadCallback, ar);
242 } else {
243 myar.SetCompleted(false, myar.Count, null);
244 }
245 } catch (Exception ex) {
246 myar.SetCompleted(false, ex);
247 }
248 }
249
250 public int EndReadAll(IAsyncResult asyncResult) {
251 AsyncResult myar = asyncResult as AsyncResult;
252 return myar.WaitForCompletion();
253 }
254
187 public override void Close() { 255 public override void Close() {
188 base.Close(); 256 base.Close();
189 baseStream.Close(); 257 baseStream.Close();
190 } 258 }
191 259