Skip to content

Commit

Permalink
Added new addin for remove-missing-rows plus sorted addins so they ar…
Browse files Browse the repository at this point in the history
…e grouped by source and destination. added logic in destinationwriter to handle new addin for removing missing rows.
  • Loading branch information
MatthiasSort committed Dec 23, 2024
1 parent 8d3db16 commit 294893a
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 63 deletions.
4 changes: 2 additions & 2 deletions src/Dynamicweb.DataIntegration.Providers.SqlProvider.csproj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Version>10.8.0</Version>
<Version>10.9.0</Version>
<AssemblyVersion>1.0.0.0</AssemblyVersion>
<Title>SQL Provider</Title>
<Description>SQL Provider</Description>
Expand All @@ -23,7 +23,7 @@
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dynamicweb.DataIntegration" Version="10.8.0" />
<PackageReference Include="Dynamicweb.DataIntegration" Version="10.9.0" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
</ItemGroup>
</Project>
11 changes: 11 additions & 0 deletions src/SQLDestinationWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ public class SqlDestinationWriter : BaseSqlWriter, IDestinationWriter, IDisposab
{
public SqlCommand SqlCommand;

internal string GetTempTableName
{
get => tempTablePrefix;
}

public new Mapping Mapping { get; }

/// <summary>
Expand Down Expand Up @@ -234,6 +239,12 @@ public SqlDestinationWriter(Mapping mapping, SqlCommand mockSqlCommand, bool rem
}
}

internal int DeleteExcessFromMainTable(SqlTransaction transaction, Dictionary<string, Mapping> mappings)
{
SqlCommand.Transaction = transaction;
return DeleteRowsFromMainTable(false, mappings, "", SqlCommand);
}

/// <summary>
/// Deletes rows not present in the import source
/// </summary>
Expand Down
198 changes: 137 additions & 61 deletions src/SQLProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Globalization;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Xml;
Expand All @@ -20,99 +21,141 @@ namespace Dynamicweb.DataIntegration.Providers.SqlProvider;
public class SqlProvider : BaseSqlProvider, ISource, IDestination
{
private Schema Schema;
[AddInParameter("Source server"), AddInParameterEditor(typeof(TextParameterEditor), ""), AddInParameterGroup("Source")]

#region Source AddIns
[AddInParameter("Source server")]
[AddInParameterEditor(typeof(TextParameterEditor), "")]
[AddInParameterGroup("Source")]
public string SourceServer
{
get { return Server; }
set { Server = value; }
}
[AddInParameter("Destination server"), AddInParameterEditor(typeof(TextParameterEditor), ""), AddInParameterGroup("Destination")]
public string DestinationServer
{
get { return Server; }
set { Server = value; }
}
[AddInParameter("Use integrated security to connect to source server"), AddInParameterEditor(typeof(YesNoParameterEditor), ""), AddInParameterGroup("Source")]

[AddInParameter("Use integrated security to connect to source server")]
[AddInParameterEditor(typeof(YesNoParameterEditor), "")]
[AddInParameterGroup("Source")]
public bool SourceServerSSPI
{
get;
set;
}
[AddInParameter("Use integrated security to connect to destination server"), AddInParameterEditor(typeof(YesNoParameterEditor), ""), AddInParameterGroup("Destination")]
public bool DestinationServerSSPI
{
get;
set;
}
[AddInParameter("Sql source server username"), AddInParameterEditor(typeof(TextParameterEditor), ""), AddInParameterGroup("Source")]

[AddInParameter("Sql source server username")]
[AddInParameterEditor(typeof(TextParameterEditor), "")]
[AddInParameterGroup("Source")]
public string SourceUsername
{
get { return Username; }
set { Username = value; }
}
[AddInParameter("Sql destination server username"), AddInParameterEditor(typeof(TextParameterEditor), ""), AddInParameterGroup("Destination")]
public string DestinationUsername
{
get { return Username; }
set { Username = value; }
}
[AddInParameter("Sql source server password"), AddInParameterEditor(typeof(TextParameterEditor), "password=true"), AddInParameterGroup("Source")]

[AddInParameter("Sql source server password")]
[AddInParameterEditor(typeof(TextParameterEditor), "password=true")]
[AddInParameterGroup("Source")]
public string SourcePassword
{
get { return Password; }
set { Password = value; }
}
[AddInParameter("Sql destination server password"), AddInParameterEditor(typeof(TextParameterEditor), "password=true"), AddInParameterGroup("Destination")]
public string DestinationPassword
{
get { return Password; }
set { Password = value; }
}
[AddInParameter("Sql source database"), AddInParameterEditor(typeof(TextParameterEditor), ""), AddInParameterGroup("Source")]

[AddInParameter("Sql source database")]
[AddInParameterEditor(typeof(TextParameterEditor), "")]
[AddInParameterGroup("Source")]
public string SourceDatabase
{
get { return Catalog; }
set { Catalog = value; }
}
[AddInParameter("Sql destination database"), AddInParameterEditor(typeof(TextParameterEditor), ""), AddInParameterGroup("Destination")]
public string DestinationDatabase
{
get { return Catalog; }
set { Catalog = value; }
}
[AddInParameter("Sql source connection string"), AddInParameterEditor(typeof(TextParameterEditor), ""), AddInParameterGroup("Source")]

[AddInParameter("Sql source connection string")]
[AddInParameterEditor(typeof(TextParameterEditor), "")]
[AddInParameterGroup("Source")]
public string SourceConnectionString
{
get { return ManualConnectionString; }
set { ManualConnectionString = value; }
}
[AddInParameter("Sql destination connection string"), AddInParameterEditor(typeof(TextParameterEditor), ""), AddInParameterGroup("Destination")]
public string DestinationConnectionString
#endregion

#region Destination AddIns
[AddInParameter("Destination server")]
[AddInParameterEditor(typeof(TextParameterEditor), "")]
[AddInParameterGroup("Destination")]
public string DestinationServer
{
get { return ManualConnectionString; }
set { ManualConnectionString = value; }
get { return Server; }
set { Server = value; }
}

[AddInParameter("Remove missing rows after import"), AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=Removes rows from the destination and relation tables. This option takes precedence"), AddInParameterGroup("Destination")]
public bool RemoveMissingAfterImport
[AddInParameter("Use integrated security to connect to destination server")]
[AddInParameterEditor(typeof(YesNoParameterEditor), "")]
[AddInParameterGroup("Destination")]
public bool DestinationServerSSPI { get; set; }

[AddInParameter("Sql destination server username")]
[AddInParameterEditor(typeof(TextParameterEditor), "")]
[AddInParameterGroup("Destination")]
public string DestinationUsername
{
get;
set;
get { return Username; }
set { Username = value; }
}

[AddInParameter("Remove missing rows after import in the destination tables only"), AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=Deletes rows not present in the import source - excluding related tabled"), AddInParameterGroup("Destination"), AddInParameterOrder(35)]
public bool RemoveMissingAfterImportDestinationTablesOnly
[AddInParameter("Sql destination server password")]
[AddInParameterEditor(typeof(TextParameterEditor), "password=true")]
[AddInParameterGroup("Destination")]
public string DestinationPassword
{
get;
set;
get { return Password; }
set { Password = value; }
}

[AddInParameter("Discard duplicates"), AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=When ON, duplicate rows are skipped"), AddInParameterGroup("Destination")]
[AddInParameter("Sql destination database")]
[AddInParameterEditor(typeof(TextParameterEditor), "")]
[AddInParameterGroup("Destination")]
public string DestinationDatabase
{
get { return Catalog; }
set { Catalog = value; }
}

[AddInParameter("Sql destination connection string")]
[AddInParameterEditor(typeof(TextParameterEditor), "")]
[AddInParameterGroup("Destination")]
public string DestinationConnectionString
{
get { return ManualConnectionString; }
set { ManualConnectionString = value; }
}

[AddInParameter("Remove missing rows after import")]
[AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=Deletes rows from each destination table individually, based on whether they are present in the corresponding source table. This setting looks at each table separately and removes rows missing from the source for that specific table. This option takes precedence")]
[AddInParameterGroup("Destination")]
public bool RemoveMissingAfterImport { get; set; }

[AddInParameter("Remove missing rows across all tables after import")]
[AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=Deletes rows from all destination tables and relation tables by considering the entire dataset in the import source. This setting evaluates all tables collectively and removes rows missing across the whole activity.")]
[AddInParameterGroup("Destination")]
public bool RemoveMissingRows { get; set; }

[AddInParameter("Remove missing rows after import in the destination tables only")]
[AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=Deletes rows not present in the import source - excluding related tabled")]
[AddInParameterGroup("Destination")]
[AddInParameterOrder(35)]
public bool RemoveMissingAfterImportDestinationTablesOnly { get; set; }

[AddInParameter("Discard duplicates")]
[AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=When ON, duplicate rows are skipped")]
[AddInParameterGroup("Destination")]
public virtual bool DiscardDuplicates { get; set; }

[AddInParameter("Persist successful rows and skip failing rows"), AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=Checking this box allows the activity to do partial imports by skipping problematic records and keeping the succesful ones"), AddInParameterGroup("Destination"), AddInParameterOrder(100)]
[AddInParameter("Persist successful rows and skip failing rows")]
[AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=Checking this box allows the activity to do partial imports by skipping problematic records and keeping the succesful ones")]
[AddInParameterGroup("Destination")]
[AddInParameterOrder(100)]
public virtual bool SkipFailingRows { get; set; }

#endregion

private string _sqlConnectionString;
protected string SqlConnectionString
Expand Down Expand Up @@ -256,6 +299,12 @@ public SqlProvider(XmlNode xmlNode)
case "Schema":
Schema = new Schema(node);
break;
case "RemoveMissingRows":
if (node.HasChildNodes)
{
RemoveMissingRows = node.FirstChild.Value == "True";
}
break;
case "RemoveMissingAfterImport":
if (node.HasChildNodes)
{
Expand Down Expand Up @@ -337,7 +386,8 @@ public override void UpdateSourceSettings(ISource source)
DestinationServerSSPI = newProvider.DestinationServerSSPI;
Catalog = newProvider.Catalog;
DiscardDuplicates = newProvider.DiscardDuplicates;
SkipFailingRows = newProvider.SkipFailingRows;
SkipFailingRows = newProvider.SkipFailingRows;
RemoveMissingRows = newProvider.RemoveMissingRows;
}

public override void UpdateDestinationSettings(IDestination destination)
Expand Down Expand Up @@ -372,13 +422,14 @@ public override string Serialize()
root.Add(CreateParameterNode(GetType(), "Sql destination connection string", DestinationConnectionString));
root.Add(CreateParameterNode(GetType(), "Remove missing rows after import in the destination tables only", RemoveMissingAfterImportDestinationTablesOnly.ToString()));
root.Add(CreateParameterNode(GetType(), "Persist successful rows and skip failing rows", SkipFailingRows.ToString()));

root.Add(CreateParameterNode(GetType(), "Remove missing rows across all tables after import", RemoveMissingRows.ToString()));
string ret = document.ToString();
return ret;
}

public new virtual void SaveAsXml(XmlTextWriter xmlTextWriter)
{
xmlTextWriter.WriteElementString("RemoveMissingRows", RemoveMissingRows.ToString(CultureInfo.CurrentCulture));
xmlTextWriter.WriteElementString("RemoveMissingAfterImport", RemoveMissingAfterImport.ToString());
xmlTextWriter.WriteElementString("RemoveMissingAfterImportDestinationTablesOnly", RemoveMissingAfterImportDestinationTablesOnly.ToString());
xmlTextWriter.WriteElementString("SqlConnectionString", SqlConnectionString);
Expand Down Expand Up @@ -508,17 +559,42 @@ public override bool RunJob(Job job)
Logger.Log(string.Format("No rows were imported to the table: {0}.", writer.Mapping.DestinationTable.Name));
}
}
foreach (SqlDestinationWriter writer in Enumerable.Reverse(writers))
if (RemoveMissingRows)
{
if (writer.RowsToWriteCount > 0)
var distinctWriters = Enumerable.Reverse(writers).DistinctBy(obj => obj.Mapping.DestinationTable);
if (distinctWriters != null)
{
System.Diagnostics.Debug.WriteLine(DateTime.Now + ": Removing excess data from table: " + writer.Mapping.DestinationTable.Name);
long rowsAffected = writer.DeleteRowsNotInSourceFromMainTable("");
System.Diagnostics.Debug.WriteLine(DateTime.Now + ": excess data Removed from table: " + writer.Mapping.DestinationTable.Name);
if (rowsAffected > 0)
foreach (var distinctWriter in distinctWriters)
{
Logger.Log($"The number of deleted rows: {rowsAffected} for the destination {writer.Mapping.DestinationTable.Name} table mapping");
TotalRowsAffected += rowsAffected;
if (distinctWriter == null || distinctWriter.Mapping == null)
continue;

var sameWriters = writers.Where(obj => obj.Mapping != null && obj.Mapping.DestinationTable != null && obj.Mapping.DestinationTable.Name.Equals(distinctWriter.Mapping.DestinationTable?.Name ?? "", StringComparison.OrdinalIgnoreCase)).ToList();
if (sameWriters.Count == 0)
continue;

Dictionary<string, Mapping> mappings = sameWriters.ToDictionary(obj => $"{obj.GetTempTableName}", obj => obj.Mapping);
if (mappings == null || mappings.Count == 0)
continue;

TotalRowsAffected += sameWriters[0].DeleteExcessFromMainTable(Transaction, mappings);
}
}
}
else
{
foreach (SqlDestinationWriter writer in Enumerable.Reverse(writers))
{
if (writer.RowsToWriteCount > 0)
{
System.Diagnostics.Debug.WriteLine(DateTime.Now + ": Removing excess data from table: " + writer.Mapping.DestinationTable.Name);
long rowsAffected = writer.DeleteRowsNotInSourceFromMainTable("");
System.Diagnostics.Debug.WriteLine(DateTime.Now + ": excess data Removed from table: " + writer.Mapping.DestinationTable.Name);
if (rowsAffected > 0)
{
Logger.Log($"The number of deleted rows: {rowsAffected} for the destination {writer.Mapping.DestinationTable.Name} table mapping");
TotalRowsAffected += rowsAffected;
}
}
}
}
Expand Down

0 comments on commit 294893a

Please sign in to comment.