Mercurial > hg > ucis.core
changeset 66:b7bc27c6734e
Added WorkQueue class
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Tue, 15 Oct 2013 16:22:54 +0200 |
parents | abe0d55a2201 |
children | 2d16447eff12 |
files | UCIS.Core.csproj Util/WorkQueue.cs |
diffstat | 2 files changed, 98 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/UCIS.Core.csproj Tue Oct 15 16:19:45 2013 +0200 +++ b/UCIS.Core.csproj Tue Oct 15 16:22:54 2013 +0200 @@ -175,6 +175,7 @@ <Compile Include="Util\PrebufferingStream.cs" /> <Compile Include="Util\QueuedPacketStream.cs" /> <Compile Include="Util\TapeArchive.cs" /> + <Compile Include="Util\WorkQueue.cs" /> <Compile Include="VNCServer\IFramebuffer.cs" /> <Compile Include="VNCServer\VNCServer.cs" /> <Compile Include="Windows\ServiceManager.cs" />
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Util/WorkQueue.cs Tue Oct 15 16:22:54 2013 +0200 @@ -0,0 +1,97 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Windows.Forms; +using SysThreadPool = System.Threading.ThreadPool; + +namespace UCIS.Util { + public class WorkQueue : WorkQueue<MethodInvoker> { + public WorkQueue() : base(Handler) { } + private static void Handler(MethodInvoker item) { item(); } + } + public class WorkQueue<TWork> : IDisposable { + Queue<TWork> queue = new Queue<TWork>(); + Action<TWork> callback = null; + int maxIdleWorkers = 0; + int maxWorkers = 1; + int idleWorkers = 0; + int workers = 0; + + public Boolean UseFrameworkThreadpool { get; set; } + + public WorkQueue(Action<TWork> callback) { + this.callback = callback; + UseFrameworkThreadpool = true; + } + public void Dispose() { + maxWorkers = 0; + lock (queue) Monitor.PulseAll(queue); + } + + public int MaxIdleWorkers { + get { return maxIdleWorkers; } + set { + maxIdleWorkers = value; + lock (queue) Monitor.PulseAll(queue); + } + } + public int MaxWorkers { + get { return maxWorkers; } + set { + maxWorkers = value; + lock (queue) Monitor.PulseAll(queue); + } + } + public int TotalWorkers { get { return workers; } } + public int IdleWorkers { get { return idleWorkers; } } + + public void Enqueue(TWork item) { + lock (queue) { + queue.Enqueue(item); + Monitor.Pulse(queue); + if (workers < maxWorkers && idleWorkers == 0) StartWorker(); + } + } + public void Clear() { lock (queue) queue.Clear(); } + public int Count { get { lock (queue) return queue.Count; } } + + private void StartWorker() { + lock (queue) { + if (workers >= maxWorkers) return; + if (UseFrameworkThreadpool) { + SysThreadPool.QueueUserWorkItem(Worker); + } else { + (new Thread(Worker)).Start(); + } + workers++; + } + } + private void RaiseEvent(Action<WorkQueue<TWork>> callback) { + if (callback != null) callback(this); + } + private void Worker(Object state) { + while (true) { + TWork item; + lock (queue) { + if (workers >= maxWorkers) { + workers--; + break; + } + if (queue.Count == 0) { + if (idleWorkers >= maxIdleWorkers) { + workers--; + queue.TrimExcess(); + break; + } + idleWorkers++; + Monitor.Wait(queue); + idleWorkers--; + if (queue.Count == 0) continue; + } + item = queue.Dequeue(); + } + callback(item); + } + } + } +}