Mercurial > hg > ucis.core
comparison Net/TCPStream.cs @ 7:4b78cc5f116b
Fixes and improvements (some untested)
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Sun, 13 Jan 2013 18:44:17 +0100 |
parents | d0117dc37c34 |
children | 4ba4fd48e1da |
comparison
equal
deleted
inserted
replaced
6:5ce7a138fdba | 7:4b78cc5f116b |
---|---|
1 using System; | 1 using System; |
2 using System.IO; | 2 using System.IO; |
3 using System.Net; | 3 using System.Net; |
4 using System.Net.Sockets; | 4 using System.Net.Sockets; |
5 using System.Threading; | 5 using System.Threading; |
6 using UCIS.Util; | |
6 using SysThreadPool = System.Threading.ThreadPool; | 7 using SysThreadPool = System.Threading.ThreadPool; |
7 | 8 |
8 namespace UCIS.Net { | 9 namespace UCIS.Net { |
9 public class TCPStream : Stream, INetworkConnection { | 10 public class TCPStream : Stream, INetworkConnection { |
10 private static long _totalBytesRead = 0; | 11 private static long _totalBytesRead = 0; |
74 return Buffer[0]; | 75 return Buffer[0]; |
75 } | 76 } |
76 } | 77 } |
77 | 78 |
78 public override int Read(byte[] buffer, int offset, int size) { | 79 public override int Read(byte[] buffer, int offset, int size) { |
80 if (size < 1) return 0; | |
79 int Count = 0; | 81 int Count = 0; |
80 | |
81 if (size < 1) return 0; | |
82 if (_HasPeekByte) { | 82 if (_HasPeekByte) { |
83 buffer[offset] = _PeekByte; | 83 buffer[offset] = _PeekByte; |
84 _HasPeekByte = false; | 84 _HasPeekByte = false; |
85 Count = 1; | 85 Count = 1; |
86 offset += 1; | 86 offset += 1; |
87 size -= 1; | 87 size = 0; |
88 } | 88 } |
89 | |
90 try { | 89 try { |
91 if (size > 0) Count += Socket.Receive(buffer, offset, size, SocketFlags.None); | 90 if (size > 0) Count += Socket.Receive(buffer, offset, size, SocketFlags.None); |
92 } catch (SocketException ex) { | 91 } catch (SocketException ex) { |
93 switch (ex.SocketErrorCode) { | 92 switch (ex.SocketErrorCode) { |
94 case SocketError.WouldBlock: | 93 case SocketError.WouldBlock: |
112 _BytesRead += (ulong)Count; | 111 _BytesRead += (ulong)Count; |
113 Interlocked.Add(ref _totalBytesRead, (long)Count); | 112 Interlocked.Add(ref _totalBytesRead, (long)Count); |
114 return Count; | 113 return Count; |
115 } | 114 } |
116 | 115 |
117 class AsyncResult : IAsyncResult { | 116 class AsyncResult : AsyncResultBase { |
118 public Object AsyncState { get; private set; } | |
119 public WaitHandle AsyncWaitHandle { get { return WaitHandle; } } | |
120 public Boolean CompletedSynchronously { get; private set; } | |
121 public Boolean IsCompleted { get; private set; } | |
122 public int Count { get; private set; } | 117 public int Count { get; private set; } |
123 | 118 public AsyncResult(AsyncCallback callback, Object state) : base(callback, state) { } |
124 private ManualResetEvent WaitHandle = new ManualResetEvent(false); | |
125 private AsyncCallback Callback = null; | |
126 private void CallCallback(Object state) { | |
127 if (Callback != null) Callback(this); | |
128 } | |
129 public void SetCompleted(Boolean synchronously, int cnt) { | 119 public void SetCompleted(Boolean synchronously, int cnt) { |
130 CompletedSynchronously = synchronously; | |
131 Count = cnt; | 120 Count = cnt; |
132 IsCompleted = true; | 121 base.SetCompleted(synchronously, null); |
133 WaitHandle.Set(); | |
134 if (synchronously) { | |
135 CallCallback(null); | |
136 } else { | |
137 if (Callback != null) SysThreadPool.QueueUserWorkItem(CallCallback); | |
138 } | |
139 } | |
140 public AsyncResult(AsyncCallback callback, Object state) { | |
141 this.Callback = callback; | |
142 this.AsyncState = state; | |
143 CompletedSynchronously = false; | |
144 IsCompleted = false; | |
145 } | 122 } |
146 } | 123 } |
147 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { | 124 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { |
148 if (count < 0) { | 125 if (count < 0) { |
149 AsyncResult ar = new AsyncResult(callback, state); | 126 AsyncResult ar = new AsyncResult(callback, state); |
150 ar.SetCompleted(true, 0); | 127 ar.SetCompleted(true, 0); |
151 return ar; | 128 return ar; |
152 } else if (_HasPeekByte) { | 129 } else if (_HasPeekByte) { |
153 buffer[offset] = _PeekByte; | 130 buffer[offset] = _PeekByte; |
154 _HasPeekByte = false; | 131 _HasPeekByte = false; |
155 offset += 1; | |
156 count -= 1; | |
157 AsyncResult ar = new AsyncResult(callback, state); | 132 AsyncResult ar = new AsyncResult(callback, state); |
158 ar.SetCompleted(true, 1); | 133 ar.SetCompleted(true, 1); |
159 return ar; | 134 return ar; |
160 } else { | 135 } else { |
161 return Socket.BeginReceive(buffer, offset, count, SocketFlags.None, callback, state); | 136 return Socket.BeginReceive(buffer, offset, count, SocketFlags.None, callback, state); |
171 | 146 |
172 public int PeekByte() { | 147 public int PeekByte() { |
173 if (_HasPeekByte) { | 148 if (_HasPeekByte) { |
174 return _PeekByte; | 149 return _PeekByte; |
175 } else { | 150 } else { |
176 int Result = 0; | 151 int Result = ReadByte(); |
177 Result = ReadByte(); | |
178 if (Result >= 0 && Result <= 255) { | 152 if (Result >= 0 && Result <= 255) { |
179 _PeekByte = (byte)Result; | 153 _PeekByte = (byte)Result; |
180 _HasPeekByte = true; | 154 _HasPeekByte = true; |
181 } | 155 } |
182 return Result; | 156 return Result; |
209 //Do nothing | 183 //Do nothing |
210 //_Socket.NoDelay = true; | 184 //_Socket.NoDelay = true; |
211 } | 185 } |
212 | 186 |
213 public override long Length { | 187 public override long Length { |
214 get { | 188 get { throw new NotSupportedException(); } |
215 throw new NotSupportedException(); | 189 } |
216 } | |
217 } | |
218 | |
219 public override long Position { | 190 public override long Position { |
220 get { | 191 get { throw new NotSupportedException(); } |
221 throw new NotSupportedException(); | 192 set { throw new NotSupportedException(); } |
222 } | |
223 set { | |
224 throw new NotSupportedException(); | |
225 } | |
226 } | 193 } |
227 | 194 |
228 public override long Seek(long offset, SeekOrigin origin) { | 195 public override long Seek(long offset, SeekOrigin origin) { |
229 throw new NotSupportedException(); | 196 throw new NotSupportedException(); |
230 } | 197 } |
264 _BytesWritten += (ulong)size; | 231 _BytesWritten += (ulong)size; |
265 Interlocked.Add(ref _totalBytesWritten, (long)size); | 232 Interlocked.Add(ref _totalBytesWritten, (long)size); |
266 } | 233 } |
267 | 234 |
268 public override void Close() { | 235 public override void Close() { |
269 System.Net.Sockets.Socket s = Interlocked.Exchange(ref _Socket, null); | 236 Socket s = Interlocked.Exchange(ref _Socket, null); |
270 try { | 237 try { |
271 if (s != null) { | 238 if (s != null) { |
272 try { | 239 try { |
273 if (s.Connected) s.Shutdown(SocketShutdown.Both); | 240 if (s.Connected) s.Shutdown(SocketShutdown.Both); |
274 } catch { } | 241 } catch { } |