unit uROThreadPool; {----------------------------------------------------------------------------} { RemObjects SDK Library - Core Library } { } { compiler: Delphi 5 and up, Kylix 2 and up } { platform: Win32, Linux } { } { (c)opyright RemObjects Software. all rights reserved. } { } { Using this code requires a valid license of the RemObjects SDK } { which can be obtained at http://www.remobjects.com. } {----------------------------------------------------------------------------} {$I RemObjects.inc} interface uses SysUtils, Classes, SyncObjs, uROClasses {$IFDEF WIN32}, Windows{$ENDIF} {$IFDEF LINUX}, Libc{$ENDIF} {$IFDEF REMOBJECTS_TRIAL}, uROTrial {$ENDIF} ; type // syncobjs TROSemaphore = class; TROThreadPool = class; IROThreadPoolCallback = interface ['{0F3386BB-0B6E-4070-93D5-BAA5CD287608}'] procedure Callback(Caller: TROThreadPool; aThread: TThread); end; TROPooledThread = class(TThread) private fOwner: TROThreadPool; public constructor Create(aOwner: TROThreadPool); destructor Destroy; override; procedure Execute; override; procedure Synchronize(Method: TThreadMethod); // protected procedure Stop; end; TROThreadPool = class(TComponent) private fMaxThreads, fMaxQueue, fPoolThreads, fWorkingCount, fThreadCount: Integer; fSemaphore: TROSemaphore; fQueue: TInterfaceList; fThreads: TThreadList; public constructor Create(AOwner: TComponent); override; destructor Destroy; override; function GetCurrQueueSize: Integer; procedure QueueItem(Callback: IROThreadPoolCallback); published property MaxQueue: Integer read fMaxQueue write fMaxQueue; property PoolThreads: Integer read fPoolThreads write fPoolThreads; property MaxThreads: Integer read FMaxThreads write fMaxThreads; end; TROSemaphore = class private {$IFDEF LINUX} fHandle: TSemaphore; fFreed: Boolean; {$ENDIF} {$IFDEF WIN32} fHandle: THandle; {$ENDIF} public constructor Create; destructor Destroy; override; procedure CloseHandle; procedure Release(Count: Integer); overload; procedure Release; overload; function WaitFor(Timeout: Cardinal): Boolean; end; implementation { TROPooledThread } constructor TROPooledThread.Create(aOwner: TROThreadPool); begin inherited Create(true); FreeOnTerminate := True; fOwner := aOwner; fOwner.fThreads.Add(Self); Resume; end; destructor TROPooledThread.Destroy; begin Stop; fOwner.fThreads.Remove(Self); inherited Destroy; end; procedure TROPooledThread.Execute; var lCurrentItem: IROThreadPoolCallback; lUnk: IUnknown; begin FreeOnTerminate := True; while not Terminated do begin if not fOwner.fSemaphore.WaitFor(Cardinal(-1)) then exit; fOwner.fQueue.Lock; try if fOwner.fQueue.Count > 0 then begin lUnk := fOwner.fQueue[0]; lCurrentItem := lUnk as IROThreadPoolCallback; lUnk := nil; // delphi will keep a temp var in memory too long if we don't create it ourselves fOwner.fQueue.Delete(0); end else lCurrentItem := nil; finally fOwner.fQueue.Unlock; end; if lCurrentItem <> nil then begin InterlockedIncrement(fOwner.fWorkingCount); try lCurrentItem.Callback(fOwner, Self); except // we cannot let exceptions escape end; InterlockedDecrement(fOwner.fWorkingCount); lCurrentItem := nil; end; end; end; procedure TROPooledThread.Stop; begin Terminate; end; procedure TROPooledThread.Synchronize(Method: TThreadMethod); begin inherited Synchronize(Method); end; { TROThreadPool } constructor TROThreadPool.Create(AOwner: TComponent); begin inherited Create(aOwner); fSemaphore := TROSemaphore.Create; fQueue := TInterfaceList.Create; fThreads := TThreadList.Create; fMaxQueue := 15; fMaxThreads := 10; fPoolThreads := 5; end; destructor TROThreadPool.Destroy; var i: Integer; lList: TList; begin lList := fThreads.LockList; try for i := lList.Count -1 downto 0 do TROPooledThread(lList[i]).Stop; fSemaphore.Release(lList.Count +1); finally fThreads.UnlockList; end; while true do begin lList := fThreads.LockList; try if lList.Count = 0 then break; finally fThreads.UnlockList; end; Sleep(50); end; fSemaphore.Free; fThreads.Free; fQueue.Free; inherited Destroy; end; function TROThreadPool.GetCurrQueueSize: Integer; begin result := fQueue.Count; end; procedure TROThreadPool.QueueItem(Callback: IROThreadPoolCallback); var lUsage: Integer; begin fQueue.Lock; try lUsage := fQueue.Count + fWorkingCount + 1; if lUsage >= fMaxQueue * fMaxThreads then raise EROException.Create('Not enough threads available for request'); if fThreadCount > fPoolThreads then lUsage := lUsage div 2; // we only need if (lUsage >= fThreadCount) and (fThreadCount < fMaxThreads) then begin fThreadCount := fThreadCount + 1; fThreads.Add(TROPooledThread.Create(self)); end; fQueue.Add(Callback); fSemaphore.Release; finally fQueue.Unlock; end; end; { TROSemaphore } procedure TROSemaphore.CloseHandle; begin {$IFDEF WIN32} if fHandle <> 0 then begin Windows.CloseHandle(fHandle); fHandle := 0; end; {$ENDIF} {$IFDEF LINUX} if fFreed then exit; fFreed := true; sem_destroy(fHandle); {$ENDIF} end; constructor TROSemaphore.Create; begin inherited Create; {$IFDEF LINUX} sem_init(fHandle,{$IFDEF FPC}0{$ELSE}false{$ENDIF}, 0); {$ENDIF} {$IFDEF WIN32} fHandle := CreateSemaphore(nil, 0, MaxInt -1, nil); {$ENDIF} end; destructor TROSemaphore.Destroy; begin CloseHandle; inherited; end; procedure TROSemaphore.Release(Count: Integer); begin {$IFDEF WIN32} ReleaseSemaphore(fHandle, Count, nil); {$ENDIF} {$IFDEF LINUX} while Count > 0 do begin Count := count -1; Release; end; {$ENDIF} end; procedure TROSemaphore.Release; begin {$IFDEF WIN32} ReleaseSemaphore(fHandle, 1, nil); {$ENDIF} {$IFDEF LINUX} sem_Post(fHandle); {$ENDIF} end; function TROSemaphore.WaitFor(Timeout: Cardinal): Boolean; {$IFDEF LINUX} var inttimeout: Timespec; {$ENDIF} begin {$IFDEF WIN32} result := WaitForSingleObject(fHandle, Timeout) = WAIT_OBJECT_0; {$ENDIF} {$IFDEF LINUX} inttimeout.tv_sec := __time(nil) + (timeout div 1000); inttimeout.tv_nsec := (timeout mod 1000); result := sem_timedwait(fHandle, {$IFDEF FPC}@{$ENDIF}inttimeout) = 0; {$ENDIF} end; end.