diff ThreadPool.cs @ 0:3ab940a0c7a0

Initial commit
author Ivo Smits <Ivo@UCIS.nl>
date Tue, 11 Sep 2012 16:28:53 +0200
parents
children 4b78cc5f116b
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/ThreadPool.cs	Tue Sep 11 16:28:53 2012 +0200
@@ -0,0 +1,281 @@
+using System;
+using System.Threading;
+using System.Collections.Generic;
+
+namespace UCIS {
+	public class ThreadPool {
+		private static ThreadPool pManager = null;
+
+		public static ThreadPool DefaultPool {
+			get {
+				if (pManager == null) pManager = new ThreadPool();
+				return pManager;
+			}
+		}
+
+		//Starts a long-term background task
+		public static WorkItem RunTask(WaitCallback Callback, object State) {
+			return DefaultPool.QueueWorkItem(Callback, State);
+		}
+		//Starts a short-term background task
+		public static WorkItem RunCall(WaitCallback Callback, object State) {
+			return DefaultPool.QueueWorkItem(Callback, State);
+		}
+
+
+		public class WorkItem {
+			public WaitCallback Callback { get; internal set; }
+			public object State { get; internal set; }
+			public ThreadInfo Thread { get; internal set; }
+		}
+		public class ThreadInfo {
+			public Thread Thread { get; internal set; }
+			internal AutoResetEvent WaitHandle = new AutoResetEvent(false);
+			public WorkItem WorkItem { get; internal set; }
+			public bool Busy { get; internal set; }
+			public bool Abort { get; internal set; }
+			public DateTime LastActive { get; internal set; }
+		}
+
+		public class ExceptionEventArgs : EventArgs {
+			public ExceptionEventArgs(WorkItem Item, Exception Exception, bool ThrowError) {
+				this.Item = Item;
+				this.Exception = Exception;
+				this.ThrowError = ThrowError;
+			}
+			public WorkItem Item;
+			public Exception Exception;
+			public bool ThrowError;
+		}
+
+		private List<ThreadInfo> pThreads = new List<ThreadInfo>();
+		private List<ThreadInfo> pBusyThreads = new List<ThreadInfo>();
+		private Queue<ThreadInfo> pIdleThreads = new Queue<ThreadInfo>();
+		private int pThreadsMax;
+		private int pThreadsMinIdle;
+		private int pThreadsMaxIdle;
+
+		public event OnExceptionEventHandler OnException;
+		public delegate void OnExceptionEventHandler(ThreadPool sender, ExceptionEventArgs e);
+
+		public System.Collections.ObjectModel.ReadOnlyCollection<ThreadInfo> Threads {
+			get {
+				return new System.Collections.ObjectModel.ReadOnlyCollection<ThreadInfo>(pThreads);
+			}
+		}
+
+		public ThreadPool() : this(250, 1, 5) { }
+
+		public ThreadPool(int MaxThreads, int MinIdle, int MaxIdle) {
+			int I = 0;
+			if (MaxThreads < 0) {
+				throw new ArgumentOutOfRangeException("ThreadsMaxIdle", "ThreadsMaxIdle must greater than 0");
+			} else if (MaxThreads < MaxIdle) {
+				throw new ArgumentOutOfRangeException("ThreadsMax", "ThreadsMax must be greater than or equal to ThreadsMaxIdle");
+			} else if (MaxIdle < 0) {
+				throw new ArgumentOutOfRangeException("ThreadsMaxIdle", "ThreadsMaxIdle must greater than or equal to 0");
+			} else if (MinIdle < 0) {
+				throw new ArgumentOutOfRangeException("ThreadsMinIdle", "ThreadsMinIdle must greater than or equal to 0");
+			} else if (MinIdle > MaxIdle) {
+				throw new ArgumentOutOfRangeException("ThreadsMaxIdle", "ThreadsMaxIdle must be greater than or equal to ThreadsMinIdle");
+			}
+			pThreadsMax = MaxThreads;
+			pThreadsMinIdle = MinIdle;
+			pThreadsMaxIdle = MaxIdle;
+			for (I = 1; I <= pThreadsMinIdle; I++) {
+				StartThread(false);
+			}
+		}
+
+		public int ThreadsIdle {
+			get { return pIdleThreads.Count; }
+		}
+		public int ThreadsBusy {
+			get { return pBusyThreads.Count; }
+		}
+		public int ThreadsAlive {
+			get { return pThreads.Count; }
+		}
+		public int ThreadsMinIdle {
+			get { return pThreadsMinIdle; }
+			set {
+				if (value > pThreadsMaxIdle) {
+					throw new ArgumentOutOfRangeException("ThreadsMinIdle", "ThreadsMinIdle must be smaller than ThreadsMaxIdle");
+				} else if (value < 0) {
+					throw new ArgumentOutOfRangeException("ThreadsMinIdle", "ThreadsMinIdle must greater than or equal to 0");
+				} else {
+					int I = 0;
+					int C = 0;
+					C = pIdleThreads.Count;
+					if (value > C) {
+						for (I = C; I <= value - 1; I++) {
+							StartThread(false);
+						}
+					}
+					pThreadsMinIdle = value;
+				}
+			}
+		}
+		public int ThreadsMaxIdle {
+			get { return pThreadsMaxIdle; }
+			set {
+				if (pThreadsMinIdle > value) {
+					throw new ArgumentOutOfRangeException("ThreadsMaxIdle", "ThreadsMaxIdle must be greater than or equal to ThreadsMinIdle");
+				} else if (value < 0) {
+					throw new ArgumentOutOfRangeException("ThreadsMaxIdle", "ThreadsMaxIdle must greater than or equal to 0");
+				} else {
+					int I = 0;
+					int C = 0;
+					ThreadInfo T = default(ThreadInfo);
+					lock (pIdleThreads) {
+						C = pIdleThreads.Count;
+						if (value < C) {
+							for (I = value; I <= C - 1; I++) {
+								T = pIdleThreads.Dequeue();
+								T.Abort = true;
+								T.WaitHandle.Set();
+							}
+						}
+					}
+					pThreadsMaxIdle = value;
+				}
+			}
+		}
+		public int ThreadsMax {
+			get { return pThreadsMax; }
+			set {
+				if (pThreadsMaxIdle > value) {
+					throw new ArgumentOutOfRangeException("ThreadsMax", "ThreadsMax must be greater than or equal to ThreadsMaxIdle");
+				} else if (value <= 0) {
+					throw new ArgumentOutOfRangeException("ThreadsMax", "ThreadsMax must greater than 0");
+				} else {
+					pThreadsMax = value;
+				}
+			}
+		}
+
+		public WorkItem QueueWorkItem(WaitCallback Callback, object State) {
+			WorkItem WorkItem = new WorkItem();
+			ThreadInfo Thread = null;
+			WorkItem.Callback = Callback;
+			WorkItem.State = State;
+			lock (pIdleThreads) {
+				while (pIdleThreads.Count > 0) {
+					Thread = pIdleThreads.Dequeue();
+					if (!Thread.Abort) {
+						break;
+					} else {
+						Thread = null;
+					}
+				}
+			}
+			if (Thread == null)  {
+				if (pThreads.Count < pThreadsMax) {
+					Thread = StartThread(true);
+				} else {
+					throw new ThreadStateException("Thread limit exceeded");
+				}
+			}
+			Thread.LastActive = DateTime.Now;
+			WorkItem.Thread = Thread;
+			Thread.WorkItem = WorkItem;
+			Thread.WaitHandle.Set();
+			return WorkItem;
+		}
+
+		private ThreadInfo StartThread(bool Reserved) {
+			ThreadInfo Thread = new ThreadInfo();
+			Thread.Thread = new Thread(pWorker);
+			lock (pThreads) {
+				pThreads.Add(Thread);
+				if (!Reserved) pIdleThreads.Enqueue(Thread);
+			}
+			Thread.LastActive = DateTime.Now;
+			Thread.Thread.Start(Thread);
+			return Thread;
+		}
+
+		public void AbortAllThreads() {
+			lock (pIdleThreads) {
+				while (pIdleThreads.Count > 0) {
+					ThreadInfo Thread = pIdleThreads.Dequeue();
+					Thread.Abort = true;
+					Thread.WaitHandle.Set();
+				}
+			}
+			foreach (ThreadInfo Thread in pThreads.ToArray()) {
+				if (Thread != null && !Thread.Abort) {
+					Thread.Thread.Abort();
+					Thread.Abort = true;
+					Thread.WaitHandle.Set();
+				}
+			}
+			pIdleThreads.Clear();
+		}
+
+		//ToDo: add timer to kill old threads periodically
+		public void KillOldThreads() {
+			ThreadInfo Thread;
+			lock (pIdleThreads) {
+				if (pIdleThreads.Count == 0) return;
+				Thread = pIdleThreads.Dequeue();
+			}
+			if (DateTime.Now.Subtract(Thread.LastActive).TotalMinutes > 1) {
+				Thread.Abort = true;
+				Thread.WaitHandle.Set();
+			} else {
+				lock (pIdleThreads) pIdleThreads.Enqueue(Thread);
+			}
+		}
+
+		private void pWorker(object state) {
+			ThreadInfo Thread = (ThreadInfo)state;
+			if (Thread == null) throw new ArgumentNullException("state");
+			try {
+				while (true) {
+					if (Thread.WaitHandle == null) throw new ArgumentNullException("WaitHandle");
+					if (!Thread.WaitHandle.WaitOne(1000, false)) {
+						if (pBusyThreads.Count == 0) {
+							return;
+						} else {
+							continue;
+						}
+					}					
+					if (Thread.Abort) break;
+
+					Thread.Busy = true;
+					lock (pBusyThreads) pBusyThreads.Add(Thread);
+					try {
+						if (Thread.WorkItem == null) throw new ArgumentNullException("WorkItem");
+						if (Thread.WorkItem.Callback == null) throw new ArgumentNullException("WorkItem.Callback");
+						Thread.WorkItem.Callback.Invoke(Thread.WorkItem.State);
+					} catch (ThreadAbortException ex) {
+						ExceptionEventArgs e = new ExceptionEventArgs(Thread.WorkItem, ex, false);
+						if (OnException != null) OnException(this, e);
+						if (e.ThrowError) Console.WriteLine("ThreadAbortException in ThreadPool thread: " + e.Exception.ToString());
+						return;
+					} catch (Exception ex) {
+						ExceptionEventArgs e = new ExceptionEventArgs(Thread.WorkItem, ex, true);
+						if (OnException != null) OnException(this, e);
+						if (e.ThrowError) {
+							Console.WriteLine("Exception in ThreadPool thread: " + e.Exception.ToString());
+							throw new Exception("Unhandled exception in work item", e.Exception);
+						}
+					} finally {
+						lock (pBusyThreads) pBusyThreads.Remove(Thread);
+					}
+					Thread.WorkItem.Thread = null;
+					Thread.WorkItem = null;
+					Thread.Busy = false;
+					lock (pIdleThreads) {
+						if (pIdleThreads.Count >= pThreadsMaxIdle) break;
+						pIdleThreads.Enqueue(Thread);
+					}
+				}
+			} finally {
+				Thread.Abort = true;
+				lock (pThreads) pThreads.Remove(Thread);
+			}
+		}
+	}
+}