comparison Remoting/RemotingManager.cs @ 13:92e09198e7e7

Added Remoting code
author Ivo Smits <Ivo@UCIS.nl>
date Wed, 27 Feb 2013 18:47:15 +0100
parents
children fc3eb8e49ea6
comparison
equal deleted inserted replaced
12:ba8f94212c6e 13:92e09198e7e7
1 using System;
2 using System.Collections.Generic;
3 using System.IO;
4 using System.Reflection;
5 using System.Reflection.Emit;
6 using System.Runtime.Remoting;
7 using System.Runtime.Remoting.Messaging;
8 using System.Runtime.Remoting.Proxies;
9 using System.Runtime.Serialization;
10 using System.Threading;
11 using UCIS.Util;
12 using SysThreadPool = System.Threading.ThreadPool;
13
14 //[assembly: InternalsVisibleToAttribute("UCIS.Remoting.Proxies")]
15
16 namespace UCIS.Remoting {
17 public class RemotingManager {
18 Dictionary<UInt32, PendingRemoteCall> pendingCalls = new Dictionary<uint, PendingRemoteCall>();
19 Dictionary<Thread, UInt32> waitingCallThreads = new Dictionary<Thread, UInt32>();
20 Boolean Closed = false;
21
22 IDictionary<String, Object> incomingCallContext = new Dictionary<String, Object>();
23 [ThreadStatic]
24 static IDictionary<String, Object> currentCallContext;
25
26 public event Action<String> OnDebugLog;
27 public event Action<Exception> OnErrorLog;
28
29 private void DebugLog(String text, params Object[] args) {
30 if (OnDebugLog != null) OnDebugLog(String.Format(text, args));
31 }
32 private void ErrorLog(Exception ex) {
33 if (OnErrorLog != null) OnErrorLog(ex);
34 }
35
36 public Object LocalRoot { get; set; }
37
38 public IDictionary<String, Object> CallContext { get { return incomingCallContext; } }
39 public static IDictionary<String, Object> CurrentCallContext { get { return currentCallContext; } }
40
41 public RemotingManager(PacketStream stream) : this(stream, null) { }
42 public RemotingManager(PacketStream stream, Object localRoot) {
43 this.stream = stream;
44 this.LocalRoot = localRoot;
45 stream.BeginReadPacketFast(ReceiveCallback, null);
46 }
47
48 #region I/O, multiplexing, encoding
49 PacketStream stream;
50 public PacketStream Stream { get { return stream; } }
51 int streamIndex = 0;
52 Dictionary<int, StreamChannel> streamChannels = new Dictionary<int, StreamChannel>();
53
54 class StreamChannel : QueuedPacketStream {
55 public RemotingManager Manager { get; private set; }
56 public int StreamID { get; private set; }
57 public StreamChannel OtherSide { get; set; }
58
59 public StreamChannel(RemotingManager conn, int sid) {
60 this.Manager = conn;
61 this.StreamID = sid;
62 }
63
64 internal void AddBuffer(Byte[] buffer, int offset, int count) {
65 if (Closed) return;
66 base.AddReadBufferCopy(buffer, offset, count);
67 }
68
69 public override bool CanRead { get { return !Closed; } }
70 public override bool CanWrite { get { return !Closed; } }
71
72 public override void Flush() { }
73
74 public override void Write(byte[] buffer, int offset, int count) {
75 if (Closed) throw new ObjectDisposedException("BaseStream", "The connection has been closed");
76 Manager.WriteStreamChannelPacket(StreamID ^ 0x4000, buffer, offset, count);
77 }
78
79 public override void Close() {
80 StreamChannel other = OtherSide;
81 MultiplexorClosed();
82 if (other != null) other.CloseInternal();
83 lock (Manager.streamChannels) Manager.streamChannels.Remove(StreamID);
84 }
85 public void CloseInternal() {
86 MultiplexorClosed();
87 lock (Manager.streamChannels) Manager.streamChannels.Remove(StreamID);
88 }
89 internal void MultiplexorClosed() {
90 base.Close();
91 OtherSide = null;
92 }
93 }
94
95 private void ReceiveCallback(IAsyncResult ar) {
96 if (ar.CompletedSynchronously) {
97 SysThreadPool.QueueUserWorkItem(ReceiveCallbackA, ar);
98 } else {
99 ReceiveCallbackB(ar);
100 }
101 }
102 private void ReceiveCallbackA(Object state) {
103 ReceiveCallbackB((IAsyncResult)state);
104 }
105 private void ReceiveCallbackB(IAsyncResult ar) {
106 try {
107 ArraySegment<Byte> packet = stream.EndReadPacketFast(ar);
108 Byte[] array = packet.Array;
109 if (packet.Count < 2) throw new ArgumentOutOfRangeException("packet.Count", "Packet is too small");
110 int offset = packet.Offset;
111 int sid = (array[offset + 0] << 8) | (array[offset + 1] << 0);
112 if ((sid & 0x8000) != 0) {
113 StreamChannel substr;
114 if (streamChannels.TryGetValue(sid, out substr)) substr.AddBuffer(array, offset + 2, packet.Count - 2);
115 }
116 stream.BeginReadPacketFast(ReceiveCallback, null);
117 if (sid == 0) {
118 Object obj;
119 using (MemoryStream ms = new MemoryStream(packet.Array, packet.Offset + 2, packet.Count - 2, false)) obj = Deserialize(ms);
120 ReceiveObject(obj);
121 }
122 } catch (Exception ex) {
123 Closed = true;
124 stream.Close();
125 lock (ObjectReferencesByID) {
126 ObjectReferencesByID.Clear();
127 RemoteObjectReferences.Clear();
128 }
129 lock (pendingCalls) {
130 foreach (PendingRemoteCall call in pendingCalls.Values) call.SetError(new InvalidOperationException("The connection has been closed"));
131 pendingCalls.Clear();
132 }
133 lock (streamChannels) {
134 foreach (StreamChannel s in streamChannels.Values) s.MultiplexorClosed();
135 streamChannels.Clear();
136 }
137 ErrorLog(ex);
138 }
139 }
140 private void SendObject(Object obj) {
141 if (Closed) throw new ObjectDisposedException("RemotingManager", "The connection has been closed");
142 using (MemoryStream ms = new MemoryStream()) {
143 ms.WriteByte(0);
144 ms.WriteByte(0);
145 Serialize(ms, obj);
146 lock (stream) ms.WriteTo(stream);
147 }
148 }
149
150 private void WriteStreamChannelPacket(int sid, Byte[] buffer, int offset, int count) {
151 Byte[] store = new Byte[count + 2];
152 store[0] = (Byte)(sid >> 8);
153 store[1] = (Byte)(sid >> 0);
154 Buffer.BlockCopy(buffer, offset, store, 2, count);
155 lock (stream) stream.Write(store, 0, store.Length);
156 }
157
158 public PacketStream GetStreamPair(out PacketStream remote) {
159 StreamChannel stream;
160 int sid;
161 lock (streamChannels) {
162 if (Closed) throw new ObjectDisposedException("BaseStream", "Reading from the base stream failed");
163 while (true) {
164 sid = Interlocked.Increment(ref streamIndex);
165 if ((sid & 0xc000) != 0) streamIndex = sid = 0;
166 sid |= 0x8000;
167 if (!streamChannels.ContainsKey(sid)) break;
168 }
169 stream = new StreamChannel(this, sid);
170 streamChannels.Add(sid, stream);
171 }
172 stream.OtherSide = (StreamChannel)SyncCall(new CreateStreamRequest() { StreamObject = stream, StreamID = sid | 0x4000 });
173 remote = stream.OtherSide;
174 return stream;
175 }
176 #endregion
177
178 #region Incoming call processing
179 private void ReceiveObject(Object obj) {
180 if (obj is ReferenceReleaseRequest) {
181 ReferenceReleaseRequest req = (ReferenceReleaseRequest)obj;
182 lock (ObjectReferencesByID) {
183 RemoteObjectReference objref = (RemoteObjectReference)ObjectReferencesByID[req.ObjectID];
184 if (objref.Release(req.ReferenceCount) == 0) {
185 ObjectReferencesByID.Remove(objref.ID);
186 RemoteObjectReferences.Remove(objref);
187 DebugLog("Release remoted object {0} with reference count {1}; {2} objects referenced", objref.ID, req.ReferenceCount, RemoteObjectReferences.Count);
188 }
189 }
190 } else if (obj is SynchronousCall) {
191 SynchronousCall sc = (SynchronousCall)obj;
192 if (sc.IsResult) {
193 PendingRemoteCall call;
194 lock (pendingCalls) {
195 call = pendingCalls[sc.CallID];
196 pendingCalls.Remove(call.ID);
197 }
198 call.SetResult(sc.Data);
199 } else if (sc.HasPreviousCallID) {
200 PendingRemoteCall call;
201 lock (pendingCalls) call = pendingCalls[sc.PreviousCallID];
202 if (!call.SetNextCall(sc)) {
203 ProcessRemoteCallRequest(sc);
204 }
205 } else {
206 ProcessRemoteCallRequest(sc);
207 }
208 } else {
209 throw new InvalidDataException("Unexpected object type");
210 }
211 }
212
213 private void ProcessRemoteCallRequest(SynchronousCall call) {
214 UInt32 prevcallid;
215 Boolean hasprevcallid;
216 Thread currentThread = Thread.CurrentThread;
217 lock (waitingCallThreads) {
218 hasprevcallid = waitingCallThreads.TryGetValue(currentThread, out prevcallid);
219 waitingCallThreads[currentThread] = call.CallID;
220 }
221 IDictionary<String, Object> prevCallContext = currentCallContext;
222 currentCallContext = incomingCallContext;
223 try {
224 call.Data = ProcessRemoteCallRequestA(call.Data);
225 } finally {
226 currentCallContext = prevCallContext;
227 lock (waitingCallThreads) {
228 if (hasprevcallid) waitingCallThreads[currentThread] = prevcallid;
229 else waitingCallThreads.Remove(currentThread);
230 }
231 }
232 call.IsResult = true;
233 SendObject(call);
234 }
235
236 private Object FixReturnType(Object value, Type expectedType) {
237 if (value == null) return value;
238 Type valueType = value.GetType();
239 if (valueType != expectedType) {
240 if (valueType.IsArray) {
241 Array retarray = value as Array;
242 if (retarray != null) {
243 if (!valueType.GetElementType().IsPublic && retarray.Rank == 1 &&
244 (
245 expectedType.IsArray ||
246 (expectedType.IsGenericType && (expectedType.GetGenericTypeDefinition() == typeof(IEnumerable<>) || expectedType.GetGenericTypeDefinition() == typeof(ICollection<>) || expectedType.GetGenericTypeDefinition() == typeof(IList<>)))
247 )
248 ) {
249 Type btype = expectedType.IsArray ? expectedType.GetElementType() : expectedType.GetGenericArguments()[0];
250 Array r = Array.CreateInstance(btype, retarray.Length);
251 retarray.CopyTo(r, 0);
252 value = r;
253 }
254 }
255 }
256 }
257 return value;
258 }
259 private Object ProcessRemoteMethodCallRequestA(Object ret) {
260 if (ret is DelegateCallRequest) {
261 DelegateCallRequest call = (DelegateCallRequest)ret;
262 Object target = call.Delegate;
263 DebugLog("Remote delegate call on {0}", target);
264 if (ReferenceEquals(target, null)) throw new NullReferenceException("target");
265 if (!(target is Delegate)) throw new InvalidCastException("target");
266 Object[] args = call.Arguments;
267 return ((Delegate)target).DynamicInvoke(args);
268 } else if (ret is MethodCallRequest) {
269 MethodCallRequest call = (MethodCallRequest)ret;
270 Object target = call.Object;
271 if (ReferenceEquals(target, null)) throw new NullReferenceException("target");
272 Type intf = call.Type ?? target.GetType();
273 DebugLog("Remote call {0}.{1} on {2}", intf.FullName, call.MethodName, target);
274 if (!intf.IsInstanceOfType(target)) throw new InvalidCastException("target");
275 MethodInfo meth;
276 if (call.MethodSignature != null) {
277 meth = intf.GetMethod(call.MethodName, BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic, null, call.MethodSignature, null);
278 } else {
279 meth = intf.GetMethod(call.MethodName, BindingFlags.Instance | BindingFlags.Public);
280 }
281 if (meth == null) throw new NullReferenceException("method");
282 Object[] args = call.Arguments;
283 Object retval;
284 if (meth.Name == "GetType" && meth == typeof(Object).GetMethod("GetType", Type.EmptyTypes)) {
285 if (target.GetType().IsPublic) retval = target.GetType();
286 else retval = intf;
287 } else {
288 retval = meth.Invoke(target, args);
289 }
290 //Todo: causes lots of redundant data transfers, do something about it!
291 /*Object[] rargs = new Object[0];
292 for (int i = 0; i < args.Length; i++) {
293 if (args[i] == null) continue;
294 if (args[i].GetType().IsArray) {
295 Array.Resize(ref rargs, i + 1);
296 rargs[i] = args[i];
297 }
298 }
299 resp.ReturnArguments = rargs;*/
300 return FixReturnType(retval, meth.ReturnType);
301 } else if (ret is PropertyAccessRequest) {
302 PropertyAccessRequest call = (PropertyAccessRequest)ret;
303 Object target = call.Object;
304 if (ReferenceEquals(target, null)) throw new NullReferenceException("target");
305 Type intf = call.Type ?? target.GetType();
306 DebugLog("Remote property access {0}.{1} on {2}", intf.FullName, call.PropertyName, target);
307 if (!intf.IsInstanceOfType(target)) throw new InvalidCastException("target");
308 PropertyInfo meth = intf.GetProperty(call.PropertyName, BindingFlags.Instance | BindingFlags.Public);
309 if (meth == null) throw new NullReferenceException("property");
310 if (call.SetValue) {
311 meth.SetValue(target, call.Value, null);
312 return null;
313 } else {
314 return FixReturnType(meth.GetValue(target, null), meth.PropertyType);
315 }
316 } else {
317 throw new InvalidDataException("Unexpected object type");
318 }
319 }
320 private Object ProcessRemoteCallRequestA(Object ret) {
321 if (ret is DelegateCallRequest || ret is MethodCallRequest || ret is PropertyAccessRequest) {
322 MethodCallResponse resp = new MethodCallResponse();
323 try {
324 resp.ReturnValue = ProcessRemoteMethodCallRequestA(ret);
325 } catch (Exception ex) {
326 resp.Exception = ex;
327 ErrorLog(ex);
328 }
329 return resp;
330 } else if (ret is GetRootRequest) {
331 DebugLog("Remote root request");
332 return LocalRoot;
333 } else if (ret is ObjectCanCastToRequest) {
334 ObjectCanCastToRequest ctr = (ObjectCanCastToRequest)ret;
335 Type intf = ctr.Type;
336 Object target = ctr.Object;
337 DebugLog("Remote type check for {0} on {1}", intf.Name, target);
338 return intf != null && intf.IsInstanceOfType(target);
339 } else if (ret is CreateStreamRequest) {
340 CreateStreamRequest csr = (CreateStreamRequest)ret;
341 StreamChannel ss = new StreamChannel(this, csr.StreamID);
342 ss.OtherSide = csr.StreamObject;
343 lock (streamChannels) streamChannels.Add(csr.StreamID, ss);
344 return ss;
345 } else {
346 throw new InvalidDataException("Unexpected object type");
347 }
348 }
349 #endregion
350
351 #region Outgoing calls
352 UInt32 callID = 0;
353 private UInt32 AllocatePendingCallID(PendingRemoteCall call) {
354 UInt32 cid;
355 lock (pendingCalls) {
356 while (true) {
357 cid = ++callID;
358 if (!pendingCalls.ContainsKey(cid)) break;
359 Monitor.Wait(pendingCalls);
360 }
361 call.ID = cid;
362 pendingCalls.Add(cid, call);
363 }
364 return cid;
365 }
366 private Object SyncCall(Object req) {
367 UInt32 previousCall;
368 Boolean hasPreviousCall;
369 lock (waitingCallThreads) hasPreviousCall = waitingCallThreads.TryGetValue(Thread.CurrentThread, out previousCall);
370 PendingRemoteCall pending = new PendingRemoteCall() { Completed = false, WaitHandle = new ManualResetEvent(false), CallData = req };
371 AllocatePendingCallID(pending);
372 SynchronousCall call = new SynchronousCall() { IsResult = false, CallID = pending.ID, Data = req, HasPreviousCallID = false };
373 if (hasPreviousCall) {
374 call.HasPreviousCallID = true;
375 call.PreviousCallID = previousCall;
376 }
377 SendObject(call);
378 while (true) {
379 pending.WaitHandle.WaitOne();
380 if (pending.Completed) break;
381 pending.WaitHandle.Reset();
382 if (pending.NextCall == null) throw new InvalidOperationException("Operation did not complete");
383 SynchronousCall sc = pending.NextCall.Value;
384 pending.NextCall = null;
385 ProcessRemoteCallRequest(sc);
386 }
387 pending.WaitHandle.Close();
388 if (pending.Error != null) throw pending.Error;
389 return pending.Response;
390 }
391 private void AsyncCall(Object req) {
392 UInt32 cid = AllocatePendingCallID(new PendingRemoteCall() { Completed = false, CallData = req });
393 SendObject(new SynchronousCall() { IsResult = false, CallID = cid, Data = req, HasPreviousCallID = false });
394 }
395
396 public Object RemoteRoot {
397 get {
398 return SyncCall(new GetRootRequest());
399 }
400 }
401
402 public static void AsyncCall(Delegate f, params Object[] args) {
403 IProxyBase proxy;
404 if ((proxy = GetRealProxyForObject(f)) != null) {
405 proxy.Manager.AsyncCall(new DelegateCallRequest() { Delegate = proxy, Arguments = args });
406 } else if ((proxy = GetRealProxyForObject(f.Target)) != null) {
407 MethodCallRequest call = new MethodCallRequest() { Object = proxy, Type = f.Method.DeclaringType, MethodName = f.Method.Name, Arguments = args };
408 call.MethodSignature = ConvertMethodParameterTypeArray(f.Method);
409 proxy.Manager.AsyncCall(call);
410 } else {
411 throw new InvalidOperationException("Delegate is not a proxy for a remote object");
412 }
413 }
414
415 private static Type[] ConvertMethodParameterTypeArray(MethodInfo method) {
416 ParameterInfo[] parameters = method.GetParameters();
417 Type[] types = new Type[parameters.Length];
418 for (int i = 0; i < types.Length; i++) types[i] = parameters[i].ParameterType;
419 return types;
420 }
421 private void ProxyCallCheckReturnType(Object value, Type type) {
422 if (type == typeof(void)) return;
423 if (type.IsInstanceOfType(value)) return;
424 if (value == null && !type.IsValueType) return;
425 throw new InvalidCastException("Type returned by remote procedure does not match expected type.");
426 }
427 private void ProxyCallFixReturnArguments(Object[] args, Object[] rargs) {
428 if (rargs == null) return;
429 for (int i = 0; i < rargs.Length; i++) {
430 if (rargs[i] == null) continue;
431 if (args[i] != null && args[i].GetType().IsArray) {
432 ((Array)rargs[i]).CopyTo((Array)args[i], 0);
433 }
434 }
435 }
436 private Object ProxyMakeCallDelegate(DelegateProxy obj, Object[] args) {
437 DelegateCallRequest call = new DelegateCallRequest() { Delegate = obj, Arguments = args };
438 MethodCallResponse resp = (MethodCallResponse)SyncCall(call);
439 if (resp.Exception != null) throw new Exception("Remote exception", resp.Exception);
440 ProxyCallFixReturnArguments(args, resp.ReturnArguments);
441 ProxyCallCheckReturnType(resp.ReturnValue, obj.MethodSignature.ReturnType);
442 return resp.ReturnValue;
443 }
444 private Object ProxyMakeCall(IProxyBase obj, MethodInfo method, Object[] args) {
445 /*if (args.Length == 1 && method.ReturnType == typeof(void) && args[0] != null && args[0] is Delegate) {
446 foreach (EventInfo ei in type.GetEvents()) {
447 if (ei.EventHandlerType.IsInstanceOfType(args[0])) {
448 if (method == ei.GetAddMethod()) {
449 Console.WriteLine("ADD EVENT");
450 } else if (method == ei.GetRemoveMethod()) {
451 Console.WriteLine("REMOVE EVENT");
452 }
453 }
454 }
455 }*/
456 MethodCallRequest call = new MethodCallRequest() { Object = obj, Type = method.DeclaringType, MethodName = method.Name, Arguments = args };
457 call.MethodSignature = ConvertMethodParameterTypeArray(method);
458 MethodCallResponse resp = (MethodCallResponse)SyncCall(call);
459 if (resp.Exception != null) throw new Exception("Remote exception", resp.Exception);
460 ProxyCallFixReturnArguments(args, resp.ReturnArguments);
461 ProxyCallCheckReturnType(resp.ReturnValue, method.ReturnType);
462 return resp.ReturnValue;
463 }
464 private Boolean ProxyCanCastTo(IProxyBase obj, Type type) {
465 return (Boolean)SyncCall(new ObjectCanCastToRequest() { Object = obj, Type = type });
466 }
467 private void ProxyReleaseObject(ProxyObjectReference objref) {
468 if (objref == null) return;
469 ProxyReleaseObject(objref, objref.RefCnt);
470 }
471 private void ProxyReleaseObject(ProxyObjectReference objref, int refcnt) {
472 lock (ObjectReferencesByID) {
473 int newrefcnt = objref.Release(refcnt);
474 IObjectReferenceBase regobjref;
475 if (newrefcnt <= 0 && ObjectReferencesByID.TryGetValue(objref.ID, out regobjref) && objref == regobjref) ObjectReferencesByID.Remove(objref.ID);
476 }
477 try {
478 if (Closed) return;
479 SendObject(new ReferenceReleaseRequest() { ObjectID = objref.RemoteID, ReferenceCount = refcnt });
480 } catch (Exception) { } //Assume the exception happened because the connection is closed. Otherwise memory leak...
481 }
482
483 class PendingRemoteCall {
484 public UInt32 ID = 0;
485 public ManualResetEvent WaitHandle = null;
486 public Object Response = null;
487 public Exception Error = null;
488 public Boolean Completed = false;
489 public SynchronousCall? NextCall = null;
490 public Object CallData = null;
491
492 public void SetError(Exception error) {
493 this.Error = error;
494 Completed = true;
495 if (WaitHandle != null) WaitHandle.Set();
496 }
497 public void SetResult(Object result) {
498 this.Response = result;
499 Completed = true;
500 if (WaitHandle != null) WaitHandle.Set();
501 }
502 public Boolean SetNextCall(SynchronousCall nextcall) {
503 if (WaitHandle == null) return false;
504 this.NextCall = nextcall;
505 WaitHandle.Set();
506 return true;
507 }
508 }
509 #endregion
510
511 #region Object serialization support
512 private void Serialize(Stream stream, Object obj) {
513 BinaryWriter writer = new BinaryWriter(stream);
514 Serialize(writer, obj);
515 writer.Flush();
516 }
517 private void Serialize(BinaryWriter writer, Object obj) {
518 if (ReferenceEquals(obj, null)) {
519 writer.Write((Byte)0);
520 } else if (GetRealProxyForObject(obj) != null) {
521 GetObjectData(obj, writer);
522 } else {
523 Type type = obj.GetType();
524 if (type == typeof(Boolean)) {
525 writer.Write((Byte)1);
526 writer.Write((Boolean)obj);
527 } else if (type == typeof(Byte)) {
528 writer.Write((Byte)2);
529 writer.Write((Byte)obj);
530 } else if (type == typeof(Char)) {
531 writer.Write((Byte)3);
532 writer.Write((Int16)obj);
533 } else if (type == typeof(Decimal)) {
534 writer.Write((Byte)4);
535 writer.Write((Decimal)obj);
536 } else if (type == typeof(Double)) {
537 writer.Write((Byte)5);
538 writer.Write((Double)obj);
539 } else if (type == typeof(Int16)) {
540 writer.Write((Byte)6);
541 writer.Write((Int16)obj);
542 } else if (type == typeof(Int32)) {
543 writer.Write((Byte)7);
544 writer.Write((Int32)obj);
545 } else if (type == typeof(Int64)) {
546 writer.Write((Byte)8);
547 writer.Write((Int64)obj);
548 } else if (type == typeof(SByte)) {
549 writer.Write((Byte)9);
550 writer.Write((SByte)obj);
551 } else if (type == typeof(Single)) {
552 writer.Write((Byte)10);
553 writer.Write((Single)obj);
554 } else if (type == typeof(String)) {
555 writer.Write((Byte)11);
556 writer.Write((String)obj);
557 } else if (type == typeof(UInt16)) {
558 writer.Write((Byte)12);
559 writer.Write((UInt16)obj);
560 } else if (type == typeof(UInt32)) {
561 writer.Write((Byte)13);
562 writer.Write((UInt32)obj);
563 } else if (type == typeof(UInt64)) {
564 writer.Write((Byte)14);
565 writer.Write((UInt64)obj);
566 } else if (type.IsSubclassOf(typeof(Type))) {
567 writer.Write((Byte)165);
568 SerializeType(writer, (Type)obj);
569 } else if (type.IsArray) {
570 writer.Write((Byte)164);
571 Array arr = (Array)obj;
572 writer.Write((int)arr.Length);
573 SerializeType(writer, type.GetElementType());
574 for (int i = 0; i < arr.Length; i++) {
575 Serialize(writer, arr.GetValue(i));
576 }
577 } else if (type.IsPrimitive) {
578 throw new NotSupportedException();
579 } else if (typeof(Delegate).IsAssignableFrom(type) || type.IsMarshalByRef) {
580 GetObjectData(obj, writer);
581 } else if (obj is ISerializable) {
582 SerializationInfo si = new SerializationInfo(type, new FormatterConverter());
583 ((ISerializable)obj).GetObjectData(si, new StreamingContext(StreamingContextStates.All));
584 writer.Write((Byte)166);
585 SerializeType(writer, Type.GetType(si.FullTypeName + "," + si.AssemblyName));
586 writer.Write((int)si.MemberCount);
587 foreach (SerializationEntry se in si) {
588 writer.Write(se.Name);
589 Serialize(writer, se.Value);
590 }
591 } else if (type.IsSerializable) {
592 MemberInfo[] members = FormatterServices.GetSerializableMembers(type);
593 Object[] values = FormatterServices.GetObjectData(obj, members);
594 writer.Write((Byte)128);
595 SerializeType(writer, type);
596 for (int i = 0; i < members.Length; i++) {
597 MemberInfo mi = members[i];
598 writer.Write((Byte)255);
599 writer.Write(mi.Name);
600 Serialize(writer, values[i]);
601 }
602 writer.Write((Byte)0);
603 } else {
604 GetObjectData(obj, writer);
605 }
606 }
607 }
608 private void SerializeType(BinaryWriter writer, Type t) {
609 writer.Write(t.FullName);
610 writer.Write(t.Assembly.FullName);
611 }
612 private Object Deserialize(Stream stream) {
613 BinaryReader reader = new BinaryReader(stream);
614 return Deserialize(reader);
615 }
616 private Object Deserialize(BinaryReader reader) {
617 Byte t = reader.ReadByte();
618 if (t == 0) return null;
619 if (t == 1) return reader.ReadBoolean();
620 if (t == 2) return reader.ReadByte();
621 if (t == 3) return (Char)reader.ReadInt16();
622 if (t == 4) return reader.ReadDecimal();
623 if (t == 5) return reader.ReadDouble();
624 if (t == 6) return reader.ReadInt16();
625 if (t == 7) return reader.ReadInt32();
626 if (t == 8) return reader.ReadInt64();
627 if (t == 9) return reader.ReadSByte();
628 if (t == 10) return reader.ReadSingle();
629 if (t == 11) return reader.ReadString();
630 if (t == 12) return reader.ReadUInt16();
631 if (t == 13) return reader.ReadUInt32();
632 if (t == 14) return reader.ReadUInt64();
633 if (t == 128) {
634 Type type = DeserializeType(reader);
635 Object inst = FormatterServices.GetUninitializedObject(type);
636 List<MemberInfo> members = new List<MemberInfo>();
637 List<Object> values = new List<object>();
638 while (true) {
639 t = reader.ReadByte();
640 if (t == 0) {
641 break;
642 } else if (t == 255) {
643 String mname = reader.ReadString();
644 Object value = Deserialize(reader);
645 MemberInfo[] mms = type.GetMember(mname, BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
646 if (mms.Length != 1) throw new InvalidOperationException();
647 members.Add(mms[0]);
648 values.Add(value);
649 } else throw new InvalidDataException();
650 }
651 FormatterServices.PopulateObjectMembers(inst, members.ToArray(), values.ToArray());
652 return inst;
653 }
654 if (t == 129 || t == 130 || t == 131) {
655 return SetObjectData(t, reader);
656 }
657 if (t == 164) {
658 int len = reader.ReadInt32();
659 Type type = DeserializeType(reader); Array arr = Array.CreateInstance(type, len);
660 for (int i = 0; i < len; i++) arr.SetValue(Deserialize(reader), i);
661 return arr;
662 }
663 if (t == 165) {
664 return DeserializeType(reader);
665 }
666 if (t == 166) {
667 Type type = DeserializeType(reader);
668 SerializationInfo si = new SerializationInfo(type, new FormatterConverter());
669 int cnt = reader.ReadInt32();
670 for (int i = 0; i < cnt; i++) {
671 String name = reader.ReadString();
672 Object sub = Deserialize(reader);
673 si.AddValue(name, sub);
674 }
675 StreamingContext sc = new StreamingContext(StreamingContextStates.All);
676 Object obj = Activator.CreateInstance(type, BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance, null, new Object[] { si, sc }, null);
677 IObjectReference objref = obj as IObjectReference;
678 if (objref != null) obj = objref.GetRealObject(sc);
679 return obj;
680 }
681 throw new NotSupportedException();
682 }
683 private Type DeserializeType(BinaryReader reader) {
684 String name = reader.ReadString();
685 String aname = reader.ReadString();
686 Type t = Type.GetType(name + ", " + aname, false);
687 if (t != null) return t;
688 foreach (Assembly assembly in AppDomain.CurrentDomain.GetAssemblies()) {
689 t = assembly.GetType(name, false);
690 if (t != null) return t;
691 }
692 return t;
693 }
694
695 Dictionary<UInt32, IObjectReferenceBase> ObjectReferencesByID = new Dictionary<uint, IObjectReferenceBase>();
696 List<RemoteObjectReference> RemoteObjectReferences = new List<RemoteObjectReference>();
697 uint remoteReferenceIndex = 0;
698 void GetObjectData(object obj, BinaryWriter w) {
699 IObjectReferenceBase rref = null;
700 IProxyBase proxy = GetRealProxyForObject(obj);
701 if (proxy != null && proxy.Manager == this) {
702 rref = proxy.ObjectReference;
703 w.Write((Byte)131);
704 w.Write((UInt32)rref.RemoteID);
705 } else {
706 Boolean isDelegate = obj is Delegate;
707 lock (ObjectReferencesByID) {
708 foreach (IObjectReferenceBase objref in RemoteObjectReferences) {
709 Object other = objref.Target;
710 if (ReferenceEquals(other, obj) || (isDelegate && Delegate.Equals(other, obj))) {
711 rref = objref;
712 break;
713 }
714 }
715 if (rref == null) {
716 UInt32 objid;
717 lock (ObjectReferencesByID) {
718 while (true) {
719 remoteReferenceIndex++;
720 objid = remoteReferenceIndex;
721 if ((objid & 0x80000000) != 0) {
722 remoteReferenceIndex = 0;
723 continue;
724 }
725 if (!ObjectReferencesByID.ContainsKey(objid)) break;
726 Monitor.Wait(ObjectReferencesByID);
727 }
728 }
729 rref = new RemoteObjectReference(objid, obj);
730 ObjectReferencesByID.Add(objid, rref);
731 RemoteObjectReferences.Add((RemoteObjectReference)rref);
732 }
733 rref.AddRef();
734 }
735 if (isDelegate) {
736 w.Write((Byte)130);
737 w.Write((UInt32)rref.RemoteID);
738 SerializeType(w, obj.GetType());
739 } else {
740 w.Write((Byte)129);
741 w.Write((UInt32)rref.RemoteID);
742 }
743 }
744 }
745 object SetObjectData(Byte t, BinaryReader r) {
746 UInt32 objid = r.ReadUInt32();
747 Type deltype = null;
748 if (t == 130) deltype = DeserializeType(r);
749 lock (ObjectReferencesByID) {
750 Object target = null;
751 IObjectReferenceBase rref;
752 if (ObjectReferencesByID.TryGetValue(objid, out rref)) target = rref.Target;
753 if (t == 131) {
754 if (target == null) throw new InvalidOperationException("Object not found");
755 } else {
756 if (target == null) {
757 if ((objid & 0x80000000) == 0) throw new InvalidOperationException("The Object ID is invalid");
758 IProxyBase proxy;
759 if (t == 130) {
760 proxy = CreateDelegateProxy(objid, deltype);
761 } else {
762 String[] iftypes = null; //(String[])info.GetValue("InterfaceTypes", typeof(String[]));
763 proxy = CreateObjectProxy(objid, iftypes);
764 }
765 rref = proxy.ObjectReference;
766 ObjectReferencesByID[objid] = rref;
767 target = proxy.GetTransparentProxy();
768 }
769 rref.AddRef();
770 }
771 return target;
772 }
773 }
774 #endregion
775
776 #region RPC messages
777 [Serializable]
778 struct SynchronousCall {
779 public UInt32 CallID;
780 public Boolean IsResult;
781 public Object Data;
782 public Boolean HasPreviousCallID;
783 public UInt32 PreviousCallID;
784 }
785
786 [Serializable]
787 struct MethodCallRequest {
788 public Object Object;
789 public Type Type;
790 public String MethodName;
791 public Type[] MethodSignature;
792 public Object[] Arguments;
793 }
794 [Serializable]
795 struct MethodCallResponse {
796 public Exception Exception;
797 public Object ReturnValue;
798 public Object[] ReturnArguments;
799 }
800 [Serializable]
801 struct PropertyAccessRequest {
802 public Object Object;
803 public Type Type;
804 public String PropertyName;
805 public Boolean SetValue;
806 public Object Value;
807 }
808 [Serializable]
809 struct ReferenceReleaseRequest {
810 public UInt32 ObjectID;
811 public Int32 ReferenceCount;
812 }
813 [Serializable]
814 struct GetRootRequest {
815 }
816 [Serializable]
817 struct ObjectCanCastToRequest {
818 public Object Object;
819 public Type Type;
820 }
821 [Serializable]
822 struct CreateStreamRequest {
823 public StreamChannel StreamObject;
824 public int StreamID;
825 }
826 [Serializable]
827 struct DelegateCallRequest {
828 public Object Delegate;
829 public Object[] Arguments;
830 }
831 #endregion
832
833 #region Proxy magic
834 static IProxyBase GetRealProxyForObject(Object obj) {
835 if (RemotingServices.IsTransparentProxy(obj)) return RemotingServices.GetRealProxy(obj) as FWProxy;
836 IProxyBase obj_IProxyBase = obj as IProxyBase;
837 if (obj_IProxyBase != null) return obj_IProxyBase;
838 Delegate obj_Delegate = obj as Delegate;
839 if (obj_Delegate != null) {
840 DelegateProxy pb = obj_Delegate.Target as DelegateProxy;
841 if (pb != null) return pb;
842 }
843 return null;
844 }
845 public static RemotingManager GetManagerForObjectProxy(Object obj) {
846 IProxyBase prox = GetRealProxyForObject(obj);
847 if (prox == null) return null;
848 return prox.Manager;
849 }
850
851 interface IObjectReferenceBase {
852 UInt32 ID { get; }
853 Object Target { get; }
854 int AddRef();
855 int RefCnt { get; }
856 Boolean IsLocal { get; }
857 UInt32 RemoteID { get; }
858 }
859
860 class RemoteObjectReference : IObjectReferenceBase {
861 int refcnt = 0;
862 public UInt32 ID { get; private set; }
863 public Object Target { get; private set; }
864 public int RefCnt { get { return refcnt; } }
865 public int AddRef() { return Interlocked.Increment(ref refcnt); }
866 public int Release(int count) { return Interlocked.Add(ref refcnt, -count); }
867 public Boolean IsLocal { get { return true; } }
868 public UInt32 RemoteID { get { return ID | 0x80000000; } }
869 public RemoteObjectReference(UInt32 id, Object obj) {
870 if ((id & 0x80000000) != 0) throw new InvalidOperationException("The Object ID is invalid");
871 this.ID = id;
872 this.Target = obj;
873 }
874 }
875
876 interface IProxyBase {
877 RemotingManager Manager { get; }
878 UInt32 ID { get; }
879 ProxyObjectReference ObjectReference { get; }
880 Object GetTransparentProxy();
881 }
882 class ProxyObjectReference : IObjectReferenceBase {
883 int refcnt = 0;
884 WeakReference targetref;
885 public UInt32 ID { get; private set; }
886 public RemotingManager Manager { get; private set; }
887 public Object Target {
888 get {
889 IProxyBase proxy = this.Proxy;
890 if (proxy == null) return null;
891 return proxy.GetTransparentProxy();
892 }
893 }
894 public IProxyBase Proxy {
895 get {
896 if (targetref == null) return null;
897 return (IProxyBase)targetref.Target;
898 }
899 }
900 public int RefCnt { get { return refcnt; } }
901 public int AddRef() {
902 int newcnt = Interlocked.Increment(ref refcnt);
903 if (newcnt > 10000 && Interlocked.CompareExchange(ref refcnt, 1, newcnt) == newcnt) {
904 Manager.ProxyReleaseObject(this, newcnt - 1);
905 newcnt = 1;
906 }
907 return newcnt;
908 }
909 public int Release(int count) { return Interlocked.Add(ref refcnt, -count); }
910 public Boolean IsLocal { get { return false; } }
911 public UInt32 RemoteID { get { return ID & 0x7FFFFFFF; } }
912 public ProxyObjectReference(IProxyBase proxy) {
913 this.Manager = proxy.Manager;
914 this.ID = proxy.ID;
915 if ((ID & 0x80000000) == 0) throw new InvalidOperationException("The Object ID is invalid");
916 targetref = new WeakReference(proxy);
917 }
918 }
919 class DelegateProxy : IProxyBase {
920 public ProxyObjectReference ObjectReference { get; private set; }
921 public RemotingManager Manager { get; private set; }
922 public UInt32 ID { get; private set; }
923 public Delegate Target { get; private set; }
924 public MethodInfo MethodSignature { get; private set; }
925 public Object GetTransparentProxy() { return Target; }
926
927 public DelegateProxy(RemotingManager manager, UInt32 objid, MethodInfo methodinfo, Type delegateType, DynamicMethod methodBuilder) {
928 this.Manager = manager;
929 this.ID = objid;
930 this.MethodSignature = methodinfo;
931 Delegate mi = methodBuilder.CreateDelegate(delegateType, this);
932 ObjectReference = new ProxyObjectReference(this);
933 }
934 ~DelegateProxy() {
935 Manager.ProxyReleaseObject(ObjectReference);
936 }
937 private Object DoCall(Object[] args) {
938 return Manager.ProxyMakeCallDelegate(this, args);
939 }
940 }
941 class FWProxy : RealProxy, IRemotingTypeInfo, IProxyBase {
942 public RemotingManager Manager { get; private set; }
943 public UInt32 ID { get; private set; }
944 public ProxyObjectReference ObjectReference { get; private set; }
945
946 public FWProxy(RemotingManager manager, UInt32 objid) : base(typeof(MarshalByRefObject)) {
947 this.Manager = manager;
948 this.ID = objid;
949 this.ObjectReference = new ProxyObjectReference(this);
950 }
951
952 ~FWProxy() {
953 Manager.ProxyReleaseObject(ObjectReference);
954 }
955
956 public string TypeName { get; set; }
957
958 public override IMessage Invoke(IMessage msg) {
959 IMethodCallMessage methodCallMessage = msg as IMethodCallMessage;
960 if (methodCallMessage != null) {
961 Object r = Manager.ProxyMakeCall(this, (MethodInfo)methodCallMessage.MethodBase, methodCallMessage.Args);
962 return new ReturnMessage(r, null, 0, null, methodCallMessage);
963 }
964 throw new NotImplementedException();
965 }
966
967 public bool CanCastTo(Type fromType, object o) {
968 if (fromType == typeof(ISerializable)) return false;
969 return Manager.ProxyCanCastTo(this, fromType);
970 }
971 }
972 class ProxyBase : IProxyBase {
973 public RemotingManager Manager { get; private set; }
974 public UInt32 ID { get; private set; }
975 public ProxyObjectReference ObjectReference { get; private set; }
976 public Object GetTransparentProxy() { return this; }
977
978 public void Init(RemotingManager manager, UInt32 objid) {
979 this.Manager = manager;
980 this.ID = objid;
981 this.ObjectReference = new ProxyObjectReference(this);
982 }
983
984 protected Object DoCall(RuntimeMethodHandle methodh, Object[] args) {
985 MethodInfo meth = (MethodInfo)MethodInfo.GetMethodFromHandle(methodh);
986 return Manager.ProxyMakeCall(this, meth, args);
987 }
988
989 ~ProxyBase() {
990 Manager.ProxyReleaseObject(ObjectReference);
991 }
992 }
993
994 private IProxyBase CreateDelegateProxy(UInt32 objid, Type deltype) {
995 MethodInfo newMethod = deltype.GetMethod("Invoke");
996 ParameterInfo[] parameters = newMethod.GetParameters();
997 Type[] mparams = ArrayUtil.Merge(new Type[1] { typeof(DelegateProxy) }, Array.ConvertAll(parameters, delegate(ParameterInfo pi) { return pi.ParameterType; }));
998 DynamicMethod methodBuilder = new DynamicMethod(String.Empty, newMethod.ReturnType, mparams, typeof(DelegateProxy));
999 ILGenerator ilGenerator = methodBuilder.GetILGenerator();
1000 GenerateProxyMethodCode(ilGenerator, parameters, newMethod.ReturnType, typeof(DelegateProxy), "DoCall", null);
1001 return new DelegateProxy(this, objid, newMethod, deltype, methodBuilder);
1002 }
1003
1004 private IProxyBase CreateObjectProxy(UInt32 objid, String[] typeNames) {
1005 DebugLog("Create proxy for remote object {0}", objid);
1006 IProxyBase proxy;
1007 if (true) {
1008 proxy = new FWProxy(this, objid);
1009 } else {
1010 Type[] types = new Type[typeNames.Length];
1011 int j = 0;
1012 for (int i = 0; i < typeNames.Length; i++) {
1013 Type t = Type.GetType(typeNames[i], false);
1014 if (t == null || !t.IsInterface) continue;
1015 types[j] = t;
1016 j++;
1017 }
1018 Array.Resize(ref types, j);
1019 Type proxyType = GetProxyType(types);
1020 proxy = (ProxyBase)Activator.CreateInstance(proxyType);
1021 ((ProxyBase)proxy).Init(this, objid);
1022 }
1023 return proxy;
1024 }
1025
1026 private static ModuleBuilder moduleBuilder = null;
1027 private static Dictionary<String, Type> proxyCache = null;
1028 private static Type GetProxyType(Type[] interfaceTypes) {
1029 Type proxyType;
1030 String key = String.Join("&", Array.ConvertAll(interfaceTypes, delegate(Type t) { return t.Name; }));
1031 lock (typeof(RemotingManager)) if (proxyCache == null) proxyCache = new Dictionary<String, Type>();
1032 lock (proxyCache) {
1033 if (!proxyCache.TryGetValue(key, out proxyType)) {
1034 proxyType = GenerateProxyType(key, interfaceTypes);
1035 proxyCache.Add(key, proxyType);
1036 }
1037 }
1038 return proxyType;
1039 }
1040 private static Type GenerateProxyType(String name, Type[] interfaceTypes) {
1041 lock (typeof(RemotingManager)) {
1042 if (moduleBuilder == null) {
1043 AssemblyBuilder assembly = AppDomain.CurrentDomain.DefineDynamicAssembly(new AssemblyName("UCIS.Remoting.Proxies"), AssemblyBuilderAccess.Run);
1044 moduleBuilder = assembly.DefineDynamicModule("UCIS.Remoting.Proxies", false);
1045 }
1046 }
1047 TypeBuilder typeBuilder = moduleBuilder.DefineType(
1048 name.Length == 0 ? "UndefinedProxy" : name, //mono does not like types with no name!
1049 TypeAttributes.NotPublic | TypeAttributes.Sealed,
1050 typeof(ProxyBase),
1051 interfaceTypes);
1052 foreach (Type interfaceType in interfaceTypes) {
1053 foreach (MethodInfo method in interfaceType.GetMethods()) {
1054 GenerateProxyMethod(typeBuilder, method);
1055 }
1056 }
1057 return typeBuilder.CreateType();
1058 }
1059 private static void GenerateProxyMethod(TypeBuilder typeBuilder, MethodInfo newMethod) {
1060 if (newMethod.IsGenericMethod) newMethod = newMethod.GetGenericMethodDefinition();
1061 ParameterInfo[] parameters = newMethod.GetParameters();
1062 Type[] parameterTypes = Array.ConvertAll(parameters, delegate(ParameterInfo parameter) { return parameter.ParameterType; });
1063
1064 MethodBuilder methodBuilder = typeBuilder.DefineMethod(
1065 "Impl_" + newMethod.DeclaringType.Name + "_" + newMethod.Name,
1066 MethodAttributes.Public | MethodAttributes.Virtual,
1067 newMethod.ReturnType,
1068 parameterTypes);
1069 typeBuilder.DefineMethodOverride(methodBuilder, newMethod);
1070
1071 if (newMethod.IsGenericMethod) {
1072 methodBuilder.DefineGenericParameters(Array.ConvertAll(newMethod.GetGenericArguments(), delegate(Type type) { return type.Name; }));
1073 }
1074
1075 ILGenerator ilGenerator = methodBuilder.GetILGenerator();
1076 GenerateProxyMethodCode(ilGenerator, parameters, newMethod.ReturnType, typeof(ProxyBase), "DoCall", newMethod);
1077 }
1078 private static void GenerateProxyMethodCode(ILGenerator ilGenerator, ParameterInfo[] parameters, Type returnType, Type baseType, String baseMethod, MethodInfo methodRef) {
1079 LocalBuilder localBuilder = ilGenerator.DeclareLocal(typeof(Object[]));
1080 ilGenerator.Emit(OpCodes.Ldc_I4, parameters.Length);
1081 ilGenerator.Emit(OpCodes.Newarr, typeof(Object));
1082 ilGenerator.Emit(OpCodes.Stloc, localBuilder);
1083 for (int i = 0; i < parameters.Length; i++) {
1084 if (parameters[i].ParameterType.IsByRef) continue;
1085 ilGenerator.Emit(OpCodes.Ldloc, localBuilder);
1086 ilGenerator.Emit(OpCodes.Ldc_I4, i);
1087 ilGenerator.Emit(OpCodes.Ldarg, i + 1);
1088 if (parameters[i].ParameterType.IsValueType) ilGenerator.Emit(OpCodes.Box, parameters[i].ParameterType);
1089 ilGenerator.Emit(OpCodes.Stelem_Ref);
1090 }
1091 ilGenerator.Emit(OpCodes.Ldarg_0);
1092 if (methodRef != null) ilGenerator.Emit(OpCodes.Ldtoken, methodRef);
1093 ilGenerator.Emit(OpCodes.Ldloc, localBuilder);
1094 ilGenerator.Emit(OpCodes.Call, baseType.GetMethod(baseMethod, BindingFlags.InvokeMethod | BindingFlags.Instance | BindingFlags.NonPublic));
1095 if (returnType == typeof(void)) {
1096 ilGenerator.Emit(OpCodes.Pop);
1097 } else if (returnType.IsValueType) {
1098 ilGenerator.Emit(OpCodes.Unbox_Any, returnType);
1099 }
1100 ilGenerator.Emit(OpCodes.Ret);
1101 }
1102 #endregion
1103 }
1104 }