From 294893a409e6f304bf65303f5e570971d392b1c9 Mon Sep 17 00:00:00 2001 From: Matthias Sebastian Sort Date: Mon, 23 Dec 2024 11:40:12 +0100 Subject: [PATCH] Added new addin for remove-missing-rows plus sorted addins so they are grouped by source and destination. added logic in destinationwriter to handle new addin for removing missing rows. --- ...taIntegration.Providers.SqlProvider.csproj | 4 +- src/SQLDestinationWriter.cs | 11 + src/SQLProvider.cs | 198 ++++++++++++------ 3 files changed, 150 insertions(+), 63 deletions(-) diff --git a/src/Dynamicweb.DataIntegration.Providers.SqlProvider.csproj b/src/Dynamicweb.DataIntegration.Providers.SqlProvider.csproj index 471260d..0566edd 100644 --- a/src/Dynamicweb.DataIntegration.Providers.SqlProvider.csproj +++ b/src/Dynamicweb.DataIntegration.Providers.SqlProvider.csproj @@ -1,6 +1,6 @@  - 10.8.0 + 10.9.0 1.0.0.0 SQL Provider SQL Provider @@ -23,7 +23,7 @@ snupkg - + diff --git a/src/SQLDestinationWriter.cs b/src/SQLDestinationWriter.cs index 05e095d..e212e40 100644 --- a/src/SQLDestinationWriter.cs +++ b/src/SQLDestinationWriter.cs @@ -17,6 +17,11 @@ public class SqlDestinationWriter : BaseSqlWriter, IDestinationWriter, IDisposab { public SqlCommand SqlCommand; + internal string GetTempTableName + { + get => tempTablePrefix; + } + public new Mapping Mapping { get; } /// @@ -234,6 +239,12 @@ public SqlDestinationWriter(Mapping mapping, SqlCommand mockSqlCommand, bool rem } } + internal int DeleteExcessFromMainTable(SqlTransaction transaction, Dictionary mappings) + { + SqlCommand.Transaction = transaction; + return DeleteRowsFromMainTable(false, mappings, "", SqlCommand); + } + /// /// Deletes rows not present in the import source /// diff --git a/src/SQLProvider.cs b/src/SQLProvider.cs index 12a3d53..40632bd 100644 --- a/src/SQLProvider.cs +++ b/src/SQLProvider.cs @@ -8,6 +8,7 @@ using System.Collections.Generic; using System.Data; using System.Data.SqlClient; +using System.Globalization; using System.Linq; using System.Runtime.CompilerServices; using System.Xml; @@ -20,99 +21,141 @@ namespace Dynamicweb.DataIntegration.Providers.SqlProvider; public class SqlProvider : BaseSqlProvider, ISource, IDestination { private Schema Schema; - [AddInParameter("Source server"), AddInParameterEditor(typeof(TextParameterEditor), ""), AddInParameterGroup("Source")] + + #region Source AddIns + [AddInParameter("Source server")] + [AddInParameterEditor(typeof(TextParameterEditor), "")] + [AddInParameterGroup("Source")] public string SourceServer { get { return Server; } set { Server = value; } } - [AddInParameter("Destination server"), AddInParameterEditor(typeof(TextParameterEditor), ""), AddInParameterGroup("Destination")] - public string DestinationServer - { - get { return Server; } - set { Server = value; } - } - [AddInParameter("Use integrated security to connect to source server"), AddInParameterEditor(typeof(YesNoParameterEditor), ""), AddInParameterGroup("Source")] + + [AddInParameter("Use integrated security to connect to source server")] + [AddInParameterEditor(typeof(YesNoParameterEditor), "")] + [AddInParameterGroup("Source")] public bool SourceServerSSPI { get; set; } - [AddInParameter("Use integrated security to connect to destination server"), AddInParameterEditor(typeof(YesNoParameterEditor), ""), AddInParameterGroup("Destination")] - public bool DestinationServerSSPI - { - get; - set; - } - [AddInParameter("Sql source server username"), AddInParameterEditor(typeof(TextParameterEditor), ""), AddInParameterGroup("Source")] + + [AddInParameter("Sql source server username")] + [AddInParameterEditor(typeof(TextParameterEditor), "")] + [AddInParameterGroup("Source")] public string SourceUsername { get { return Username; } set { Username = value; } } - [AddInParameter("Sql destination server username"), AddInParameterEditor(typeof(TextParameterEditor), ""), AddInParameterGroup("Destination")] - public string DestinationUsername - { - get { return Username; } - set { Username = value; } - } - [AddInParameter("Sql source server password"), AddInParameterEditor(typeof(TextParameterEditor), "password=true"), AddInParameterGroup("Source")] + + [AddInParameter("Sql source server password")] + [AddInParameterEditor(typeof(TextParameterEditor), "password=true")] + [AddInParameterGroup("Source")] public string SourcePassword { get { return Password; } set { Password = value; } } - [AddInParameter("Sql destination server password"), AddInParameterEditor(typeof(TextParameterEditor), "password=true"), AddInParameterGroup("Destination")] - public string DestinationPassword - { - get { return Password; } - set { Password = value; } - } - [AddInParameter("Sql source database"), AddInParameterEditor(typeof(TextParameterEditor), ""), AddInParameterGroup("Source")] + + [AddInParameter("Sql source database")] + [AddInParameterEditor(typeof(TextParameterEditor), "")] + [AddInParameterGroup("Source")] public string SourceDatabase { get { return Catalog; } set { Catalog = value; } } - [AddInParameter("Sql destination database"), AddInParameterEditor(typeof(TextParameterEditor), ""), AddInParameterGroup("Destination")] - public string DestinationDatabase - { - get { return Catalog; } - set { Catalog = value; } - } - [AddInParameter("Sql source connection string"), AddInParameterEditor(typeof(TextParameterEditor), ""), AddInParameterGroup("Source")] + + [AddInParameter("Sql source connection string")] + [AddInParameterEditor(typeof(TextParameterEditor), "")] + [AddInParameterGroup("Source")] public string SourceConnectionString { get { return ManualConnectionString; } set { ManualConnectionString = value; } } - [AddInParameter("Sql destination connection string"), AddInParameterEditor(typeof(TextParameterEditor), ""), AddInParameterGroup("Destination")] - public string DestinationConnectionString + #endregion + + #region Destination AddIns + [AddInParameter("Destination server")] + [AddInParameterEditor(typeof(TextParameterEditor), "")] + [AddInParameterGroup("Destination")] + public string DestinationServer { - get { return ManualConnectionString; } - set { ManualConnectionString = value; } + get { return Server; } + set { Server = value; } } - [AddInParameter("Remove missing rows after import"), AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=Removes rows from the destination and relation tables. This option takes precedence"), AddInParameterGroup("Destination")] - public bool RemoveMissingAfterImport + [AddInParameter("Use integrated security to connect to destination server")] + [AddInParameterEditor(typeof(YesNoParameterEditor), "")] + [AddInParameterGroup("Destination")] + public bool DestinationServerSSPI { get; set; } + + [AddInParameter("Sql destination server username")] + [AddInParameterEditor(typeof(TextParameterEditor), "")] + [AddInParameterGroup("Destination")] + public string DestinationUsername { - get; - set; + get { return Username; } + set { Username = value; } } - [AddInParameter("Remove missing rows after import in the destination tables only"), AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=Deletes rows not present in the import source - excluding related tabled"), AddInParameterGroup("Destination"), AddInParameterOrder(35)] - public bool RemoveMissingAfterImportDestinationTablesOnly + [AddInParameter("Sql destination server password")] + [AddInParameterEditor(typeof(TextParameterEditor), "password=true")] + [AddInParameterGroup("Destination")] + public string DestinationPassword { - get; - set; + get { return Password; } + set { Password = value; } } - [AddInParameter("Discard duplicates"), AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=When ON, duplicate rows are skipped"), AddInParameterGroup("Destination")] + [AddInParameter("Sql destination database")] + [AddInParameterEditor(typeof(TextParameterEditor), "")] + [AddInParameterGroup("Destination")] + public string DestinationDatabase + { + get { return Catalog; } + set { Catalog = value; } + } + + [AddInParameter("Sql destination connection string")] + [AddInParameterEditor(typeof(TextParameterEditor), "")] + [AddInParameterGroup("Destination")] + public string DestinationConnectionString + { + get { return ManualConnectionString; } + set { ManualConnectionString = value; } + } + + [AddInParameter("Remove missing rows after import")] + [AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=Deletes rows from each destination table individually, based on whether they are present in the corresponding source table. This setting looks at each table separately and removes rows missing from the source for that specific table. This option takes precedence")] + [AddInParameterGroup("Destination")] + public bool RemoveMissingAfterImport { get; set; } + + [AddInParameter("Remove missing rows across all tables after import")] + [AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=Deletes rows from all destination tables and relation tables by considering the entire dataset in the import source. This setting evaluates all tables collectively and removes rows missing across the whole activity.")] + [AddInParameterGroup("Destination")] + public bool RemoveMissingRows { get; set; } + + [AddInParameter("Remove missing rows after import in the destination tables only")] + [AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=Deletes rows not present in the import source - excluding related tabled")] + [AddInParameterGroup("Destination")] + [AddInParameterOrder(35)] + public bool RemoveMissingAfterImportDestinationTablesOnly { get; set; } + + [AddInParameter("Discard duplicates")] + [AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=When ON, duplicate rows are skipped")] + [AddInParameterGroup("Destination")] public virtual bool DiscardDuplicates { get; set; } - [AddInParameter("Persist successful rows and skip failing rows"), AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=Checking this box allows the activity to do partial imports by skipping problematic records and keeping the succesful ones"), AddInParameterGroup("Destination"), AddInParameterOrder(100)] + [AddInParameter("Persist successful rows and skip failing rows")] + [AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=Checking this box allows the activity to do partial imports by skipping problematic records and keeping the succesful ones")] + [AddInParameterGroup("Destination")] + [AddInParameterOrder(100)] public virtual bool SkipFailingRows { get; set; } - + #endregion private string _sqlConnectionString; protected string SqlConnectionString @@ -256,6 +299,12 @@ public SqlProvider(XmlNode xmlNode) case "Schema": Schema = new Schema(node); break; + case "RemoveMissingRows": + if (node.HasChildNodes) + { + RemoveMissingRows = node.FirstChild.Value == "True"; + } + break; case "RemoveMissingAfterImport": if (node.HasChildNodes) { @@ -337,7 +386,8 @@ public override void UpdateSourceSettings(ISource source) DestinationServerSSPI = newProvider.DestinationServerSSPI; Catalog = newProvider.Catalog; DiscardDuplicates = newProvider.DiscardDuplicates; - SkipFailingRows = newProvider.SkipFailingRows; + SkipFailingRows = newProvider.SkipFailingRows; + RemoveMissingRows = newProvider.RemoveMissingRows; } public override void UpdateDestinationSettings(IDestination destination) @@ -372,13 +422,14 @@ public override string Serialize() root.Add(CreateParameterNode(GetType(), "Sql destination connection string", DestinationConnectionString)); root.Add(CreateParameterNode(GetType(), "Remove missing rows after import in the destination tables only", RemoveMissingAfterImportDestinationTablesOnly.ToString())); root.Add(CreateParameterNode(GetType(), "Persist successful rows and skip failing rows", SkipFailingRows.ToString())); - + root.Add(CreateParameterNode(GetType(), "Remove missing rows across all tables after import", RemoveMissingRows.ToString())); string ret = document.ToString(); return ret; } public new virtual void SaveAsXml(XmlTextWriter xmlTextWriter) { + xmlTextWriter.WriteElementString("RemoveMissingRows", RemoveMissingRows.ToString(CultureInfo.CurrentCulture)); xmlTextWriter.WriteElementString("RemoveMissingAfterImport", RemoveMissingAfterImport.ToString()); xmlTextWriter.WriteElementString("RemoveMissingAfterImportDestinationTablesOnly", RemoveMissingAfterImportDestinationTablesOnly.ToString()); xmlTextWriter.WriteElementString("SqlConnectionString", SqlConnectionString); @@ -508,17 +559,42 @@ public override bool RunJob(Job job) Logger.Log(string.Format("No rows were imported to the table: {0}.", writer.Mapping.DestinationTable.Name)); } } - foreach (SqlDestinationWriter writer in Enumerable.Reverse(writers)) + if (RemoveMissingRows) { - if (writer.RowsToWriteCount > 0) + var distinctWriters = Enumerable.Reverse(writers).DistinctBy(obj => obj.Mapping.DestinationTable); + if (distinctWriters != null) { - System.Diagnostics.Debug.WriteLine(DateTime.Now + ": Removing excess data from table: " + writer.Mapping.DestinationTable.Name); - long rowsAffected = writer.DeleteRowsNotInSourceFromMainTable(""); - System.Diagnostics.Debug.WriteLine(DateTime.Now + ": excess data Removed from table: " + writer.Mapping.DestinationTable.Name); - if (rowsAffected > 0) + foreach (var distinctWriter in distinctWriters) { - Logger.Log($"The number of deleted rows: {rowsAffected} for the destination {writer.Mapping.DestinationTable.Name} table mapping"); - TotalRowsAffected += rowsAffected; + if (distinctWriter == null || distinctWriter.Mapping == null) + continue; + + var sameWriters = writers.Where(obj => obj.Mapping != null && obj.Mapping.DestinationTable != null && obj.Mapping.DestinationTable.Name.Equals(distinctWriter.Mapping.DestinationTable?.Name ?? "", StringComparison.OrdinalIgnoreCase)).ToList(); + if (sameWriters.Count == 0) + continue; + + Dictionary mappings = sameWriters.ToDictionary(obj => $"{obj.GetTempTableName}", obj => obj.Mapping); + if (mappings == null || mappings.Count == 0) + continue; + + TotalRowsAffected += sameWriters[0].DeleteExcessFromMainTable(Transaction, mappings); + } + } + } + else + { + foreach (SqlDestinationWriter writer in Enumerable.Reverse(writers)) + { + if (writer.RowsToWriteCount > 0) + { + System.Diagnostics.Debug.WriteLine(DateTime.Now + ": Removing excess data from table: " + writer.Mapping.DestinationTable.Name); + long rowsAffected = writer.DeleteRowsNotInSourceFromMainTable(""); + System.Diagnostics.Debug.WriteLine(DateTime.Now + ": excess data Removed from table: " + writer.Mapping.DestinationTable.Name); + if (rowsAffected > 0) + { + Logger.Log($"The number of deleted rows: {rowsAffected} for the destination {writer.Mapping.DestinationTable.Name} table mapping"); + TotalRowsAffected += rowsAffected; + } } } }