Componentes.Terceros.RemObj.../internal/5.0.23.613/1/RemObjects SDK for Delphi/Source/uROThreadPool.pas
david f0e35ec439 - Eliminadas las librerías para Delphi 6 (Dcu\D6) en RO y DA.
- Recompilación de RO para poner RemObjects_Core_D10 como paquete de runtime/designtime.

git-svn-id: https://192.168.0.254/svn/Componentes.Terceros.RemObjects@3 b6239004-a887-0f4b-9937-50029ccdca16
2007-09-10 10:40:17 +00:00

302 lines
7.1 KiB
ObjectPascal

unit uROThreadPool;
{----------------------------------------------------------------------------}
{ RemObjects SDK Library - Core Library }
{ }
{ compiler: Delphi 5 and up, Kylix 2 and up }
{ platform: Win32, Linux }
{ }
{ (c)opyright RemObjects Software. all rights reserved. }
{ }
{ Using this code requires a valid license of the RemObjects SDK }
{ which can be obtained at http://www.remobjects.com. }
{----------------------------------------------------------------------------}
{$I RemObjects.inc}
interface
uses
SysUtils, Classes, SyncObjs,
uROClasses
{$IFDEF WIN32}, Windows{$ENDIF}
{$IFDEF LINUX}, Libc{$ENDIF}
{$IFDEF REMOBJECTS_TRIAL}, uROTrial {$ENDIF}
;
type // syncobjs
TROSemaphore = class;
TROThreadPool = class;
IROThreadPoolCallback = interface
['{0F3386BB-0B6E-4070-93D5-BAA5CD287608}']
procedure Callback(Caller: TROThreadPool; aThread: TThread);
end;
TROPooledThread = class(TThread)
private
fOwner: TROThreadPool;
public
constructor Create(aOwner: TROThreadPool);
destructor Destroy; override;
procedure Execute; override;
procedure Synchronize(Method: TThreadMethod); // protected
procedure Stop;
end;
TROThreadPool = class(TComponent)
private
fMaxThreads,
fMaxQueue,
fPoolThreads,
fWorkingCount,
fThreadCount: Integer;
fSemaphore: TROSemaphore;
fQueue: TInterfaceList;
fThreads: TThreadList;
public
constructor Create(AOwner: TComponent); override;
destructor Destroy; override;
function GetCurrQueueSize: Integer;
procedure QueueItem(Callback: IROThreadPoolCallback);
published
property MaxQueue: Integer read fMaxQueue write fMaxQueue;
property PoolThreads: Integer read fPoolThreads write fPoolThreads;
property MaxThreads: Integer read FMaxThreads write fMaxThreads;
end;
TROSemaphore = class
private
{$IFDEF LINUX}
fHandle: TSemaphore;
fFreed: Boolean;
{$ENDIF}
{$IFDEF WIN32}
fHandle: THandle;
{$ENDIF}
public
constructor Create;
destructor Destroy; override;
procedure CloseHandle;
procedure Release(Count: Integer); overload;
procedure Release; overload;
function WaitFor(Timeout: Cardinal): Boolean;
end;
implementation
{ TROPooledThread }
constructor TROPooledThread.Create(aOwner: TROThreadPool);
begin
inherited Create(true);
FreeOnTerminate := True;
fOwner := aOwner;
fOwner.fThreads.Add(Self);
Resume;
end;
destructor TROPooledThread.Destroy;
begin
Stop;
fOwner.fThreads.Remove(Self);
inherited Destroy;
end;
procedure TROPooledThread.Execute;
var
lCurrentItem: IROThreadPoolCallback;
lUnk: IUnknown;
begin
FreeOnTerminate := True;
while not Terminated do begin
if not fOwner.fSemaphore.WaitFor(Cardinal(-1)) then exit;
fOwner.fQueue.Lock;
try
if fOwner.fQueue.Count > 0 then begin
lUnk := fOwner.fQueue[0];
lCurrentItem := lUnk as IROThreadPoolCallback;
lUnk := nil; // delphi will keep a temp var in memory too long if we don't create it ourselves
fOwner.fQueue.Delete(0);
end
else
lCurrentItem := nil;
finally
fOwner.fQueue.Unlock;
end;
if lCurrentItem <> nil then begin
InterlockedIncrement(fOwner.fWorkingCount);
try
lCurrentItem.Callback(fOwner, Self);
except
// we cannot let exceptions escape
end;
InterlockedDecrement(fOwner.fWorkingCount);
lCurrentItem := nil;
end;
end;
end;
procedure TROPooledThread.Stop;
begin
Terminate;
end;
procedure TROPooledThread.Synchronize(Method: TThreadMethod);
begin
inherited Synchronize(Method);
end;
{ TROThreadPool }
constructor TROThreadPool.Create(AOwner: TComponent);
begin
inherited Create(aOwner);
fSemaphore := TROSemaphore.Create;
fQueue := TInterfaceList.Create;
fThreads := TThreadList.Create;
fMaxQueue := 15;
fMaxThreads := 10;
fPoolThreads := 5;
end;
destructor TROThreadPool.Destroy;
var
i: Integer;
lList: TList;
begin
lList := fThreads.LockList;
try
for i := lList.Count -1 downto 0 do
TROPooledThread(lList[i]).Stop;
fSemaphore.Release(lList.Count +1);
finally
fThreads.UnlockList;
end;
while true do begin
lList := fThreads.LockList;
try
if lList.Count = 0 then break;
finally
fThreads.UnlockList;
end;
Sleep(50);
end;
fSemaphore.Free;
fThreads.Free;
fQueue.Free;
inherited Destroy;
end;
function TROThreadPool.GetCurrQueueSize: Integer;
begin
result := fQueue.Count;
end;
procedure TROThreadPool.QueueItem(Callback: IROThreadPoolCallback);
var
lUsage: Integer;
begin
fQueue.Lock;
try
lUsage := fQueue.Count + fWorkingCount + 1;
if lUsage >= fMaxQueue * fMaxThreads then
raise EROException.Create('Not enough threads available for request');
if fThreadCount > fPoolThreads then lUsage := lUsage div 2; // we only need
if (lUsage >= fThreadCount) and (fThreadCount < fMaxThreads) then begin
fThreadCount := fThreadCount + 1;
fThreads.Add(TROPooledThread.Create(self));
end;
fQueue.Add(Callback);
fSemaphore.Release;
finally
fQueue.Unlock;
end;
end;
{ TROSemaphore }
procedure TROSemaphore.CloseHandle;
begin
{$IFDEF WIN32}
if fHandle <> 0 then begin
Windows.CloseHandle(fHandle);
fHandle := 0;
end;
{$ENDIF}
{$IFDEF LINUX}
if fFreed then exit;
fFreed := true;
sem_destroy(fHandle);
{$ENDIF}
end;
constructor TROSemaphore.Create;
begin
inherited Create;
{$IFDEF LINUX}
sem_init(fHandle,{$IFDEF FPC}0{$ELSE}false{$ENDIF}, 0);
{$ENDIF}
{$IFDEF WIN32}
fHandle := CreateSemaphore(nil, 0, MaxInt -1, nil);
{$ENDIF}
end;
destructor TROSemaphore.Destroy;
begin
CloseHandle;
inherited;
end;
procedure TROSemaphore.Release(Count: Integer);
begin
{$IFDEF WIN32}
ReleaseSemaphore(fHandle, Count, nil);
{$ENDIF}
{$IFDEF LINUX}
while Count > 0 do begin
Count := count -1;
Release;
end;
{$ENDIF}
end;
procedure TROSemaphore.Release;
begin
{$IFDEF WIN32}
ReleaseSemaphore(fHandle, 1, nil);
{$ENDIF}
{$IFDEF LINUX}
sem_Post(fHandle);
{$ENDIF}
end;
function TROSemaphore.WaitFor(Timeout: Cardinal): Boolean;
{$IFDEF LINUX}
var
inttimeout: Timespec;
{$ENDIF}
begin
{$IFDEF WIN32}
result := WaitForSingleObject(fHandle, Timeout) = WAIT_OBJECT_0;
{$ENDIF}
{$IFDEF LINUX}
inttimeout.tv_sec := __time(nil) + (timeout div 1000);
inttimeout.tv_nsec := (timeout mod 1000);
result := sem_timedwait(fHandle, {$IFDEF FPC}@{$ENDIF}inttimeout) = 0;
{$ENDIF}
end;
end.