unit uDAJSONDataStreamer; {$I DataAbstract.inc} interface uses SysUtils, Classes, uROTypes, uROXMLIntf, uDAInterfaces, uDADelta, uDADataStreamer,uROJSONParser; const // Data stream node names pn_Datasets = 'datasets'; pn_Dataset_Name = 'name'; pn_Dataset_Schema = 'schema'; pn_Dataset_Schema_Fields = 'fields'; pn_Dataset_Schema_Params = 'params'; pn_Dataset_Data = 'data'; pn_Dataset_Data_Rows = 'rows'; pn_Dataset_Data_Fields = 'fields'; pn_Dataset_Data_Fields_name = 'name'; pn_Deltas = 'deltas'; pn_Delta_Name = 'name'; pn_Delta_LoggedFields = 'loggedfields'; pn_Delta_LoggedFields_Name = 'name'; pn_Delta_LoggedFields_Type = 'type'; pn_Delta_KeyFields = 'keyfields'; pn_Delta_Data = 'data'; pn_Delta_Data_RecID = 'recid'; pn_Delta_Data_Changetype = 'changetype'; pn_Delta_Data_Status = 'status'; pn_Delta_Data_Message = 'message'; pn_Delta_Data_Old = 'old'; pn_Delta_Data_New = 'new'; type TDASmallFieldInfo = record Name: String; end; TDADataForAppendBin2 = class(TDADataForAppend) public FieldsInfo: array of TDASmallFieldInfo; end; TDAJSONDataStreamer = class(TDADataStreamer) private FParser: TROJSONValue; FDatasets, FDeltas : TROJSONArray; function GetDatasetObject(const ADatasetName: JSON_String; const ACreate: Boolean):TROJSONObject; function GetDeltaObject(const ADeltaName: JSON_String; const ACreate: Boolean):TROJSONObject; function GetDatasetObjectByPropName(const ADatasetName, AObjectName: JSON_String; const ACreate: Boolean):TROJSONObject; function GetDataArraysByPropName(const ADatasetName, AObjectName: JSON_String; const ACreate: Boolean):TROJSONArray; procedure ClearParser; procedure WriteSchema(Const ASchemaObject: TROJSONObject; const Fields: TDAFieldCollection; const Params: TDAParamCollection; aFieldsIndex: array of integer); overload; procedure ReadSchema(Const ASchemaObject: TROJSONObject; const Destination: IDADataset); procedure WriteField(const AFieldObject: TROJSONObject; const AField: TDAField); procedure WriteParam(const AParamObject: TROJSONObject; const AParam: TDAParam); procedure ReadParam(const AParamObject: TROJSONObject; const AParam: TDAParam); procedure ReadField(const AFieldObject: TROJSONObject; const AField: TDAField); protected procedure InternalDoWriteDataset(const Source: IDADataset; var k: integer; const Maxrecords: integer; ARealFields: array of integer;aDataIndex: Integer; info: array of TDASmallFieldInfo); procedure InternalDoReadDataset(const ARowsArray: TROJSONArray; const Destination: IDAEditableDataset; ARealFields: array of integer;AppendMode: Boolean); function DoCreateStream: TStream; override; procedure DoInitialize(Mode: TDAAdapterInitialization); override; procedure DoFinalize; override; function DoWriteDataset(const Source: IDADataset; Options: TDAWriteOptions; MaxRows: integer; ADynFieldNames: array of string): integer; override; function DoBeginWriteDataset( const Source: IDADataset; const Schema: TDADataset; Options: TDAWriteOptions; MaxRows: integer; ADynFieldNames: array of string): TDADataForAppend; override; function DoWriteDatasetData(const Source: IDADataset; var aDataForAppend: TDADataForAppend; aDataIndex: Integer = -1): Integer; override; function DoEndWriteDataset(aDataForAppend: TDADataForAppend): Integer; override; procedure DoReadDataset(const DatasetName: string; const Destination: IDADataset; ApplySchema: boolean; AppendMode: Boolean); override; procedure DoReadDelta(const DeltaName: string; const Destination: IDADelta); override; procedure DoWriteDelta(const Source: IDADelta); override; public function GetTargetDataType: TRODataType; override; published end; implementation uses Variants, uROClasses, uDAClasses,uDAEngine,uROBinaryHelpers, uROJSONMessage, uROCompression; const TAlignmentStrings: array[Low(TAlignment)..High(TAlignment)] of Json_String = ('taLeftJustify', 'taRightJustify', 'taCenter'); TDABlobTypeStrings: array[Low(TDABlobType)..High(TDABlobType)] of Json_String = ('dabtUnknown', 'dabtBlob', 'dabtMemo', 'dabtOraBlob', 'dabtOraClob', 'dabtGraphic', 'dabtTypedBinary', 'dabtTimestamp'); TDADataTypeStrings: array[Low(TDADataType)..High(TDADataType)] of Json_String = ('datUnknown', 'datString', 'datDateTime', 'datFloat', 'datCurrency', 'datAutoInc', 'datInteger', 'datLargeInt', 'datBoolean', 'datMemo', 'datBlob', 'datWideString', 'datWideMemo', 'datLargeAutoInc', 'datByte', 'datShortInt', 'datWord', 'datSmallInt', 'datCardinal', 'datLargeUInt', 'datGuid', 'datXml', 'datDecimal', 'datSingleFloat'); TDAParamTypeStrings:array[Low(TDAParamType)..High(TDAParamType)] of Json_String = ('daptUnknown', 'daptInput', 'daptOutput', 'daptInputOutput', 'daptResult'); TDAChangeTypeStrings: array[Low(TDAChangeType)..High(TDAChangeType)] of Json_String = ('insert', 'update', 'delete'); TDAChangeStatusStrings: array[Low(TDAChangeStatus)..High(TDAChangeStatus)] of Json_String = ('pending', 'resolved', 'failed'); function TDAChangeStatusStringsToTDAChangeStatus(aValue: JSON_String): TDAChangeStatus; begin for Result := Low(TDAChangeStatus) to High(TDAChangeStatus) do if TDAChangeStatusStrings[Result] = aValue then Exit; raise Exception.Create('Unknown TDAChangeStatus value: '''+aValue+''''); end; function TDAChangeTypeStringsToTDAChangeType(aValue: JSON_String): TDAChangeType; begin for Result := Low(TDAChangeType) to High(TDAChangeType) do if TDAChangeTypeStrings[Result] = aValue then Exit; raise Exception.Create('Unknown TDAChangeType value: '''+aValue+''''); end; function TAlignmentStringsToTAlignment(aValue: Json_String): TAlignment; begin for Result := Low(TAlignment) to High(TAlignment) do if TAlignmentStrings[Result] = aValue then Exit; raise Exception.Create('Unknown TAlignment value: '''+aValue+''''); end; function TDABlobTypeStringsToTDABlobType(aValue: Json_String): TDABlobType; begin for Result := Low(TDABlobType) to High(TDABlobType) do if TDABlobTypeStrings[Result] = aValue then Exit; raise Exception.Create('Unknown TDABlobType value: '''+aValue+''''); end; function TDADataTypeStringsToTDADataType(aValue: Json_String): TDADataType; begin for Result := Low(TDADataType) to High(TDADataType) do if TDADataTypeStrings[Result] = aValue then Exit; raise Exception.Create('Unknown TDADataType value: '''+aValue+''''); end; function TDAParamTypeStringsToTDAParamType(aValue: Json_String): TDAParamType; begin for Result := Low(TDAParamType) to High(TDAParamType) do if TDAParamTypeStrings[Result] = aValue then Exit; raise Exception.Create('Unknown TDAParamType value: '''+aValue+''''); end; function EncodeBlob(val: Variant): variant; var arrLen: integer; arr: PByteArray; msIn, msOut: TROBinaryMemoryStream; begin if VarIsArray(val) then begin arrLen := VarArrayHighBound(val, 1) - VarArrayLowBound(val, 1) + 1; msIn := TROBinaryMemoryStream.Create; msOut := TROBinaryMemoryStream.Create; arr := VarArrayLock(val); try msIn.Write(arr^, arrLen); msIn.Position := 0; EncodeStream(msIn, msOut); Result := msOut.ToString; finally msIn.Free; msOut.Free; VarArrayUnlock(val); end; end else if VarIsNull(val) then begin Result := 'null'; end else begin Result := val; end; end; function DecodeBlob(val: AnsiString): Variant; var msIn, msOut: TROBinaryMemoryStream; arr: PByteArray; begin Result := Null; if ROAnsiCompare(val, 'null', false) <> 0 then begin msIn := TROBinaryMemoryStream.Create(val); msOut := TROBinaryMemoryStream.Create; arr := nil; try msIn.Position := 0; DecodeStream(msIn, msOut); Result := VarArrayCreate([0, msOut.Size - 1], varByte); arr := VarArrayLock(Result); msOut.Position := 0; msOut.Read(arr^, msOut.Size); finally if Assigned(arr) then VarArrayUnlock(Result); msIn.Free; msOut.Free; end; end; end; function EncodeDecimal(val: Variant): string; begin Result := VarAsType(val, varString); ReplaceChar(Result, [',', '.'], '.'); end; { TDAJSONDataStreamer } procedure TDAJSONDataStreamer.ClearParser; begin FDatasets := nil; FDeltas := nil; if FParser <> nil then FreeAndNil(FParser); end; function TDAJSONDataStreamer.GetDatasetObjectByPropName(const ADatasetName, AObjectName: JSON_String; const ACreate: Boolean):TROJSONObject; var lProp: TROJSONProperty; lDataset: TROJSONObject; begin Result := nil; lDataset := GetDatasetObject(ADatasetName,ACreate); lProp := lDataset.FindItem(AObjectName); if (lProp = nil) then begin if ACreate then lProp := lDataset.AddObjectProperty(AObjectName) else Exit; end; Result := lProp.ASObject; end; function TDAJSONDataStreamer.DoBeginWriteDataset(const Source: IDADataset; const Schema: TDADataset; Options: TDAWriteOptions; MaxRows: integer; ADynFieldNames: array of string): TDADataForAppend; var k, i, Realfldcnt: integer; fld: TDAField; wrtschema: boolean; info: array of TDASmallFieldInfo; RealFields: array of integer; lfields: array of integer; lDataForAppend : TDADataForAppendBin2; lSchemaFields: TDAFieldCollection; lSchemaParams: TDAParamCollection; lLogicalName: String; lRealSchemaArray: TROJSONArray; lobj: TROJSONObject; begin lDataForAppend := TDADataForAppendBin2.Create(); result := lDataForAppend; if Assigned(Schema) then begin lDataForAppend.TableSchema := Schema; if Schema is TDAUnionDataTable then begin fld := Schema.FindField(def_SourceTableFieldName); if not Assigned(fld) then begin fld := Schema.Fields.Add(); fld.Name := def_SourceTableFieldName; fld.DataType := datInteger; fld.InPrimaryKey := true; fld.ServerAutoRefresh := true; end; end; lSchemaFields := Schema.Fields; lSchemaParams := Schema.Params; lLogicalName := Schema.Name; end else begin if Assigned(Source) then begin lSchemaFields := Source.Fields; lSchemaParams := Source.Params; lLogicalName := Source.LogicalName; end else begin raise EDAException.Create('Schema or source should be assigned.'); end; end; if Length(ADynFieldNames) > 0 then begin SetLength(lfields, Length(ADynFieldNames)); For i:=0 to High(ADynFieldNames) do begin fld := lSchemaFields.FindField(ADynFieldNames[i]); if fld <> nil then lfields[i]:= i else lfields[i]:= -1; // TODO: shoudln't this raise an exception "field not found", or the like? end; end else begin SetLength(lfields, lSchemaFields.Count); For i:=0 to lSchemaFields.Count-1 do lfields[i]:=i; end; // This information will be used later to complete the stream (see DoInitialize) AddingDataset(lLogicalName); // Writes a boolean flag that indicates if the schema is being written wrtschema := (woSchema in Options) or (Length(ADynFieldNames)>0); if wrtschema then WriteSchema(GetDatasetObjectByPropName(lLogicalName,pn_Dataset_Schema,True), lSchemaFields, lSchemaParams, lfields); // Writes the row count if not (woRows in Options) then Exit; // write datatypes+offsets SetLength(info, lSchemaFields.Count); SetLength(RealFields, lSchemaFields.Count); Realfldcnt := 0; lRealSchemaArray := GetDataArraysByPropName(Source.LogicalName,pn_Dataset_Data_Fields,True); for i := 0 to High(lfields) do begin if lSchemaFields[lfields[i]].Calculated or lSchemaFields[lfields[i]].Lookup then Continue; RealFields[Realfldcnt] := lfields[i]; info[Realfldcnt].Name := lSchemaFields[lfields[i]].Name; lobj:=lRealSchemaArray.AddObject.AsObject; lObj.AddStringProperty(pn_Dataset_Data_Fields_name,info[Realfldcnt].Name); inc(Realfldcnt); end; SetLength(info, Realfldcnt); SetLength(RealFields, Realfldcnt); // prepare DataForAppend structure... SetLength(lDataForAppend.RealFields, Realfldcnt); SetLength(lDataForAppend.FieldsInfo, Realfldcnt); for i := Low(RealFields) to High(RealFields) do begin lDataForAppend.RealFields[i] := RealFields[i]; lDataForAppend.FieldsInfo[i].Name := info[i].Name; end; lDataForAppend.MaxRowCount := MaxRows; k := 0; lDataForAppend.RecordCount := k; result := lDataForAppend; end; function TDAJSONDataStreamer.DoCreateStream: TStream; begin // outdated, for backward capability result := nil; end; function TDAJSONDataStreamer.DoEndWriteDataset( aDataForAppend: TDADataForAppend): Integer; begin Result:= aDataForAppend.RecordCount; aDataForAppend.Free; end; procedure TDAJSONDataStreamer.DoFinalize; begin if (AdapterInitialization in AdapterWriteModes) then begin if (FDatasets <> nil) and (FDatasets.Count = 0) then FParser.AsObject.FindItem(pn_Datasets).Free; if (FDeltas <> nil) and (FDeltas.Count = 0) then FParser.AsObject.FindItem(pn_Deltas).Free; FParser.SaveToStream(Data); end; ClearParser; end; procedure TDAJSONDataStreamer.DoInitialize(Mode: TDAAdapterInitialization); var i : integer; lobj: TROJSONObject; lprop: TROJSONProperty; begin ClearParser; if (Mode in AdapterReadModes) then begin FParser := JSON_ParseStream(Data); if (FParser.ValueType = jdtObject) and (FParser.AsObject <> nil) then begin lprop := FParser.AsObject.FindItem(pn_Datasets); if lProp <> nil then begin FDatasets := lProp.ASArray; if (FDatasets <> nil) then for i := 0 to (FDatasets.Count-1) do begin lobj:=FDatasets.Items[i].AsObject; if (lobj <> nil) then begin lprop:=lobj.FindItem(pn_Dataset_Name); AddingDataset(lprop.AsString); end; end; end; lProp := FParser.AsObject.FindItem(pn_Deltas); if lProp <> nil then begin FDeltas := lProp.ASArray; if (FDeltas <> nil) then for i := 0 to (FDeltas.Count-1) do begin lobj:=FDeltas.Items[i].AsObject; if (lobj <> nil) then begin lprop:=lobj.FindItem(pn_Delta_Name); AddingDelta(lprop.AsString); end; end; end; end; end else if (Mode in AdapterWriteModes) then begin FParser := TROJSONValue.Create(nil); FParser.AsObject:= TROJSONObject.Create; FDatasets := FParser.AsObject.AddArrayProperty(pn_Datasets).ASArray; FDeltas := FParser.AsObject.AddArrayProperty(pn_Deltas).ASArray; end; end; procedure TDAJSONDataStreamer.DoReadDataset(const DatasetName: string; const Destination: IDADataset; ApplySchema: boolean; AppendMode: Boolean); var editable: IDAEditableDataset; i, k: integer; readonlyfields: array of boolean; // Realfldcount: integer; info: array of TDASmallFieldInfo; RealFields: array of integer; lErrorMessage: String; lErrorMesCnt: integer; lFldList: TStringList; lObject: TROJSONObject; lArray: TROJSONArray; begin if Destination.Active and ApplySchema then raise Exception.Create('Cannot apply a schema if the destination is active'); lErrorMessage := ''; lErrorMesCnt := 0; editable := Destination as IDAEditableDataset; Destination.DisableControls; try editable.DisableEventHandlers; try if ApplySchema then begin // Checks to see if the schema is present lObject:= GetDatasetObjectByPropName(DatasetName,pn_Dataset_Schema, False); if (lObject <> nil) and ApplySchema then ReadSchema(lObject, Destination); Exit; end else begin lObject:= GetDatasetObjectByPropName(DatasetName,pn_Dataset_Data,False); if (lObject = nil) then Exit; // Only schema is present! if not Destination.Active then Destination.Open; with editable do try // if table is empty then process in standard mode AppendMode := AppendMode and not (EOF and GetBOF); // Temporarily sets all fields as writable Destination.DisableConstraints; lArray := lObject.ItemByName(pn_Dataset_Data_Fields).ASArray; Realfldcount := lArray.Count; SetLength(info, Realfldcount); SetLength(RealFields, Realfldcount); //Data.Read(pointer(info)^, Sizeof(TDASmallFieldInfo) * Realfldcount); lFldList:=TStringList.Create; try lFldList.Sorted:=False; lFldList.Duplicates:=dupIgnore; For i:= 0 to Fields.Count-1 do lFldList.AddObject(Fields[i].Name,Pointer(Fields[i].Index)); lFldList.Sorted:=True; for i := 0 to Realfldcount - 1 do begin info[i].Name := lArray[i].AsObject.GetStringValueByName(pn_Dataset_Data_Fields_name); k:=lFldList.IndexOf(info[i].Name); if k = -1 then begin inc(lErrorMesCnt); if lErrorMesCnt > 5 then begin lErrorMessage := lErrorMessage + '' + sLineBreak; break; end else begin lErrorMessage := lErrorMessage + Format('The %s field isn''t found.' + sLineBreak,[info[i].Name]) end; end else begin RealFields[i]:= Integer(lFldList.Objects[k]); end; end; finally lFldList.Free; end; if (Length(lErrorMessage) > 0) then begin lErrorMessage := 'Format of the data in the stream doesn''t match the destination table format.'+ #13#10 + #13#10 + lErrorMessage; raise EDAException.Create(lErrorMessage); end; k := 0; SetLength(readonlyfields, Fields.Count); for i := 0 to (Fields.Count - 1) do begin readonlyfields[i] := Fields[i].ReadOnly; Fields[i].ReadOnly := FALSE; if Fields[i].Calculated or Fields[i].Lookup then Continue; //RealFields[k] := i; if (k >= Realfldcount) then lErrorMessage := lErrorMessage + 'Fields count mismatch' + sLineBreak; inc(k); end; try if (k <> Realfldcount) then lErrorMessage := lErrorMessage + Format('Fields count mismatch: %d expected but %d found in the stream', [k, Realfldcount]) + sLineBreak; if (Length(lErrorMessage) > 0) then begin lErrorMessage := 'Format of the data in the stream doesn''t match the destination table format.'+ sLineBreak + sLineBreak + lErrorMessage; raise EDAException.Create(lErrorMessage); end; // Inserts the records try lArray := lObject.ItemByName(pn_Dataset_Data_Rows).ASArray; if lArray.Count > 0 then InternalDoReadDataset( lArray, editable,RealFields,AppendMode); except raise; end; finally // Restores the read-only property for i := 0 to (Fields.Count - 1) do Fields[i].ReadOnly := readonlyfields[i]; end; finally Destination.EnableConstraints; end; end; finally editable.EnableEventHandlers; end; finally Destination.EnableControls; end; end; procedure TDAJSONDataStreamer.DoReadDelta(const DeltaName: string; const Destination: IDADelta); var msg: JSON_String; recid, i, x: integer; change: TDADeltaChange; changetype: TDAChangeType; status: TDAChangeStatus; lDeltaObject: TROJSONObject; lArray: TROJSONArray; lArray2: TROJSONArray; lObject: TROJSONObject; lProgress: Boolean; lTotal: Integer; lOffset: Integer; begin lDeltaObject:=GetDeltaObject(DeltaName,False); // names and types Destination.ClearFieldNames; lArray := lDeltaObject.ItemByName(pn_Delta_LoggedFields).ASArray; for i := 0 to lArray.Count-1 do begin lObject:=lArray[i].AsObject; Destination.AddFieldName(lObject.GetStringValueByName(pn_Delta_LoggedFields_Name), TDADataTypeStringsToTDADataType(lObject.GetStringValueByName(pn_Delta_LoggedFields_Type))); // Destination.AddFieldName(lObject.GetStringValueByName(pn_Delta_LoggedFields_Name)); // Destination.LoggedFieldTypes[Destination.LoggedFieldCount - 1] := TDADataTypeStringsToTDADataType(lObject.GetStringValueByName(pn_Delta_LoggedFields_Type)); end; // Key fields Destination.ClearKeyFieldNames; lArray := lDeltaObject.ItemByName(pn_Delta_KeyFields).ASArray; for i := 0 to lArray.Count-1 do Destination.AddKeyFieldName(lArray[i].AsString); lArray:=lDeltaObject.ItemByName(pn_Delta_Data).ASArray; lProgress := Assigned(OnReadDeltaProgress); lTotal := lArray.Count; for i := 0 to lArray.Count-1 do begin lObject:= lArray[i].AsObject; recid := lObject.GetNumberValueByName(pn_Delta_Data_RecID); changetype := TDAChangeTypeStringsToTDAChangeType(lObject.GetStringValueByName(pn_Delta_Data_Changetype)); status := TDAChangeStatusStringsToTDAChangeStatus(lObject.GetStringValueByName(pn_Delta_Data_Status)); msg := lObject.GetStringValueByName(pn_Delta_Data_Message); change := Destination.Add(recid, changetype, status, msg); lArray2:=lObject.ItemByName(pn_Delta_Data_Old).ASArray; for x := 0 to (Destination.LoggedFieldCount - 1) do if lArray2[x].ValueType <> jdtNull then case Destination.LoggedFieldTypes[x] of datDateTime: change.OldValues[x] := SOAPDateTimeToDateTime(lArray2[x].AsString, lOffset); datBlob: change.OldValues[x] := DecodeBlob({$IFDEF UNICODE}WideStringToAnsiString{$ENDIF}(lArray2[x].AsString)); else change.OldValues[x] := lArray2[x].VarValue; end else change.OldValues[x] := lArray2[x].VarValue; lArray2:=lObject.ItemByName(pn_Delta_Data_New).ASArray; for x := 0 to (Destination.LoggedFieldCount - 1) do if lArray2[x].ValueType <> jdtNull then case Destination.LoggedFieldTypes[x] of datDateTime: change.NewValues[x] := SOAPDateTimeToDateTime(lArray2[x].AsString, lOffset); datBlob: change.NewValues[x] := DecodeBlob({$IFDEF UNICODE}WideStringToAnsiString{$ENDIF}(lArray2[x].AsString)); else change.NewValues[x] := lArray2[x].VarValue; end else change.NewValues[x] := lArray2[x].VarValue; if lProgress then OnReadDeltaProgress(Self, Destination, i, lTotal); end; end; function TDAJSONDataStreamer.DoWriteDataset(const Source: IDADataset; Options: TDAWriteOptions; MaxRows: integer; ADynFieldNames: array of string): integer; var lDataForAppend: TDADataForAppend; begin lDataForAppend := DoBeginWriteDataset(Source, {schema}nil, Options, MaxRows, ADynFieldNames); if woRows in Options then begin DoWriteDatasetData(Source, lDataForAppend); result := DoEndWriteDataset(lDataForAppend); end else begin lDataForAppend.Free; result := -1; end; end; function TDAJSONDataStreamer.DoWriteDatasetData(const Source: IDADataset; var aDataForAppend: TDADataForAppend; aDataIndex: Integer = -1): Integer; var max, k, i, Realfldcnt: integer; info: array of TDASmallFieldInfo; RealFields: array of integer; lDataForAppend: TDADataForAppendBin2; lMapToFieldName: String; lColumnMappings: TDAColumnMappingCollection; lColumnMapping: TDAColumnMapping; begin lDataForAppend := aDataForAppend as TDADataForAppendBin2; Realfldcnt := Length(lDataForAppend.RealFields); SetLength(info, Realfldcnt); SetLength(RealFields, Realfldcnt); for i := 0 to Realfldcnt - 1 do begin info[i].Name := lDataForAppend.FieldsInfo[i].Name; RealFields[i] := lDataForAppend.RealFields[i]; end; max := lDataForAppend.MaxRowCount; k := lDataForAppend.RecordCount; // Mapping fields of Source table to the streamed dataset if Assigned(lDataForAppend.TableSchema) and (lDataForAppend.TableSchema is TDAUnionDataTable) then begin lColumnMappings := TDAUnionSourceTable(TDAUnionDataTable(lDataForAppend.TableSchema).SourceTables.ItemByName(Source.Name)).ColumnMappings; for i := 0 to Realfldcnt - 1 do begin if info[i].Name = def_SourceTableFieldName then begin RealFields[i] := -10; continue; end; lMapToFieldName := info[i].Name; if Assigned(lColumnMappings) then begin lColumnMapping := lColumnMappings.MappingByDatasetField(info[i].Name); if Assigned(lColumnMapping) and (lColumnMapping.TableField <> '') then lMapToFieldName := lColumnMapping.TableField; end; RealFields[i] := Source.FieldByName(lMapToFieldName).Index; end; end; with Source do try DisableControls; if not Source.Active then Source.Open; try InternalDoWriteDataset(Source,k, max, RealFields, aDataIndex, info) except raise; end; lDataForAppend.RecordCount := k; finally EnableControls; result := k; end; end; procedure TDAJSONDataStreamer.DoWriteDelta(const Source: IDADelta); var i, x: integer; lDeltaObject: TROJSONObject; lArray: TROJSONArray; lArray2: TROJSONArray; lObject: TROJSONObject; lchange: TDADeltaChange; lProgress: Boolean; lTotal: integer; begin // This information will be used later to complete the stream (see DoInitialize) AddingDelta(Source.LogicalName); Source.RemoveUnchangedChanges; lDeltaObject:=GetDeltaObject(Source.LogicalName,True); lArray:=lDeltaObject.AddArrayProperty(pn_Delta_LoggedFields).ASArray; // field names and their types for i := 0 to (Source.LoggedFieldCount - 1) do begin lObject:=lArray.AddObject.AsObject; lObject.AddStringProperty(pn_Delta_LoggedFields_Name, Source.LoggedFieldNames[i]); lObject.AddStringProperty(pn_Delta_LoggedFields_Type, TDADataTypeStrings[Source.LoggedFieldTypes[i]]); end; lArray:=lDeltaObject.AddArrayProperty(pn_Delta_KeyFields).ASArray; // Key fields for i := 0 to (Source.KeyFieldCount - 1) do lArray.AddStringValue(Source.KeyFieldNames[i]); lArray := lDeltaObject.AddArrayProperty(pn_Delta_Data).ASArray; if (Source.Count = 0) then Exit; lProgress := Assigned(OnWriteDeltaProgress); lTotal := Source.Count; for i := 0 to (Source.Count - 1) do begin lObject:=lArray.AddObject.AsObject; lchange :=Source.Changes[i]; lObject.AddNumberProperty(pn_Delta_Data_RecID,lchange.RecID); lObject.AddStringProperty(pn_Delta_Data_Changetype,TDAChangeTypeStrings[lchange.ChangeType]); lObject.AddStringProperty(pn_Delta_Data_Status,TDAChangeStatusStrings[lchange.Status]); lObject.AddStringProperty(pn_Delta_Data_Message,lchange.Message); // old lArray2:=lObject.AddArrayProperty(pn_Delta_Data_Old).ASArray; for x := 0 to (Source.LoggedFieldCount - 1) do if not VarIsNull(Source.Changes[i].OldValues[x]) then case Source.LoggedFieldTypes[x] of datDateTime: lArray2.AddStringValue(DateTimeToSOAPDateTime(Source.Changes[i].OldValues[x])); datBlob: lArray2.AddStringValue(VarToWideStr(EncodeBlob(Source.Changes[i].OldValues[x]))); datDecimal: lArray2.AddStringValue(EncodeDecimal(Source.Changes[i].OldValues[x])); else lArray2.AddVariantValue(Source.Changes[i].OldValues[x]); end else lArray2.AddVariantValue(Source.Changes[i].OldValues[x]); // new lArray2:=lObject.AddArrayProperty(pn_Delta_Data_New).ASArray; for x := 0 to (Source.LoggedFieldCount - 1) do if not VarIsNull(Source.Changes[i].NewValues[x]) then case Source.LoggedFieldTypes[x] of datDateTime: lArray2.AddStringValue(DateTimeToSOAPDateTime(Source.Changes[i].NewValues[x])); datBlob: lArray2.AddStringValue(VarToWideStr(EncodeBlob(Source.Changes[i].NewValues[x]))); datDecimal: lArray2.AddStringValue(EncodeDecimal(Source.Changes[i].NewValues[x])); else lArray2.AddVariantValue(Source.Changes[i].NewValues[x]); end else lArray2.AddVariantValue(Source.Changes[i].NewValues[x]); if lProgress then OnWriteDeltaProgress(Self, Source, i+1, lTotal); end; end; procedure TDAJSONDataStreamer.ReadField(const AFieldObject: TROJSONObject; const AField: TDAField); var lProp: TROJSONProperty; begin lProp := AFieldObject.FindItem('Alignment'); if lProp <> nil then AField.Alignment := TAlignmentStringsToTAlignment(lProp.AsString); lProp := AFieldObject.FindItem('BlobType'); if lProp <> nil then AField.BlobType := TDABlobTypeStringsToTDABlobType(lProp.AsString); lProp := AFieldObject.FindItem('BusinessClassID'); if lProp <> nil then AField.BusinessClassID := lProp.AsString; lProp := AFieldObject.FindItem('Calculated'); if lProp <> nil then AField.Calculated := lProp.AsBoolean; lProp := AFieldObject.FindItem('CustomAttributes'); if lProp <> nil then AField.CustomAttributes.Text := lProp.AsString; lProp := AFieldObject.FindItem('DataType'); if lProp <> nil then AField.DataType := TDADataTypeStringsToTDADataType(lProp.AsString); lProp := AFieldObject.FindItem('DecimalPrecision'); if lProp <> nil then AField.DecimalPrecision := lProp.AsNumber; lProp := AFieldObject.FindItem('DecimalScale'); if lProp <> nil then AField.DecimalScale := lProp.AsNumber; lProp := AFieldObject.FindItem('DefaultValue'); if lProp <> nil then AField.DefaultValue := lProp.AsString; lProp := AFieldObject.FindItem('Description'); if lProp <> nil then AField.Description := lProp.AsString; lProp := AFieldObject.FindItem('DictionaryEntry'); if lProp <> nil then AField.DictionaryEntry := lProp.AsString; lProp := AFieldObject.FindItem('DisplayFormat'); if lProp <> nil then AField.DisplayFormat := lProp.AsString; lProp := AFieldObject.FindItem('DisplayLabel'); if lProp <> nil then AField.DisplayLabel := lProp.AsString; lProp := AFieldObject.FindItem('DisplayWidth'); if lProp <> nil then AField.DisplayWidth := lProp.AsNumber; lProp := AFieldObject.FindItem('EditFormat'); if lProp <> nil then AField.EditFormat := lProp.AsString; lProp := AFieldObject.FindItem('EditMask'); if lProp <> nil then AField.EditMask := lProp.AsString; lProp := AFieldObject.FindItem('Expression'); if lProp <> nil then AField.Expression := lProp.AsString; lProp := AFieldObject.FindItem('GeneratorName'); if lProp <> nil then AField.GeneratorName := lProp.AsString; lProp := AFieldObject.FindItem('InPrimaryKey'); if lProp <> nil then AField.InPrimaryKey := lProp.AsBoolean; lProp := AFieldObject.FindItem('KeyFields'); if lProp <> nil then AField.KeyFields := lProp.AsString; lProp := AFieldObject.FindItem('LogChanges'); if lProp <> nil then AField.LogChanges := lProp.AsBoolean; lProp := AFieldObject.FindItem('Lookup'); if lProp <> nil then AField.Lookup := lProp.AsBoolean; lProp := AFieldObject.FindItem('LookupCache'); if lProp <> nil then AField.LookupCache := lProp.AsBoolean; lProp := AFieldObject.FindItem('LookupKeyFields'); if lProp <> nil then AField.LookupKeyFields := lProp.AsString; lProp := AFieldObject.FindItem('LookupResultField'); if lProp <> nil then AField.LookupResultField := lProp.AsString; // lProp := AFieldObject.FindItem('LookupSource'); // if lProp <> nil then AField.LookupSource := nil; lProp := AFieldObject.FindItem('Name'); if lProp <> nil then AField.Name := lProp.AsString; lProp := AFieldObject.FindItem('ReadOnly'); if lProp <> nil then AField.ReadOnly := lProp.AsBoolean; lProp := AFieldObject.FindItem('RegExpression'); if lProp <> nil then AField.RegExpression := lProp.AsString; lProp := AFieldObject.FindItem('Required'); if lProp <> nil then AField.Required := lProp.AsBoolean; lProp := AFieldObject.FindItem('ServerAutoRefresh'); if lProp <> nil then AField.ServerAutoRefresh := lProp.AsBoolean; lProp := AFieldObject.FindItem('ServerCalculated'); if lProp <> nil then AField.ServerCalculated := lProp.AsBoolean; lProp := AFieldObject.FindItem('Size'); if lProp <> nil then AField.Size := lProp.AsNumber; lProp := AFieldObject.FindItem('Visible'); if lProp <> nil then AField.Visible := lProp.AsBoolean; end; procedure TDAJSONDataStreamer.ReadParam(const AParamObject: TROJSONObject; const AParam: TDAParam); var lProp: TROJSONProperty; begin lProp := AParamObject.FindItem('AsWideString'); if lProp <> nil then AParam.AsWideString := lProp.AsString; lProp := AParamObject.FindItem('BlobType'); if lProp <> nil then AParam.BlobType := TDABlobTypeStringsToTDABlobType(lProp.AsString); lProp := AParamObject.FindItem('DataType'); if lProp <> nil then AParam.DataType := TDADataTypeStringsToTDADataType(lProp.AsString); lProp := AParamObject.FindItem('DecimalPrecision'); if lProp <> nil then AParam.DecimalPrecision := lProp.AsNumber; lProp := AParamObject.FindItem('DecimalScale'); if lProp <> nil then AParam.DecimalScale := lProp.AsNumber; lProp := AParamObject.FindItem('Description'); if lProp <> nil then AParam.Description := lProp.AsString; lProp := AParamObject.FindItem('GeneratorName'); if lProp <> nil then AParam.GeneratorName := lProp.AsString; lProp := AParamObject.FindItem('Name'); if lProp <> nil then AParam.Name := lProp.AsString; lProp := AParamObject.FindItem('ParamType'); if lProp <> nil then AParam.ParamType := TDAParamTypeStringsToTDAParamType(lProp.AsString); lProp := AParamObject.FindItem('Size'); if lProp <> nil then AParam.Size := lProp.AsNumber; lProp := AParamObject.FindItem('Value'); if lProp <> nil then begin case lProp.ValueType of jdtString: AParam.Value := lProp.AsString; jdtNumber: AParam.Value := lProp.AsNumber; jdtBoolean: AParam.Value := lProp.AsBoolean; else AParam.Value := Null; end; end; end; procedure TDAJSONDataStreamer.ReadSchema( const ASchemaObject: TROJSONObject; const Destination: IDADataset); var i: integer; lFields, lParams : TROJSONArray; begin Destination.Fields.Clear; lFields:= ASchemaObject.GetArrayItemByName(pn_Dataset_Schema_Fields,False); if lFields <> nil then for i := 0 to lFields.Count-1 do ReadField(lFields.Items[i].AsObject,Destination.Fields.Add); Destination.Params.Clear; lParams:= ASchemaObject.GetArrayItemByName(pn_Dataset_Schema_Params,False); if lParams <> nil then for i := 0 to lParams.Count - 1 do ReadParam(lParams.Items[i].AsObject, Destination.Params.Add); end; procedure TDAJSONDataStreamer.WriteField(const AFieldObject: TROJSONObject; const AField: TDAField); begin //same as in bin2 AFieldObject.AddStringProperty('Alignment', TAlignmentStrings[AField.Alignment]); AFieldObject.AddStringProperty('BlobType', TDABlobTypeStrings[AField.BlobType]); AFieldObject.AddStringProperty('BusinessClassID', AField.BusinessClassID); AFieldObject.AddBooleanProperty('Calculated', AField.Calculated); AFieldObject.AddStringProperty('CustomAttributes', AField.CustomAttributes.Text); AFieldObject.AddStringProperty('DataType', TDADataTypeStrings[AField.DataType]); AFieldObject.AddNumberProperty('DecimalPrecision', AField.DecimalPrecision); AFieldObject.AddNumberProperty('DecimalScale', AField.DecimalScale); AFieldObject.AddStringProperty('DefaultValue', AField.DefaultValue); AFieldObject.AddStringProperty('Description', AField.Description); AFieldObject.AddStringProperty('DictionaryEntry', AField.DictionaryEntry); AFieldObject.AddStringProperty('DisplayFormat', AField.DisplayFormat); AFieldObject.AddStringProperty('DisplayLabel', AField.DisplayLabel); AFieldObject.AddNumberProperty('DisplayWidth', AField.DisplayWidth); AFieldObject.AddStringProperty('EditFormat', AField.EditFormat); AFieldObject.AddStringProperty('EditMask', AField.EditMask); AFieldObject.AddStringProperty('Expression', AField.Expression); AFieldObject.AddStringProperty('GeneratorName', AField.GeneratorName); AFieldObject.AddBooleanProperty('InPrimaryKey', AField.InPrimaryKey); AFieldObject.AddStringProperty('KeyFields', AField.KeyFields); AFieldObject.AddBooleanProperty('LogChanges',AField.LogChanges); AFieldObject.AddBooleanProperty('Lookup', AField.Lookup); AFieldObject.AddBooleanProperty('LookupCache',AField.LookupCache); AFieldObject.AddStringProperty('LookupKeyFields', AField.LookupKeyFields); AFieldObject.AddStringProperty('LookupResultField', AField.LookupResultField); AFieldObject.AddStringProperty('LookupSource',''); AFieldObject.AddStringProperty('Name',AField.Name); AFieldObject.AddBooleanProperty('ReadOnly', AField.ReadOnly); AFieldObject.AddStringProperty('RegExpression', AField.RegExpression); AFieldObject.AddBooleanProperty('Required', AField.Required); AFieldObject.AddBooleanProperty('ServerAutoRefresh', AField.ServerAutoRefresh); AFieldObject.AddBooleanProperty('ServerCalculated', AField.ServerCalculated); AFieldObject.AddNumberProperty('Size', AField.Size); AFieldObject.AddBooleanProperty('Visible', AField.Visible); end; procedure TDAJSONDataStreamer.WriteParam(const AParamObject: TROJSONObject; const AParam: TDAParam); begin //same as in bin2 AParamObject.AddStringProperty('AsWideString', AParam.AsWideString); AParamObject.AddStringProperty('BlobType', TDABlobTypeStrings[AParam.BlobType]); AParamObject.AddStringProperty('DataType', TDADataTypeStrings[AParam.DataType]); AParamObject.AddNumberProperty('DecimalPrecision', AParam.DecimalPrecision); AParamObject.AddNumberProperty('DecimalScale', AParam.DecimalScale); AParamObject.AddStringProperty('Description', AParam.Description); AParamObject.AddStringProperty('GeneratorName', AParam.GeneratorName); AParamObject.AddStringProperty('Name',AParam.Name); AParamObject.AddStringProperty('ParamType', TDAParamTypeStrings[AParam.ParamType]); AParamObject.AddNumberProperty('Size', AParam.Size); if VarIsStr(AParam.Value) then AParamObject.AddStringProperty('Value', AParam.Value) else if TVarData(AParam.Value).VType = varBoolean then AParamObject.AddBooleanProperty('Value', AParam.Value) else if VarIsNumeric(AParam.Value) then AParamObject.AddNumberProperty('Value', AParam.Value) else AParamObject.AddNullProperty('Value'); end; procedure TDAJSONDataStreamer.WriteSchema(const ASchemaObject: TROJSONObject; const Fields: TDAFieldCollection; const Params: TDAParamCollection; aFieldsIndex: array of integer); var i: integer; lFields, lParams : TROJSONArray; begin if Length(aFieldsIndex) >0 then begin lFields := ASchemaObject.AddArrayProperty(pn_Dataset_Schema_Fields).ASArray; for i := 0 to High(aFieldsIndex) do WriteField(lFields.AddObject.AsObject, Fields[aFieldsIndex[i]]); end; if Params.Count >0 then begin lParams := ASchemaObject.AddArrayProperty(pn_Dataset_Schema_Params).ASArray; for i := 0 to Params.Count - 1 do WriteParam(lParams.AddObject.AsObject,Params[i]); end; end; function TDAJSONDataStreamer.GetDatasetObject(const ADatasetName: JSON_String; const ACreate: Boolean): TROJSONObject; begin Result := FDatasets.FindObjectByStringValue(pn_Dataset_Name, ADatasetName); if (Result = nil) then begin if ACreate then begin Result := FDatasets.AddObject.AsObject; Result.AddStringProperty(pn_Dataset_Name,ADatasetName); end else begin raise EDAException.Create('Cannot find dataset '+ADatasetName); end; end; end; procedure TDAJSONDataStreamer.InternalDoReadDataset( const ARowsArray: TROJSONArray; const Destination: IDAEditableDataset; ARealFields: array of integer;AppendMode: Boolean); var lPK: string; //for append mode lPKValues: array of variant; //for append mode lOnePK:Boolean; //for append mode function Dataset_Locate(): boolean; begin if lOnePK then Result:= Destination.Locate(lPK,lPKValues[0],[]) else Result:= Destination.Locate(lPK,lPKValues,[]); end; var j: integer; ev: boolean; val: Variant; flds: TDAFieldArray; // flds in stream i: integer; Realfldcount: integer; lRow : TROJSONArray; lProgress: Boolean; lTotal: integer; lFldValues: array of Variant; // flds values for append mode lPKFlds: TDAFieldArray; // PK flds (Append mode) lOffset: integer; begin Realfldcount := Length(ARealFields); setLength(flds, Realfldcount); For i:= 0 to Realfldcount-1 do flds[i] := Destination.Fields[ARealFields[i]]; ev := Assigned(OnReadFieldValue); lProgress := Assigned(OnReadDatasetProgress); lTotal := ARowsArray.Count; lPK := Dataset_GetPK(flds, lPKFlds); lOnePK := Length(lPKFlds) = 1; AppendMode := AppendMode and (Length(lPKFlds)>0); if AppendMode then begin SetLength(lFldValues, Length(flds)); SetLength(lPKValues, Length(lPKFlds)); end; for j:= 0 to ARowsArray.Count -1 do begin lRow := ARowsArray[j].ASArray; Destination.Append; for i := 0 to Realfldcount - 1 do begin val := lRow[i].VarValue; if ev then OnReadFieldValue(flds[i], val); if not VarIsNull(val) then case flds[i].DataType of datDecimal: flds[i].AsDecimal := VariantToBCD(val); datDateTime: flds[i].AsDateTime := SOAPDateTimeToDateTime(val, lOffset); datBlob: case VarType(val) of varString, varOleStr, varUString: flds[i].Value := DecodeBlob({$IFDEF UNICODE}VarToAnsiStr{$ENDIF}(val)); else flds[i].Value := val; end; else flds[i].Value := val; end; end; if AppendMode then begin // store values and avoid firing OnReadFieldValue event for I := 0 to Length(Flds) - 1 do lFldValues[i]:= Flds[i].Value; // store PK values for locate for I := 0 to Length(lPKFlds) - 1 do lPKValues[i]:= lPKFlds[i].Value; Destination.Cancel; if Dataset_Locate then Destination.Edit else Destination.Append; // assign new values for I := 0 to Length(Flds) - 1 do Flds[i].Value := lFldValues[i]; end; try Destination.Post; except // Introduced to restore the dsBrowse state of the datatable // in case of errors Destination.Cancel; raise; end; if lProgress then OnReadDatasetProgress(Self, Destination, j, lTotal); end; end; procedure TDAJSONDataStreamer.InternalDoWriteDataset( const Source: IDADataset; var k: integer; const Maxrecords: integer; ARealFields: array of integer; aDataIndex: Integer; info: array of TDASmallFieldInfo); var ev1, ev2,ev3: boolean; BitMask: Ansistring; BitMaskSize: integer; i : integer; Realfldcnt: integer; flds: array of TDAField; val: Variant; lRowsArray: TROJSONArray; lCurrentRow: TROJSONArray; lProgress: Boolean; begin lRowsArray := GetDataArraysByPropName(Source.LogicalName,pn_Dataset_Data_Rows,True); Realfldcnt:= Length(ARealFields); BitMaskSize := (Realfldcnt + 7) div 8; SetLength(BitMask, BitMaskSize); SetLength(flds,Realfldcnt); for i := 0 to Realfldcnt-1 do begin if ARealFields[i] = -10 then flds[i]:=nil else flds[i]:=Source.Fields[ARealFields[i]]; end; ev1 := Assigned(OnBeforeFieldValueSerialization); ev2 := Assigned(OnWriteFieldValue); ev3 := Assigned(OnWriteFieldValueEx); lProgress := Assigned(onWriteDatasetProgress); // with events while (k <> Maxrecords) and not Source.EOF do begin lCurrentRow := lRowsArray.AddArrayValue.ASArray; for i := 0 to (Realfldcnt - 1) do begin //ARealFields[i] = -10 then this is @SourceTable field if ARealFields[i] = -10 then begin //We shouldn't fire events since this is special internal field val := aDataIndex; end else begin val := Source.FieldValues[ARealFields[i]]; if ev1 then OnBeforeFieldValueSerialization(flds[i], val); if ev2 then OnWriteFieldValue(flds[i], val); if ev3 then OnWriteFieldValueEx(Source, flds[i], val); end; if VarIsNull(Val) then lCurrentRow.AddNullValue else case flds[i].DataType of datDateTime: lCurrentRow.Add.AsString := DateTimeToSOAPDateTime(val); // DateTime in SOAP format datDecimal: lCurrentRow.Add.AsString := EncodeDecimal(val); datBlob: lCurrentRow.Add.VarValue := EncodeBlob(val); // BLOB as Base64-encoded string else lCurrentRow.Add.VarValue := val; end; end; Inc(k); if lProgress then OnWriteDatasetProgress(Self, Source, k, Maxrecords); Source.Next; if Source.EOF then Break; end; end; function TDAJSONDataStreamer.GetDataArraysByPropName(const ADatasetName, AObjectName: JSON_String; const ACreate: Boolean): TROJSONArray; var lProp: TROJSONProperty; lobj: TROJSONObject; begin Result := nil; lobj := GetDatasetObjectByPropName(ADatasetName,pn_Dataset_Data,ACreate); lProp := lobj.FindItem(AObjectName); if (lProp = nil) then begin if ACreate then lProp := lobj.AddArrayProperty(AObjectName) else Exit; end; Result := lProp.ASArray; end; function TDAJSONDataStreamer.GetDeltaObject( const ADeltaName: JSON_String; const ACreate: Boolean): TROJSONObject; begin Result := FDeltas.FindObjectByStringValue(pn_Delta_Name, ADeltaName); if (Result = nil) then begin if ACreate then begin Result := FDeltas.AddObject.AsObject; Result.AddStringProperty(pn_Delta_Name,ADeltaName); end else begin raise EDAException.Create('Cannot find delta '+ADeltaName); end; end; end; function TDAJSONDataStreamer.GetTargetDataType: TRODataType; begin Result := rtXML; end; end.