Mercurial > hg > ucis.core
comparison Util/QueuedPacketStream.cs @ 1:28dc7d535036
Small improvements, introduced PacketStream
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Mon, 07 Jan 2013 16:43:28 +0100 |
parents | |
children | 4b78cc5f116b |
comparison
equal
deleted
inserted
replaced
0:3ab940a0c7a0 | 1:28dc7d535036 |
---|---|
1 using System; | |
2 using System.Collections.Generic; | |
3 using System.IO; | |
4 using System.Threading; | |
5 using SysThreadPool = System.Threading.ThreadPool; | |
6 | |
7 namespace UCIS.Util { | |
8 public abstract class QueuedPacketStream : PacketStream { | |
9 Queue<Byte[]> ReceiveQueue = new Queue<byte[]>(); | |
10 Byte[] ReceiveBuffer = null; | |
11 int ReceiveBufferOffset = 0; | |
12 int ReceiveWaiting = 0; | |
13 AutoResetEvent ReceiveEvent = new AutoResetEvent(false); | |
14 AsyncResult AsyncReceiveOperation = null; | |
15 protected Boolean Closed { get; private set; } | |
16 | |
17 public QueuedPacketStream() { | |
18 ReadTimeout = Timeout.Infinite; | |
19 Closed = false; | |
20 } | |
21 | |
22 protected void AddReadBufferCopy(Byte[] buffer, int offset, int count) { | |
23 Byte[] store; | |
24 store = new Byte[count]; | |
25 Buffer.BlockCopy(buffer, offset, store, 0, count); | |
26 AddReadBufferNoCopy(store); | |
27 } | |
28 protected void AddReadBufferNoCopy(Byte[] store) { | |
29 if (Closed) return; | |
30 lock (ReceiveQueue) { | |
31 ReceiveQueue.Enqueue(store); | |
32 Interlocked.Add(ref ReceiveWaiting, store.Length); | |
33 ReceiveEvent.Set(); | |
34 if (AsyncReceiveOperation != null && (store.Length > 0 || AsyncReceiveOperation.IsReadPacket)) { | |
35 AsyncReceiveOperation.SetCompleted(false); | |
36 AsyncReceiveOperation = null; | |
37 } | |
38 } | |
39 } | |
40 public override void Close() { | |
41 Closed = true; | |
42 base.Close(); | |
43 ReceiveEvent.Set(); | |
44 lock (ReceiveQueue) { | |
45 if (AsyncReceiveOperation != null) { | |
46 AsyncReceiveOperation.SetCompleted(false); | |
47 AsyncReceiveOperation = null; | |
48 } | |
49 } | |
50 } | |
51 | |
52 public override bool CanSeek { get { return false; } } | |
53 public override bool CanTimeout { get { return true; } } | |
54 public override bool CanRead { get { return !Closed; } } | |
55 public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } } | |
56 public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } | |
57 public override void SetLength(long value) { throw new NotSupportedException(); } | |
58 | |
59 public override int ReadTimeout { get; set; } | |
60 public override long Length { get { return ReceiveWaiting; } } | |
61 | |
62 public int WaitForPacket() { | |
63 while (ReceiveBuffer == null) { | |
64 lock (ReceiveQueue) { | |
65 if (ReceiveQueue.Count > 0) { | |
66 ReceiveBuffer = ReceiveQueue.Dequeue(); | |
67 ReceiveBufferOffset = 0; | |
68 continue; | |
69 } | |
70 } | |
71 if (Closed) throw new ObjectDisposedException("QueuedPacketStream", "The connection has been closed"); | |
72 if (ReadTimeout == 0 || !ReceiveEvent.WaitOne(ReadTimeout, false)) throw new TimeoutException(); | |
73 } | |
74 return ReceiveBuffer.Length - ReceiveBufferOffset; | |
75 } | |
76 public override int Read(byte[] buffer, int offset, int count) { | |
77 int left = 0; | |
78 while (true) { | |
79 left = WaitForPacket(); | |
80 if (left > 0) break; | |
81 ReceiveBuffer = null; | |
82 } | |
83 if (count > left) count = left; | |
84 Buffer.BlockCopy(ReceiveBuffer, ReceiveBufferOffset, buffer, offset, count); | |
85 ReceiveBufferOffset += count; | |
86 if (ReceiveBufferOffset == ReceiveBuffer.Length) ReceiveBuffer = null; | |
87 Interlocked.Add(ref ReceiveWaiting, -count); | |
88 return count; | |
89 } | |
90 public override Byte[] ReadPacket() { | |
91 WaitForPacket(); | |
92 Byte[] arr = ReceiveBuffer; | |
93 if (ReceiveBufferOffset > 0) { | |
94 arr = new Byte[ReceiveBuffer.Length - ReceiveBufferOffset]; | |
95 Buffer.BlockCopy(ReceiveBuffer, ReceiveBufferOffset, arr, 0, arr.Length - ReceiveBufferOffset); | |
96 } | |
97 ReceiveBuffer = null; | |
98 return arr; | |
99 } | |
100 public override ArraySegment<byte> ReadPacketFast() { | |
101 WaitForPacket(); | |
102 ArraySegment<byte> ret = new ArraySegment<byte>(ReceiveBuffer, ReceiveBufferOffset, ReceiveBuffer.Length - ReceiveBufferOffset); | |
103 ReceiveBuffer = null; | |
104 return ret; | |
105 } | |
106 | |
107 class AsyncResult : IAsyncResult { | |
108 public Object AsyncState { get; private set; } | |
109 public WaitHandle AsyncWaitHandle { get { return WaitHandle; } } | |
110 public Boolean CompletedSynchronously { get; private set; } | |
111 public Boolean IsCompleted { get; private set; } | |
112 public Boolean IsReadPacket { get; private set; } | |
113 | |
114 public Byte[] Buffer = null; | |
115 public int BufferOffset = 0; | |
116 public int BufferLength = 0; | |
117 | |
118 private ManualResetEvent WaitHandle = new ManualResetEvent(false); | |
119 private AsyncCallback Callback = null; | |
120 private void CallCallback(Object state) { | |
121 if (Callback != null) Callback(this); | |
122 } | |
123 public void SetCompleted(Boolean synchronously) { | |
124 CompletedSynchronously = synchronously; | |
125 IsCompleted = true; | |
126 WaitHandle.Set(); | |
127 if (Callback != null) SysThreadPool.QueueUserWorkItem(CallCallback); | |
128 } | |
129 public AsyncResult(AsyncCallback callback, Object state) { | |
130 this.Callback = callback; | |
131 this.AsyncState = state; | |
132 CompletedSynchronously = false; | |
133 IsCompleted = false; | |
134 IsReadPacket = true; | |
135 } | |
136 public AsyncResult(AsyncCallback callback, Object state, Byte[] buffer, int bufferOffset, int bufferLength) | |
137 : this(callback, state) { | |
138 this.Buffer = buffer; | |
139 this.BufferOffset = bufferOffset; | |
140 this.BufferLength = bufferLength; | |
141 IsReadPacket = false; | |
142 } | |
143 } | |
144 private IAsyncResult BeginAsyncReadOperation(AsyncResult ar) { | |
145 lock (ReceiveQueue) { | |
146 if (AsyncReceiveOperation != null) throw new InvalidOperationException("Another asynchronous operation is in progress"); | |
147 if (ReceiveBuffer != null || ReceiveQueue.Count > 0) { | |
148 ar.SetCompleted(true); | |
149 } else { | |
150 if (Closed) throw new ObjectDisposedException("QueuedPacketStream", "The connection has been closed"); | |
151 AsyncReceiveOperation = ar; | |
152 } | |
153 } | |
154 return ar; | |
155 } | |
156 private void EndAsyncReadOperation(AsyncResult ar) { | |
157 lock (ReceiveQueue) { | |
158 if (AsyncReceiveOperation != null && ar != AsyncReceiveOperation) throw new InvalidOperationException("The given AsyncResult object does not match the current pending operation"); | |
159 AsyncReceiveOperation = null; | |
160 } | |
161 } | |
162 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { | |
163 return BeginAsyncReadOperation(new AsyncResult(callback, state, buffer, offset, count)); | |
164 } | |
165 public override IAsyncResult BeginReadPacket(AsyncCallback callback, object state) { | |
166 return BeginAsyncReadOperation(new AsyncResult(callback, state)); | |
167 } | |
168 public override IAsyncResult BeginReadPacketFast(AsyncCallback callback, object state) { | |
169 return BeginAsyncReadOperation(new AsyncResult(callback, state)); | |
170 } | |
171 public override int EndRead(IAsyncResult asyncResult) { | |
172 AsyncResult ar = (AsyncResult)asyncResult; | |
173 EndAsyncReadOperation(ar); | |
174 return Read(ar.Buffer, ar.BufferOffset, ar.BufferLength); | |
175 } | |
176 public override Byte[] EndReadPacket(IAsyncResult asyncResult) { | |
177 AsyncResult ar = (AsyncResult)asyncResult; | |
178 EndAsyncReadOperation(ar); | |
179 return ReadPacket(); | |
180 } | |
181 public override ArraySegment<Byte> EndReadPacketFast(IAsyncResult asyncResult) { | |
182 AsyncResult ar = (AsyncResult)asyncResult; | |
183 EndAsyncReadOperation(ar); | |
184 return ReadPacketFast(); | |
185 } | |
186 } | |
187 } |