Mercurial > hg > ucis.core
comparison Util/PrebufferingStream.cs @ 5:2933f7750542
Added read-buffering stream wrapper
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Tue, 08 Jan 2013 16:38:21 +0100 |
parents | |
children | 9525fb2d14ec |
comparison
equal
deleted
inserted
replaced
4:9e2e6433f57a | 5:2933f7750542 |
---|---|
1 using System; | |
2 using System.Collections.Generic; | |
3 using System.Text; | |
4 using System.IO; | |
5 using System.Threading; | |
6 | |
7 namespace UCIS.Util { | |
8 public class PrebufferingStream : Stream { | |
9 class AsyncResult : AsyncResultBase { | |
10 public int Count { get; private set; } | |
11 public int Left { get; set; } | |
12 public AsyncResult(AsyncCallback callback, Object state) : base(callback, state) { } | |
13 public void SetCompleted(Boolean synchronously, int count, Exception error) { | |
14 this.Count = count; | |
15 base.SetCompleted(synchronously, error); | |
16 } | |
17 public int WaitForCompletion() { | |
18 WaitHandle wh = null; | |
19 lock (this) if (!IsCompleted) wh = AsyncWaitHandle; | |
20 if (wh != null) wh.WaitOne(); | |
21 ThrowError(); | |
22 return Count; | |
23 } | |
24 } | |
25 | |
26 Stream baseStream; | |
27 Byte[] prebuffer = null; | |
28 int prebufferoffset = 0; | |
29 int prebuffercount = 0; | |
30 int defaultbuffersize; | |
31 | |
32 public Stream BaseStream { get { return baseStream; } } | |
33 | |
34 public PrebufferingStream(Stream stream) : this(stream, 1024) { } | |
35 public PrebufferingStream(Stream stream, int bufferSize) { | |
36 if (stream == null) throw new ArgumentNullException("stream"); | |
37 baseStream = stream; | |
38 defaultbuffersize = bufferSize; | |
39 } | |
40 | |
41 public IAsyncResult BeginPrebuffering(AsyncCallback callback, Object state) { | |
42 return BeginPrebuffering(1, callback, state); | |
43 } | |
44 public IAsyncResult BeginPrebuffering(int count, AsyncCallback callback, Object state) { | |
45 AsyncResult ar = new AsyncResult(callback, state); | |
46 if (prebuffercount > count) { | |
47 ar.SetCompleted(true, count, null); | |
48 } else { | |
49 PrepareBuffer(count); | |
50 int off = prebufferoffset + prebuffercount; | |
51 baseStream.BeginRead(prebuffer, off, prebuffer.Length - off, asyncPrebufferReadCallback, ar); | |
52 } | |
53 return ar; | |
54 } | |
55 private void asyncPrebufferReadCallback(IAsyncResult ar) { | |
56 AsyncResult myar = (AsyncResult)ar.AsyncState; | |
57 try { | |
58 int len = baseStream.EndRead(ar); | |
59 if (len <= 0) { | |
60 myar.SetCompleted(false, prebuffercount, null); | |
61 } else { | |
62 myar.Left -= len; | |
63 prebuffercount += len; | |
64 if (myar.Left > 0) { | |
65 int off = prebufferoffset + prebuffercount; | |
66 baseStream.BeginRead(prebuffer, off, prebuffer.Length - off, asyncPrebufferReadCallback, myar); | |
67 } else { | |
68 myar.SetCompleted(false, prebuffercount, null); | |
69 } | |
70 } | |
71 } catch (Exception ex) { | |
72 myar.SetCompleted(false, prebuffercount, ex); | |
73 } | |
74 } | |
75 public int EndPrebuffering(IAsyncResult ar) { | |
76 AsyncResult myar = (AsyncResult)ar; | |
77 return myar.WaitForCompletion(); | |
78 } | |
79 public int Prebuffer() { | |
80 return Prebuffer(1); | |
81 } | |
82 public int Prebuffer(int count) { | |
83 count -= prebuffercount; | |
84 if (count <= 0) return prebuffercount; | |
85 PrepareBuffer(prebuffercount + count); | |
86 while (count > 0) { | |
87 int off = prebufferoffset + prebuffercount; | |
88 int len = baseStream.Read(prebuffer, off, prebuffer.Length - off); | |
89 if (len <= 0) return prebuffercount; | |
90 count -= len; | |
91 prebuffercount += len; | |
92 } | |
93 return prebuffercount; | |
94 } | |
95 private void PrepareBuffer(int count) { | |
96 if (prebuffercount == 0) prebufferoffset = 0; | |
97 if (prebuffer == null || (prebuffercount == 0 && prebuffer.Length > defaultbuffersize)) { | |
98 if (count < defaultbuffersize) count = defaultbuffersize; | |
99 prebuffer = new Byte[count]; | |
100 prebufferoffset = 0; | |
101 } else if (prebufferoffset + count > prebuffer.Length) { | |
102 if (count > prebuffer.Length) { | |
103 Byte[] newbuffer = new Byte[prebuffercount + count]; | |
104 Buffer.BlockCopy(prebuffer, prebufferoffset, newbuffer, 0, prebuffercount); | |
105 prebuffer = newbuffer; | |
106 } else { | |
107 Buffer.BlockCopy(prebuffer, prebufferoffset, prebuffer, 0, prebuffercount); | |
108 } | |
109 prebufferoffset = 0; | |
110 } | |
111 } | |
112 public Byte Peek() { | |
113 return Peek(0); | |
114 } | |
115 public Byte Peek(int offset) { | |
116 if (Prebuffer(offset + 1) < offset + 1) throw new EndOfStreamException(); | |
117 return prebuffer[prebufferoffset + offset]; | |
118 } | |
119 public void Peek(Byte[] buffer, int offset, int count) { | |
120 Peek(buffer, offset, 0, count); | |
121 } | |
122 public void Peek(Byte[] buffer, int bufferoffset, int peekoffset, int count) { | |
123 if (Prebuffer(peekoffset + count) < peekoffset + count) throw new EndOfStreamException(); | |
124 Buffer.BlockCopy(prebuffer, prebufferoffset + peekoffset, buffer, bufferoffset, count); | |
125 } | |
126 public int TryPeek() { | |
127 return TryPeek(0); | |
128 } | |
129 public int TryPeek(int offset) { | |
130 if (prebuffercount <= offset) return -1; | |
131 return prebuffer[prebufferoffset + offset]; | |
132 } | |
133 public int TryPeek(Byte[] buffer, int offset, int count) { | |
134 return TryPeek(buffer, offset, 0, count); | |
135 } | |
136 public int TryPeek(Byte[] buffer, int bufferoffset, int peekoffset, int count) { | |
137 if (prebuffercount < peekoffset + count) count = prebuffercount - peekoffset; | |
138 if (count < 0) count = 0; | |
139 if (count > 0) Buffer.BlockCopy(prebuffer, prebufferoffset + peekoffset, buffer, bufferoffset, count); | |
140 return count; | |
141 } | |
142 | |
143 public override int Read(byte[] buffer, int offset, int count) { | |
144 if (prebuffercount > 0 || count < 16) { | |
145 if (prebuffercount == 0) if (Prebuffer() < 1) return 0; | |
146 if (count > prebuffercount) count = prebuffercount; | |
147 Buffer.BlockCopy(prebuffer, prebufferoffset, buffer, offset, count); | |
148 prebufferoffset += count; | |
149 prebuffercount -= count; | |
150 return count; | |
151 } else { | |
152 return baseStream.Read(buffer, offset, count); | |
153 } | |
154 } | |
155 | |
156 public override int ReadByte() { | |
157 if (Prebuffer(1) < 1) return -1; | |
158 int v = prebuffer[prebufferoffset]; | |
159 prebufferoffset++; | |
160 prebuffercount--; | |
161 return v; | |
162 } | |
163 | |
164 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { | |
165 if (prebuffercount > 0) { | |
166 if (count > prebuffercount) count = prebuffercount; | |
167 Buffer.BlockCopy(prebuffer, prebufferoffset, buffer, offset, count); | |
168 prebufferoffset += count; | |
169 prebuffercount -= count; | |
170 AsyncResult ar = new AsyncResult(callback, state); | |
171 ar.SetCompleted(true, count, null); | |
172 return ar; | |
173 } else { | |
174 return baseStream.BeginRead(buffer, offset, count, callback, state); | |
175 } | |
176 } | |
177 | |
178 public override int EndRead(IAsyncResult asyncResult) { | |
179 AsyncResult myar = asyncResult as AsyncResult; | |
180 if (myar != null) { | |
181 return myar.WaitForCompletion(); | |
182 } else { | |
183 return baseStream.EndRead(asyncResult); | |
184 } | |
185 } | |
186 | |
187 public override void Close() { | |
188 base.Close(); | |
189 baseStream.Close(); | |
190 } | |
191 | |
192 public override void Write(byte[] buffer, int offset, int count) { | |
193 baseStream.Write(buffer, offset, count); | |
194 } | |
195 | |
196 public override void WriteByte(byte value) { | |
197 baseStream.WriteByte(value); | |
198 } | |
199 | |
200 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { | |
201 return baseStream.BeginWrite(buffer, offset, count, callback, state); | |
202 } | |
203 | |
204 public override void EndWrite(IAsyncResult asyncResult) { | |
205 baseStream.EndWrite(asyncResult); | |
206 } | |
207 | |
208 public override int ReadTimeout { | |
209 get { return baseStream.ReadTimeout; } | |
210 set { baseStream.ReadTimeout = value; } | |
211 } | |
212 | |
213 public override int WriteTimeout { | |
214 get { return baseStream.WriteTimeout; } | |
215 set { baseStream.WriteTimeout = value; } | |
216 } | |
217 | |
218 public override long Length { get { return prebuffercount + baseStream.Length; } } | |
219 public override long Position { | |
220 get { return baseStream.Position - prebuffercount; } | |
221 set { throw new NotImplementedException(); } | |
222 } | |
223 | |
224 public override void SetLength(long value) { throw new NotImplementedException(); } | |
225 public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); } | |
226 | |
227 public override bool CanRead { get { return prebuffercount > 0 || baseStream.CanRead; } } | |
228 public override bool CanSeek { get { return false; } } | |
229 public override bool CanTimeout { get { return baseStream.CanTimeout; } } | |
230 public override bool CanWrite { get { return baseStream.CanWrite; } } | |
231 | |
232 public int Buffered { get { return prebuffercount; } } | |
233 | |
234 public override void Flush() { | |
235 baseStream.Flush(); | |
236 } | |
237 } | |
238 } |