unit uROZeroConfStreamWorker; {$I RemObjects.inc} {.$DEFINE uROZeroConfHub_DEBUG} interface uses {$IFDEF MSWINDOWS}Windows, SyncObjs,{$ENDIF}SysUtils, Classes, uROClasses, uROThread, uROClient; type TROHubBytes = AnsiString; EROZeroConfHubException = class(Exception) private fCode: integer; public constructor Create(const Msg: string; aCode: Integer); constructor CreateFmt(const Msg: string; const Args: array of const;aCode: Integer); property Code: integer read fCode; end; TKeyValuePairItem = record ArgName: Unicodestring; ArgValue: TROHubBytes; end; TROStreamWorker = class; TKeyValuePair = array of TKeyValuePairItem; IROStreamWorkerCallbacks = interface procedure ControlCommand(aSender: TROStreamWorker; aName: UnicodeString; aArguments: TKeyValuePair); procedure Data(aSender: TROStreamWorker; aData: TROHubBytes); procedure Disconnected(aSender: TROStreamWorker); end; TTCPStream = class protected function GetConnected: Boolean;virtual; abstract; public constructor Create; virtual; destructor Destroy; override; procedure Connect(AHost: string; aPort: Integer);virtual; abstract; procedure Stop;virtual; abstract; function Read(Buffer: Pointer; Count: Longint): Longint; virtual; abstract; function ReadExact(Buffer: Pointer; Count: Longint): Longint; function Write(const Buffer; Count: Longint): Longint; virtual; abstract; property Connected: Boolean read GetConnected; end; TROStreamWorker = class(TROThread) private fStream: TTCPStream; fUserData: Pointer; fCallbacks: IROStreamWorkerCallbacks; fIdGen: integer; fRemoteMaxLength: integer; fMaxLength: integer; fStarted: Boolean; fCS: TRTLCriticalSection; procedure ControlPackage(aData: TROHubBytes); protected procedure Execute; override; function ReadWelcome: Boolean; public constructor Create(aCallbacks: IROStreamWorkerCallbacks; aStream: TTCPStream); destructor Destroy; override; function GenerateID(aPositive: boolean): integer; procedure Start; procedure SendControlPackage(aCommand: UnicodeString; aArguments: TKeyValuePair); overload; procedure SendControlPackage(aCommand: UnicodeString; aArguments: array of TKeyValuePairItem); overload; procedure SendData(aData: TROHubBytes); property UserData: Pointer read fUserData write fUserData; property RemoteMaxLength: integer read fRemoteMaxLength write fRemoteMaxLength default -1; property MaxLength: integer read fMaxLength write fMaxLength default 1024*1024; end; function Int32FromBuffer(aBuf: TROHubBytes): integer; overload; function Int32FromBuffer(aBuf: PAnsiChar): integer; overload; function Int32ToBuffer(aValue: integer): TROHubBytes; function UnicodeStringToBytes(AStr: UnicodeString): TROHubBytes; function BytesToUnicodeString(aData: TROHubBytes): UnicodeString; function EncodePackage(aCommand: Unicodestring; aWriteTotalLength: Boolean; aPrefix: TROHubBytes; aArguments: TKeyValuePair): TROHubBytes; procedure DecodePackage(aData: TROHubBytes; aSkipBytes: integer; out aName: Unicodestring; out aArguments: TKeyValuePair); function GetKVPair(aName: UnicodeString; aValue: TROHubBytes): TKeyValuePairItem; implementation function Int32FromBuffer(aBuf: PAnsiChar): integer; begin {$IFDEF ENDIAN_BIG} Result := ord(aBuf[3])+ ord(aBuf[2])*$100+ ord(aBuf[1])*$10000+ ord(aBuf[0])*$1000000; {$ELSE} Result := ord(aBuf[0])+ ord(aBuf[1])*$100+ ord(aBuf[2])*$10000+ ord(aBuf[3])*$1000000; {$ENDIF} end; function Int32FromBuffer(aBuf: TROHubBytes): integer; begin Result := Int32FromBuffer(PAnsiChar(aBuf)); end; function Int32ToBuffer(aValue: integer): TROHubBytes; begin SetLength(Result,4); {$IFDEF ENDIAN_BIG} Result[4] := AnsiChar((aValue and $000000ff) shr 0); Result[3] := AnsiChar((aValue and $0000ff00) shr 8); Result[2] := AnsiChar((aValue and $00FF0000) shr 16); Result[1] := AnsiChar((aValue and $FF000000) shr 24); {$ELSE} Result[1] := AnsiChar((aValue and $000000ff) shr 0); Result[2] := AnsiChar((aValue and $0000ff00) shr 8); Result[3] := AnsiChar((aValue and $00FF0000) shr 16); Result[4] := AnsiChar((aValue and $FF000000) shr 24); {$ENDIF} end; function UnicodeStringToBytes(AStr: UnicodeString): TROHubBytes; begin Result := UTF8Encode(AStr); end; function BytesToUnicodeString(aData: TROHubBytes): UnicodeString; begin Result := UTF8ToString(aData); end; procedure DecodePackage(aData: TROHubBytes; aSkipBytes: integer; out aName: Unicodestring; out aArguments: TKeyValuePair); function ReadInt32(var p: PAnsiChar): integer; begin Result := Int32FromBuffer(p); inc(p,4); end; { procedure ReadByteArray(var p: PAnsiChar; var ByteArray: TROHubBytes); var lLength: integer; begin lLength:= ReadInt32(p); SetString(ByteArray,p,lLength); inc(p, lLength); end; } function ReadAnsiString(var p: PAnsiChar): Ansistring; var lLength: integer; begin lLength:= ReadInt32(p); SetString(Result,p,lLength); inc(p, lLength); end; function ReadUTF8String(var p: PAnsiChar): Unicodestring; begin Result := UTF8ToString(ReadAnsiString(p)); end; var lArgCount: integer; i: integer; p: PAnsiChar; begin p := PAnsiChar(aData); if aSkipBytes > 0 then p := p + aSkipBytes; aName := ReadUTF8String(p); lArgCount := ReadInt32(p); SetLength(aArguments,lArgCount); for i := 0 to lArgCount-1 do begin aArguments[i].ArgName := ReadUTF8String(p); aArguments[i].ArgValue := ReadAnsiString(p); // ReadByteArray(p, aArguments[i].ArgValue); end; end; function EncodePackage(aCommand: Unicodestring; aWriteTotalLength: Boolean; aPrefix: TROHubBytes; aArguments: TKeyValuePair): TROHubBytes; procedure WriteInt32(AStream: TStream; AValue: Integer); var a: TROHubBytes; begin a := Int32ToBuffer(AValue); AStream.Write(pointer(a)^, SizeOf(Integer)); end; procedure WriteAnsiString(AStream: TStream; AStr: AnsiString); begin WriteInt32(AStream,Length(Astr)); AStream.Write(pointer(Astr)^, Length(Astr)); end; procedure WriteUTF8String(AStream: TStream; AStr: UnicodeString); begin WriteAnsiString(AStream, UTF8Encode(AStr)); end; var Str: TROBinaryMemoryStream; i: integer; begin str := TROBinaryMemoryStream.Create; try i := 0; if aWriteTotalLength then begin Str.Write(i, SizeOf(integer)); end; if Length(aPrefix) > 0 then Str.Write(pointer(aPrefix)^,Length(aPrefix)); WriteUTF8String(str,aCommand); WriteInt32(str, Length(aArguments)); if Length(aArguments) <> 0 then begin for i := 0 to Length(aArguments)-1 do begin WriteUTF8String(str,aArguments[i].ArgName); WriteAnsiString(str,aArguments[i].ArgValue); end; end; if (aWriteTotalLength) then begin Str.Position := 0; WriteInt32(str, Str.Size-4); end; // SetString(Result, PAnsiChar(Str.Memory), Str.Size); Result := Str.ToString; finally Str.Free; end; end; function GetKVPair(aName: UnicodeString; aValue: TROHubBytes): TKeyValuePairItem; begin result.ArgName := aName; result.ArgValue := aVAlue; end; {$IFDEF DELPHI10UP}{$REGION 'TTCPStream methods'}{$ENDIF DELPHI10UP} { TTCPStream } constructor TTCPStream.Create; begin inherited; end; destructor TTCPStream.Destroy; begin Stop; inherited; end; function TTCPStream.ReadExact(Buffer: Pointer; Count: Integer): Longint; var lRecv: Integer; p: PAnsiChar; begin result := 0; p := Buffer; {$IFDEF uROZeroConfHub_DEBUG} OutputDebugString(Pchar('ReadExact - start, cnt = '+inttoStr(Count))); try {$ENDIF} if not Connected then Exit; if count = 0 then Exit; while Count > 0 do begin if not Connected then Break; lRecv := Read(p, Count - Result); if lRecv = 0 then break; Result := Result + lRecv; if Result >= Count then break; Inc(p, lRecv); end; {$IFDEF uROZeroConfHub_DEBUG} finally OutputDebugString(Pchar('ReadExact - end, result = '+inttoStr(Result))); end; {$ENDIF} end; {$IFDEF DELPHI10UP}{$ENDREGION}{$ENDIF DELPHI10UP} {$IFDEF DELPHI10UP}{$REGION 'TROStreamWorker methods'}{$ENDIF DELPHI10UP} constructor EROZeroConfHubException.Create(const Msg: string; aCode: Integer); begin fCode := aCode; inherited Create(Msg); end; constructor EROZeroConfHubException.CreateFmt(const Msg: string; const Args: array of const; aCode: Integer); begin fCode := aCode; inherited CreateFmt(Msg,Args); end; { TROStreamWorker } procedure TROStreamWorker.ControlPackage(aData: TROHubBytes); var lName: Unicodestring; lArgs: TKeyValuePair; i: integer; begin DecodePackage(aData, 4, lName, lArgs); if lName = 'SetMaxLength' then begin for I := 0 to Length(lArgs) - 1 do if lArgs[i].ArgName = 'Length' then fRemoteMaxLength := Int32FromBuffer(lArgs[i].ArgValue); end else begin fCallbacks.ControlCommand(self, lName, lArgs); end; end; constructor TROStreamWorker.Create(aCallbacks: IROStreamWorkerCallbacks; aStream: TTCPStream); begin inherited Create(true); {$IFDEF FPC} InitCriticalSection(FCs); {$ELSE} InitializeCriticalSection(FCs); {$ENDIF} fStream := aStream; fCallbacks := aCallbacks; fRemoteMaxLength := -1; fMaxLength := 1024*1024; fStarted := false; end; destructor TROStreamWorker.Destroy; begin {$IFDEF uROZeroConfHub_DEBUG} OutputDebugString(PChar('BEFORE WORKER.Destroy:_____ Owner=$'+IntToHex(cardinal(self),8)+'; TCP=$'+IntToHex(cardinal(fStream),8))); {$ENDIF uROZeroConfHub_DEBUG} if Assigned(fStream) then begin fStream.Stop; FreeAndNil(fStream); end; inherited Destroy; {$IFDEF uROZeroConfHub_DEBUG} OutputDebugString(PChar('AFTER WORKER.Destroy:_____ Owner=$'+IntToHex(cardinal(self),8))); {$ENDIF uROZeroConfHub_DEBUG} {$IFDEF FPC} DoneCriticalSection(FCs); {$ELSE} DeleteCriticalSection(FCs); {$ENDIF} end; procedure TROStreamWorker.Execute; var lLength: array[0..3] of AnsiChar; lLen: Integer; lBuffer: TROHubBytes; begin if not ReadWelcome then exit; while true do begin if fStream.ReadExact(@lLength, 4) <> 4 then break; lLen := Int32FromBuffer(@lLength); if lLen > fMaxLength then begin fStream.Stop; exit; end; if lLen > 0 then begin SetLength(lBuffer, lLen); if fStream.ReadExact(pointer(lBuffer), lLen) <> lLen then break; if (lLen > 8) and (PInteger(lBuffer)^ = 0) then begin ControlPackage(lBuffer); end else begin fCallbacks.Data(Self, lBuffer); end; end; end; fCallbacks.Disconnected(self); end; function TROStreamWorker.GenerateID(aPositive: boolean): integer; var lRes: integer; begin lRes := 0; while (lRes = 0) do begin lRes := Abs(InterlockedIncrement(fIdGen)); if (not aPositive) then lRes := -lRes; end; // Package ID 0 is reserved for control packages Result := lRes; end; function TROStreamWorker.ReadWelcome: Boolean; var lData: array[0..3] of AnsiChar; begin if fStream.ReadExact(@lData, 4) <> 4 then begin result := false; exit; end; if (lData[0] <> 'R') or (lData[1] <> 'O') or (lData[2] <> 'S') or (lData[3] <> 'W') then begin fStream.Stop; result := false; exit; end; SendControlPackage('SetMaxLength', [GetKVPair('Length', Int32ToBuffer(fMaxLength))]); result := true; end; procedure TROStreamWorker.SendControlPackage(aCommand: UnicodeString; aArguments: TKeyValuePair); begin SendData(EncodePackage(aCommand, True, Int32ToBuffer(0), aArguments)); end; {$O-} procedure TROStreamWorker.SendControlPackage(aCommand: UnicodeString; aArguments: array of TKeyValuePairItem); var lPair: TKeyValuePair; i: Integer; begin SetLength(lPair, Length(aArguments)); for i :=0 to Length(aArguments) -1 do begin lPair[i] := aArguments[i]; end; SendData(EncodePackage(aCommand, True, Int32ToBuffer(0), lPair)); end; procedure TROStreamWorker.SendData(aData: TROHubBytes); begin EnterCriticalSection(fcs); try fStream.Write(pointer(aData)^, Length(aData)); finally LeaveCriticalSection(fcs); end; end; {$O+} procedure TROStreamWorker.Start; begin if fStarted then Exit; fStarted := true; SendData(TROHubBytes('ROSW')); Suspended := False; end; {$IFDEF DELPHI10UP}{$ENDREGION}{$ENDIF DELPHI10UP} end.