changeset 1:28dc7d535036

Small improvements, introduced PacketStream
author Ivo Smits <Ivo@UCIS.nl>
date Mon, 07 Jan 2013 16:43:28 +0100
parents 3ab940a0c7a0
children d0117dc37c34
files Pml/Elements/Collection.cs Pml/RW/PmlBinaryRW.cs UCIS.csproj UTF8NoPreamble.cs Util/ArrayUtil.cs Util/CrossStream.cs Util/PacketStream.cs Util/PipeStream.cs Util/QueuedPacketStream.cs
diffstat 9 files changed, 278 insertions(+), 118 deletions(-) [+]
line wrap: on
line diff
--- a/Pml/Elements/Collection.cs	Tue Sep 11 16:28:53 2012 +0200
+++ b/Pml/Elements/Collection.cs	Mon Jan 07 16:43:28 2013 +0100
@@ -6,13 +6,16 @@
 	public class PmlCollection : PmlElement, ICollection<PmlElement> {
 			private List<PmlElement> pItems = new List<PmlElement>();
 
-			public PmlCollection() { 			}
+			public PmlCollection() { }
 			public PmlCollection(params PmlElement[] Elements) {
 				pItems.AddRange(Elements);
 			}
 			public PmlCollection(IEnumerable<PmlElement> Elements) {
 				pItems.AddRange(Elements);
 			}
+			public PmlCollection(params String[] Elements) {
+				foreach (String s in Elements) pItems.Add(s);
+			}
 
 			public PmlElement Add(PmlElement Element) {
 				pItems.Add(Element);
--- a/Pml/RW/PmlBinaryRW.cs	Tue Sep 11 16:28:53 2012 +0200
+++ b/Pml/RW/PmlBinaryRW.cs	Mon Jan 07 16:43:28 2013 +0100
@@ -34,29 +34,18 @@
 	}
 
 	public class PmlBinaryWriter : IPmlWriter {
-		//private BinaryWriter pWriter;
 		private Stream pStream;
 		private Encoding pEncoding;
 
-		/*public PmlBinaryWriter(BinaryWriter Writer) {
-			pWriter = Writer;
-		}*/
 		public PmlBinaryWriter(Stream Stream) {
-			//pWriter = new BinaryWriter(Stream);
 			pStream = Stream;
 			pEncoding = Encoding.UTF8;
 		}
 		public PmlBinaryWriter(Stream Stream, Encoding Encoding) {
-			//pWriter = new BinaryWriter(Stream, Encoding);
 			pStream = Stream;
 			pEncoding = Encoding;
 		}
 
-		/*public BinaryWriter BaseWriter {
-			get { return pWriter; }
-			set { pWriter = value; }
-		}*/
-
 		public void WriteMessage(PmlElement Message) {
 			MemoryStream stream = new MemoryStream();
 			BinaryWriter writer = new BinaryWriter(stream, pEncoding);
@@ -67,6 +56,13 @@
 			}
 		}
 
+		public static Byte[] EncodeMessage(PmlElement message) {
+			MemoryStream stream = new MemoryStream();
+			BinaryWriter writer = new BinaryWriter(stream, Encoding.UTF8);
+			WriteMessageTo(message, writer);
+			return stream.ToArray();
+		}
+
 		public static void WriteMessageTo(PmlElement Message, BinaryWriter Writer) {
 			lock (Writer) {
 				Writer.Write((byte)255);
@@ -77,13 +73,10 @@
 		}
 
 		private static void WriteElementTo(PmlElement Element, BinaryWriter Writer) {
-			//byte[] Buffer = null;
 			if (Element == null) {
-				//Writer.Write((byte)PmlElementType.Null);
 				Writer.Write((byte)0);
 				return;
 			}
-			//Writer.Write((byte)Element.Type);
 			switch (Element.Type) {
 				case PmlType.Null:
 					Writer.Write((byte)0);
@@ -119,7 +112,7 @@
 					Writer.Write((byte)11);
 					string Str = Element.ToString();
 					if (Str == null) {
-						Writer.Write("");
+						Writer.Write(String.Empty);
 					} else {
 						Writer.Write(Str);
 					}
@@ -166,6 +159,14 @@
 			return ReadMessageFrom(pReader);
 		}
 
+		public static PmlElement DecodeMessage(Byte[] message) {
+			using (MemoryStream ms = new MemoryStream(message)) {
+				using (BinaryReader reader = new BinaryReader(ms, Encoding.UTF8)) {
+					return ReadMessageFrom(reader);
+				}
+			}
+		}
+
 		public static PmlElement ReadMessageFrom(BinaryReader Reader) {
 			PmlElement Element = null;
 			lock (Reader) {
@@ -213,7 +214,6 @@
 						if (B == 0) return new PmlInteger(Reader.ReadUInt64());
 						else if (B == 1) return new PmlInteger(Reader.ReadInt64());
 						else return null;
-
 					}
 				default:
 					throw new Exception("Unknown PML type code " + EType.ToString());
--- a/UCIS.csproj	Tue Sep 11 16:28:53 2012 +0200
+++ b/UCIS.csproj	Mon Jan 07 16:43:28 2013 +0100
@@ -12,6 +12,8 @@
     <AssemblyName>UCIS</AssemblyName>
     <TargetFrameworkVersion>v2.0</TargetFrameworkVersion>
     <FileAlignment>512</FileAlignment>
+    <SignAssembly>true</SignAssembly>
+    <AssemblyOriginatorKeyFile>UCIS.snk</AssemblyOriginatorKeyFile>
   </PropertyGroup>
   <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
     <DebugSymbols>true</DebugSymbols>
@@ -64,6 +66,7 @@
     <None Include="Pml\LegacyPmlCommunicator.cs" />
     <Compile Include="Pml\PmlBuilder.cs" />
     <None Include="Pml\PmlCommunicator.cs" />
+    <None Include="UCIS.snk" />
     <Compile Include="Pml\PmlCommunicator2.cs" />
     <Compile Include="Pml\RW\PmlAmfRW.cs" />
     <Compile Include="Pml\RW\PmlPHPRW.cs" />
@@ -81,7 +84,9 @@
     <Compile Include="Util\AsyncStream.cs" />
     <Compile Include="Util\CrossStream.cs" />
     <Compile Include="Util\HoldStream.cs" />
+    <Compile Include="Util\PacketStream.cs" />
     <Compile Include="Util\PipeStream.cs" />
+    <Compile Include="Util\QueuedPacketStream.cs" />
     <Compile Include="Windows\ServiceManager.cs" />
     <Compile Include="Xml\PolicyFile.cs" />
     <Compile Include="Xml\Server.cs" />
--- a/UTF8NoPreamble.cs	Tue Sep 11 16:28:53 2012 +0200
+++ b/UTF8NoPreamble.cs	Mon Jan 07 16:43:28 2013 +0100
@@ -1,7 +1,5 @@
 namespace UCIS {
 	public class UTF8NoPreamble : System.Text.UTF8Encoding {
-		public override byte[] GetPreamble() {
-			return new byte[] { };
-		}
+		public UTF8NoPreamble() : base(false) { }
 	}
 }
--- a/Util/ArrayUtil.cs	Tue Sep 11 16:28:53 2012 +0200
+++ b/Util/ArrayUtil.cs	Mon Jan 07 16:43:28 2013 +0100
@@ -84,5 +84,23 @@
 			for (int i = 0; i < a.Length; i++) if (a[i] != b[i]) return false;
 			return true;
 		}
+		public static int GetHashCode<T>(T[] array) {
+			int h = 0;
+			foreach (T v in array) h ^= v.GetHashCode();
+			return h;
+		}
+		public static void Add<T>(ref T[] array, T item) {
+			if (array == null) {
+				array = new T[] { item };
+			} else {
+				int index = array.Length;
+				Array.Resize(ref array, index + 1);
+				array[index] = item;
+			}
+		}
+		public static void AddUnique<T>(ref T[] array, T item) {
+			if (Array.IndexOf(array, item) != -1) return;
+			Add(ref array, item);
+		}
 	}
 }
--- a/Util/CrossStream.cs	Tue Sep 11 16:28:53 2012 +0200
+++ b/Util/CrossStream.cs	Mon Jan 07 16:43:28 2013 +0100
@@ -1,65 +1,44 @@
 using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Threading;
 
 namespace UCIS.Util {
-	public class CrossStream : Stream {
-		private Queue<byte[]> queue = new Queue<byte[]>();
-		private byte[] currentBuffer = null;
-		private int currentBufferIndex;
-		private CrossStream otherPart;
-		private AutoResetEvent resetEvent = new AutoResetEvent(false);
+	public class CrossStream : QueuedPacketStream {
+		protected CrossStream otherPart;
+		private Boolean otherClosed = false;
+
+		public CrossStream OtherSide { get { return otherPart; } }
 
 		public static CrossStream CreatePair(out CrossStream stream1, out CrossStream stream2) {
-			stream1 = new CrossStream();
-			stream2 = new CrossStream(stream1);
-			stream1.otherPart = stream2;
-			return stream1;
+			return stream1 = CreatePair(out stream2);
 		}
 		public static CrossStream CreatePair(out CrossStream stream2) {
-			CrossStream stream1 = new CrossStream();
-			stream2 = new CrossStream(stream1);
-			stream1.otherPart = stream2;
-			return stream1;
+			stream2 = new CrossStream();
+			return stream2.otherPart;
 		}
 
-		private CrossStream() { }
-		public CrossStream(CrossStream other) {
-			this.otherPart = other;
+		public CrossStream() {
+			otherPart = new CrossStream(this);
+		}
+		protected CrossStream(CrossStream other) {
+			otherPart = other;
 		}
 
 		public override bool CanRead { get { return true; } }
 		public override bool CanWrite { get { return true; } }
-		public override bool CanSeek { get { return false; } }
-		public override bool CanTimeout { get { return true; } }
-		public override long Length { get { throw new NotSupportedException(); } }
-		public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } }
 		public override void Flush() { }
-		public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }
-		public override void SetLength(long value) { throw new NotSupportedException(); }
-		public override int Read(byte[] buffer, int offset, int count) {
-			if (currentBuffer == null) {
-				//if (queue.Count == 0) if (!resetEvent.WaitOne(this.ReadTimeout)) throw new TimeoutException();
-				if (queue.Count == 0 && !resetEvent.WaitOne()) throw new TimeoutException();
-				//while (queue.Count == 0) Thread.Sleep(100);
-				resetEvent.Reset();
-				currentBuffer = queue.Dequeue();
-				currentBufferIndex = 0;
-			}
-			if (count > currentBuffer.Length - currentBufferIndex) count = currentBuffer.Length - currentBufferIndex;
-			Buffer.BlockCopy(currentBuffer, currentBufferIndex, buffer, offset, count);
-			currentBufferIndex += count;
-			if (currentBufferIndex == currentBuffer.Length) currentBuffer = null;
-			return count;
+
+		public override void Write(byte[] buffer, int offset, int count) {
+			if (otherClosed) throw new ObjectDisposedException("CrossStream", "The stream has been closed");
+			otherPart.AddReadBufferCopy(buffer, offset, count);
 		}
-		public override void Write(byte[] buffer, int offset, int count) {
-			byte[] tostore;
-			if (count == 0) return;
-			tostore = new byte[count];
-			Buffer.BlockCopy(buffer, offset, tostore, 0, count);
-			otherPart.queue.Enqueue(tostore);
-			otherPart.resetEvent.Set();
+
+		public override void Close() {
+			CloseBase();
+			otherPart.CloseBase();
+		}
+		private void CloseBase() {
+			otherClosed = true;
+			otherPart = null;
+			base.Close();
 		}
 	}
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Util/PacketStream.cs	Mon Jan 07 16:43:28 2013 +0100
@@ -0,0 +1,17 @@
+using System;
+using System.IO;
+
+namespace UCIS.Util {
+	public abstract class PacketStream : Stream {
+		public abstract Byte[] ReadPacket();
+		public abstract IAsyncResult BeginReadPacket(AsyncCallback callback, object state);
+		public abstract Byte[] EndReadPacket(IAsyncResult asyncResult);
+
+		public virtual ArraySegment<Byte> ReadPacketFast() { return new ArraySegment<byte>(ReadPacket()); }
+		public virtual IAsyncResult BeginReadPacketFast(AsyncCallback callback, object state) { return BeginReadPacket(callback, state); }
+		public virtual ArraySegment<Byte> EndReadPacketFast(IAsyncResult asyncResult) { return new ArraySegment<byte>(EndReadPacket(asyncResult)); }
+		public virtual void WritePacketFast(Byte[] packet, int unusedBefore, int unusedAfter) { Write(packet, unusedBefore, packet.Length - unusedBefore - unusedAfter); }
+		public virtual int WriteFastBytesBefore { get { return 0; } }
+		public virtual int WriteFastBytesAfter { get { return 0; } }
+	}
+}
--- a/Util/PipeStream.cs	Tue Sep 11 16:28:53 2012 +0200
+++ b/Util/PipeStream.cs	Mon Jan 07 16:43:28 2013 +0100
@@ -1,59 +1,12 @@
 using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Threading;
 
 namespace UCIS.Util {
-	public class PipeStream : Stream {
-		private Queue<byte[]> queue = new Queue<byte[]>();
-		private byte[] currentBuffer = null;
-		private int currentBufferIndex;
-		private AutoResetEvent resetEvent = new AutoResetEvent(false);
-		private Boolean closed = false;
-
-		public PipeStream() {
-			ReadTimeout = Timeout.Infinite;
-		}
-
-		public override int ReadTimeout { get; set; }
-		public override bool CanRead { get { return !closed; } }
-		public override bool CanWrite { get { return !closed; } }
-		public override bool CanSeek { get { return false; } }
-		public override bool CanTimeout { get { return !closed; } }
-		public override long Length { get { throw new NotSupportedException(); } }
-		public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } }
+	public class PipeStream : QueuedPacketStream {
+		public override bool CanWrite { get { return !Closed; } }
 		public override void Flush() { }
-		public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }
-		public override void SetLength(long value) { throw new NotSupportedException(); }
-		public override int Read(byte[] buffer, int offset, int count) {
-			if (closed) throw new ObjectDisposedException("PipeStream");
-			if (currentBuffer == null) {
-				//if (queue.Count == 0) if (!resetEvent.WaitOne(this.ReadTimeout)) throw new TimeoutException();
-				if (queue.Count == 0 && (ReadTimeout == 0 || !resetEvent.WaitOne(ReadTimeout))) throw new TimeoutException();
-				//while (queue.Count == 0) Thread.Sleep(100);
-				resetEvent.Reset();
-				currentBuffer = queue.Dequeue();
-				currentBufferIndex = 0;
-			}
-			if (count > currentBuffer.Length - currentBufferIndex) count = currentBuffer.Length - currentBufferIndex;
-			Buffer.BlockCopy(currentBuffer, currentBufferIndex, buffer, offset, count);
-			currentBufferIndex += count;
-			if (currentBufferIndex == currentBuffer.Length) currentBuffer = null;
-			return count;
-		}
 		public override void Write(byte[] buffer, int offset, int count) {
-			byte[] tostore;
-			if (closed) throw new ObjectDisposedException("PipeStream");
-			if (count == 0) return;
-			tostore = new byte[count];
-			Buffer.BlockCopy(buffer, offset, tostore, 0, count);
-			queue.Enqueue(tostore);
-			resetEvent.Set();
-		}
-		public override void Close() {
-			closed = true;
-			resetEvent.Set();
-			base.Close();
+			if (Closed) throw new ObjectDisposedException("PipeStream");
+			AddReadBufferCopy(buffer, offset, count);
 		}
 	}
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Util/QueuedPacketStream.cs	Mon Jan 07 16:43:28 2013 +0100
@@ -0,0 +1,187 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Threading;
+using SysThreadPool = System.Threading.ThreadPool;
+
+namespace UCIS.Util {
+	public abstract class QueuedPacketStream : PacketStream {
+		Queue<Byte[]> ReceiveQueue = new Queue<byte[]>();
+		Byte[] ReceiveBuffer = null;
+		int ReceiveBufferOffset = 0;
+		int ReceiveWaiting = 0;
+		AutoResetEvent ReceiveEvent = new AutoResetEvent(false);
+		AsyncResult AsyncReceiveOperation = null;
+		protected Boolean Closed { get; private set; }
+
+		public QueuedPacketStream() {
+			ReadTimeout = Timeout.Infinite;
+			Closed = false;
+		}
+
+		protected void AddReadBufferCopy(Byte[] buffer, int offset, int count) {
+			Byte[] store;
+			store = new Byte[count];
+			Buffer.BlockCopy(buffer, offset, store, 0, count);
+			AddReadBufferNoCopy(store);
+		}
+		protected void AddReadBufferNoCopy(Byte[] store) {
+			if (Closed) return;
+			lock (ReceiveQueue) {
+				ReceiveQueue.Enqueue(store);
+				Interlocked.Add(ref ReceiveWaiting, store.Length);
+				ReceiveEvent.Set();
+				if (AsyncReceiveOperation != null && (store.Length > 0 || AsyncReceiveOperation.IsReadPacket)) {
+					AsyncReceiveOperation.SetCompleted(false);
+					AsyncReceiveOperation = null;
+				}
+			}
+		}
+		public override void Close() {
+			Closed = true;
+			base.Close();
+			ReceiveEvent.Set();
+			lock (ReceiveQueue) {
+				if (AsyncReceiveOperation != null) {
+					AsyncReceiveOperation.SetCompleted(false);
+					AsyncReceiveOperation = null;
+				}
+			}
+		}
+
+		public override bool CanSeek { get { return false; } }
+		public override bool CanTimeout { get { return true; } }
+		public override bool CanRead { get { return !Closed; } }
+		public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } }
+		public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }
+		public override void SetLength(long value) { throw new NotSupportedException(); }
+
+		public override int ReadTimeout { get; set; }
+		public override long Length { get { return ReceiveWaiting; } }
+
+		public int WaitForPacket() {
+			while (ReceiveBuffer == null) {
+				lock (ReceiveQueue) {
+					if (ReceiveQueue.Count > 0) {
+						ReceiveBuffer = ReceiveQueue.Dequeue();
+						ReceiveBufferOffset = 0;
+						continue;
+					}
+				}
+				if (Closed) throw new ObjectDisposedException("QueuedPacketStream", "The connection has been closed");
+				if (ReadTimeout == 0 || !ReceiveEvent.WaitOne(ReadTimeout, false)) throw new TimeoutException();
+			}
+			return ReceiveBuffer.Length - ReceiveBufferOffset;
+		}
+		public override int Read(byte[] buffer, int offset, int count) {
+			int left = 0;
+			while (true) {
+				left = WaitForPacket();
+				if (left > 0) break;
+				ReceiveBuffer = null;
+			}
+			if (count > left) count = left;
+			Buffer.BlockCopy(ReceiveBuffer, ReceiveBufferOffset, buffer, offset, count);
+			ReceiveBufferOffset += count;
+			if (ReceiveBufferOffset == ReceiveBuffer.Length) ReceiveBuffer = null;
+			Interlocked.Add(ref ReceiveWaiting, -count);
+			return count;
+		}
+		public override Byte[] ReadPacket() {
+			WaitForPacket();
+			Byte[] arr = ReceiveBuffer;
+			if (ReceiveBufferOffset > 0) {
+				arr = new Byte[ReceiveBuffer.Length - ReceiveBufferOffset];
+				Buffer.BlockCopy(ReceiveBuffer, ReceiveBufferOffset, arr, 0, arr.Length - ReceiveBufferOffset);
+			}
+			ReceiveBuffer = null;
+			return arr;
+		}
+		public override ArraySegment<byte> ReadPacketFast() {
+			WaitForPacket();
+			ArraySegment<byte> ret = new ArraySegment<byte>(ReceiveBuffer, ReceiveBufferOffset, ReceiveBuffer.Length - ReceiveBufferOffset);
+			ReceiveBuffer = null;
+			return ret;
+		}
+
+		class AsyncResult : IAsyncResult {
+			public Object AsyncState { get; private set; }
+			public WaitHandle AsyncWaitHandle { get { return WaitHandle; } }
+			public Boolean CompletedSynchronously { get; private set; }
+			public Boolean IsCompleted { get; private set; }
+			public Boolean IsReadPacket { get; private set; }
+
+			public Byte[] Buffer = null;
+			public int BufferOffset = 0;
+			public int BufferLength = 0;
+
+			private ManualResetEvent WaitHandle = new ManualResetEvent(false);
+			private AsyncCallback Callback = null;
+			private void CallCallback(Object state) {
+				if (Callback != null) Callback(this);
+			}
+			public void SetCompleted(Boolean synchronously) {
+				CompletedSynchronously = synchronously;
+				IsCompleted = true;
+				WaitHandle.Set();
+				if (Callback != null) SysThreadPool.QueueUserWorkItem(CallCallback);
+			}
+			public AsyncResult(AsyncCallback callback, Object state) {
+				this.Callback = callback;
+				this.AsyncState = state;
+				CompletedSynchronously = false;
+				IsCompleted = false;
+				IsReadPacket = true;
+			}
+			public AsyncResult(AsyncCallback callback, Object state, Byte[] buffer, int bufferOffset, int bufferLength)
+				: this(callback, state) {
+				this.Buffer = buffer;
+				this.BufferOffset = bufferOffset;
+				this.BufferLength = bufferLength;
+				IsReadPacket = false;
+			}
+		}
+		private IAsyncResult BeginAsyncReadOperation(AsyncResult ar) {
+			lock (ReceiveQueue) {
+				if (AsyncReceiveOperation != null) throw new InvalidOperationException("Another asynchronous operation is in progress");
+				if (ReceiveBuffer != null || ReceiveQueue.Count > 0) {
+					ar.SetCompleted(true);
+				} else {
+					if (Closed) throw new ObjectDisposedException("QueuedPacketStream", "The connection has been closed");
+					AsyncReceiveOperation = ar;
+				}
+			}
+			return ar;
+		}
+		private void EndAsyncReadOperation(AsyncResult ar) {
+			lock (ReceiveQueue) {
+				if (AsyncReceiveOperation != null && ar != AsyncReceiveOperation) throw new InvalidOperationException("The given AsyncResult object does not match the current pending operation");
+				AsyncReceiveOperation = null;
+			}
+		}
+		public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) {
+			return BeginAsyncReadOperation(new AsyncResult(callback, state, buffer, offset, count));
+		}
+		public override IAsyncResult BeginReadPacket(AsyncCallback callback, object state) {
+			return BeginAsyncReadOperation(new AsyncResult(callback, state));
+		}
+		public override IAsyncResult BeginReadPacketFast(AsyncCallback callback, object state) {
+			return BeginAsyncReadOperation(new AsyncResult(callback, state));
+		}
+		public override int EndRead(IAsyncResult asyncResult) {
+			AsyncResult ar = (AsyncResult)asyncResult;
+			EndAsyncReadOperation(ar);
+			return Read(ar.Buffer, ar.BufferOffset, ar.BufferLength);
+		}
+		public override Byte[] EndReadPacket(IAsyncResult asyncResult) {
+			AsyncResult ar = (AsyncResult)asyncResult;
+			EndAsyncReadOperation(ar);
+			return ReadPacket();
+		}
+		public override ArraySegment<Byte> EndReadPacketFast(IAsyncResult asyncResult) {
+			AsyncResult ar = (AsyncResult)asyncResult;
+			EndAsyncReadOperation(ar);
+			return ReadPacketFast();
+		}
+	}
+}