////////////////////////////////////////////////// // SQL Server Data Access Components // Copyright © 1998-2007 Core Lab. All right reserved. // MSServiceBroker ////////////////////////////////////////////////// {$IFNDEF CLR} {$I Sdac.inc} unit MSServiceBroker; {$ENDIF} interface uses SysUtils, Classes, DB,{$IFDEF VER6P} Variants,{$ENDIF}{$IFNDEF CLR} CLRClasses,{$ENDIF} CRThread, MSAccess, OLEDBAccess; const SEventNotificationType = 'http://schemas.microsoft.com/SQL/Notifications/EventNotification'; SQueryNotificationType = 'http://schemas.microsoft.com/SQL/Notifications/QueryNotification'; SFailedRemoteServiceBindingType = 'http://schemas.microsoft.com/SQL/ServiceBroker/BrokerConfigurationNotice/FailedRemoteServiceBinding'; SFailedRouteType = 'http://schemas.microsoft.com/SQL/ServiceBroker/BrokerConfigurationNotice/FailedRoute'; SMissingRemoteServiceBindingType = 'http://schemas.microsoft.com/SQL/ServiceBroker/BrokerConfigurationNotice/MissingRemoteServiceBinding'; SMissingRouteType = 'http://schemas.microsoft.com/SQL/ServiceBroker/BrokerConfigurationNotice/MissingRoute'; SDialogTimerType = 'http://schemas.microsoft.com/SQL/ServiceBroker/DialogTimer'; SEndDialogType = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'; SErrorType = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error'; SDescriptionType = 'http://schemas.microsoft.com/SQL/ServiceBroker/ServiceDiagnostic/Description'; SQueryType = 'http://schemas.microsoft.com/SQL/ServiceBroker/ServiceDiagnostic/Query'; SStatusType = 'http://schemas.microsoft.com/SQL/ServiceBroker/ServiceDiagnostic/Status'; SEchoType = 'http://schemas.microsoft.com/SQL/ServiceBroker/ServiceEcho/Echo'; type // TMSMessageStatus = (msReady, msReceived, msIncomplete, msRetained); // Status of the message. For messages returned by the RECEIVE command, the status is always 0. Messages in the queue may contain one of the following values: 0=Ready, 1=Received message, 2=Not yet complete, 3=Retained sent message TMSMessageValidation = (mvEmpty, mvNone, mvXML); TMSServiceBroker = class; TMSConversation = class; TMSMessage = class protected FBody: TBytes; FConversation: TMSConversation; FParams: TStringList; FParamValues: TStringList; // FStatus: TMSMessageStatus; // Status of the message. For messages returned by the RECEIVE command, the status is always 0. Messages in the queue may contain one of the following values: 0=Ready, 1=Received message, 2=Not yet complete, 3=Retained sent message FQueuingOrder: Int64; // Message order number within the queue. FConversationGroupId: TGuid; // Identifier for the conversation group that this message belongs to. FConversationHandle: TGuid; // Handle for the conversation that this message is part of. FMessageSequenceNumber: Int64;// Sequence number of the message within the conversation. // FServiceName: WideString; // Name of the service that the conversation is to. // FServiceId: integer; // SQL Server object identifier of the service that the conversation is to. FServiceContractName: WideString; // Name of the contract that the conversation follows. // FServiceContractId: integer; // SQL Server object identifier of the contract that the conversation follows. FMessageType: WideString; // {DEFAULT, http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog} Name of the message type that describes the message. FMessageTypeId: integer; // SQL Server object identifier of the message type that describes the message. FValidation: TMSMessageValidation; // Validation used for the message. E=Empty N=None X=XML FIsEmpty: boolean; FMessageId: TGuid; // Unique identifier for the message. function GetAsString: string; function GetAsWideString: WideString; procedure Fill(Source: TMSQuery); function GetParams: TStringList; function GetParamValues: TStringList; property Params: TStringList read GetParams; property ParamValues: TStringList read GetParamValues; public destructor Destroy; override; property AsString: string read GetAsString; property AsWideString: WideString read GetAsWideString; property AsBytes: TBytes read FBody; property Conversation: TMSConversation read FConversation; // property Status: TMSMessageStatus read FStatus; // Status of the message. For messages returned by the RECEIVE command, the status is always 0. Messages in the queue may contain one of the following values: 0=Ready, 1=Received message, 2=Not yet complete, 3=Retained sent message property QueuingOrder: Int64 read FQueuingOrder; // Message order number within the queue. // property ConversationGroupId: TGuid read FConversationGroupId; // Identifier for the conversation group that this message belongs to. // property ConversationHandle: TGuid read FConversationHandle; // Handle for the conversation that this message is part of. property MessageSequenceNumber: Int64 read FMessageSequenceNumber;// Sequence number of the message within the conversation. // property ServiceName: WideString read FServiceName; // Name of the service that the conversation is to. // property ServiceId: integer read FServiceId; // SQL Server object identifier of the service that the conversation is to. // property ServiceContractName: WideString read FServiceContractName; // Name of the contract that the conversation follows. // property ServiceContractId: integer read FServiceContractId; // SQL Server object identifier of the contract that the conversation follows. property MessageType: WideString read FMessageType; // {DEFAULT, http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog} Name of the message type that describes the message. property MessageTypeId: integer read FMessageTypeId; // SQL Server object identifier of the message type that describes the message. property Validation: TMSMessageValidation read FValidation; // Validation used for the message. E=Empty N=None X=XML property IsEmpty: boolean read FIsEmpty; property MessageId: TGuid read FMessageId; // Unique identifier for the message. end; TMSMessageEvent = procedure(Sender: TObject) of object; TMSConversationBeginEvent = procedure(Sender: TObject; Conversation: TMSConversation) of object; TMSConversationEndEvent = procedure(Sender: TObject; Conversation: TMSConversation; ErrorMessage: string; ErrorCode: integer) of object; TMSConversation = class protected FHandle: TGuid; FGroupId: TGuid; FFarService: string; FIsInitiator: boolean; FServiceBroker: TMSServiceBroker; FContractName: WideString; // FContractId: integer; function GetFarService: string; function GetGroupId: TGuid; // function GetContractId: integer; function GetContractName: WideString; procedure SetGroupId(const Value: TGuid); procedure InternalSend(const MessageBody: TBytes; const MessageType: WideString = ''; const IsEmpty: boolean = False); public constructor Create(ServiceBroker: TMSServiceBroker; const Handle: TGuid; const IsInitiator: boolean; const FarService: string = ''); procedure EndConversation(const Cleanup: boolean = False); procedure EndConversationWithError(const ErrorMessage: string; const ErrorCode: integer; const Cleanup: boolean = False); procedure Send(const MessageBody: TBytes; const MessageType: WideString = ''); overload; procedure Send(const MessageBody: string; const MessageType: WideString = ''); overload; procedure SendEmpty(const MessageType: WideString = ''); {$IFNDEF CLR} {$IFDEF VER6P} procedure Send(const MessageBody: WideString; const MessageType: WideString = ''); overload; {$ENDIF} {$ENDIF} procedure BeginTimer(const Timeout: integer); function GetTransmissionStatus: string; property Handle: TGuid read FHandle; property GroupId: TGuid read GetGroupId write SetGroupId; property ServiceBroker: TMSServiceBroker read FServiceBroker; property FarService: string read GetFarService; property IsInitiator: boolean read FIsInitiator; property ContractName: WideString read GetContractName; // Name of the contract that the conversation follows. // property ContractId: integer read GetContractId; // SQL Server object identifier of the contract that the conversation follows. end; TMSServiceBroker = class(TComponent) protected FQuery: TMSQuery; FReceiveSQL: string; FConversations: TList; FMsgs: TThreadList; FReceivePrev: boolean; FDesignCreate: boolean; FService, FQueue: string; FFetchRows: integer; FWaitTimeout: integer; FOnBeginConversation: TMSConversationBeginEvent; FOnEndConversation: TMSConversationEndEvent; // Async mode FOnMessage: TMSMessageEvent; FAsyncNotification: boolean; FStreamedAsyncNotification: boolean; FListener: TOLEDBThreadWrapper; FStopProcessing: boolean; function GetConversation(Index: Integer): TMSConversation; function GetConversationCount: integer; function GetConversationIndexByHandle(Handle: TGuid): integer; procedure EndConversations; function GetCurrentMessage: TMSMessage; procedure GetServiceBrockerObjNames(List: TStrings; SQL: string); function GetQueue: string; function GetConnection: TMSConnection; procedure SetConnection(const Value: TMSConnection); procedure SetService(Value: string); procedure SetFetchRows(Value: integer); procedure SetWaitTimeout(Value: integer); procedure SetOnMessage(Value: TMSMessageEvent); procedure ConnectChange(Sender: TObject; Connecting: Boolean); procedure Loaded; override; procedure AssignTo(Dest: TPersistent); override; procedure CheckInactive; procedure BeginConnection; procedure EndConnection; class function GenerateReceiveSQL(const Queue: string; Conversation: TMSConversation; const FetchRows: integer; const TimeoutMSec: integer): string; class function ReceiveFromServer(Query: TMSQuery; Msgs: TThreadList): integer; // Received messages count function BeginDialogInternal( const TargetService: string; const TargetDatabase: string; const Contract: string; const LifeTime: integer; const UseEncryption: boolean; RelatedConversation: TMSConversation; const GroupId: TGuid ): TMSConversation; // Async mode procedure DoThreadMessage(Sender: TObject; Event: TObject); procedure DoException(Sender: TObject; E: Exception; var Fail: boolean); procedure Start; procedure Stop; procedure SetAsyncNotification(const Value: boolean); public constructor Create(AOwner: TComponent); override; destructor Destroy; override; function BeginDialog( const TargetService: string; const TargetDatabase: string = ''; const UseEncryption: boolean = True; RelatedConversation: TMSConversation = nil; const LifeTime: integer = 0; const Contract: string = '' ): TMSConversation; overload; function BeginDialog( const TargetService: string; const TargetDatabase: string; const UseEncryption: boolean; const GroupId: TGuid; const LifeTime: integer = 0; const Contract: string = '' ): TMSConversation; overload; property Conversations[Index: Integer]: TMSConversation read GetConversation; default; property ConversationCount: integer read GetConversationCount; property CurrentMessage: TMSMessage read GetCurrentMessage; function Receive(Conversation: TMSConversation = nil): boolean; procedure GetQueueNames(List: TStrings); procedure GetServiceNames(List: TStrings); procedure GetContractNames(List: TStrings); procedure GetMessageTypeNames(List: TStrings); procedure CreateServerObjects; procedure DropServerObjects; property Queue: string read GetQueue; published property Connection: TMSConnection read GetConnection write SetConnection; property Service: string read FService write SetService; property AsyncNotification: boolean read FAsyncNotification write SetAsyncNotification default False; property FetchRows: integer read FFetchRows write SetFetchRows default 0; property WaitTimeout: integer read FWaitTimeout write SetWaitTimeout default - 1; property OnMessage: TMSMessageEvent read FOnMessage write SetOnMessage; property OnBeginConversation: TMSConversationBeginEvent read FOnBeginConversation write FOnBeginConversation; property OnEndConversation: TMSConversationEndEvent read FOnEndConversation write FOnEndConversation; end; implementation uses {$IFDEF CLR} System.Runtime.InteropServices, System.Text, System.XML, System.IO, System.Reflection, {$ELSE} CRXml, {$ENDIF} Windows, ComObj, ActiveX, DAConsts, DBAccess, MSConsts, MemData, OLEDBC, MemUtils; function GuidToUniqueidentifier(Guid: TGuid): string; var Str: string; begin Str := GuidToString(Guid); Result := Copy(Str, 2, Length(Str) - 2); end; { TListenThread } type TListenThread = class(TCRThread) private FConnection: TMSConnection; FQuery: TMSQuery; FMsgs: TThreadList; protected procedure InternalExecute; override; public constructor Create(Owner: TCRThreadWrapper); override; destructor Destroy; override; end; constructor TListenThread.Create(Owner: TCRThreadWrapper); begin inherited Create(Owner); FConnection := TMSConnection.Create(nil); FQuery := TMSQuery.Create(nil); FQuery.Connection := FConnection; end; destructor TListenThread.Destroy; begin Terminate; WaitFor; FQuery.Free; FConnection.Free; inherited; end; procedure TListenThread.InternalExecute; var MsgCount: integer; begin OleCheck(CoInitializeEx(nil, COINIT_APARTMENTTHREADED)); try FQuery.Connection.Connect; while not Terminated do begin MsgCount := TMSServiceBroker.ReceiveFromServer(FQuery, FMsgs); PostEvent(TObject(MsgCount)); end; finally CoUninitialize; end; end; { TMSMessage } destructor TMSMessage.Destroy; begin FParams.Free; FParamValues.Free; inherited; end; function TMSMessage.GetAsString: string; begin Result := Encoding.Default.GetString(FBody); end; function TMSMessage.GetAsWideString: WideString; begin Result := Encoding.Unicode.{$IFDEF CLR}GetString{$ELSE}GetWideString{$ENDIF}(FBody); end; procedure TMSMessage.Fill(Source: TMSQuery); var Blob: MemData.TBlob; Handle: IntPtr; s: string; begin (*case Source.FieldByName('status').AsInteger of 0: FStatus := msReady; 1: FStatus := msReceived; 2: FStatus := msIncomplete; 3: FStatus := msRetained; else Assert(False); end;*) FQueuingOrder := (Source.FieldByName('queuing_order') as TLargeIntField).AsLargeInt; FConversationGroupId := (Source.FieldByName('conversation_group_id') as TGuidField).AsGuid; FConversationHandle := (Source.FieldByName('conversation_handle') as TGuidField).AsGuid; FMessageSequenceNumber := (Source.FieldByName('message_sequence_number') as TLargeIntField).AsLargeInt; // FServiceName := (Source.FieldByName('service_name') as TWideStringField).Value; // FServiceId := Source.FieldByName('service_id').AsInteger; FServiceContractName := (Source.FieldByName('service_contract_name') as TWideStringField).Value; // FServiceContractId := Source.FieldByName('service_contract_id').AsInteger; FMessageType := (Source.FieldByName('message_type_name') as TWideStringField).Value; FMessageTypeId := Source.FieldByName('message_type_id').AsInteger; s := UpperCase(Source.FieldByName('validation').AsString); Assert(Length(s) = 1, 'Length(' + s + ') <> 1'); case s[1] of 'E': // Empty FValidation := mvEmpty; 'N': // None FValidation := mvNone; 'X': // XML FValidation := mvXML; else Assert(False); end; FIsEmpty := Source.FieldByName('message_body').IsNull; FMessageId := (Source.FieldByName('message_id') as TGuidField).AsGuid; Blob := Source.GetBlob('message_body'); SetLength(FBody, Blob.Size); Handle := AllocGCHandle(FBody, True); try Blob.Read(0, 0, GetAddrOfPinnedObject(Handle)); finally FreeGCHandle(Handle); end; end; function TMSMessage.GetParams: TStringList; var TextReader: StringReader; Reader: XMLTextReader; i: integer; PrevNodeType: XmlNodeType; begin if (FParams = nil) and (Validation = mvXML) then begin FParams := TStringList.Create; FParamValues := TStringList.Create; TextReader := nil; Reader := nil; try TextReader := StringReader.Create(Encoding.Unicode.{$IFDEF CLR}GetString{$ELSE}GetWideString{$ENDIF}(FBody)); Reader := XMLTextReader.Create(TextReader); PrevNodeType := Reader.NodeType; while Reader.Read do begin if Reader.NodeType = ntElement then begin FParams.Add(Reader.Name); FParamValues.Add(''); end; if (Reader.NodeType = ntText) and (PrevNodeType = ntElement) then begin Assert(FParams.Count = FParamValues.Count); FParamValues[FParamValues.Count - 1] := Reader.Value; end; for i := 0 to Reader.AttributeCount - 1 do begin Reader.MoveToAttribute(i); FParams.Add(Reader.Name); FParamValues.Add(Reader.Value); end; PrevNodeType := Reader.NodeType; end; finally Reader.Free; TextReader.Free; end; end; Result := FParams; Assert(Result <> nil); end; function TMSMessage.GetParamValues: TStringList; begin GetParams; // fill FParams and FParamValues Result := FParamValues; Assert(Result <> nil); end; { TMSConversation } constructor TMSConversation.Create(ServiceBroker: TMSServiceBroker; const Handle: TGuid; const IsInitiator: boolean; const FarService: string = ''); begin inherited Create; Assert(ServiceBroker <> nil); FServiceBroker := ServiceBroker; FHandle := Handle; FFarService := FarService; FIsInitiator := IsInitiator; FServiceBroker.FConversations.Add(Self); if Assigned(FServiceBroker.FOnBeginConversation) then FServiceBroker.FOnBeginConversation(FServiceBroker, Self); end; procedure TMSConversation.EndConversation(const Cleanup: boolean = False); begin EndConversationWithError('', 0, Cleanup); end; procedure TMSConversation.EndConversationWithError(const ErrorMessage: string; const ErrorCode: integer; const Cleanup: boolean = False); var WithError: string; WithCleanUp: string; begin Assert(FServiceBroker <> nil); Assert(FServiceBroker.Connection <> nil); try if Assigned(FServiceBroker.FOnEndConversation) then FServiceBroker.FOnEndConversation(FServiceBroker, Self, ErrorMessage, ErrorCode); FServiceBroker.FConversations.Remove(Self); if (ErrorMessage <> '') and (ErrorCode <> 0) then WithError := ' WITH ERROR = :ErrorCode DESCRIPTION = :ErrorMessage'; if Cleanup then WithCleanUp := ' WITH CLEANUP'; FServiceBroker.FQuery.SQL.Text := 'END CONVERSATION :Handle ' + WithError + WithCleanUp; FServiceBroker.FQuery.ParamByName('Handle').DataType := ftGuid; FServiceBroker.FQuery.ParamByName('Handle').AsString := GuidToString(Handle); if FServiceBroker.FQuery.Params.Count > 1 then begin FServiceBroker.FQuery.ParamByName('ErrorCode').AsInteger := ErrorCode; FServiceBroker.FQuery.ParamByName('ErrorMessage').AsString := ErrorMessage; end; FServiceBroker.FQuery.Execute; finally FServiceBroker.EndConnection; Free; end; end; {$IFNDEF CLR} {$IFDEF VER6P} procedure TMSConversation.Send(const MessageBody: WideString; const MessageType: WideString = ''); var Buf: TBytes; begin Buf := Encoding.Default.{$IFNDEF VER5}GetBytes{$ELSE}GetBytesWide{$ENDIF}(MessageBody); Send(Buf, MessageType); end; {$ENDIF} {$ENDIF} function TMSConversation.GetTransmissionStatus: string; begin FServiceBroker.FQuery.SQL.Text := 'SELECT GET_TRANSMISSION_STATUS(CONVERT(uniqueidentifier, ''' + GuidToUniqueidentifier(Handle) + '''))'; FServiceBroker.FQuery.Execute; Result := FServiceBroker.FQuery.Fields[0].AsString; end; procedure TMSConversation.BeginTimer(const Timeout: integer); begin FServiceBroker.FQuery.SQL.Text := 'BEGIN CONVERSATION TIMER (CONVERT(uniqueidentifier, ''' + GuidToUniqueidentifier(Handle) + ''')) TIMEOUT = ' + IntToStr(Timeout); FServiceBroker.FQuery.Execute; end; function TMSConversation.GetFarService: string; begin if FFarService = '' then begin FServiceBroker.FQuery.SQL.Text := 'SELECT far_service FROM sys.conversation_endpoints WHERE conversation_handle = CONVERT(uniqueidentifier, ''' + GuidToUniqueidentifier(Handle) + ''')'; FServiceBroker.FQuery.Execute; FFarService := FServiceBroker.FQuery.Fields[0].AsString; end; Result := FFarService; end; function TMSConversation.GetGroupId: TGuid; begin if CompareGuid(FGroupId, DB_NULLGUID) then begin Assert(FServiceBroker <> nil); Assert(FServiceBroker.FQuery <> nil); // sys.conversation_endpoints FServiceBroker.FQuery.SQL.Text := 'SELECT e.conversation_group_id, e.service_contract_id, c.name ' + 'FROM sys.conversation_endpoints e, sys.service_contracts c ' + 'WHERE (e.conversation_handle = CONVERT(uniqueidentifier, ''' + GuidToUniqueidentifier(Handle) + ''')) AND (e.service_contract_id = c.service_contract_id)'; FServiceBroker.FQuery.Execute; FGroupId := StringToGUID(FServiceBroker.FQuery.Fields[0].AsString); // FContractId := FServiceBroker.FQuery.Fields[1].AsInteger; FContractName := FServiceBroker.FQuery.Fields[2].AsString; end; Result := FGroupId; end; (*function TMSConversation.GetContractId: integer; begin if CompareGuid(FGroupId, DB_NULLGUID) then GetGroupId; // Fill FGroupId, FContractId, FContractName Result := FContractId; end;*) function TMSConversation.GetContractName: WideString; begin if CompareGuid(FGroupId, DB_NULLGUID) then GetGroupId; // Fill FGroupId, FContractId, FContractName Result := FContractName; end; procedure TMSConversation.SetGroupId(const Value: TGuid); begin Assert(FServiceBroker <> nil); Assert(FServiceBroker.Connection <> nil); FServiceBroker.FQuery.SQL.Text := 'MOVE CONVERSATION CONVERT(uniqueidentifier, ''' + GuidToUniqueidentifier(Handle) + ''')'; FServiceBroker.FQuery.SQL.Add('TO CONVERT(uniqueidentifier, ''' + GuidToUniqueidentifier(Value) + ''')'); FServiceBroker.FQuery.Execute; FGroupId := Value; end; procedure TMSConversation.SendEmpty(const MessageType: WideString = ''); begin InternalSend(nil, MessageType, True); end; procedure TMSConversation.Send(const MessageBody: string; const MessageType: WideString = ''); var Buf: TBytes; begin Buf := Encoding.Default.GetBytes(MessageBody); InternalSend(Buf, MessageType); end; procedure TMSConversation.Send(const MessageBody: TBytes; const MessageType: WideString = ''); begin InternalSend(MessageBody, MessageType); end; procedure TMSConversation.InternalSend(const MessageBody: TBytes; const MessageType: WideString = ''; const IsEmpty: boolean = False); var ParamIndex: integer; begin Assert(FServiceBroker <> nil); Assert(FServiceBroker.Connection <> nil); ParamIndex := 0; FServiceBroker.FQuery.SQL.BeginUpdate; try FServiceBroker.FQuery.SQL.Text := 'SEND ON CONVERSATION CONVERT(uniqueidentifier, ''' + GuidToUniqueidentifier(Handle) + ''')'; if MessageType <> '' then FServiceBroker.FQuery.SQL.Add('MESSAGE TYPE :MessageType' + LineSeparator); if not IsEmpty then FServiceBroker.FQuery.SQL.Add('(:MessageBody);'); finally FServiceBroker.FQuery.SQL.EndUpdate; end; if MessageType <> '' then begin FServiceBroker.FQuery.Params[ParamIndex].DataType := ftWideString; FServiceBroker.FQuery.Params[ParamIndex].Value := MessageType; FServiceBroker.FQuery.Params[ParamIndex].ParamType := ptInput; Inc(ParamIndex); end; if not IsEmpty then begin FServiceBroker.FQuery.Params[ParamIndex].DataType := ftBytes; FServiceBroker.FQuery.Params[ParamIndex].Value := MessageBody; FServiceBroker.FQuery.Params[ParamIndex].ParamType := ptInput; end; FServiceBroker.FQuery.Execute; end; { TMSServiceBroker } constructor TMSServiceBroker.Create(AOwner: TComponent); begin inherited Create(AOwner); FWaitTimeout := -1; FConversations := TList.Create; FQuery := TMSQuery.Create(nil); FQuery.ReadOnly := True; FMsgs := TThreadList.Create; FDesignCreate := csDesigning in ComponentState; end; destructor TMSServiceBroker.Destroy; var i: integer; MsgList: TList; begin Stop; EndConversations; Connection := nil; // UnregisterClient if FMsgs <> nil then begin MsgList := FMsgs.LockList; try for i := 0 to MsgList.Count - 1 do TObject(MsgList[i]).Free; finally FMsgs.UnlockList; FMsgs.Free; end; end; FQuery.Free; FConversations.Free; inherited; end; procedure TMSServiceBroker.Loaded; begin inherited; FDesignCreate := False; try if FStreamedAsyncNotification then AsyncNotification := True; except if csDesigning in ComponentState then ApplicationHandleException(Self) else raise; end; end; procedure TMSServiceBroker.AssignTo(Dest: TPersistent); begin if Dest is TMSServiceBroker then begin TMSServiceBroker(Dest).Connection := Connection; TMSServiceBroker(Dest).Service := Service; TMSServiceBroker(Dest).OnMessage := OnMessage; end else inherited; end; procedure TMSServiceBroker.BeginConnection; begin if Connection = nil then raise Exception.Create(SConnectionNotDefined); TDBAccessUtils.InternalConnect(Connection); end; procedure TMSServiceBroker.EndConnection; begin TDBAccessUtils.InternalDisconnect(Connection); end; procedure TMSServiceBroker.EndConversations; var i: integer; begin for i := ConversationCount - 1 downto 0 do Conversations[i].Free; FConversations.Clear; end; procedure TMSServiceBroker.CheckInactive; begin if AsyncNotification then if ([csUpdating, csDesigning] * ComponentState) <> [] then Stop else DatabaseError(SServiceBrokerAsync, Self); end; procedure TMSServiceBroker.ConnectChange(Sender: TObject; Connecting: Boolean); begin if not Connecting then begin Stop; EndConversations; end; end; function TMSServiceBroker.GetConversation(Index: Integer): TMSConversation; begin Result := TMSConversation(FConversations[Index]); end; function TMSServiceBroker.GetConversationCount: integer; begin Result := FConversations.Count; end; function TMSServiceBroker.GetConversationIndexByHandle(Handle: TGuid): integer; var i: integer; begin Result := -1; for i := 0 to FConversations.Count - 1 do if CompareGuid(Conversations[i].Handle, Handle) then begin Result := i; Exit; end; end; class function TMSServiceBroker.GenerateReceiveSQL(const Queue: string; Conversation: TMSConversation; const FetchRows: integer; const TimeoutMSec: integer): string; begin if FetchRows = 0 then Result := 'RECEIVE ' else Result := 'RECEIVE TOP(' + IntToStr(FetchRows) + ') '; Result := Result + 'message_id, message_body, status, queuing_order, conversation_group_id, conversation_handle, message_sequence_number, service_contract_name, service_contract_id, message_type_name, message_type_id, validation FROM ' + BracketIfNeed(Queue); if Conversation <> nil then Result := Result + ' WHERE conversation_handle = ' + 'CONVERT(uniqueidentifier, ''''' + GuidToUniqueidentifier(Conversation.Handle) + ''''')'; if TimeoutMSec <> - 1 then Result := 'WAITFOR (' + Result + '), TIMEOUT ' + IntToStr(TimeoutMSec); end; function TMSServiceBroker.BeginDialogInternal( const TargetService: string; const TargetDatabase: string; const Contract: string; const LifeTime: integer; const UseEncryption: boolean; RelatedConversation: TMSConversation; const GroupId: TGuid ): TMSConversation; var Param: TMSParam; begin if FService = '' then raise Exception.Create(SServiceNotDefined); BeginConnection; try FQuery.SQL.Clear; FQuery.SQL.Append('BEGIN DIALOG :ch FROM SERVICE :fs TO SERVICE :ts'); Param := FQuery.ParamByName('ch'); Param.DataType := ftGuid; Param.ParamType := ptOutput; Param := FQuery.ParamByName('fs'); Param.AsString := FService; Param := FQuery.ParamByName('ts'); Param.AsString := TargetService; if TargetDatabase <> '' then begin FQuery.SQL.Append(', :bi'); Param := FQuery.ParamByName('bi'); Param.AsWideString := TargetDatabase; end; if Contract <> '' then begin FQuery.SQL.Append(' ON CONTRACT :cn'); Param := FQuery.ParamByName('cn'); Param.AsString := Contract; end; if UseEncryption then FQuery.SQL.Append(' WITH ENCRYPTION = ON ') else FQuery.SQL.Append(' WITH ENCRYPTION = OFF '); if RelatedConversation <> nil then begin FQuery.SQL.Append(', RELATED_CONVERSATION = :rch '); Param := FQuery.ParamByName('rch'); Param.AsString := GUIDToString(RelatedConversation.Handle); end else if not CompareGuid(GroupId, DB_NULLGUID) then begin FQuery.SQL.Append(', RELATED_CONVERSATION_GROUP = :rcg '); Param := FQuery.ParamByName('rcg'); Param.AsString := GUIDToString(GroupId); end; if LifeTime <> 0 then FQuery.SQL.Append(', LIFETIME = ' + IntToStr(LifeTime) + ' '); FQuery.Execute; Param := FQuery.ParamByName('ch'); Result := TMSConversation.Create(Self, StringToGUID(Param.AsString), True, TargetService); except EndConnection; raise; end; end; function TMSServiceBroker.BeginDialog( const TargetService: string; const TargetDatabase: string = ''; const UseEncryption: boolean = True; RelatedConversation: TMSConversation = nil; const LifeTime: integer = 0; const Contract: string = ''): TMSConversation; begin Result := BeginDialogInternal(TargetService, TargetDatabase, Contract, LifeTime, UseEncryption, RelatedConversation, DB_NULLGUID); end; function TMSServiceBroker.BeginDialog( const TargetService: string; const TargetDatabase: string; const UseEncryption: boolean; const GroupId: TGuid; const LifeTime: integer = 0; const Contract: string = ''): TMSConversation; begin Result := BeginDialogInternal(TargetService, TargetDatabase, Contract, LifeTime, UseEncryption, nil, GroupId); end; function TMSServiceBroker.GetCurrentMessage: TMSMessage; var MsgList: TList; begin MsgList := FMsgs.LockList; try if MsgList.Count > 0 then Result := TMSMessage(MsgList[0]) else Result := nil; finally FMsgs.UnlockList; end; end; procedure TMSServiceBroker.GetServiceBrockerObjNames(List: TStrings; SQL: string); var QueueDS: TMSQuery; NameFld: TField; begin BeginConnection; List.Clear; QueueDS := nil; try QueueDS := TMSQuery.Create(nil); QueueDS.Connection := Connection; QueueDS.SQL.Text := SQL; if TOLEDBConnection(TMSAccessUtils.FIConnection(Connection)).DBMSPrimaryVer <= 8 then Exit; QueueDS.Open; NameFld := QueueDS.Fields[0]; while not QueueDS.Eof do begin List.Add(NameFld.AsString); QueueDS.Next end; if List is TStringList then TStringList(List).Sort; finally QueueDS.Free; EndConnection; end; end; function TMSServiceBroker.GetQueue: string; begin if FQueue = '' then begin BeginConnection; try FQuery.SQL.Text := 'SELECT q.name FROM sys.service_queues q, sys.services s WHERE (s.service_queue_id = q.object_id) AND (s.name = ''' + Service + ''')'; FQuery.Execute; FQueue := FQuery.Fields[0].AsString; finally EndConnection; end; end; Result := FQueue; end; function TMSServiceBroker.GetConnection: TMSConnection; begin Assert(FQuery <> nil); Result := FQuery.Connection; end; procedure TMSServiceBroker.SetConnection(const Value: TMSConnection); begin if Value <> Connection then begin Stop; EndConversations; if Connection <> nil then TDBAccessUtils.UnRegisterClient(Connection, Self); FQuery.Connection := Value; if Value <> nil then TDBAccessUtils.RegisterClient(Value, Self, ConnectChange); end; end; procedure TMSServiceBroker.SetService(Value: string); begin if FService <> Value then begin Stop; if FConversations.Count > 0 then if ([csUpdating, csDesigning] * ComponentState) <> [] then EndConversations else DatabaseError(SDialogActive, Self); FService := Value; FQueue := ''; end; end; procedure TMSServiceBroker.SetFetchRows(Value: integer); begin FFetchRows := Value; FReceiveSQL := ''; end; procedure TMSServiceBroker.SetWaitTimeout(Value: integer); begin FWaitTimeout := Value; FReceiveSQL := ''; end; procedure TMSServiceBroker.SetOnMessage(Value: TMSMessageEvent); begin CheckInactive; FOnMessage := Value; end; class function TMSServiceBroker.ReceiveFromServer(Query: TMSQuery; Msgs: TThreadList): integer; // Received messages count var Msg: TMSMessage; MsgList: TList; begin Assert(Query <> nil); Assert(Query.Connection <> nil); Result := 0; TDBAccessUtils.InternalConnect(Query.Connection); try Query.Execute; MsgList := Msgs.LockList; try while not Query.Eof do begin Msg := TMSMessage.Create; try Msg.Fill(Query); except Msg.Free; raise; end; MsgList.Add(Msg); Inc(Result); Query.Next; end; finally Msgs.UnlockList; end; Query.Close; finally // Query will automatically disconnect // TDBAccessUtils.InternalDisconnect(Query.Connection); end; end; function TMSServiceBroker.Receive(Conversation: TMSConversation = nil): boolean; var MsgList: TList; procedure RemoveCurrentMessage; var Msg: TMSMessage; begin Msg := TMSMessage(MsgList[0]); try if (Msg.MessageType = SEndDialogType) or (Msg.MessageType = SErrorType) then begin FConversations.Remove(Msg.Conversation); Msg.Conversation.Free; end; finally Msg.Free; MsgList.Delete(0); end; end; procedure ActualizeCurrentMessage; var Msg: TMSMessage; ConversationIndex: integer; i: integer; s: string; begin Msg := CurrentMessage; ConversationIndex := GetConversationIndexByHandle(Msg.FConversationHandle); if ConversationIndex < 0 then Msg.FConversation := TMSConversation.Create(Self, Msg.FConversationHandle, False) else Msg.FConversation := Conversations[ConversationIndex]; Msg.FConversation.FGroupId := Msg.FConversationGroupId; Msg.FConversation.FContractName := Msg.FServiceContractName; // Msg.FConversation.FContractId := Msg.FServiceContractId; if Assigned(FOnEndConversation) then begin if Msg.MessageType = SEndDialogType then FOnEndConversation(Self, Msg.Conversation, '', 0) else if Msg.MessageType = SErrorType then begin i := Msg.Params.IndexOf('Description'); s := Msg.ParamValues[i]; i := Msg.Params.IndexOf('Code'); i := StrToInt(Msg.ParamValues[i]); FOnEndConversation(Self, Msg.Conversation, s, i); end; end; end; begin if (Conversation <> nil) and AsyncNotification then raise Exception.Create(SServiceBrokerAsync); MsgList := FMsgs.LockList; try if (MsgList.Count > 0) and FReceivePrev then RemoveCurrentMessage; if (MsgList.Count = 0) and not AsyncNotification then begin if (FReceiveSQL = '') or (Conversation <> nil) then FReceiveSQL := GenerateReceiveSQL(Queue, Conversation, FetchRows, WaitTimeout); FQuery.SQL.Text := FReceiveSQL; ReceiveFromServer(FQuery, FMsgs); end; Result := MsgList.Count > 0; FReceivePrev := Result; if Result then ActualizeCurrentMessage; finally FMsgs.UnlockList; end; end; procedure TMSServiceBroker.GetQueueNames(List: TStrings); begin GetServiceBrockerObjNames(List, 'SELECT name FROM sys.service_queues'); end; procedure TMSServiceBroker.GetServiceNames(List: TStrings); begin GetServiceBrockerObjNames(List, 'SELECT name FROM sys.services'); end; procedure TMSServiceBroker.GetContractNames(List: TStrings); begin GetServiceBrockerObjNames(List, 'SELECT name FROM sys.service_contracts'); end; procedure TMSServiceBroker.GetMessageTypeNames(List: TStrings); begin GetServiceBrockerObjNames(List, 'SELECT name FROM sys.service_message_types'); end; procedure TMSServiceBroker.CreateServerObjects; begin Connection.ExecSQL( 'IF NOT EXISTS (SELECT * FROM sys.service_queues WHERE name = ''' + Service + '_QUEUE'') CREATE QUEUE ' + BracketIfNeed(Service + '_QUEUE') + #$D#$A + 'IF NOT EXISTS (SELECT * FROM sys.services WHERE name = ''' + Service + ''') CREATE SERVICE ' + BracketIfNeed(Service) + ' ON QUEUE ' + BracketIfNeed(Service + '_QUEUE') + '([DEFAULT])', []); end; procedure TMSServiceBroker.DropServerObjects; begin BeginConnection; try Stop; Connection.ExecSQL( 'IF EXISTS (SELECT * FROM sys.services WHERE name = ''' + Service + ''') DROP SERVICE ' + BracketIfNeed(Service) + #$D#$A + 'IF EXISTS (SELECT * FROM sys.service_queues WHERE name = ''' + Service + '_Queue'') DROP QUEUE ' + BracketIfNeed(Service + '_Queue'), []); finally EndConnection; end; end; procedure TMSServiceBroker.DoThreadMessage(Sender: TObject; Event: TObject); var i: integer; begin if Assigned(FOnMessage) then for i := 0 to Integer(Event) - 1do FOnMessage(Sender); end; procedure TMSServiceBroker.DoException(Sender: TObject; E: Exception; var Fail: boolean); begin if (E is EOLEDBError) then begin if EOLEDBError(E).ErrorCode = DB_E_CANCELED then Fail := False else if Connection.Connected then TMSAccessUtils.DoError(Connection, E, Fail); if Fail then Stop; end; end; procedure TMSServiceBroker.Start; var lt: TListenThread; begin if not FAsyncNotification then begin BeginConnection; if FService = '' then DatabaseError(SServiceNotDefined, Self); {if not Assigned(FOnMessage) then DatabaseError(SOnMessageNotAssigned, Self);} Assert(FListener = nil); FListener := TOLEDBThreadWrapper.Create(TListenThread, True); lt := TListenThread(FListener.Thread); Assert(lt.FConnection <> nil); lt.FConnection.Assign(Connection); lt.FConnection.LoginPrompt := False; lt.FQuery.SQL.Text := GenerateReceiveSQL(Queue, nil, FetchRows, MaxInt); lt.FMsgs := FMsgs; FListener.OnPostEvent := DoThreadMessage; FListener.OnException := DoException; FListener.Resume; FAsyncNotification := True; end; end; procedure TMSServiceBroker.Stop; begin if FStopProcessing then Exit; if FAsyncNotification then begin FStopProcessing := True; try TListenThread(FListener.Thread).FQuery.BreakExec; FListener.Free; FListener := nil; FAsyncNotification := False; finally FStopProcessing := False; EndConnection; end; end; end; procedure TMSServiceBroker.SetAsyncNotification(const Value: boolean); begin if csReading in ComponentState then begin if Value then FStreamedAsyncNotification := True; end else if Value <> FAsyncNotification then if Value then Start else Stop; end; end.