unit uROSynapseSCHelpers; {----------------------------------------------------------------------------} { RemObjects SDK Library - Indy Library } { } { compiler: Delphi 5 and up, Kylix 2 and up } { platform: Win32, Linux } { } { (c)opyright RemObjects Software. all rights reserved. } { } { Using this code requires a valid license of the RemObjects SDK } { which can be obtained at http://www.remobjects.com. } {----------------------------------------------------------------------------} {$I RemObjects.inc} interface uses SysUtils, Classes, uROClasses, uROClient, uROClientIntf, synsock, blcksock {$IFDEF MSWINDOWS}, Windows {$ENDIF}, SyncObjs; type EROSynapseTimeout = class(EROException); TROSynapseAckState = (rNone, rAck, rNoAck); IROSynapsePackageAck = interface ['{89ABD257-42CB-4B4B-9BA6-3E5CED10605B}'] function GetEvent: TROEvent; function GetAckNo: Integer; function GetNoAckError: Integer; procedure SetNoAckError(I: Integer); function GetAckState: TROSynapseAckState; procedure SetAckState(I: TROSynapseAckState); property Event: TROEvent read GetEvent; property AckNo: Integer read GetAckNo; property NoAckError: Integer read GetNoAckError write SetNoAckError; property AckState: TROSynapseAckState read GetAckState write SetAckState; procedure RemoveFromList; end; TROSynSuperChannelWorker = class private fMaxPackageSize: Integer; fIsServer: Boolean; fConnection: TTCPBlockSocket; fWriteLock, fInternalWriteLock: TCriticalSection; fClientID: TGUID; fLastId: Integer; fConnected: Boolean; fLastData: TDateTime; // fAckWaitTimeout: Integer; fOtherSideMaxPackageLen: Integer; fWaitingAckList: TInterfaceList; fRemoteSupportsOptions, fRemoteSkipAck, fSkipAck: Boolean; fPingFrequency, fPingTimeout: Integer; procedure SetSkipAck(const Value: Boolean); function ReadStream(aDest: TStream; aLen: Integer): Boolean; protected property PingFrequency: Integer read fPingFrequency write fPingFrequency; property PingTimeout: Integer read fPingTimeout write fPingTimeout; property LastData: TDateTime read FLastData; property IsServer: Boolean read fIsServer write fIsServer; procedure Idle; virtual; procedure Connected; virtual; procedure SupportsOptions; procedure ProcessOption(Data: TStream); procedure Disconnected(var RestartLoop: Boolean); virtual; procedure IncomingData(Id: Integer; aData: TStream); virtual; abstract; function IntSendData(Id: Integer; aData: TStream): IROSynapsePackageAck; procedure SetAckDetails(Id: Integer; Oke: Boolean; ErrorNo: Integer); virtual; procedure SendOptions(aData: UTF8String); function GetDefaultResponse: string; virtual; public constructor Create(aConnection: TTCPBlockSocket); destructor Destroy; override; property ClientID: TGuid read fClientID write fClientID; property Connection: TTCPBlockSocket read fConnection; // to be called from a thread; will stay checking until disconnected procedure DoExecute; procedure Disconnect; // Thread safe method procedure SendError(Id: Integer; Error: Byte); procedure BeginWriteLock; function SendPackage(aData: TStream; Id: Integer = 0): IROSynapsePackageAck; procedure EndWriteLock; class procedure WaitForAck(pkg: IROSynapsePackageAck; Timeout: Integer = 60000); function GenerateId: Integer; // properties property MaxPackageSize: Integer read fMaxPackageSize write fMaxPackageSize; property IsConnected: Boolean read fConnected write fConnected; property SkipAck: Boolean read fSkipAck write SetSkipAck; end; const ScCmdNoAck_MsgTooLarge = 0; ScCmdNoAck_Timeout = 1; scCmdNoAck_UnknownCommand = 2; ScCmdNoAck_QueueFull = 3; ScCmdNoAck_SupportsOptions = 255; implementation const ScWelcome = 'ROSC10'; // ScCmdHello = 0; { Client: 'ROSC10' [MyGuid] or [NullGuid] MaxPackageLength: Int32; Server: 'ROSC10' [YourGuid] or [NewGuid] MaxPackageLength: Int32; } ScCmdPackage = 1; { Request or Event Id: Int32; (Event < 0; Request/Response > 0) Length: Int32; Data: [...] } ScCmdAck = 2; { Id: Integer; } ScCmdNoAck = 3; { Id: Integer; Error: 0 = Message too large 1 = Timeout } ScCmdPing = 4; { Should be sent every 60 seconds by the client RandomNumber: Integer; } ScCmdPong = 5; { Reply to ping OriginalRandomNumber: Integer; } ScCmdOptions = 6; { Length: Longint Data: array of 0..Length -1 of byte UTF8 encoded, seperated by #13 } type TROSynPackageAck = class(TInterfacedObject, IROSynapsePackageAck) private fEvent: TROEvent; fAckNo, FAckNoError: Integer; FAckState: TROSynapseAckState; FOwner: TROSynSuperChannelWorker; public function GetEvent: TROEvent; function GetAckNo: Integer; function GetNoAckError: Integer; procedure SetNoAckError(I: Integer); function GetAckState: TROSynapseAckState; procedure SetAckState(I: TROSynapseAckState); constructor Create(aOwner: TROSynSuperChannelWorker; aId: Integer); destructor Destroy; override; procedure RemoveFromList; end; { TROSynSuperChannelWorker } constructor TROSynSuperChannelWorker.Create(aConnection: TTCPBlockSocket); begin inherited Create; fPingFrequency := 60; fPingTimeout := 90; fConnection := aConnection; fWriteLock := TCriticalSection.Create; fInternalWriteLock := TCriticalSection.Create; fMaxPackageSize := 1024 * 1024; // 1mb fWaitingAckList:= TInterfaceList.Create; end; destructor TROSynSuperChannelWorker.Destroy; var i: Integer; begin fWaitingAckList.Lock; try for i := fWaitingAckList.Count -1 downto 0 do IROSynapsePackageAck(fWaitingAckList[i]).RemoveFromList; finally fWaitingAckList.Unlock; end; fWaitingAckList.Free; fInternalWriteLock.Free; fWriteLock.Free; inherited Destroy; end; procedure TROSynSuperChannelWorker.Disconnect; begin fConnection.CloseSocket; end; procedure TROSynSuperChannelWorker.Connected; var b: array[0..5] of byte; begin fConnected := True; b[0] := ScCmdNoAck; b[1] := 0; b[2] := 0; b[3] := 0; b[4] := 0; b[5] := ScCmdNoAck_SupportsOptions; fConnection.SendBuffer(@b[0], sizeof(b)); end; procedure TROSynSuperChannelWorker.Disconnected(var RestartLoop: Boolean); begin if RestartLoop then fConnected := False else //for preventing warnings in FPC fConnected := False; end; function TROSynSuperChannelWorker.ReadStream(aDest: TStream; aLen: Integer): Boolean; var Buf: array[0 .. 4096-1] of byte; lRes: Integer; begin aDest.Position := 0; aDest.Size := 0; while aLen > 0 do begin lRes := aLen; if lRes > sizeof(buf) then lRes := Sizeof(Buf); lRes := fConnection.RecvBufferEx(@Buf[0], lRes, fPingFrequency * 1000); aLen := aLen - lRes; if lRes = 0 then begin Result := false; exit; end; aDest.Write(Buf, lRes); fLastData := now; end; Result := true; end; procedure TROSynSuperChannelWorker.DoExecute; var fLastPackage: DateTime; lServerExpectAck: Integer; lTmpId, lTmpLen: Integer; cmd: Byte; Bl: array[0..5] of Byte; lData: TMemoryStream; lRetry: Boolean; begin lTmpId := 1; SetSockOpt(fConnection.Socket, 6, 1, @lTmpId, 4); // Nagle bit repeat fRemoteSkipAck := false; fRemoteSupportsOptions := false; lServerExpectAck := 0; try if IsServer then begin if fConnection.RecvBufferStr(6, fPingTimeout * 1000) <> ScWelcome then begin fConnection.SendString(GetDefaultResponse); fConnection.CloseSocket; exit; end; fConnection.RecvBufferEx(@fClientID, Sizeof(fClientID), fPingTimeout * 1000); if IsEqualGUID(fClientID, EmptyGUID) then fClientID := NewGuid; fOtherSideMaxPackageLen := fConnection.RecvInteger(fPingTimeout * 1000); fConnection.SendString(ScWelcome); fConnection.SendBuffer(@fClientID, sizeof(fClientID)); fConnection.SendInteger(fMaxPackageSize); end else begin fConnection.SendString(ScWelcome); fConnection.SendBuffer(@fClientID, sizeof(fClientID)); fConnection.SendInteger(fMaxPackageSize); if fConnection.RecvBufferStr(6, fPingTimeout * 1000) <> ScWelcome then begin fConnection.SendString(GetDefaultResponse); fConnection.CloseSocket; exit; end; fConnection.RecvBufferEx(@fClientID, Sizeof(fClientID), fPingTimeout * 1000); fOtherSideMaxPackageLen := fConnection.RecvInteger(fPingTimeout * 1000); end; fLastPackage := Now; Connected; while true do begin if fConnection.RecvBufferEx(@cmd, 1, fPingFrequency * 1000) = 1 then begin fLastPackage := Now; case cmd of ScCmdPong: begin if fConnection.RecvInteger(fPingTimeout * 1000) = lServerExpectAck then begin lServerExpectAck := 0; end; end; ScCmdPing: begin Bl[0] := ScCmdPong; lTmpId := fConnection.RecvInteger(fPingTimeout * 1000); Move(ltmpId, bl[1], 4); fInternalWriteLock.Acquire; try fConnection.SendBuffer(@bl, 5); finally fInternalWriteLock.Release; end; end; ScCmdAck: begin lTmpId := fConnection.RecvInteger(fPingTimeout * 1000); SetAckDetails(lTmpId, true, 0); end; ScCmdNoAck: begin lTmpId := fConnection.RecvInteger(fPingTimeout * 1000); cmd := fConnection.RecvByte(fPingTimeout * 1000); if cmd = ScCmdNoAck_SupportsOptions then SupportsOptions else SetAckDetails(lTmpId, false, cmd); end; ScCmdOptions: begin lTmpLen := fconnection.RecvInteger(fPingTimeout * 1000); if lTmpLen > fMaxPackageSize then begin fInternalWriteLock.Acquire; try fConnection.SendString(Chr(ScCmdNoAck)+#0#0#0#0+chr(ScCmdNoAck_MsgTooLarge)); finally fInternalWriteLock.Release; end; fConnection.CloseSocket; exit; end else begin lData := TMemoryStream.Create; try if not ReadStream(lData, lTmpLen) then break; ProcessOption(lData); finally lData.Free; end; fLastPackage := Now; // for packages that might take longer. end; end; ScCmdPackage: begin fLastData := Now; lTmpId := fConnection.RecvInteger(fPingTimeout * 1000); lTmpLen := fConnection.RecvInteger(fPingTimeout * 1000); if lTmpLen < 0 then begin fInternalWriteLock.Acquire; try fConnection.SendString(Chr(ScCmdNoAck)+#0#0#0#0+chr(scCmdNoAck_UnknownCommand)); finally fInternalWriteLock.Release; end; fConnection.CloseSocket; exit; end; if lTmpLen > fMaxPackageSize then begin fInternalWriteLock.Acquire; try fConnection.SendString(Chr(ScCmdNoAck)+#0#0#0#0+chr(ScCmdNoAck_MsgTooLarge)); finally fInternalWriteLock.Release; end; fConnection.CloseSocket; exit; end else begin lData := TMemoryStream.Create; try if not ReadStream(lData, lTmpLen) then break; lData.Seek(0, soFromBeginning); Self.IncomingData(lTmpId, lData); except lData.Free; // only free when we failed to send the data to the queue fInternalWriteLock.Acquire; try fConnection.SendByte(ScCmdNoAck); fConnection.SendInteger(lTmpId); fConnection.SendByte(ScCmdNoAck_QueueFull); finally fInternalWriteLock.Release; end; end; if not fRemoteSkipAck then begin fInternalWriteLock.Acquire; try bl[0] := ScCmdAck; Move(lTmpId, bl[1], 4); fConnection.SendBuffer(@Bl, 5); finally fInternalWriteLock.Release; end; end; fLastPackage := Now; // for packages that might take longer. end; end; end; end else begin if fConnection.LastError <> WSAETIMEDOUT then begin fConnection.CloseSocket; Break; // disconnected end; Idle; if IsServer then begin if ((Now - fLastPackage) > (PingTimeout * (1.0 / (24 * 60 * 60)))) then begin fInternalWriteLock.Acquire; try fConnection.SendString(Chr(ScCmdNoAck)+#0#0#0#0+chr(ScCmdNoAck_Timeout)); fConnection.CloseSocket; finally fInternalWriteLock.Release; end; exit; end; end else begin if (lServerExpectAck <> 0) then begin if (Now - fLastPackage) > (PingTimeout * (1.0 / (24 * 60 * 60))) then begin fInternalWriteLock.Acquire; try fConnection.SendString(Chr(ScCmdNoAck)+#0#0#0#0+chr(ScCmdNoAck_Timeout)); fConnection.CloseSocket; finally fInternalWriteLock.Release; end; exit; end; end else begin if (Now - fLastPackage) > (PingFrequency * (1.0 / (24 * 60 * 60))) then begin lServerExpectAck := GetTickCount; if lServerExpectAck = 0 then lServerExpectAck := 1; bl[0] := ScCmdPing; Move(lServerExpectAck, bl[1], 4); fInternalWriteLock.Acquire; try fConnection.SendBuffer(@bl, 5); finally fInternalWriteLock.Release; end; end; end; end; end; end; except end; lRetry := false; try Disconnect; except end; Disconnected(lRetry); until not lRetry; end; function TROSynSuperChannelWorker.GenerateId: Integer; begin fWriteLock.Acquire; try if IsServer then begin fLastId := fLastId - 1; if fLastId > -1 then fLastId := -1; end else begin fLastId := fLastId + 1; if fLastId < 1 then fLastId := 1; end; Result := fLastId; finally fWriteLock.Release; end; end; procedure TROSynSuperChannelWorker.Idle; begin end; function TROSynSuperChannelWorker.IntSendData(Id: Integer; aData: TStream): IROSynapsePackageAck; var Buffer: array[0.. 2047] of byte; Len: Integer; begin fLastData := Now; Result := TROSynPackageAck.Create(Self, Id); fWaitingAckList.Add(Result); Buffer[0] := ScCmdPackage; Move(Id, Buffer[1], 4); Len := aData.Size; Move(Len, Buffer[5], 4); aData.Seek(0, soFromBeginning); fInternalWriteLock.Acquire; try Len := aData.Read(Buffer[9], sizeof(Buffer) - 9); fConnection.SendBuffer(@Buffer, Len + 9); if Len > 0 then begin repeat Len := aData.Read(Buffer[0], Sizeof(Buffer)); fConnection.SendBuffer(@Buffer[0], Len); fLastData := Now; until Len = 0; end; finally fInternalWriteLock.Release; end; end; procedure TROSynSuperChannelWorker.ProcessOption(Data: TStream); var lCmd, lData: UTF8String; lPos: Integer; begin SetLength(lData, Data.Size); Data.Position := 0; Data.Read(lData[1], Length(lData)); lPos := Pos(#13, lData); if lPos = 0 then begin lCmd := lData; lData := ''; end else begin lCmd := copy(lData, 1, lPos-1); Delete(lData, 1, lPos); end; if lCmd = 'SKIPACK' then begin fRemoteSkipAck := lData <> 'OFF'; end; end; procedure TROSynSuperChannelWorker.SendError(Id: Integer; Error: Byte); var Buffer: array[0..5] of Byte; begin buffer[0] := ScCmdNoAck; Move(ID, Buffer[1], 4); Buffer[5] := Error; fInternalWriteLock.Acquire; try fConnection.SendBuffer(@Buffer, 6); finally fInternalWriteLock.Release; end; end; procedure TROSynSuperChannelWorker.SendOptions(aData: UTF8String); var Buffer: array of byte; Len: Integer; begin fWriteLock.Acquire; try fLastData := Now; SetLength(Buffer, Length(aData)+5); Buffer[0] := ScCmdOptions; Len := Length(aData); Move(Len, Buffer[1], 4); Move(aData[1], Buffer[5], Length(aDAta)); fInternalWriteLock.Acquire; try fConnection.SendBuffer(@Buffer[0], Length(Buffer)); finally fInternalWriteLock.Release; end; finally fWriteLock.Release; end; end; function TROSynSuperChannelWorker.SendPackage(aData: TStream; Id: Integer = 0): IROSynapsePackageAck; begin fWriteLock.Acquire; try if Id = 0 then begin id := GenerateId; end; Result := IntSendData(Id, aData); finally fWriteLock.Release; end; end; procedure TROSynSuperChannelWorker.SetAckDetails(Id: Integer; Oke: Boolean; ErrorNo: Integer); var i: Integer; rp: IROSynapsePackageAck; begin fWaitingAckList.Lock; try for i := fWaitingAckList.Count -1 downto 0 do begin if IROSynapsePackageAck(fWaitingAckList[i]).AckNo = Id then begin rp := IROSynapsePackageAck(fWaitingAckList[i]); rp.RemoveFromList; rp.NoAckError := ErrorNo; if Oke then rp.AckState := rAck else rp.AckState := rNoAck; rp.Event.SetEvent; exit; end; end; finally fWaitingAckList.Unlock; end; end; procedure TROSynSuperChannelWorker.SetSkipAck(const Value: Boolean); begin fSkipAck := Value; end; procedure TROSynSuperChannelWorker.SupportsOptions; begin if fSkipAck then SendOptions('SKIPACK'#13'ON'); fRemoteSupportsOptions := true; end; class procedure TROSynSuperChannelWorker.WaitForAck(pkg: IROSynapsePackageAck; Timeout: Integer = 60000); begin try if pkg.Event = nil then exit; // skipack if pkg.Event.WaitFor(Timeout) <> wrSignaled then raise EROSynapseTimeout.Create('Timeout'); if pkg.AckState <> rAck then begin case pkg.NoAckError of ScCmdNoAck_MsgTooLarge: raise eROMessageTooLarge.Create('Message too large'); else raise EROException.Create('Unknown error'); end; end; finally pkg.RemoveFromList; end; end; procedure TROSynSuperChannelWorker.BeginWriteLock; begin fWriteLock.Acquire; end; procedure TROSynSuperChannelWorker.EndWriteLock; begin fWriteLock.Release; end; function TROSynSuperChannelWorker.GetDefaultResponse: string; begin Result := 'ROSC:Invalid connection string'; end; { TROSynPackageAck } constructor TROSynPackageAck.Create(aOwner: TROSynSuperChannelWorker; aId: Integer); begin inherited Create; if not (aOwner.fSkipAck and aOwner.fRemoteSupportsOptions) then fEvent := TROEvent.Create(nil, true, false, ''); fAckNo := aId; FOwner := aOwner; // if FOwner <> nil then FOwner.fWaitingAckList.Add(Self as IROSynapsePackageAck); end; destructor TROSynPackageAck.Destroy; begin fEvent.Free; inherited; end; function TROSynPackageAck.GetAckNo: Integer; begin result := fAckNo; end; function TROSynPackageAck.GetAckState: TROSynapseAckState; begin Result := FAckState; end; function TROSynPackageAck.GetEvent: TROEvent; begin Result := fEvent; end; function TROSynPackageAck.GetNoAckError: Integer; begin Result := FAckNoError; end; {$ifndef fpc} function InterlockedExchangePointer(var Target: Pointer; Value: Pointer): Pointer; begin // Result := Pointer(InterlockedExchange64(Int64(Target), Int64(Value))); // For delphi for win64; once that's out Result := Pointer(InterlockedExchange(Integer(Target), Integer(Value))); end; {$else} function InterlockedExchangePointer(var Target: Pointer; Value: Pointer): Pointer; begin result := InterlockedExchange(target, value); end; {$endif} procedure TROSynPackageAck.RemoveFromList; var lOwner: TROSynSuperChannelWorker; begin lOwner := InterlockedExchangePointer(Pointer(FOwner), nil); if lOwner <> nil then begin lOwner.fWaitingAckList.Remove(self as IROSynapsePackageAck); // lOwner := nil; end; end; procedure TROSynPackageAck.SetAckState(I: TROSynapseAckState); begin FAckState := I; end; procedure TROSynPackageAck.SetNoAckError(I: Integer); begin FAckNoError := i; end; end.