comparison Util/PrebufferingStream.cs @ 5:2933f7750542

Added read-buffering stream wrapper
author Ivo Smits <Ivo@UCIS.nl>
date Tue, 08 Jan 2013 16:38:21 +0100
parents
children 9525fb2d14ec
comparison
equal deleted inserted replaced
4:9e2e6433f57a 5:2933f7750542
1 using System;
2 using System.Collections.Generic;
3 using System.Text;
4 using System.IO;
5 using System.Threading;
6
7 namespace UCIS.Util {
8 public class PrebufferingStream : Stream {
9 class AsyncResult : AsyncResultBase {
10 public int Count { get; private set; }
11 public int Left { get; set; }
12 public AsyncResult(AsyncCallback callback, Object state) : base(callback, state) { }
13 public void SetCompleted(Boolean synchronously, int count, Exception error) {
14 this.Count = count;
15 base.SetCompleted(synchronously, error);
16 }
17 public int WaitForCompletion() {
18 WaitHandle wh = null;
19 lock (this) if (!IsCompleted) wh = AsyncWaitHandle;
20 if (wh != null) wh.WaitOne();
21 ThrowError();
22 return Count;
23 }
24 }
25
26 Stream baseStream;
27 Byte[] prebuffer = null;
28 int prebufferoffset = 0;
29 int prebuffercount = 0;
30 int defaultbuffersize;
31
32 public Stream BaseStream { get { return baseStream; } }
33
34 public PrebufferingStream(Stream stream) : this(stream, 1024) { }
35 public PrebufferingStream(Stream stream, int bufferSize) {
36 if (stream == null) throw new ArgumentNullException("stream");
37 baseStream = stream;
38 defaultbuffersize = bufferSize;
39 }
40
41 public IAsyncResult BeginPrebuffering(AsyncCallback callback, Object state) {
42 return BeginPrebuffering(1, callback, state);
43 }
44 public IAsyncResult BeginPrebuffering(int count, AsyncCallback callback, Object state) {
45 AsyncResult ar = new AsyncResult(callback, state);
46 if (prebuffercount > count) {
47 ar.SetCompleted(true, count, null);
48 } else {
49 PrepareBuffer(count);
50 int off = prebufferoffset + prebuffercount;
51 baseStream.BeginRead(prebuffer, off, prebuffer.Length - off, asyncPrebufferReadCallback, ar);
52 }
53 return ar;
54 }
55 private void asyncPrebufferReadCallback(IAsyncResult ar) {
56 AsyncResult myar = (AsyncResult)ar.AsyncState;
57 try {
58 int len = baseStream.EndRead(ar);
59 if (len <= 0) {
60 myar.SetCompleted(false, prebuffercount, null);
61 } else {
62 myar.Left -= len;
63 prebuffercount += len;
64 if (myar.Left > 0) {
65 int off = prebufferoffset + prebuffercount;
66 baseStream.BeginRead(prebuffer, off, prebuffer.Length - off, asyncPrebufferReadCallback, myar);
67 } else {
68 myar.SetCompleted(false, prebuffercount, null);
69 }
70 }
71 } catch (Exception ex) {
72 myar.SetCompleted(false, prebuffercount, ex);
73 }
74 }
75 public int EndPrebuffering(IAsyncResult ar) {
76 AsyncResult myar = (AsyncResult)ar;
77 return myar.WaitForCompletion();
78 }
79 public int Prebuffer() {
80 return Prebuffer(1);
81 }
82 public int Prebuffer(int count) {
83 count -= prebuffercount;
84 if (count <= 0) return prebuffercount;
85 PrepareBuffer(prebuffercount + count);
86 while (count > 0) {
87 int off = prebufferoffset + prebuffercount;
88 int len = baseStream.Read(prebuffer, off, prebuffer.Length - off);
89 if (len <= 0) return prebuffercount;
90 count -= len;
91 prebuffercount += len;
92 }
93 return prebuffercount;
94 }
95 private void PrepareBuffer(int count) {
96 if (prebuffercount == 0) prebufferoffset = 0;
97 if (prebuffer == null || (prebuffercount == 0 && prebuffer.Length > defaultbuffersize)) {
98 if (count < defaultbuffersize) count = defaultbuffersize;
99 prebuffer = new Byte[count];
100 prebufferoffset = 0;
101 } else if (prebufferoffset + count > prebuffer.Length) {
102 if (count > prebuffer.Length) {
103 Byte[] newbuffer = new Byte[prebuffercount + count];
104 Buffer.BlockCopy(prebuffer, prebufferoffset, newbuffer, 0, prebuffercount);
105 prebuffer = newbuffer;
106 } else {
107 Buffer.BlockCopy(prebuffer, prebufferoffset, prebuffer, 0, prebuffercount);
108 }
109 prebufferoffset = 0;
110 }
111 }
112 public Byte Peek() {
113 return Peek(0);
114 }
115 public Byte Peek(int offset) {
116 if (Prebuffer(offset + 1) < offset + 1) throw new EndOfStreamException();
117 return prebuffer[prebufferoffset + offset];
118 }
119 public void Peek(Byte[] buffer, int offset, int count) {
120 Peek(buffer, offset, 0, count);
121 }
122 public void Peek(Byte[] buffer, int bufferoffset, int peekoffset, int count) {
123 if (Prebuffer(peekoffset + count) < peekoffset + count) throw new EndOfStreamException();
124 Buffer.BlockCopy(prebuffer, prebufferoffset + peekoffset, buffer, bufferoffset, count);
125 }
126 public int TryPeek() {
127 return TryPeek(0);
128 }
129 public int TryPeek(int offset) {
130 if (prebuffercount <= offset) return -1;
131 return prebuffer[prebufferoffset + offset];
132 }
133 public int TryPeek(Byte[] buffer, int offset, int count) {
134 return TryPeek(buffer, offset, 0, count);
135 }
136 public int TryPeek(Byte[] buffer, int bufferoffset, int peekoffset, int count) {
137 if (prebuffercount < peekoffset + count) count = prebuffercount - peekoffset;
138 if (count < 0) count = 0;
139 if (count > 0) Buffer.BlockCopy(prebuffer, prebufferoffset + peekoffset, buffer, bufferoffset, count);
140 return count;
141 }
142
143 public override int Read(byte[] buffer, int offset, int count) {
144 if (prebuffercount > 0 || count < 16) {
145 if (prebuffercount == 0) if (Prebuffer() < 1) return 0;
146 if (count > prebuffercount) count = prebuffercount;
147 Buffer.BlockCopy(prebuffer, prebufferoffset, buffer, offset, count);
148 prebufferoffset += count;
149 prebuffercount -= count;
150 return count;
151 } else {
152 return baseStream.Read(buffer, offset, count);
153 }
154 }
155
156 public override int ReadByte() {
157 if (Prebuffer(1) < 1) return -1;
158 int v = prebuffer[prebufferoffset];
159 prebufferoffset++;
160 prebuffercount--;
161 return v;
162 }
163
164 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) {
165 if (prebuffercount > 0) {
166 if (count > prebuffercount) count = prebuffercount;
167 Buffer.BlockCopy(prebuffer, prebufferoffset, buffer, offset, count);
168 prebufferoffset += count;
169 prebuffercount -= count;
170 AsyncResult ar = new AsyncResult(callback, state);
171 ar.SetCompleted(true, count, null);
172 return ar;
173 } else {
174 return baseStream.BeginRead(buffer, offset, count, callback, state);
175 }
176 }
177
178 public override int EndRead(IAsyncResult asyncResult) {
179 AsyncResult myar = asyncResult as AsyncResult;
180 if (myar != null) {
181 return myar.WaitForCompletion();
182 } else {
183 return baseStream.EndRead(asyncResult);
184 }
185 }
186
187 public override void Close() {
188 base.Close();
189 baseStream.Close();
190 }
191
192 public override void Write(byte[] buffer, int offset, int count) {
193 baseStream.Write(buffer, offset, count);
194 }
195
196 public override void WriteByte(byte value) {
197 baseStream.WriteByte(value);
198 }
199
200 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) {
201 return baseStream.BeginWrite(buffer, offset, count, callback, state);
202 }
203
204 public override void EndWrite(IAsyncResult asyncResult) {
205 baseStream.EndWrite(asyncResult);
206 }
207
208 public override int ReadTimeout {
209 get { return baseStream.ReadTimeout; }
210 set { baseStream.ReadTimeout = value; }
211 }
212
213 public override int WriteTimeout {
214 get { return baseStream.WriteTimeout; }
215 set { baseStream.WriteTimeout = value; }
216 }
217
218 public override long Length { get { return prebuffercount + baseStream.Length; } }
219 public override long Position {
220 get { return baseStream.Position - prebuffercount; }
221 set { throw new NotImplementedException(); }
222 }
223
224 public override void SetLength(long value) { throw new NotImplementedException(); }
225 public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); }
226
227 public override bool CanRead { get { return prebuffercount > 0 || baseStream.CanRead; } }
228 public override bool CanSeek { get { return false; } }
229 public override bool CanTimeout { get { return baseStream.CanTimeout; } }
230 public override bool CanWrite { get { return baseStream.CanWrite; } }
231
232 public int Buffered { get { return prebuffercount; } }
233
234 public override void Flush() {
235 baseStream.Flush();
236 }
237 }
238 }