unit uROSynapseSuperTCPChannel; {----------------------------------------------------------------------------} { RemObjects SDK Library - Indy Library } { } { compiler: Delphi 5 and up, Kylix 2 and up } { platform: Win32, Linux } { } { (c)opyright RemObjects Software. all rights reserved. } { } { Using this code requires a valid license of the RemObjects SDK } { which can be obtained at http://www.remobjects.com. } {----------------------------------------------------------------------------} {$I RemObjects.inc} interface uses SysUtils, Classes, uROClasses, uROClient, uROClientIntf, SyncObjs, uROAsync, {$IFDEF RemObjects_UseEncryption} uROEncryption, {$ENDIF}{$IFDEF SynapseSuperTCPDEBUG}synsock,{$ENDIF} blcksock, {$IFDEF MSWINDOWS}Windows, {$ENDIF}uROSynapseSCHelpers, uROThreadPool; type TROSynapseCustomSuperTCPChannel = class; TROSynapseSCClientWorker = class; TROSynapseWaitingRequest = class private fEvent: TROEvent; fFreeEvent: Boolean; fId: Integer; fResultData: TStream; fResultErrorCode: Integer; public property Id: Integer read fId; property Event: TROEvent read fEvent; property Resultdata: TStream read fResultData write fResultData; property ResultErrorCode: Integer read fResultErrorCode write fResultErrorCode; constructor Create(Id: Integer); overload; constructor Create(Id: Integer; Ev: TROEvent); overload; destructor Destroy; override; end; TROSynapsePooledEvent = class(TInterfacedObject, IROThreadPoolCallback) private fData: TStream; fOwner: TROSynapseCustomSuperTCPChannel; protected procedure Callback(Caller: TROThreadPool; Thread: TThread); public constructor Create(aData: TStream; aOwner: TROSynapseCustomSuperTCPChannel); destructor Destroy; override; end; TROSCState = (scNotConnected, scConnecting, scConnected, scReconnecting); TROSCAutoReconnectBehavior = (sbBeforeServerLocators, sbAfterServerLocators, sbSwitchServerLocatorAfterFirstFailure); TROSynapseCustomSuperTCPChannel = class(TROTransportChannel, IROTransport, IROTCPTransport, IROActiveEventChannel, IROMultiThreadAwareChannel, IROAsyncTransportChannel, IROActiveAsyncTransportChannel, IROTransportChannelEx,IROTCPTransportProperties) private fRequestTimeout: Integer; fState: TROSCState; fHost: string; fPort: Integer; fConnection: TTCPBlockSocket; fActive: Boolean; fOwnsEventThreadPool: Boolean; fEventThreadPool: TROThreadPool; fClient: TROSynapseSCClientWorker; fWorkerThread: TThread; fWaitingRequests: TThreadList; fAutoReconnect: Boolean; fEventReceiver: IROEventReceiver; fConnectEvent: TROEvent; fReconnectEvent: TROEvent; fOnDisconnected: TNotifyEvent; fOnConnected: TNotifyEvent; fReconnectDelay: Integer; fStoreActive: Boolean; fPingSecs: Integer; fAckWaitTimeout: Integer; fIdleTimeoutMinutes: Integer; fSynchronizeEvents: Boolean; fConnectionWaitTimeout: Integer; fAutoReconnectBehavior: TROSCAutoReconnectBehavior; {$IFDEF SynapseSuperTCPDEBUG} dmsg_count: integer; dLogStream:TMemoryStream; dWriteMode: Boolean; procedure SynapseSuperTCP_HookMonitor(Sender: TObject; Writing: Boolean;const Buffer: TMemory; Len: Integer); {$ENDIF} procedure SetHost(const Value: string); procedure SetPort(const Value: Integer); procedure SetActive(const Value: Boolean); procedure SetEventThreadPool(const Value: TROThreadPool); function GetClientID: TGUID; procedure SetClientID(const Value: TGUID); function GetMaxPackageSize: Longint; procedure SetMaxPackageSize(val: longint); function GetConnected: Boolean; function GetSkipAck: Boolean; procedure SetSkipAck(const Value: Boolean); procedure SetPingSecs(const Value: Integer); function InDestroyingState: Boolean; function GetHost: string; function GetPort: Integer; procedure ClearWaitingRequests; protected function InitServerLocator:Integer; override; procedure ChangeServerLocator(var faultstartlocatoridx: integer; var aRetry: Boolean; const aException: Exception); override; procedure Loaded; override; function GetClientAddress: String; function GetTransportObject: TObject; override; procedure HasData(Id: Integer; aData: TStream); procedure BeforeDispatch(aMessage: IROMessage); override; procedure IntDispatch(aRequest, aResponse : TStream); override; procedure RegisterEventReceiver(aReceiver: IROEventReceiver); procedure UnregisterEventReceiver(aReceiver: IROEventReceiver); procedure Notification(AComponent: TComponent; Operation: TOperation); override; function InvokeRequest(aRequest: TStream; aGetResponse: Boolean = True; aEvent: TROEvent = nil): String; overload; function InvokeRequest(aRequest:TStream; iGetResponse:boolean=true):string; overload; function CheckStatus(const iMessageID: String): Boolean; procedure RetrieveResponse(const iMessageID: String; aResponse: TStream); public constructor Create(aOwner: TComponent); override; destructor Destroy; override; procedure WaitForActive(const Timeout: Integer); procedure Assign(aSource : TPersistent); override; property Host: string read GetHost write SetHost; property Port: Integer read GetPort write SetPort default 8095; property PingSecs: Integer read fPingSecs write SetPingSecs default 60; property StoreActive: Boolean read fStoreActive write fStoreActive default True; property Active: Boolean read fActive write SetActive stored fStoreActive default False; property Client: TTCPBlockSocket read fConnection; property RequestTimeout: Integer read fRequestTimeout write fRequestTimeout default 60000; property EventThreadPool: TROThreadPool read fEventThreadPool write SetEventThreadPool; property AutoReconnect: Boolean read fAutoReconnect write fAutoReconnect default False; property ReconnectDelay: Integer read fReconnectDelay write fReconnectDelay default 500; property ClientID: TGUID read GetClientID write SetClientID; property OnDisconnected: TNotifyEvent read fOnDisconnected write fOnDisconnected; property MaxPackageSize: Integer read GetMaxPackageSize write SetMaxPackageSize default 10*1024*1024; property OnConnected: TNotifyEvent read fOnConnected write fOnConnected; property AckWaitTimeout: Integer read fAckWaitTimeout write fAckWaitTimeout default 10000; property Connected: Boolean read GetConnected; property IdleTimeoutMinutes: Integer read fIdleTimeoutMinutes write fIdleTimeoutMinutes default 0; // minutes property SynchronizeEvents: Boolean read fSynchronizeEvents write fSynchronizeEvents default False; property ConnectionWaitTimeout: Integer read fConnectionWaitTimeout write fConnectionWaitTimeout default 10000; property SkipAck: Boolean read GetSkipAck write SetSkipAck default false; property AutoReconnectBehavior: TROSCAutoReconnectBehavior read fAutoReconnectBehavior write fAutoReconnectBehavior default sbSwitchServerLocatorAfterFirstFailure; end; TROSynapseSuperTCPChannel = class(TROSynapseCustomSuperTCPChannel) published property Host; property Port; property Active; property RequestTimeout; property AutoReconnect; property ReconnectDelay; property OnDisconnected; property OnConnected; property StoreActive; property MaxPackageSize; property AckWaitTimeout; property SynchronizeEvents; property ConnectionWaitTimeout; property AutoReconnectBehavior; property SkipAck; published // from TROTransportChannel property DispatchOptions; property OnAfterProbingServer; property OnAfterProbingServers; property OnBeforeProbingServer; property OnBeforeProbingServers; property OnLoginNeeded; //property OnProgress; property OnReceiveStream; property OnSendStream; property OnServerLocatorAssignment; property ProbeFrequency; property ProbeServers; property ServerLocators; property SynchronizedProbing; end; TROSynapseSCClientWorker = class(TROSynSuperChannelWorker) private fOwner: TROSynapseCustomSuperTCPChannel; fSkipReconnect: Boolean; protected procedure Connected; override; procedure Disconnected(var RestartLoop: Boolean); override; procedure IncomingData(Id: Integer; aData: TStream); override; procedure Idle; override; procedure SetAckDetails(Id: Integer; Oke: Boolean; ErrorNo: Integer); override; public constructor Create(aOwner: TROSynapseCustomSuperTCPChannel; aConnection: TTCPBlockSocket); property Owner: TROSynapseCustomSuperTCPChannel read fOwner; end; implementation type TROSynapseClientThread = class(TROInitializedThread) private fChannel: TROSynapseSCClientWorker; fConnecting: Boolean; protected procedure Execute; override; public constructor Create(aChannel: TROSynapseSCClientWorker); property Connecting: Boolean read fConnecting; end; { TROSynapseCustomSuperTCPChannel } procedure TROSynapseCustomSuperTCPChannel.Assign(aSource: TPersistent); var lSource: TROSynapseCustomSuperTCPChannel; begin inherited; if aSource is TROSynapseCustomSuperTCPChannel then begin lSource := TROSynapseCustomSuperTCPChannel(aSource); Active := False; AckWaitTimeout := lSource.AckWaitTimeout; AutoReconnect := lSource.AutoReconnect; AutoReconnectBehavior := lSource.AutoReconnectBehavior; ConnectionWaitTimeout := lSource.ConnectionWaitTimeout; MaxPackageSize := lSource.MaxPackageSize; OnConnected := lSource.OnConnected; OnDisconnected := lSource.OnDisconnected; ReconnectDelay := lSource.ReconnectDelay; RequestTimeout := lSource.RequestTimeout; SkipAck := lSource.SkipAck; StoreActive := lSource.StoreActive; SynchronizeEvents := lSource.SynchronizeEvents; Active := lSource.Active; end; end; procedure TROSynapseCustomSuperTCPChannel.BeforeDispatch(aMessage: IROMessage); begin try inherited; if not fActive then begin SetActive(true); end; fReconnectEvent.SetEvent; if fConnectEvent.WaitFor(fConnectionWaitTimeout) <> wrSignaled then raise EROException.Create('No connection available'); if aMessage <> nil then aMessage.ClientID := fClient.ClientId; except fState:=scNotConnected; raise; end; end; procedure TROSynapseCustomSuperTCPChannel.ChangeServerLocator( var faultstartlocatoridx: integer; var aRetry: Boolean; const aException: Exception); begin // none end; function TROSynapseCustomSuperTCPChannel.CheckStatus( const iMessageID: String): Boolean; var i, id: Integer; lList: TList; lReq: TROSynapseWaitingRequest; begin Result := False; id := StrToInt(iMessageID); lList := fWaitingRequests.LockList; try for i := lList.Count -1 downto 0 do begin lReq := TROSynapseWaitingRequest(lList[i]); if Lreq.Id = id then begin Result := (lReq.Resultdata <> nil) or (lReq.ResultErrorCode <> -1); exit; end; end; finally fWaitingRequests.UnlockList; end; end; procedure TROSynapseCustomSuperTCPChannel.ClearWaitingRequests; var List: TList; i: Integer; begin List:= fWaitingRequests.LockList; try for I := List.Count - 1 downto 0 do TROSynapseWaitingRequest(List[i]).Free; List.Clear; finally fWaitingRequests.UnlockList; end; end; constructor TROSynapseCustomSuperTCPChannel.Create(aOwner: TComponent); begin inherited Create(aOwner); fConnectEvent := TROEvent.create(nil, true, false, ''); fReconnectEvent := TROEvent.Create(nil, true, false, ''); fPingSecs := 60; fWaitingRequests := TThreadList.Create; fHost := 'localhost'; fPort := 8095; fConnection := TTCPBlockSocket.Create; {$IFDEF SynapseSuperTCPDEBUG} dmsg_count:=0; dLogStream:=TMemoryStream.Create; dWriteMode := False; fConnection.OnMonitor:=SynapseSuperTCP_HookMonitor; {$ENDIF} fClient := TROSynapseSCClientWorker.Create(Self, fConnection); fRequestTimeout := 60000; fAckWaitTimeout := 10000; fReconnectDelay := 500; fConnectionWaitTimeout := 10000; fStoreActive := True; ThreadSafe := True; fAutoReconnectBehavior := sbSwitchServerLocatorAfterFirstFailure; end; destructor TROSynapseCustomSuperTCPChannel.Destroy; var fw: TROSynapseClientThread; begin if (fWorkerThread <> nil) then begin fActive := False; fw := TROSynapseClientThread(fWorkerThread); fWorkerThread := nil; fClient.Disconnect; fw.Terminate; fReconnectEvent.SetEvent; fw.WaitFor; FreeAndNil(fw); end; fClient.Free; fConnection.Free; ClearWaitingRequests; fWaitingRequests.Free; fConnectEvent.Free; if fOwnsEventThreadPool then begin fEventThreadPool.Free; end; UnregisterEventReceiver(nil); fReconnectEvent.Free; {$IFDEF SynapseSuperTCPDEBUG} SynapseSuperTCP_HookMonitor(Self,not dWriteMode,nil,0); dLogStream.Free; {$ENDIF} inherited Destroy; end; function TROSynapseCustomSuperTCPChannel.GetClientAddress: String; begin Result := fConnection.GetRemoteSinIP; end; function TROSynapseCustomSuperTCPChannel.GetClientID: TGUID; begin Result := fClient.ClientID; end; function TROSynapseCustomSuperTCPChannel.GetConnected: Boolean; begin result := fClient.IsConnected; end; function TROSynapseCustomSuperTCPChannel.GetHost: string; begin Result := fHost; end; function TROSynapseCustomSuperTCPChannel.GetMaxPackageSize: Longint; begin result := fClient.MaxPackageSize; end; function TROSynapseCustomSuperTCPChannel.GetPort: Integer; begin Result := fPort; end; function TROSynapseCustomSuperTCPChannel.GetSkipAck: Boolean; begin result := fClient.SkipAck; end; function TROSynapseCustomSuperTCPChannel.GetTransportObject: TObject; begin Result := self; end; procedure TROSynapseCustomSuperTCPChannel.HasData(Id: Integer; aData: TStream); var lList: TList; i: Integer; lReq: TROSynapseWaitingRequest; lEvent: TROSynapsePooledEvent; lEventIntf: IROThreadPoolCallback; begin if Id < 0 then // got an event begin if fEventThreadPool = nil then raise EROException.Create('No thread pool assigned'); lEvent := TROSynapsePooledEvent.Create(aData, Self); lEventIntf := lEvent; try fEventThreadPool.QueueItem(lEventIntf); except lEvent.fData := nil; raise; end; end else begin lList := fWaitingRequests.LockList; try lReq := nil; for i := lList.Count -1 downto 0 do begin if TROSynapseWaitingRequest(lList[i]).Id = Id then begin lReq := TROSynapseWaitingRequest(lList[i]); break; end; end; if lReq = nil then begin // Don't want a response aData.Free; exit; end; lReq.Resultdata := aData; lReq.Event.SetEvent; finally fWaitingRequests.UnlockList; end; end; end; function TROSynapseCustomSuperTCPChannel.InDestroyingState: Boolean; begin Result:= csDestroying in ComponentState; end; function TROSynapseCustomSuperTCPChannel.InitServerLocator: Integer; begin Result:=-1; end; procedure TROSynapseCustomSuperTCPChannel.IntDispatch(aRequest, aResponse: TStream); var lId: Integer; lReq: TROSynapseWaitingRequest; begin // if (not fActive) then raise EROException.Create('Not connected'); lId := fClient.GenerateId; lReq := TROSynapseWaitingRequest.Create(lId); try fWaitingRequests.Add(lReq); try TROSynapseSCClientWorker.WaitForAck(fClient.SendPackage(aRequest, lId), fAckWaitTimeout); lReq.Event.WaitFor(fRequestTimeout); finally fWaitingRequests.Remove(lReq); end; if lReq.Resultdata = nil then begin case lReq.fResultErrorCode of ScCmdNoAck_MsgTooLarge: raise EROException.Create('Message from server too large'); ScCmdNoAck_QueueFull: raise EROException.Create('Server queue full'); else raise EROSynapseTimeout.Create('Timeout waiting for response'); end; end; lReq.Resultdata.Seek(0, soFromBeginning); aResponse.CopyFrom(lReq.Resultdata, lReq.Resultdata.Size); finally lReq.Free; end; end; function TROSynapseCustomSuperTCPChannel.InvokeRequest(aRequest: TStream; aGetResponse: Boolean; aEvent: TROEvent): String; var lId: Integer; lReq: TROSynapseWaitingRequest; begin lId := fClient.GenerateId; if aGetResponse then begin if aEvent = nil then lReq := TROSynapseWaitingRequest.Create(lId) else lReq := TROSynapseWaitingRequest.Create(lId, aEvent); fWaitingRequests.Add(lReq); end; {$IFDEF RemObjects_UseEncryption} if Encryption.EncryptionMethod <> tetNone then DoEncryption2(aRequest); {$ENDIF} fClient.SendPackage(aRequest, lId); Result := IntToStr(lId); aRequest.Free; // needs to be freed here end; function TROSynapseCustomSuperTCPChannel.InvokeRequest(aRequest: TStream; iGetResponse: boolean): string; begin Result := InvokeRequest(aRequest, iGetResponse, nil); end; procedure TROSynapseCustomSuperTCPChannel.Loaded; begin inherited; if fActive then begin fActive := false; SetActive(true); end; end; procedure TROSynapseCustomSuperTCPChannel.Notification(AComponent: TComponent; Operation: TOperation); var dummy: IROEventReceiver; begin if (Operation = opRemove) then begin if (AComponent = fEventThreadPool) then fEventThreadPool := nil; if Supports(aComponent, IROEventReceiver, dummy) and (dummy = fEventReceiver) then UnregisterEventReceiver(fEventReceiver); end; inherited; end; procedure TROSynapseCustomSuperTCPChannel.RegisterEventReceiver( aReceiver: IROEventReceiver); begin if fEventReceiver <> aReceiver then begin if assigned(fEventReceiver) then begin if fEventReceiver.GetObject is TROComponent then TROComponent(fEventReceiver.GetObject).RORemoveFreeNotification(Self) else TComponent(fEventReceiver.GetObject).RemoveFreeNotification(Self); end; fEventReceiver := aReceiver; if assigned(fEventReceiver) then begin if fEventReceiver.GetObject is TROComponent then TROComponent(fEventReceiver.GetObject).ROFreeNotification(Self) else TComponent(fEventReceiver.GetObject).FreeNotification(Self); end; end; end; procedure TROSynapseCustomSuperTCPChannel.RetrieveResponse( const iMessageID: String; aResponse: TStream); var i, id: Integer; lList: TList; lReq: TROSynapseWaitingRequest; begin id := STrToInt(iMessageID); lList := fWaitingRequests.LockList; try lReq := nil; for i := lList.Count -1 downto 0 do begin lReq := TROSynapseWaitingRequest(lList[i]); if Lreq.Id = id then break else lReq := nil; end; finally fWaitingRequests.UnlockList; end; if lReq = nil then raise EROSynapseTimeout.Create('Unknown response id '+iMessageID); try if lReq.fEvent <> nil then lReq.fEvent.WaitFor(fRequestTimeout); if lReq.fResultData = nil then begin case lReq.ResultErrorCode of ScCmdNoAck_MsgTooLarge: raise EROException.Create('Message from server too large'); ScCmdNoAck_QueueFull: raise EROException.Create('Server queue full'); else raise EROSynapseTimeout.Create('Timeout waiting for response'); end; end; lREq.Resultdata.Seek(0, soFromBeginning); aResponse.CopyFrom(lReq.Resultdata, lReq.Resultdata.Size); aResponse.Seek(0, soFromBeginning); {$IFDEF RemObjects_UseEncryption} if Encryption.EncryptionMethod <> tetNone then DoDecryption2(aResponse); {$ENDIF} finally fWaitingRequests.Remove(lReq); lReq.Free; end; end; procedure TROSynapseCustomSuperTCPChannel.SetActive(const Value: Boolean); var fw: TThread; begin if (fActive = value) then begin if not fActive or (fState <> scNotConnected) then Exit; end; fActive := value; if ComponentState * [csLoading] = [] then begin if (fWorkerThread <> nil) then begin fw := fWorkerThread; fWorkerThread := nil; fClient.Disconnect; TROSynapseClientThread(fw).Terminate; fReconnectEvent.SetEvent; fw.WaitFor; FreeAndNil(fw); fState := scNotConnected; end; if fEventThreadPool = nil then begin EventThreadPool := TROThreadPool.Create(nil); fOwnsEventThreadPool := true; end; if fActive then begin fReconnectEvent.SetEvent; fConnectEvent.ResetEvent; fState := scConnecting; fWorkerThread := TROSynapseClientThread.Create(fClient); end; end; end; procedure TROSynapseCustomSuperTCPChannel.SetClientID(const Value: TGUID); begin fClient.ClientID := Value; end; procedure TROSynapseCustomSuperTCPChannel.SetEventThreadPool( const Value: TROThreadPool); begin if fOwnsEventThreadPool then begin fEventThreadPool.Free; fOwnsEventThreadPool := false; end; fEventThreadPool := Value; end; procedure TROSynapseCustomSuperTCPChannel.SetHost(const Value: string); begin if fActive and (csLoading in ComponentState) then raise EROException.Create('Client is active'); fHost := value; end; procedure TROSynapseCustomSuperTCPChannel.SetMaxPackageSize(val: Integer); begin fClient.MaxPackageSize := Val; end; procedure TROSynapseCustomSuperTCPChannel.SetPingSecs(const Value: Integer); begin if (Value < 10) or (Value > 60) then raise EROException.Create('PingSecs has to be between 10 and 60'); fPingSecs := Value; end; procedure TROSynapseCustomSuperTCPChannel.SetPort(const Value: Integer); begin if fActive and (csLoading in ComponentState) then raise EROException.Create('Client is active'); fPort := Value; end; procedure TROSynapseCustomSuperTCPChannel.SetSkipAck(const Value: Boolean); begin fClient.SkipAck := Value; end; {$IFDEF SynapseSuperTCPDEBUG} procedure TROSynapseCustomSuperTCPChannel.SynapseSuperTCP_HookMonitor( Sender: TObject; Writing: Boolean; const Buffer: TMemory; Len: Integer); const rw : array[Boolean] of string = ('Read', 'Write'); begin if (dWriteMode <> Writing) or nor Writing then begin if Len <> 0 then begin inc(dmsg_count); dLogStream.SaveToFile(IntToHex(cardinal(Self),8)+'_'+IntToStr(dmsg_count)+'='+rw[dWriteMode]); dLogStream.Size := 0; end; dWriteMode := Writing; end; if len <> 0 then dLogStream.Write(Buffer^,Len); end; {$ENDIF SynapseSuperTCPDEBUG} procedure TROSynapseCustomSuperTCPChannel.UnregisterEventReceiver( aReceiver: IROEventReceiver); begin if assigned(fEventReceiver) then begin if fEventReceiver.GetObject is TROComponent then TROComponent(fEventReceiver.GetObject).RORemoveFreeNotification(Self) else TComponent(fEventReceiver.GetObject).RemoveFreeNotification(Self); end; {$IFDEF FPC} if aReceiver <> nil then fEventReceiver := nil else //for preventing warnings in FPC {$ENDIF} fEventReceiver := nil; end; type TROSBEventTrigger = class(TInterfacedObject, IROThreadPoolCallback) private fEvent: TNotifyEvent; fSender: TObject; procedure Execute; public constructor Create(aSender: TObject; aEvent: TNotifyEvent); procedure Callback(Caller: TROThreadPool; aThread: TThread); end; procedure TROSynapseCustomSuperTCPChannel.WaitForActive(const Timeout: Integer); begin SetActive(true); if fConnectEvent.WaitFor(Timeout) <> wrSignaled then raise EROException.Create('No connection available'); end; { TROSynapseSCClientWorker } procedure TROSynapseSCClientWorker.Connected; begin inherited; fOwner.fState := scConnected; fOwner.fConnectEvent.SetEvent; if assigned(fOwner.fOnConnected) then begin if fOwner.fSynchronizeEvents then fOwner.fEventThreadPool.QueueItem(TROSBEventTrigger.Create(fOwner, fOwner.fOnconnected)) else fOwner.fOnconnected(fOwner); end; end; constructor TROSynapseSCClientWorker.Create(aOwner: TROSynapseCustomSuperTCPChannel; aConnection: TTCPBlockSocket); begin fOwner := aOwner; inherited Create(aConnection); PingFrequency := fOwner.fPingSecs; PingTimeout := PingFrequency * 15 div 10; end; procedure TROSynapseSCClientWorker.Disconnected(var RestartLoop: Boolean); // restartloop is false by default function CanUseServerLocators: boolean; var i: integer; begin Result := False; if (doFaultTolerant in fOwner.DispatchOptions) then begin For i:=0 to fOwner.ServerLocators.Count -1 do if (fOwner.ServerLocators.Items[i].Enabled) then begin Result:=True; Break; end; end; end; var lIndex: integer; lRet: boolean; lAutoReconnectFailed: boolean; begin inherited; try fOwner.fConnection.CloseSocket; except end; if assigned(fOwner.fOnDisconnected) then begin if fOwner.fSynchronizeEvents then fOwner.fEventThreadPool.QueueItem(TROSBEventTrigger.Create(fOwner, fOwner.fOnDisconnected)) else fOwner.fOnDisconnected(fOwner); end; if fOwner.fActive and (fOwner.fAutoReconnect or CanUseServerLocators) and not fOwner.InDestroyingState and not fSkipReconnect then begin fOwner.fState := scReconnecting end else begin fOwner.fState := scNotConnected; fOwner.fActive := false; fOwner.fConnectEvent.ResetEvent; exit; end; fOwner.fReconnectEvent.ResetEvent; fOwner.fConnectEvent.ResetEvent; lIndex := -1; lRet := True; lAutoReconnectFailed:=False; while lRet and fOwner.fActive and (fOwner.fAutoReconnect or CanUseServerLocators) and not fOwner.InDestroyingState and not fSkipReconnect do begin fOwner.fReconnectEvent.WaitFor(fOwner.fReconnectDelay); // don't want to create a tight loop try fOwner.fReconnectEvent.ResetEvent; fOwner.fConnection.Connect(fOwner.fHost, inttostr(fOwner.fPort)); if fOwner.fConnection.LastError <> 0 then raise ESynapseError.Create(fOwner.fConnection.LastErrorDesc); RestartLoop := True; Break; except // we failed, retry in a few on E: Exception do begin lRet:= False; if (fOwner.AutoReconnectBehavior = sbAfterServerLocators) or (lAutoReconnectFailed and (fOwner.AutoReconnectBehavior = sbSwitchServerLocatorAfterFirstFailure)) then begin fOwner.intChangeServerLocator(lIndex,lRet,e); end; if not lRet and Owner.AutoReconnect then begin lRet := true; lAutoReconnectFailed := True; end; if not lRet and ((fOwner.AutoReconnectBehavior = sbBeforeServerLocators) or (not lAutoReconnectFailed and (fOwner.AutoReconnectBehavior = sbSwitchServerLocatorAfterFirstFailure)))then begin fOwner.intChangeServerLocator(lIndex,lRet,e); end; end; end; end; if RestartLoop then fOwner.fState := scConnecting else begin fowner.fState := scNotConnected; fOwner.fActive := false; fOwner.fConnectEvent.ResetEvent; end; end; procedure TROSynapseSCClientWorker.Idle; begin IF fOwner.IdleTimeoutMinutes > 0 then begin if Now > LastData + ((1.0 / 24 / 60) * fOwner.IdleTimeoutMinutes) then begin Disconnect; fSkipReconnect := true; end; end; end; procedure TROSynapseSCClientWorker.IncomingData(Id: Integer; aData: TStream); begin fOwner.HasData(id, aData); end; procedure TROSynapseSCClientWorker.SetAckDetails(Id: Integer; Oke: Boolean; ErrorNo: Integer); var lList: TList; i: Integer; lReq: TROSynapseWaitingRequest; begin if Oke then begin inherited; exit; end; lList := FOwner.fWaitingRequests.LockList; try lReq := nil; for i := lList.Count -1 downto 0 do begin if TROSynapseWaitingRequest(lList[i]).Id = Id then begin lReq := TROSynapseWaitingRequest(lList[i]); break; end; end; if lReq = nil then inherited else begin lReq.fResultErrorCode := ErrorNo; lReq.Event.SetEvent; end; finally fOwner.fWaitingRequests.UnlockList; end; end; { TROSynapseClientThread } constructor TROSynapseClientThread.Create(aChannel: TROSynapseSCClientWorker); begin {$IFNDEF FPC} inherited Create(False); {$ELSE} inherited Create(True); {$ENDIF} fChannel := aChannel; FreeOnTerminate := false; {$IFDEF FPC} Resume; {$ENDIF} end; procedure TROSynapseClientThread.Execute; var lRet: Boolean; fIndex: integer; lAutoReconnectFailed: boolean; begin fChannel.fOwner.fReconnectEvent.ResetEvent; if (fChannel.Owner.AutoReconnectBehavior = sbBeforeServerLocators) then fIndex := fChannel.Owner.intInitServerLocator else fIndex := -2; lRet := true; fConnecting := True; lAutoReconnectFailed := False; while lRet do begin lRet := false; try fChannel.Connection.Connect(fChannel.fOwner.fHost, inttostr(fChannel.fOwner.fPort)); if fChannel.Connection.LastError <> 0 then raise ESynapseError.Create(fChannel.Connection.LastErrorDesc); lAutoReconnectFailed := False; except on e: Exception do begin // if autoreconnect is failed in 1st time, try to use ServerLocator if (fChannel.Owner.AutoReconnectBehavior = sbAfterServerLocators) or (lAutoReconnectFailed and (fChannel.Owner.AutoReconnectBehavior = sbSwitchServerLocatorAfterFirstFailure)) then begin if fIndex = -2 then begin fIndex := fChannel.Owner.intInitServerLocator; lRet := True; end else fChannel.fOwner.intChangeServerLocator(fIndex,lRet,e); end; if not lRet and fChannel.Owner.AutoReconnect and not fChannel.fOwner.InDestroyingState then begin lRet := true; lAutoReconnectFailed := True; fChannel.fOwner.fReconnectEvent.WaitFor(fChannel.fOwner.fReconnectDelay); fChannel.fOwner.fReconnectEvent.ResetEvent; end; if not lRet and ((fChannel.Owner.AutoReconnectBehavior = sbBeforeServerLocators) or (not lAutoReconnectFailed and (fChannel.Owner.AutoReconnectBehavior = sbSwitchServerLocatorAfterFirstFailure)))then begin if fIndex = -2 then begin fIndex := fChannel.Owner.intInitServerLocator; lRet := True; end else fChannel.fOwner.intChangeServerLocator(fIndex,lRet,e); end; // if assigned(fChannel.Owner.OnException) then fChannel.Owner.OnException(fChannel.Owner, e, lRet); if (not lRet) or (not fChannel.Owner.Active) then begin //terminate execution if channel was deactivated fConnecting := False; exit; end; end; end; end; fChannel.DoExecute; end; { TROSynapseWaitingRequest } constructor TROSynapseWaitingRequest.Create(Id: Integer); begin inherited Create; fId := Id; fEvent := TROEvent.Create(nil, true, false, ''); fFreeEvent := True; fResultErrorCode := -1; end; constructor TROSynapseWaitingRequest.Create(Id: Integer; Ev: TROEvent); begin inherited Create; fId := Id; fEvent := ev; fResultErrorCode := -1; end; destructor TROSynapseWaitingRequest.Destroy; begin fResultData.Free; if fFreeEvent then fEvent.Free; inherited Destroy; end; { TROSynapsePooledEvent } procedure TROSynapsePooledEvent.Callback(Caller: TROThreadPool; Thread: TThread); begin if fOwner.fEventReceiver <> nil then begin fOwner.DecodeEventStream(fData); {$IFDEF FPC} if caller <> nil then fOwner.fEventReceiver.Dispatch(fData, Thread) else // for preventing warning in FPC {$ENDIF} fOwner.fEventReceiver.Dispatch(fData, Thread); end; end; constructor TROSynapsePooledEvent.Create(aData: TStream; aOwner: TROSynapseCustomSuperTCPChannel); begin inherited Create; fOwner := aOwner; fData := aData; end; destructor TROSynapsePooledEvent.Destroy; begin fData.Free; inherited Destroy; end; { TROSBEventTrigger } procedure TROSBEventTrigger.Callback(Caller: TROThreadPool; aThread: TThread); begin try {$IFDEF FPC} if Caller <> nil then TROSynapseClientThread(aThread).Synchronize(Execute) else // for preventing warning in FPC {$ENDIF} TROSynapseClientThread(aThread).Synchronize(Execute); except // exceptions cannot pass back to the pool end; end; constructor TROSBEventTrigger.Create(aSender: TObject; aEvent: TNotifyEvent); begin inherited Create; fSender := aSender; fEvent := aEvent; end; procedure TROSBEventTrigger.Execute; begin if assigned(fEvent) then fEvent(fSender); end; initialization RegisterTransportChannelClass(TROSynapseSuperTCPChannel); finalization UnRegisterTransportChannelClass(TROSynapseSuperTCPChannel); end.