Mercurial > hg > ucis.core
changeset 5:2933f7750542
Added read-buffering stream wrapper
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Tue, 08 Jan 2013 16:38:21 +0100 |
parents | 9e2e6433f57a |
children | 5ce7a138fdba |
files | UCIS.csproj Util/AsyncResultBase.cs Util/PrebufferingStream.cs |
diffstat | 3 files changed, 285 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/UCIS.csproj Tue Jan 08 14:48:43 2013 +0100 +++ b/UCIS.csproj Tue Jan 08 16:38:21 2013 +0100 @@ -81,11 +81,13 @@ <Compile Include="ThreadPool.cs" /> <Compile Include="UTF8NoPreamble.cs" /> <Compile Include="Util\ArrayUtil.cs" /> + <Compile Include="Util\AsyncResultBase.cs" /> <Compile Include="Util\AsyncStream.cs" /> <Compile Include="Util\CrossStream.cs" /> <Compile Include="Util\HoldStream.cs" /> <Compile Include="Util\PacketStream.cs" /> <Compile Include="Util\PipeStream.cs" /> + <Compile Include="Util\PrebufferingStream.cs" /> <Compile Include="Util\QueuedPacketStream.cs" /> <Compile Include="Windows\ServiceManager.cs" /> <Compile Include="Xml\PolicyFile.cs" />
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Util/AsyncResultBase.cs Tue Jan 08 16:38:21 2013 +0100 @@ -0,0 +1,45 @@ +using System; +using System.Threading; +using SysThreadPool = System.Threading.ThreadPool; + +namespace UCIS.Util { + public abstract class AsyncResultBase : IAsyncResult { + ManualResetEvent WaitEvent = null; + AsyncCallback Callback = null; + public object AsyncState { get; private set; } + public bool CompletedSynchronously { get; private set; } + public bool IsCompleted { get; private set; } + public Exception Error { get; private set; } + public WaitHandle AsyncWaitHandle { + get { + lock (this) { + if (WaitEvent == null) WaitEvent = new ManualResetEvent(IsCompleted); + return WaitEvent; + } + } + } + + public AsyncResultBase(AsyncCallback callback, Object state) { + this.Callback = callback; + this.AsyncState = state; + } + + private void CallCallback(Object state) { + if (Callback != null) Callback(this); + } + + protected void SetCompleted(Boolean synchronously, Exception error) { + this.CompletedSynchronously = synchronously; + this.Error = error; + lock (this) { + IsCompleted = true; + if (WaitEvent != null) WaitEvent.Set(); + } + if (Callback != null) SysThreadPool.QueueUserWorkItem(CallCallback); + } + + protected void ThrowError() { + if (Error != null) throw Error; + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Util/PrebufferingStream.cs Tue Jan 08 16:38:21 2013 +0100 @@ -0,0 +1,238 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.IO; +using System.Threading; + +namespace UCIS.Util { + public class PrebufferingStream : Stream { + class AsyncResult : AsyncResultBase { + public int Count { get; private set; } + public int Left { get; set; } + public AsyncResult(AsyncCallback callback, Object state) : base(callback, state) { } + public void SetCompleted(Boolean synchronously, int count, Exception error) { + this.Count = count; + base.SetCompleted(synchronously, error); + } + public int WaitForCompletion() { + WaitHandle wh = null; + lock (this) if (!IsCompleted) wh = AsyncWaitHandle; + if (wh != null) wh.WaitOne(); + ThrowError(); + return Count; + } + } + + Stream baseStream; + Byte[] prebuffer = null; + int prebufferoffset = 0; + int prebuffercount = 0; + int defaultbuffersize; + + public Stream BaseStream { get { return baseStream; } } + + public PrebufferingStream(Stream stream) : this(stream, 1024) { } + public PrebufferingStream(Stream stream, int bufferSize) { + if (stream == null) throw new ArgumentNullException("stream"); + baseStream = stream; + defaultbuffersize = bufferSize; + } + + public IAsyncResult BeginPrebuffering(AsyncCallback callback, Object state) { + return BeginPrebuffering(1, callback, state); + } + public IAsyncResult BeginPrebuffering(int count, AsyncCallback callback, Object state) { + AsyncResult ar = new AsyncResult(callback, state); + if (prebuffercount > count) { + ar.SetCompleted(true, count, null); + } else { + PrepareBuffer(count); + int off = prebufferoffset + prebuffercount; + baseStream.BeginRead(prebuffer, off, prebuffer.Length - off, asyncPrebufferReadCallback, ar); + } + return ar; + } + private void asyncPrebufferReadCallback(IAsyncResult ar) { + AsyncResult myar = (AsyncResult)ar.AsyncState; + try { + int len = baseStream.EndRead(ar); + if (len <= 0) { + myar.SetCompleted(false, prebuffercount, null); + } else { + myar.Left -= len; + prebuffercount += len; + if (myar.Left > 0) { + int off = prebufferoffset + prebuffercount; + baseStream.BeginRead(prebuffer, off, prebuffer.Length - off, asyncPrebufferReadCallback, myar); + } else { + myar.SetCompleted(false, prebuffercount, null); + } + } + } catch (Exception ex) { + myar.SetCompleted(false, prebuffercount, ex); + } + } + public int EndPrebuffering(IAsyncResult ar) { + AsyncResult myar = (AsyncResult)ar; + return myar.WaitForCompletion(); + } + public int Prebuffer() { + return Prebuffer(1); + } + public int Prebuffer(int count) { + count -= prebuffercount; + if (count <= 0) return prebuffercount; + PrepareBuffer(prebuffercount + count); + while (count > 0) { + int off = prebufferoffset + prebuffercount; + int len = baseStream.Read(prebuffer, off, prebuffer.Length - off); + if (len <= 0) return prebuffercount; + count -= len; + prebuffercount += len; + } + return prebuffercount; + } + private void PrepareBuffer(int count) { + if (prebuffercount == 0) prebufferoffset = 0; + if (prebuffer == null || (prebuffercount == 0 && prebuffer.Length > defaultbuffersize)) { + if (count < defaultbuffersize) count = defaultbuffersize; + prebuffer = new Byte[count]; + prebufferoffset = 0; + } else if (prebufferoffset + count > prebuffer.Length) { + if (count > prebuffer.Length) { + Byte[] newbuffer = new Byte[prebuffercount + count]; + Buffer.BlockCopy(prebuffer, prebufferoffset, newbuffer, 0, prebuffercount); + prebuffer = newbuffer; + } else { + Buffer.BlockCopy(prebuffer, prebufferoffset, prebuffer, 0, prebuffercount); + } + prebufferoffset = 0; + } + } + public Byte Peek() { + return Peek(0); + } + public Byte Peek(int offset) { + if (Prebuffer(offset + 1) < offset + 1) throw new EndOfStreamException(); + return prebuffer[prebufferoffset + offset]; + } + public void Peek(Byte[] buffer, int offset, int count) { + Peek(buffer, offset, 0, count); + } + public void Peek(Byte[] buffer, int bufferoffset, int peekoffset, int count) { + if (Prebuffer(peekoffset + count) < peekoffset + count) throw new EndOfStreamException(); + Buffer.BlockCopy(prebuffer, prebufferoffset + peekoffset, buffer, bufferoffset, count); + } + public int TryPeek() { + return TryPeek(0); + } + public int TryPeek(int offset) { + if (prebuffercount <= offset) return -1; + return prebuffer[prebufferoffset + offset]; + } + public int TryPeek(Byte[] buffer, int offset, int count) { + return TryPeek(buffer, offset, 0, count); + } + public int TryPeek(Byte[] buffer, int bufferoffset, int peekoffset, int count) { + if (prebuffercount < peekoffset + count) count = prebuffercount - peekoffset; + if (count < 0) count = 0; + if (count > 0) Buffer.BlockCopy(prebuffer, prebufferoffset + peekoffset, buffer, bufferoffset, count); + return count; + } + + public override int Read(byte[] buffer, int offset, int count) { + if (prebuffercount > 0 || count < 16) { + if (prebuffercount == 0) if (Prebuffer() < 1) return 0; + if (count > prebuffercount) count = prebuffercount; + Buffer.BlockCopy(prebuffer, prebufferoffset, buffer, offset, count); + prebufferoffset += count; + prebuffercount -= count; + return count; + } else { + return baseStream.Read(buffer, offset, count); + } + } + + public override int ReadByte() { + if (Prebuffer(1) < 1) return -1; + int v = prebuffer[prebufferoffset]; + prebufferoffset++; + prebuffercount--; + return v; + } + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { + if (prebuffercount > 0) { + if (count > prebuffercount) count = prebuffercount; + Buffer.BlockCopy(prebuffer, prebufferoffset, buffer, offset, count); + prebufferoffset += count; + prebuffercount -= count; + AsyncResult ar = new AsyncResult(callback, state); + ar.SetCompleted(true, count, null); + return ar; + } else { + return baseStream.BeginRead(buffer, offset, count, callback, state); + } + } + + public override int EndRead(IAsyncResult asyncResult) { + AsyncResult myar = asyncResult as AsyncResult; + if (myar != null) { + return myar.WaitForCompletion(); + } else { + return baseStream.EndRead(asyncResult); + } + } + + public override void Close() { + base.Close(); + baseStream.Close(); + } + + public override void Write(byte[] buffer, int offset, int count) { + baseStream.Write(buffer, offset, count); + } + + public override void WriteByte(byte value) { + baseStream.WriteByte(value); + } + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { + return baseStream.BeginWrite(buffer, offset, count, callback, state); + } + + public override void EndWrite(IAsyncResult asyncResult) { + baseStream.EndWrite(asyncResult); + } + + public override int ReadTimeout { + get { return baseStream.ReadTimeout; } + set { baseStream.ReadTimeout = value; } + } + + public override int WriteTimeout { + get { return baseStream.WriteTimeout; } + set { baseStream.WriteTimeout = value; } + } + + public override long Length { get { return prebuffercount + baseStream.Length; } } + public override long Position { + get { return baseStream.Position - prebuffercount; } + set { throw new NotImplementedException(); } + } + + public override void SetLength(long value) { throw new NotImplementedException(); } + public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); } + + public override bool CanRead { get { return prebuffercount > 0 || baseStream.CanRead; } } + public override bool CanSeek { get { return false; } } + public override bool CanTimeout { get { return baseStream.CanTimeout; } } + public override bool CanWrite { get { return baseStream.CanWrite; } } + + public int Buffered { get { return prebuffercount; } } + + public override void Flush() { + baseStream.Flush(); + } + } +}