diff --git a/.config/dotnet-tools.json b/.config/dotnet-tools.json
index 63bcec52..e4cb4309 100644
--- a/.config/dotnet-tools.json
+++ b/.config/dotnet-tools.json
@@ -9,4 +9,4 @@
]
}
}
-}
\ No newline at end of file
+}
diff --git a/Extractor/BrowseCallback.cs b/Extractor/BrowseCallback.cs
index 68433e69..d25432fe 100644
--- a/Extractor/BrowseCallback.cs
+++ b/Extractor/BrowseCallback.cs
@@ -27,10 +27,15 @@ public class BrowseReport
{
public string? IdPrefix { get; set; }
public int AssetsUpdated { get; set; }
+ public int RawAssetsUpdated { get; set; }
public int AssetsCreated { get; set; }
+ public int RawAssetsCreated { get; set; }
public int TimeSeriesUpdated { get; set; }
+ public int RawTimeseriesUpdated { get; set; }
public int TimeSeriesCreated { get; set; }
+ public int RawTimeseriesCreated { get; set; }
public int RelationshipsCreated { get; set; }
+ public int RawRelationshipsCreated { get; set; }
public int MinimalTimeSeriesCreated { get; set; }
public string? RawDatabase { get; set; }
public string? AssetsTable { get; set; }
diff --git a/Extractor/Config/CogniteConfig.cs b/Extractor/Config/CogniteConfig.cs
index 969cb981..0c11320c 100644
--- a/Extractor/Config/CogniteConfig.cs
+++ b/Extractor/Config/CogniteConfig.cs
@@ -17,6 +17,7 @@ You should have received a copy of the GNU General Public License
using Cognite.Extensions;
using Cognite.Extractor.Utils;
+using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.ComponentModel.DataAnnotations;
@@ -56,6 +57,7 @@ public class CognitePusherConfig : CogniteConfig, IPusherConfig
/// similarly to raw-metadata, and datapoints will be pushed. Nothing will be written to raw, and no assets will be created.
/// Events will be created, but without asset context.
///
+ [Obsolete("Deprecated!")]
public bool SkipMetadata { get; set; }
///
/// Store assets and/or timeseries data in raw. Assets will not be created at all,
@@ -65,6 +67,7 @@ public class CognitePusherConfig : CogniteConfig, IPusherConfig
/// of the source node is added to metadata if applicable.
/// Use different table names for assets and timeseries.
///
+ [Obsolete("Deprecated! Use MetadataTargetsConfig.RawMetadataTargetConfig instead.")]
public RawMetadataConfig? RawMetadata { get; set; }
///
/// Map metadata to asset/timeseries attributes. Each of "assets" and "timeseries" is a map from property DisplayName to
@@ -114,7 +117,13 @@ public double? NonFiniteReplacement
///
/// Configuration for writing to a custom OPC-UA flexible data model.
///
+ [Obsolete("Deprecated! Use MetadataTargetsConfig.FdmDestinationConfig instead.")]
public FdmDestinationConfig? FlexibleDataModels { get; set; }
+
+ ///
+ /// This is the implementation of the metadata targets
+ ///
+ public MetadataTargetsConfig? MetadataTargets { get; set; }
}
public class RawMetadataConfig
{
@@ -136,6 +145,34 @@ public class RawMetadataConfig
///
public string? RelationshipsTable { get; set; }
}
+ public class MetadataTargetsConfig
+ {
+ ///
+ /// Raw metadata targets config
+ ///
+ public RawMetadataTargetConfig? Raw { get; set; }
+ ///
+ /// Clean metadata targets config
+ ///
+ public CleanMetadataTargetConfig? Clean { get; set; }
+ ///
+ /// FDM destination config
+ ///
+ public FdmDestinationConfig? DataModels { get; set; }
+ }
+ public class RawMetadataTargetConfig
+ {
+ public string? Database { get; set; }
+ public string? AssetsTable { get; set; }
+ public string? TimeseriesTable { get; set; }
+ public string? RelationshipsTable { get; set; }
+ }
+ public class CleanMetadataTargetConfig
+ {
+ public bool Assets { get; set; }
+ public bool Timeseries { get; set; }
+ public bool Relationships { get; set; }
+ }
public class MetadataMapConfig
{
public Dictionary? Assets { get; set; }
diff --git a/Extractor/NodeSources/NodeSetSource.cs b/Extractor/NodeSources/NodeSetSource.cs
index 53baff5f..6224c6dc 100644
--- a/Extractor/NodeSources/NodeSetSource.cs
+++ b/Extractor/NodeSources/NodeSetSource.cs
@@ -349,7 +349,7 @@ private async Task InitNodes(IEnumerable nodes, CancellationToken to
await InitNodes(NodeList, token);
- var usesFdm = Config.Cognite?.FlexibleDataModels?.Enabled ?? false;
+ var usesFdm = Config.Cognite?.MetadataTargets?.DataModels?.Enabled ?? false;
if (Config.Extraction.Relationships.Enabled)
{
diff --git a/Extractor/NodeSources/UANodeSource.cs b/Extractor/NodeSources/UANodeSource.cs
index 1d10d28c..775c2c6e 100644
--- a/Extractor/NodeSources/UANodeSource.cs
+++ b/Extractor/NodeSources/UANodeSource.cs
@@ -110,7 +110,7 @@ private async Task InitNodes(IEnumerable nodes, CancellationToken to
await InitNodes(NodeList, token);
- var usesFdm = Config.Cognite?.FlexibleDataModels?.Enabled ?? false;
+ var usesFdm = Config.Cognite?.MetadataTargets?.DataModels?.Enabled ?? false;
if (Config.Extraction.Relationships.Enabled)
{
diff --git a/Extractor/Nodes/UADataType.cs b/Extractor/Nodes/UADataType.cs
index 727e39b8..121124f9 100644
--- a/Extractor/Nodes/UADataType.cs
+++ b/Extractor/Nodes/UADataType.cs
@@ -38,7 +38,7 @@ public DataTypeAttributes() : base(NodeClass.DataType)
public override IEnumerable GetAttributeSet(FullConfig config)
{
- if (config.Cognite?.FlexibleDataModels?.Enabled ?? false)
+ if (config.Cognite?.MetadataTargets?.DataModels?.Enabled ?? false)
{
yield return Attributes.DataTypeDefinition;
}
diff --git a/Extractor/Nodes/UAVariable.cs b/Extractor/Nodes/UAVariable.cs
index 2ec6d3eb..d748ad5b 100644
--- a/Extractor/Nodes/UAVariable.cs
+++ b/Extractor/Nodes/UAVariable.cs
@@ -457,22 +457,9 @@ public TimeSeriesCreate ToTimeseries(
UAExtractor extractor,
long? dataSetId,
IDictionary? nodeToAssetIds,
- Dictionary? metaMap,
- bool minimal = false)
+ Dictionary? metaMap)
{
string? externalId = GetUniqueId(client);
-
- if (minimal)
- {
- return new TimeSeriesCreate
- {
- ExternalId = externalId,
- IsString = FullAttributes.DataType.IsString,
- IsStep = FullAttributes.DataType.IsStep,
- DataSetId = dataSetId
- };
- }
-
var writePoco = new TimeSeriesCreate
{
Description = FullAttributes.Description,
@@ -502,6 +489,19 @@ public TimeSeriesCreate ToTimeseries(
return writePoco;
}
+
+ public TimeSeriesCreate ToMinimalTimeseries(IUAClientAccess client, long? dataSetId)
+ {
+ string? externalId = GetUniqueId(client);
+
+ return new TimeSeriesCreate
+ {
+ ExternalId = externalId,
+ IsString = FullAttributes.DataType.IsString,
+ IsStep = FullAttributes.DataType.IsStep,
+ DataSetId = dataSetId
+ };
+ }
#endregion
}
diff --git a/Extractor/Pushers/CDFPusher.cs b/Extractor/Pushers/CDFPusher.cs
index de16473e..ebfca370 100644
--- a/Extractor/Pushers/CDFPusher.cs
+++ b/Extractor/Pushers/CDFPusher.cs
@@ -22,7 +22,7 @@ You should have received a copy of the GNU General Public License
using Cognite.OpcUa.History;
using Cognite.OpcUa.Nodes;
using Cognite.OpcUa.NodeSources;
-using Cognite.OpcUa.Pushers.FDM;
+using Cognite.OpcUa.Pushers.Writers.Interfaces;
using Cognite.OpcUa.Types;
using CogniteSdk;
using Microsoft.Extensions.DependencyInjection;
@@ -33,7 +33,6 @@ You should have received a copy of the GNU General Public License
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
-using System.Net.Http;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
@@ -48,6 +47,7 @@ public sealed class CDFPusher : IPusher
{
private readonly CognitePusherConfig config;
private readonly FullConfig fullConfig;
+ private readonly ICDFWriter cdfWriter;
private readonly IDictionary nodeToAssetIds = new Dictionary();
public bool DataFailing { get; set; }
@@ -60,9 +60,9 @@ public sealed class CDFPusher : IPusher
private UAExtractor extractor;
public UAExtractor Extractor { get => extractor; set {
extractor = value;
- if (fdmDestination != null)
+ if (cdfWriter.FDM != null)
{
- fdmDestination.Extractor = value;
+ cdfWriter.FDM.Extractor = value;
}
} }
public IPusherConfig BaseConfig { get; }
@@ -72,7 +72,9 @@ public sealed class CDFPusher : IPusher
private readonly CogniteDestination destination;
private readonly BrowseCallback? callback;
- private readonly FDMWriter? fdmDestination;
+ private RawMetadataTargetConfig? RawMetadataTargetConfig => fullConfig.Cognite?.MetadataTargets?.Raw;
+ private CleanMetadataTargetConfig? CleanMetadataTargetConfig => fullConfig.Cognite?.MetadataTargets?.Clean;
+
public CDFPusher(
ILogger log,
@@ -87,15 +89,11 @@ public CDFPusher(
BaseConfig = config;
this.destination = destination;
this.fullConfig = fullConfig;
+ cdfWriter = provider.GetRequiredService();
if (config.BrowseCallback != null && (config.BrowseCallback.Id.HasValue || !string.IsNullOrEmpty(config.BrowseCallback.ExternalId)))
{
callback = new BrowseCallback(destination, config.BrowseCallback, log);
}
- if (config.FlexibleDataModels != null && config.FlexibleDataModels.Enabled)
- {
- fdmDestination = new FDMWriter(provider.GetRequiredService(), destination,
- provider.GetRequiredService>());
- }
}
private static readonly Counter dataPointsCounter = Metrics
@@ -197,8 +195,6 @@ public CDFPusher(
}
}
-
-
result.ThrowOnFatal();
log.LogDebug("Successfully pushed {Real} / {Total} points to CDF", realCount, count);
@@ -288,23 +284,21 @@ public CDFPusher(
///
/// List of objects to be synchronized
/// List of variables to be synchronized
+ /// List of references to be synchronized
/// Configuration of what fields, if any, should be updated.
+ /// Cancellation token
/// True if no operation failed unexpectedly
- public async Task PushNodes(
- IEnumerable objects,
- IEnumerable variables,
- IEnumerable references,
- UpdateConfig update,
- CancellationToken token)
+ public async Task PushNodes(IEnumerable objects,
+ IEnumerable variables, IEnumerable references, UpdateConfig update, CancellationToken token)
{
var result = new PushResult();
var report = new BrowseReport
{
IdPrefix = fullConfig.Extraction.IdPrefix,
- RawDatabase = config.RawMetadata?.Database,
- AssetsTable = config.RawMetadata?.AssetsTable,
- TimeSeriesTable = config.RawMetadata?.TimeseriesTable,
- RelationshipsTable = config.RawMetadata?.RelationshipsTable
+ RawDatabase = RawMetadataTargetConfig?.Database,
+ AssetsTable = RawMetadataTargetConfig?.AssetsTable,
+ TimeSeriesTable = RawMetadataTargetConfig?.TimeseriesTable,
+ RelationshipsTable = RawMetadataTargetConfig?.RelationshipsTable
};
if (!variables.Any() && !objects.Any() && !references.Any())
@@ -317,13 +311,16 @@ public async Task PushNodes(
return result;
}
- log.LogInformation("Testing {TotalNodesToTest} nodes against CDF", variables.Count() + objects.Count());
+ log.LogInformation(
+ "Testing {TotalNodesToTest} nodes against CDF",
+ variables.Count() + objects.Count()
+ );
if (fullConfig.DryRun)
{
- if (fdmDestination != null)
+ if (cdfWriter.FDM != null)
{
- await fdmDestination.PushNodes(objects, variables, references, Extractor, token);
+ await cdfWriter.FDM.PushNodes(objects, variables, references, Extractor, token);
}
return result;
}
@@ -338,78 +335,35 @@ public async Task PushNodes(
result.Objects = false;
result.References = false;
result.Variables = false;
+ result.RawObjects = false;
+ result.RawReferences = false;
+ result.RawVariables = false;
nodeEnsuringFailures.Inc();
return result;
}
- if (fdmDestination != null)
- {
- bool pushResult = true;
- try
- {
- var tsIds = new ConcurrentDictionary(
- variables.ToDictionary(ts => ts.GetUniqueId(Extractor))!);
- await CreateTimeseries(tsIds, report, true, token);
- }
- catch (Exception ex)
- {
- log.LogError(ex, "Failed to push minimal timeseries to CDF");
- pushResult = false;
- }
-
- if (pushResult)
- {
- try
- {
- pushResult = await fdmDestination.PushNodes(objects, variables, references, Extractor, token);
- }
- catch (Exception e)
- {
- log.LogError(e, "Failed to push to flexible data models");
- pushResult = false;
- }
- }
-
- result.Objects = pushResult;
- result.Variables = pushResult;
- result.References = pushResult;
- }
- else
- {
- try
- {
- await PushAssets(objects, update.Objects, report, token);
- }
- catch (Exception e)
- {
- log.LogError(e, "Failed to ensure assets");
- result.Objects = false;
- }
+ var tasks = new List();
- try
- {
- await PushTimeseries(variables, update.Variables, report, token);
- }
- catch (Exception e)
- {
- log.LogError(e, "Failed to ensure timeseries");
- result.Variables = false;
- }
+ tasks.Add(PushAssets(objects, update.Objects, report, result, token));
+
+ tasks.Add(PushTimeseries(variables, update.Variables, report, result, token));
- try
- {
- await PushReferences(references, report, token);
- }
- catch (Exception e)
- {
- log.LogError(e, "Failed to ensure references");
- result.References = false;
- }
- }
+ tasks.Add(PushReferences(references, report, result, token));
+
+ tasks.Add(PushFdm(objects, variables, references, result, token));
+
+ await Task.WhenAll(tasks);
log.LogInformation("Finish pushing nodes to CDF");
- if (result.Objects && result.References && result.Variables)
+ if (
+ result.Objects
+ && result.References
+ && result.Variables
+ && result.RawObjects
+ && result.RawVariables
+ && result.RawReferences
+ )
{
if (callback != null)
{
@@ -423,6 +377,34 @@ public async Task PushNodes(
return result;
}
+
+ ///
+ /// Synchronized all objects, variables and references with FDM
+ ///
+ /// List of objects to be synchronized
+ /// List of variables to synchronize
+ /// List of references to synchronize
+ /// Push result
+ /// Cancellation token
+ /// Task
+ private async Task PushFdm(IEnumerable objects,
+ IEnumerable variables, IEnumerable references, PushResult result, CancellationToken token)
+ {
+ if (cdfWriter.FDM == null) return;
+ bool pushResult = true;
+ try
+ {
+ pushResult = await cdfWriter.FDM.PushNodes(objects, variables, references, Extractor, token);
+ }
+ catch
+ {
+ pushResult = false;
+ }
+ result.Variables = pushResult;
+ result.Objects = pushResult;
+ result.References = pushResult;
+ }
+
///
/// Reset the pusher, preparing it to be restarted
///
@@ -431,6 +413,7 @@ public void Reset()
missingTimeseries.Clear();
mismatchedTimeseries.Clear();
}
+
///
/// Initialize extracted datapoint ranges on the given list of states.
///
@@ -441,9 +424,11 @@ public void Reset()
public async Task InitExtractedRanges(
IEnumerable states,
bool backfillEnabled,
- CancellationToken token)
+ CancellationToken token
+ )
{
- if (!states.Any() || !config.ReadExtractedRanges || fullConfig.DryRun) return true;
+ if (!states.Any() || !config.ReadExtractedRanges || fullConfig.DryRun)
+ return true;
var ids = new List();
foreach (var state in states)
{
@@ -452,7 +437,8 @@ public async Task InitExtractedRanges(
for (int i = 0; i < state.ArrayDimensions[0]; i++)
{
var id = Extractor.GetUniqueId(state.SourceId, i);
- if (id == null) break;
+ if (id == null)
+ break;
ids.Add(id);
}
}
@@ -466,7 +452,11 @@ public async Task InitExtractedRanges(
Dictionary ranges;
try
{
- var dict = await destination.GetExtractedRanges(ids.Select(Identity.Create).ToList(), token, backfillEnabled);
+ var dict = await destination.GetExtractedRanges(
+ ids.Select(Identity.Create).ToList(),
+ token,
+ backfillEnabled
+ );
ranges = dict.ToDictionary(kvp => kvp.Key.ExternalId, kvp => kvp.Value);
}
catch (Exception ex)
@@ -482,7 +472,8 @@ public async Task InitExtractedRanges(
for (int i = 0; i < state.ArrayDimensions[0]; i++)
{
var id = Extractor.GetUniqueId(state.SourceId, i);
- if (id == null) break;
+ if (id == null)
+ break;
if (ranges.TryGetValue(id, out var range))
{
if (range == TimeRange.Empty)
@@ -514,6 +505,7 @@ public async Task InitExtractedRanges(
return true;
}
+
///
/// Test that the extractor is capable of pushing to CDF.
/// Also fetches DataSet externalId.
@@ -522,7 +514,8 @@ public async Task InitExtractedRanges(
/// True if pushing is possible, false if not.
public async Task TestConnection(FullConfig fullConfig, CancellationToken token)
{
- if (fullConfig.DryRun) return true;
+ if (fullConfig.DryRun)
+ return true;
try
{
@@ -530,20 +523,31 @@ public async Task InitExtractedRanges(
}
catch (Exception ex)
{
- log.LogError("Failed to get CDF login status, this is likely a problem with the network or configuration. Project {Project} at {Url}: {Message}",
- config.Project, config.Host, ex.Message);
+ log.LogError(
+ "Failed to get CDF login status, this is likely a problem with the network or configuration. Project {Project} at {Url}: {Message}",
+ config.Project,
+ config.Host,
+ ex.Message
+ );
return false;
}
try
{
- await destination.CogniteClient.TimeSeries.ListAsync(new TimeSeriesQuery { Limit = 1 }, token);
+ await destination.CogniteClient.TimeSeries.ListAsync(
+ new TimeSeriesQuery { Limit = 1 },
+ token
+ );
}
catch (ResponseException ex)
{
- log.LogError("Could not access CDF Time Series - most likely due " +
- "to insufficient access rights on API key. Project {Project} at {Host}: {Message}",
- config.Project, config.Host, ex.Message);
+ log.LogError(
+ "Could not access CDF Time Series - most likely due "
+ + "to insufficient access rights on API key. Project {Project} at {Host}: {Message}",
+ config.Project,
+ config.Host,
+ ex.Message
+ );
return false;
}
@@ -551,13 +555,20 @@ public async Task InitExtractedRanges(
{
try
{
- await destination.CogniteClient.Events.ListAsync(new EventQuery { Limit = 1 }, token);
+ await destination.CogniteClient.Events.ListAsync(
+ new EventQuery { Limit = 1 },
+ token
+ );
}
catch (ResponseException ex)
{
- log.LogError("Could not access CDF Events, though event emitters are specified - most likely due " +
- "to insufficient access rights on API key. Project {Project} at {Host}: {Message}",
- config.Project, config.Host, ex.Message);
+ log.LogError(
+ "Could not access CDF Events, though event emitters are specified - most likely due "
+ + "to insufficient access rights on API key. Project {Project} at {Host}: {Message}",
+ config.Project,
+ config.Host,
+ ex.Message
+ );
return false;
}
}
@@ -573,41 +584,11 @@ public async Task InitExtractedRanges(
return true;
}
- ///
- /// Push list of references as relationships to CDF.
- ///
- /// List of references to push
- /// True if nothing failed unexpectedly
- private async Task PushReferences(IEnumerable references, BrowseReport report, CancellationToken token)
- {
- if (references == null || !references.Any()) return;
-
- var relationships = references
- .Select(reference => reference.ToRelationship(config.DataSet?.Id, Extractor))
- .DistinctBy(rel => rel.ExternalId);
-
- bool useRawRelationships = config.RawMetadata != null
- && !string.IsNullOrWhiteSpace(config.RawMetadata.Database)
- && !string.IsNullOrWhiteSpace(config.RawMetadata.RelationshipsTable);
-
- log.LogInformation("Test {Count} relationships against CDF", references.Count());
-
- if (useRawRelationships)
- {
- await PushRawReferences(relationships, report, token);
- }
- else
- {
- var counts = await Task.WhenAll(relationships.ChunkBy(1000).Select(chunk => PushReferencesChunk(chunk, token)));
- report.RelationshipsCreated += counts.Sum();
- }
-
- log.LogInformation("Sucessfully pushed relationships to CDF");
- }
public async Task ExecuteDeletes(DeletedNodes deletes, CancellationToken token)
{
- if (fullConfig.DryRun) return true;
+ if (fullConfig.DryRun)
+ return true;
var tasks = new List();
if (deletes.Objects.Any())
@@ -638,558 +619,304 @@ public async Task ExecuteDeletes(DeletedNodes deletes, CancellationToken t
#region assets
///
- /// Update list of nodes as assets in CDF Raw.
+ /// Maps objects to their keys while filtering
///
- /// Id, node map for the assets that should be pushed.
- private async Task UpdateRawAssets(IDictionary assetMap, BrowseReport report, CancellationToken token)
+ /// List of objects to be mapped
+ /// A dictionary of mapping
+ private ConcurrentDictionary MapAssets(IEnumerable objects)
{
- if (config.RawMetadata?.Database == null || config.RawMetadata?.AssetsTable == null) return;
- await UpsertRawRows(config.RawMetadata.Database, config.RawMetadata.AssetsTable, rows =>
- {
- if (rows == null)
- {
- return assetMap.Select(kvp => (
- kvp.Key,
- update: PusherUtils.CreateRawUpdate(log, Extractor.StringConverter, kvp.Value, null, ConverterType.Node)
- )).Where(elem => elem.update != null)
- .ToDictionary(pair => pair.Key, pair => pair.update!.Value);
- }
-
- var toWrite = new List<(string key, RawRow> row, BaseUANode node)>();
-
- foreach (var row in rows)
- {
- if (assetMap.TryGetValue(row.Key, out var ts))
- {
- toWrite.Add((row.Key, row, ts));
- assetMap.Remove(row.Key);
- }
- }
-
- var updates = new Dictionary();
-
- foreach (var (key, row, node) in toWrite)
- {
- var update = PusherUtils.CreateRawUpdate(log, Extractor.StringConverter, node, row, ConverterType.Node);
-
- if (update != null)
- {
- updates[key] = update.Value;
- if (row == null)
- {
- report.AssetsCreated++;
- }
- else
- {
- report.AssetsUpdated++;
- }
- }
- }
-
- return updates;
- }, null, token);
+ return new ConcurrentDictionary(
+ objects
+ .Where(node => node.Source != NodeSource.CDF)
+ .ToDictionary(obj => Extractor.GetUniqueId(obj.Id)!)
+ );
}
+
///
- /// Create list of nodes as assets in CDF Raw.
- /// This does not create rows if they already exist.
+ /// Synchronize all objects to CDF
///
- /// Id, node map for the assets that should be pushed.
- private async Task CreateRawAssets(IDictionary assetMap, BrowseReport report, CancellationToken token)
+ /// List of objects to be synchronized
+ /// Update configuration
+ /// Browse report
+ /// Push result
+ /// Cancellation token
+ /// Task
+ private async Task PushAssets(IEnumerable objects, TypeUpdateConfig update, BrowseReport report, PushResult result, CancellationToken token)
{
- if (config.RawMetadata?.Database == null || config.RawMetadata?.AssetsTable == null) return;
+ if (!objects.Any() && cdfWriter.Assets == null && cdfWriter.Raw == null) return;
- await EnsureRawRows(config.RawMetadata.Database, config.RawMetadata.AssetsTable, assetMap.Keys, ids =>
+ var assetsMap = MapAssets(objects);
+ if (cdfWriter.Assets != null)
{
- var assets = ids.Select(id => (assetMap[id], id));
- var creates = assets.Select(pair => (pair.Item1.ToJson(log, Extractor.StringConverter, ConverterType.Node), pair.id))
- .Where(pair => pair.Item1 != null)
- .ToDictionary(pair => pair.id, pair => pair.Item1!.RootElement);
- report.AssetsCreated += creates.Count;
- return creates;
- }, new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }, token);
- }
- ///
- /// Create assets in CDF Clean.
- ///
- /// Id, node map for the assets that should be pushed.
- private async Task> CreateAssets(IDictionary assetMap, BrowseReport report, CancellationToken token)
- {
- var assets = new List();
- foreach (var chunk in Chunking.ChunkByHierarchy(assetMap.Values, config.CdfChunking.Assets, node => node.Id, node => node.ParentId))
+ await PushCleanAssets(assetsMap, update, report, result, token);
+ }
+ if (cdfWriter.Raw != null && RawMetadataTargetConfig?.AssetsTable != null)
{
- var assetChunk = await destination.GetOrCreateAssetsAsync(chunk.Select(node => Extractor.GetUniqueId(node.Id)!), ids =>
- {
- var assets = ids.Select(id => assetMap[id]);
- var creates = assets
- .Select(node => node.ToCDFAsset(fullConfig, Extractor, config.DataSet?.Id, config.MetadataMapping?.Assets))
- .Where(asset => asset != null);
- report.AssetsCreated += creates.Count();
- return creates;
- }, RetryMode.None, SanitationMode.Clean, token);
-
- log.LogResult(assetChunk, RequestType.CreateAssets, true);
-
- assetChunk.ThrowOnFatal();
-
- if (assetChunk.Results == null) continue;
-
- foreach (var asset in assetChunk.Results)
- {
- nodeToAssetIds[assetMap[asset.ExternalId].Id] = asset.Id;
- }
- assets.AddRange(assetChunk.Results);
+ await PushRawAssets(assetsMap, update, report, result, token);
}
- return assets;
}
+
///
- /// Update assets in CDF Clean.
+ /// Synchronize all objects to CDF assets
///
- /// Id, node map for the assets that should be pushed.
- /// List of existing assets in CDF.
- /// Configuration for which fields should be updated.
- private async Task UpdateAssets(IDictionary assetMap, IEnumerable assets,
- TypeUpdateConfig update, BrowseReport report, CancellationToken token)
+ /// Synchronizes all objects maps to CDF assets
+ /// Update configuration
+ /// Browse report
+ /// Push result
+ /// Cancellation token
+ /// Task
+ private async Task PushCleanAssets(IDictionary assetsMap, TypeUpdateConfig update, BrowseReport report, PushResult result, CancellationToken token)
{
- var updates = new List();
- var existing = assets.ToDictionary(asset => asset.ExternalId);
- foreach (var kvp in assetMap)
+ try
{
- if (existing.TryGetValue(kvp.Key, out var asset))
- {
- var assetUpdate = PusherUtils.GetAssetUpdate(fullConfig, asset, kvp.Value, Extractor, update);
-
- if (assetUpdate == null) continue;
- if (assetUpdate.ParentExternalId != null || assetUpdate.Description != null
- || assetUpdate.Name != null || assetUpdate.Metadata != null)
- {
- updates.Add(new AssetUpdateItem(asset.ExternalId) { Update = assetUpdate });
- }
- }
+ var _result = await cdfWriter.Assets!.PushNodes(Extractor, assetsMap, nodeToAssetIds, update, token);
+ report.AssetsCreated += _result.Created;
+ report.AssetsUpdated += _result.Updated;
}
- if (updates.Any())
+ catch
{
- var res = await destination.UpdateAssetsAsync(updates, RetryMode.OnError, SanitationMode.Clean, token);
-
- log.LogResult(res, RequestType.UpdateAssets, false);
-
- res.ThrowOnFatal();
-
- report.AssetsUpdated += res.Results?.Count() ?? 0;
+ result.Objects = false;
}
}
+
///
- /// Master method for pushing assets to CDF raw or clean.
+ /// Master method for pushing assets to CDF raw.
///
/// Assets to push
/// Configuration for which fields, if any, to update in CDF
- private async Task PushAssets(
- IEnumerable objects,
+ private async Task PushRawAssets(
+ ConcurrentDictionary assetsMap,
TypeUpdateConfig update,
BrowseReport report,
- CancellationToken token)
+ PushResult result,
+ CancellationToken token
+ )
{
- if (config.SkipMetadata) return;
-
- var assetIds = new ConcurrentDictionary(objects
- .Where(node => node.Source != NodeSource.CDF)
- .ToDictionary(obj => Extractor.GetUniqueId(obj.Id)!));
-
- if (!assetIds.Any()) return;
-
- var metaMap = config.MetadataMapping?.Assets;
- bool useRawAssets = config.RawMetadata != null
- && !string.IsNullOrWhiteSpace(config.RawMetadata.Database)
- && !string.IsNullOrWhiteSpace(config.RawMetadata.AssetsTable);
-
- if (useRawAssets)
+ try
{
- if (update.AnyUpdate)
- {
- await UpdateRawAssets(assetIds, report, token);
- }
- else
- {
- await CreateRawAssets(assetIds, report, token);
- }
+ var _result = await cdfWriter.Raw!.PushNodes(
+ Extractor,
+ RawMetadataTargetConfig!.Database!,
+ RawMetadataTargetConfig!.AssetsTable!,
+ assetsMap,
+ ConverterType.Node,
+ update.AnyUpdate,
+ token
+ );
+ report.RawAssetsCreated += _result.Created;
+ report.RawAssetsUpdated += _result.Updated;
}
- else
+ catch (Exception e)
{
- var assets = await CreateAssets(assetIds, report, token);
-
- if (update.AnyUpdate)
- {
- await UpdateAssets(assetIds, assets, update, report, token);
- }
+ log.LogError(e, "Failed to ensure assets");
+ result.RawObjects = false;
}
}
private async Task MarkAssetsAsDeleted(
IEnumerable externalIds,
- CancellationToken token)
+ CancellationToken token
+ )
{
- bool useRawAssets = config.RawMetadata != null
- && !string.IsNullOrWhiteSpace(config.RawMetadata.Database)
- && !string.IsNullOrWhiteSpace(config.RawMetadata.AssetsTable);
+ bool useRawAssets =
+ RawMetadataTargetConfig != null
+ && !string.IsNullOrWhiteSpace(RawMetadataTargetConfig.Database)
+ && !string.IsNullOrWhiteSpace(RawMetadataTargetConfig.AssetsTable);
if (useRawAssets)
{
- await MarkRawRowsAsDeleted(config.RawMetadata!.Database!, config.RawMetadata!.AssetsTable!, externalIds, token);
+ await MarkRawRowsAsDeleted(
+ RawMetadataTargetConfig!.Database!,
+ RawMetadataTargetConfig!.AssetsTable!,
+ externalIds,
+ token
+ );
}
else
{
- var updates = externalIds.Select(extId => new AssetUpdateItem(extId)
- {
- Update = new AssetUpdate
- {
- Metadata = new UpdateDictionary(new Dictionary
+ var updates = externalIds.Select(
+ extId =>
+ new AssetUpdateItem(extId)
{
- { fullConfig.Extraction.Deletes.DeleteMarker, "true" }
- }, Enumerable.Empty())
- }
- });
- var result = await destination.UpdateAssetsAsync(updates, RetryMode.OnError, SanitationMode.Clean, token);
+ Update = new AssetUpdate
+ {
+ Metadata = new UpdateDictionary(
+ new Dictionary
+ {
+ { fullConfig.Extraction.Deletes.DeleteMarker, "true" }
+ },
+ Enumerable.Empty()
+ )
+ }
+ }
+ );
+ var result = await destination.UpdateAssetsAsync(
+ updates,
+ RetryMode.OnError,
+ SanitationMode.Clean,
+ token
+ );
log.LogResult(result, RequestType.UpdateAssets, true);
result.ThrowOnFatal();
}
}
-
#endregion
#region timeseries
///
- /// Update list of nodes as timeseries in CDF Raw.
+ /// Maps variables to their keys
///
- /// Id, node map for the timeseries that should be pushed.
- private async Task UpdateRawTimeseries(
- IDictionary tsMap,
- BrowseReport report,
- CancellationToken token)
+ /// List of variables to be mapped
+ /// A dictionary of mapping
+ private ConcurrentDictionary MapTimeseries(IEnumerable variables)
{
- if (config.RawMetadata?.Database == null || config.RawMetadata.TimeseriesTable == null) return;
-
- await UpsertRawRows(config.RawMetadata.Database, config.RawMetadata.TimeseriesTable, rows =>
- {
- if (rows == null)
- {
- return tsMap.Select(kvp => (
- kvp.Key,
- update: PusherUtils.CreateRawUpdate(log, Extractor.StringConverter, kvp.Value, null, ConverterType.Variable)
- )).Where(elem => elem.update != null)
- .ToDictionary(pair => pair.Key, pair => pair.update!.Value);
- }
-
- var toWrite = new List<(string key, RawRow> row, UAVariable node)>();
-
- foreach (var row in rows)
- {
- if (tsMap.TryGetValue(row.Key, out var ts))
- {
- toWrite.Add((row.Key, row, ts));
- tsMap.Remove(row.Key);
- }
- }
-
- var updates = new Dictionary();
-
- foreach (var (key, row, node) in toWrite)
- {
- var update = PusherUtils.CreateRawUpdate(log, Extractor.StringConverter, node, row, ConverterType.Variable);
-
- if (update != null)
- {
- updates[key] = update.Value;
- if (row == null)
- {
- report.TimeSeriesCreated++;
- }
- else
- {
- report.TimeSeriesUpdated++;
- }
- }
- }
-
- return updates;
- }, null, token);
+ return new ConcurrentDictionary(
+ variables.ToDictionary(ts => ts.GetUniqueId(Extractor)!)
+ );
}
+
///
- /// Create list of nodes as timeseries in CDF Raw.
- /// This does not create rows if they already exist.
+ /// Synchronize all variables to CDF
///
- /// Id, node map for the timeseries that should be pushed.
- private async Task CreateRawTimeseries(
- IDictionary tsMap,
- BrowseReport report,
- CancellationToken token)
+ /// List of variables to be synchronized
+ /// Update configuration
+ /// Browse report
+ /// Push result
+ /// Cancellation token
+ /// Task
+ private async Task PushTimeseries(IEnumerable variables, TypeUpdateConfig update, BrowseReport report, PushResult result, CancellationToken token)
{
- if (config.RawMetadata?.Database == null || config.RawMetadata.TimeseriesTable == null) return;
+ if (!variables.Any() && cdfWriter.Timeseries == null && cdfWriter.Raw == null) return;
- await EnsureRawRows(config.RawMetadata.Database, config.RawMetadata.TimeseriesTable, tsMap.Keys, ids =>
+ var timeseriesMap = MapTimeseries(variables);
+ await PushCleanTimeseries(timeseriesMap, update, report, result, token);
+ if (cdfWriter.Raw != null && (RawMetadataTargetConfig?.TimeseriesTable != null))
{
- var timeseries = ids.Select(id => (tsMap[id], id));
- var creates = timeseries.Select(pair => (pair.Item1.ToJson(log, Extractor.StringConverter, ConverterType.Variable), pair.id))
- .Where(pair => pair.Item1 != null)
- .ToDictionary(pair => pair.id, pair => pair.Item1!.RootElement);
-
- report.TimeSeriesCreated += creates.Count;
- return creates;
- }, new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }, token);
+ await PushRawTimeseries(timeseriesMap, update, report, result, token);
+ }
}
+
///
- /// Create timeseries in CDF Clean, optionally creates only minimal timeseries with no metadata or context.
+ /// Synchronize all variables to CDF timeseries
///
- /// Id, node map for the timeseries that should be pushed.
- /// True to create timeseries with no metadata.
- private async Task> CreateTimeseries(
- IDictionary tsMap,
- BrowseReport report,
- bool createMinimalTimeseries,
- CancellationToken token)
+ /// Synchronizes all variable maps to CDF timeseries
+ /// Update configuration
+ /// Browse report
+ /// Push result
+ /// Cancellation token
+ /// Task
+ private async Task PushCleanTimeseries(IDictionary timeseriesMap, TypeUpdateConfig update, BrowseReport report, PushResult result, CancellationToken token)
{
- var timeseries = await destination.GetOrCreateTimeSeriesAsync(tsMap.Keys, ids =>
+ try
{
- var tss = ids.Select(id => tsMap[id]);
- var creates = tss.Select(ts => ts.ToTimeseries(
- fullConfig,
- Extractor,
- Extractor,
- config.DataSet?.Id,
- nodeToAssetIds,
- config.MetadataMapping?.Timeseries,
- createMinimalTimeseries))
- .Where(ts => ts != null);
- if (createMinimalTimeseries)
+ var _result = await cdfWriter.Timeseries!.PushVariables(Extractor, timeseriesMap, nodeToAssetIds, mismatchedTimeseries, update, token);
+ var createMinimal = !(CleanMetadataTargetConfig?.Timeseries ?? false);
+ if (createMinimal)
{
- report.MinimalTimeSeriesCreated += creates.Count();
+ report.MinimalTimeSeriesCreated += _result.Created;
}
else
{
- report.TimeSeriesCreated += creates.Count();
- }
- return creates;
- }, RetryMode.None, SanitationMode.Clean, token);
-
- log.LogResult(timeseries, RequestType.CreateTimeSeries, true);
-
- timeseries.ThrowOnFatal();
-
- if (timeseries.Results == null) return Array.Empty();
-
- var foundBadTimeseries = new List();
- foreach (var ts in timeseries.Results)
- {
- var loc = tsMap[ts.ExternalId];
- if (nodeToAssetIds.TryGetValue(loc.ParentId, out var parentId))
- {
- nodeToAssetIds[loc.Id] = parentId;
- }
- if (ts.IsString != loc.FullAttributes.DataType.IsString)
- {
- mismatchedTimeseries.Add(ts.ExternalId);
- foundBadTimeseries.Add(ts.ExternalId);
- }
- }
- if (foundBadTimeseries.Any())
- {
- log.LogDebug("Found mismatched timeseries when ensuring: {TimeSeries}", string.Join(", ", foundBadTimeseries));
- }
-
- return timeseries.Results;
- }
- ///
- /// Update timeseries in CDF Clean.
- ///
- /// Id, node map for the timeseries that should be pushed.
- /// List of existing timeseries in CDF.
- /// Configuration for which fields should be updated.
- private async Task UpdateTimeseries(
- IDictionary tsMap,
- IEnumerable timeseries,
- TypeUpdateConfig update,
- BrowseReport report,
- CancellationToken token)
- {
- var updates = new List();
- var existing = timeseries.ToDictionary(asset => asset.ExternalId);
- foreach (var kvp in tsMap)
- {
- if (existing.TryGetValue(kvp.Key, out var ts))
- {
- var tsUpdate = PusherUtils.GetTSUpdate(fullConfig, Extractor, ts, kvp.Value, update, nodeToAssetIds);
- if (tsUpdate == null) continue;
- if (tsUpdate.AssetId != null || tsUpdate.Description != null
- || tsUpdate.Name != null || tsUpdate.Metadata != null)
- {
- updates.Add(new TimeSeriesUpdateItem(ts.ExternalId) { Update = tsUpdate });
- }
+ report.TimeSeriesCreated += _result.Created;
}
+ report.TimeSeriesUpdated += _result.Updated;
}
-
- if (updates.Any())
+ catch
{
- var res = await destination.UpdateTimeSeriesAsync(updates, RetryMode.OnError, SanitationMode.Clean, token);
-
- log.LogResult(res, RequestType.UpdateTimeSeries, false);
- res.ThrowOnFatal();
-
- report.TimeSeriesUpdated += res.Results?.Count() ?? 0;
+ result.Variables = false;
}
}
- ///
- /// Master method for pushing timeseries to CDF raw or clean.
+ ///
+ /// Synchronize all variables to CDF raw
///
- /// Timeseries to push
- /// Configuration for which fields, if any, to update in CDF
- private async Task PushTimeseries(
- IEnumerable tsList,
- TypeUpdateConfig update,
- BrowseReport report,
- CancellationToken token)
+ /// Synchronizes all variables maps to CDF raw
+ /// Update configuration
+ /// Browse report
+ /// Push result
+ /// Cancellation token
+ /// Task
+ private async Task PushRawTimeseries(ConcurrentDictionary tsIds, TypeUpdateConfig update, BrowseReport report, PushResult result, CancellationToken token)
{
- var tsIds = new ConcurrentDictionary(
- tsList.ToDictionary(ts => ts.GetUniqueId(Extractor)!));
- bool useRawTimeseries = config.RawMetadata != null
- && !string.IsNullOrWhiteSpace(config.RawMetadata.Database)
- && !string.IsNullOrWhiteSpace(config.RawMetadata.TimeseriesTable);
-
- bool simpleTimeseries = useRawTimeseries || config.SkipMetadata;
-
- var timeseries = await CreateTimeseries(tsIds, report, simpleTimeseries, token);
-
- var toPushMeta = tsIds.Where(kvp => kvp.Value.Source != NodeSource.CDF)
- .ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
-
- if (config.SkipMetadata || !toPushMeta.Any()) return;
-
- if (useRawTimeseries)
+ try
{
- if (update.AnyUpdate)
- {
- await UpdateRawTimeseries(toPushMeta, report, token);
- }
- else
- {
- await CreateRawTimeseries(toPushMeta, report, token);
- }
+ var toPushMeta = tsIds
+ .Where(kvp => kvp.Value.Source != NodeSource.CDF)
+ .ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
+
+ var _result = await cdfWriter.Raw!.PushNodes(
+ Extractor,
+ RawMetadataTargetConfig!.Database!,
+ RawMetadataTargetConfig!.TimeseriesTable!,
+ toPushMeta,
+ ConverterType.Variable,
+ update.AnyUpdate,
+ token
+ );
+ report.RawTimeseriesCreated += _result.Created;
+ report.RawTimeseriesUpdated += _result.Updated;
}
- else if (update.AnyUpdate)
+ catch (Exception e)
{
- await UpdateTimeseries(toPushMeta, timeseries, update, report, token);
+ log.LogError(e, "Failed to ensure timeseries");
+ result.RawVariables = false;
}
}
- private async Task MarkTimeSeriesAsDeleted(IEnumerable externalIds, CancellationToken token)
+ private async Task MarkTimeSeriesAsDeleted(
+ IEnumerable externalIds,
+ CancellationToken token
+ )
{
- bool useRawTss = config.RawMetadata != null
- && !string.IsNullOrWhiteSpace(config.RawMetadata.Database)
- && !string.IsNullOrWhiteSpace(config.RawMetadata.TimeseriesTable);
+ bool useRawTss =
+ RawMetadataTargetConfig != null
+ && !string.IsNullOrWhiteSpace(RawMetadataTargetConfig.Database)
+ && !string.IsNullOrWhiteSpace(RawMetadataTargetConfig.TimeseriesTable);
if (useRawTss)
{
- await MarkRawRowsAsDeleted(config.RawMetadata!.Database!, config.RawMetadata!.TimeseriesTable!, externalIds, token);
+ await MarkRawRowsAsDeleted(
+ RawMetadataTargetConfig!.Database!,
+ RawMetadataTargetConfig!.TimeseriesTable!,
+ externalIds,
+ token
+ );
}
- var updates = externalIds.Select(extId => new TimeSeriesUpdateItem(extId)
- {
- Update = new TimeSeriesUpdate
- {
- Metadata = new UpdateDictionary(new Dictionary
+ var updates = externalIds.Select(
+ extId =>
+ new TimeSeriesUpdateItem(extId)
{
- { fullConfig.Extraction.Deletes.DeleteMarker, "true" }
- }, Enumerable.Empty())
- }
- });
- var result = await destination.UpdateTimeSeriesAsync(updates, RetryMode.OnError, SanitationMode.Clean, token);
+ Update = new TimeSeriesUpdate
+ {
+ Metadata = new UpdateDictionary(
+ new Dictionary
+ {
+ { fullConfig.Extraction.Deletes.DeleteMarker, "true" }
+ },
+ Enumerable.Empty()
+ )
+ }
+ }
+ );
+ var result = await destination.UpdateTimeSeriesAsync(
+ updates,
+ RetryMode.OnError,
+ SanitationMode.Clean,
+ token
+ );
log.LogResult(result, RequestType.UpdateAssets, true);
result.ThrowOnFatal();
}
#endregion
#region raw-utils
- ///
- /// Ensure that raw rows given by exist in the table given by
- /// and .
- /// Keys that do not exist are built into DTOs by .
- ///
- /// Type of DTO to build
- /// Name of database in CDF Raw
- /// Name of table in CDF Raw
- /// Keys of rows to ensure
- /// Method to build DTOs for keys that were not found.
- /// used for serialization.
- private async Task EnsureRawRows(
- string dbName,
- string tableName,
- IEnumerable keys,
- Func, IDictionary> dtoBuilder,
- JsonSerializerOptions options,
- CancellationToken token)
- {
- var rows = await GetRawRows(dbName, tableName, new[] { "," }, token);
- var existing = rows.Select(row => row.Key);
-
- var toCreate = keys.Except(existing);
- if (!toCreate.Any()) return;
- log.LogInformation("Creating {Count} raw rows in CDF", toCreate.Count());
-
- var createDtos = dtoBuilder(toCreate);
-
- await destination.InsertRawRowsAsync(dbName, tableName, createDtos, options, token);
- }
- ///
- /// Insert or update raw rows given by in table
- /// given by and .
- /// The dtoBuilder is called with chunks of 10000 rows, and finally with null to indicate that there are no more rows.
- ///
- /// Type of DTO to build
- /// Name of database in CDF Raw
- /// Name of table in CDF Raw
- /// Method to build DTOs, called with existing rows.
- /// used for serialization.
- private async Task UpsertRawRows(
- string dbName,
- string tableName,
- Func>>?, IDictionary> dtoBuilder,
- JsonSerializerOptions? options,
- CancellationToken token)
- {
- int count = 0;
- async Task CallAndCreate(IEnumerable>>? rows)
- {
- var toUpsert = dtoBuilder(rows);
- count += toUpsert.Count;
- await destination.InsertRawRowsAsync(dbName, tableName, toUpsert, options, token);
- }
-
- string? cursor = null;
- do
- {
- try
- {
- var result = await destination.CogniteClient.Raw.ListRowsAsync>(dbName, tableName,
- new RawRowQuery { Cursor = cursor, Limit = 10_000 }, null, token);
- cursor = result.NextCursor;
-
- await CallAndCreate(result.Items);
- }
- catch (ResponseException ex) when (ex.Code == 404)
- {
- log.LogWarning("Table or database not found: {Message}", ex.Message);
- break;
- }
- } while (cursor != null);
-
- await CallAndCreate(null);
-
- log.LogInformation("Updated or created {Count} rows in CDF Raw", count);
- }
-
public async Task>>> GetRawRows(
string dbName,
string tableName,
IEnumerable? columns,
- CancellationToken token)
+ CancellationToken token
+ )
{
string? cursor = null;
var rows = new List>>();
@@ -1197,8 +924,20 @@ public async Task>>> GetRawRo
{
try
{
- var result = await destination.CogniteClient.Raw.ListRowsAsync>(dbName, tableName,
- new RawRowQuery { Cursor = cursor, Limit = 10_000, Columns = columns }, null, token);
+ var result = await destination.CogniteClient.Raw.ListRowsAsync<
+ Dictionary
+ >(
+ dbName,
+ tableName,
+ new RawRowQuery
+ {
+ Cursor = cursor,
+ Limit = 10_000,
+ Columns = columns
+ },
+ null,
+ token
+ );
rows.AddRange(result.Items);
cursor = result.NextCursor;
}
@@ -1211,7 +950,12 @@ public async Task>>> GetRawRo
return rows;
}
- private async Task MarkRawRowsAsDeleted(string dbName, string tableName, IEnumerable keys, CancellationToken token)
+ private async Task MarkRawRowsAsDeleted(
+ string dbName,
+ string tableName,
+ IEnumerable keys,
+ CancellationToken token
+ )
{
var keySet = new HashSet(keys);
var rows = await GetRawRows(dbName, tableName, keys, token);
@@ -1221,89 +965,118 @@ private async Task MarkRawRowsAsDeleted(string dbName, string tableName, IEnumer
{
row.Columns[fullConfig.Extraction.Deletes.DeleteMarker] = trueElem;
}
- await destination.InsertRawRowsAsync(dbName, tableName, toMark.ToDictionary(e => e.Key, e => e.Columns), token);
+ await destination.InsertRawRowsAsync(
+ dbName,
+ tableName,
+ toMark.ToDictionary(e => e.Key, e => e.Columns),
+ token
+ );
}
-
#endregion
#region references
///
- /// Create the given list of relationships in CDF, handles duplicates.
+ /// Synchronize all references to CDF
///
- /// Relationships to create
- private async Task PushReferencesChunk(IEnumerable relationships, CancellationToken token)
+ /// List of references to be synchronized
+ /// Update configuration
+ /// Browse report
+ /// Push result
+ /// Cancellation token
+ /// Task
+ private async Task PushReferences(IEnumerable references, BrowseReport report, PushResult result, CancellationToken token)
{
- if (!relationships.Any()) return 0;
- try
+ if (!references.Any() && cdfWriter.Relationships == null && cdfWriter.Raw == null) return;
+
+ var relationships = references
+ .Select(reference => reference.ToRelationship(config.DataSet?.Id, Extractor))
+ .DistinctBy(rel => rel.ExternalId);
+
+ if (cdfWriter.Relationships != null)
{
- await destination.CogniteClient.Relationships.CreateAsync(relationships, token);
- return relationships.Count();
+ await PushCleanReferences(relationships, report, result, token);
}
- catch (ResponseException ex)
- {
- if (ex.Duplicated.Any())
- {
- var existing = new HashSet();
- foreach (var dict in ex.Duplicated)
- {
- if (dict.TryGetValue("externalId", out var value))
- {
- if (value is MultiValue.String strValue)
- {
- existing.Add(strValue.Value);
- }
- }
- }
- if (!existing.Any()) throw;
- relationships = relationships.Where(rel => !existing.Contains(rel.ExternalId)).ToList();
- return await PushReferencesChunk(relationships, token);
- }
- else
- {
- throw;
- }
+ if (cdfWriter.Raw != null && RawMetadataTargetConfig?.RelationshipsTable != null)
+ {
+ await PushRawReferences(relationships, report, result, token);
}
}
+
///
- /// Create the given list of relationships in CDF Raw, skips rows that already exist.
+ /// Synchronize all references to CDF relationship
///
- /// Relationships to create.
- private async Task PushRawReferences(IEnumerable relationships, BrowseReport report, CancellationToken token)
+ /// Synchronizes all references maps to CDF relationships
+ /// Update configuration
+ /// Browse report
+ /// Push result
+ /// Cancellation token
+ /// Task
+ private async Task PushCleanReferences(IEnumerable relationships, BrowseReport report, PushResult result, CancellationToken token)
{
- if (config.RawMetadata?.Database == null || config.RawMetadata.RelationshipsTable == null) return;
+ try
+ {
+ var _result = await cdfWriter.Relationships!.PushReferences(relationships, token);
+ report.RelationshipsCreated += _result.Created;
+ }
+ catch (Exception e)
+ {
+ log.LogError(e, "Failed to ensure relationships");
+ result.References = false;
+ }
+ }
- await EnsureRawRows(
- config.RawMetadata.Database,
- config.RawMetadata.RelationshipsTable,
- relationships.Select(rel => rel.ExternalId),
- ids =>
- {
- var idSet = ids.ToHashSet();
- var creates = relationships.Where(rel => idSet.Contains(rel.ExternalId)).ToDictionary(rel => rel.ExternalId);
- report.RelationshipsCreated += creates.Count;
- return creates;
- },
- new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase },
- token);
+ ///
+ /// Synchronize all references to CDF
+ ///
+ /// Synchronizes all references maps to CDF assets
+ /// Update configuration
+ /// Browse report
+ /// Push result
+ /// Cancellation token
+ /// Task
+ private async Task PushRawReferences(IEnumerable relationships, BrowseReport report, PushResult result, CancellationToken token)
+ {
+ try
+ {
+ var _result = await cdfWriter.Raw!.PushReferences(RawMetadataTargetConfig!.Database!, RawMetadataTargetConfig!.RelationshipsTable!, relationships, token);
+ report.RawRelationshipsCreated += _result.Created;
+ } catch (Exception e)
+ {
+ log.LogError(e, "Failed to ensure raw relationships");
+ result.RawReferences = false;
+ }
}
- private async Task MarkReferencesAsDeleted(IEnumerable externalIds, CancellationToken token)
+ private async Task MarkReferencesAsDeleted(
+ IEnumerable externalIds,
+ CancellationToken token
+ )
{
- bool useRawRelationships = config.RawMetadata != null
- && !string.IsNullOrWhiteSpace(config.RawMetadata.Database)
- && !string.IsNullOrWhiteSpace(config.RawMetadata.RelationshipsTable);
+ bool useRawRelationships =
+ RawMetadataTargetConfig != null
+ && !string.IsNullOrWhiteSpace(RawMetadataTargetConfig.Database)
+ && !string.IsNullOrWhiteSpace(RawMetadataTargetConfig.RelationshipsTable);
if (useRawRelationships)
{
- await MarkRawRowsAsDeleted(config.RawMetadata!.Database!, config.RawMetadata!.RelationshipsTable!, externalIds, token);
+ await MarkRawRowsAsDeleted(
+ RawMetadataTargetConfig!.Database!,
+ RawMetadataTargetConfig!.RelationshipsTable!,
+ externalIds,
+ token
+ );
}
else if (config.DeleteRelationships)
{
- var tasks = externalIds.ChunkBy(1000).Select(chunk => destination.CogniteClient.Relationships.DeleteAsync(chunk, true, token));
+ var tasks = externalIds
+ .ChunkBy(1000)
+ .Select(
+ chunk =>
+ destination.CogniteClient.Relationships.DeleteAsync(chunk, true, token)
+ );
await Task.WhenAll(tasks);
}
-
}
#endregion
@@ -1321,9 +1094,14 @@ private async Task EnsureConfigInit(CancellationToken token)
}
catch (ResponseException ex)
{
- log.LogError("Could not fetch data set by external id. It may not exist, or the user may lack" +
- " sufficient access rights. Project {Project} at {Host}, id {Id}: {Message}",
- config.Project, config.Host, config.DataSet.ExternalId, ex.Message);
+ log.LogError(
+ "Could not fetch data set by external id. It may not exist, or the user may lack"
+ + " sufficient access rights. Project {Project} at {Host}, id {Id}: {Message}",
+ config.Project,
+ config.Host,
+ config.DataSet.ExternalId,
+ ex.Message
+ );
throw;
}
}
@@ -1332,3 +1110,4 @@ private async Task EnsureConfigInit(CancellationToken token)
public void Dispose() { }
}
}
+
diff --git a/Extractor/Pushers/FDM/DMSValueConverter.cs b/Extractor/Pushers/FDM/DMSValueConverter.cs
index f0b237e2..e56ed44f 100644
--- a/Extractor/Pushers/FDM/DMSValueConverter.cs
+++ b/Extractor/Pushers/FDM/DMSValueConverter.cs
@@ -3,11 +3,8 @@
using Opc.Ua;
using System;
using System.Collections;
-using System.Collections.Generic;
using System.Linq;
-using System.Text;
using System.Text.Json;
-using System.Threading.Tasks;
namespace Cognite.OpcUa.Pushers.FDM
{
diff --git a/Extractor/Pushers/FDM/FDMWriter.cs b/Extractor/Pushers/FDM/FDMWriter.cs
index c33fda15..9836b2f3 100644
--- a/Extractor/Pushers/FDM/FDMWriter.cs
+++ b/Extractor/Pushers/FDM/FDMWriter.cs
@@ -34,7 +34,7 @@ You should have received a copy of the GNU General Public License
namespace Cognite.OpcUa.Pushers.FDM
{
- internal class FDMWriter
+ public class FDMWriter
{
private CogniteDestination destination;
private FullConfig config;
@@ -46,7 +46,7 @@ public FDMWriter(FullConfig config, CogniteDestination destination, ILogger instances, int chunkSize, CancellationToken token)
@@ -91,7 +91,7 @@ private async Task Initialize(FDMTypeBatch types, CancellationToken token)
var options = new JsonSerializerOptions(Oryx.Cognite.Common.jsonOptions) { WriteIndented = true };
var viewsToInsert = types.Views.Values.ToList();
- if (config.Cognite!.FlexibleDataModels!.SkipSimpleTypes)
+ if (config.Cognite!.MetadataTargets!.DataModels!.SkipSimpleTypes)
{
viewsToInsert = viewsToInsert.Where(v => v.Properties.Any() || types.ViewIsReferenced.GetValueOrDefault(v.ExternalId)).ToList();
}
@@ -108,7 +108,7 @@ private async Task Initialize(FDMTypeBatch types, CancellationToken token)
if (config.DryRun) return;
// Check if the data model exists
- if (config.Cognite!.FlexibleDataModels!.SkipTypesOnEqualCount)
+ if (config.Cognite!.MetadataTargets!.DataModels!.SkipTypesOnEqualCount)
{
try
{
@@ -237,7 +237,7 @@ public async Task PushNodes(
log.LogInformation("Mapped out {Nodes} nodes and {Edges} edges to write to PG3", nodes.Count, finalReferences.Count);
// Run the node filter unless we are writing everything.
- if (config.Cognite!.FlexibleDataModels!.ExcludeNonReferenced)
+ if (config.Cognite!.MetadataTargets!.DataModels!.ExcludeNonReferenced)
{
var trimmer = new NodeTrimmer(nodeHierarchy, config, log);
nodeHierarchy = trimmer.Filter();
diff --git a/Extractor/Pushers/FDM/NodeTypeCollector.cs b/Extractor/Pushers/FDM/NodeTypeCollector.cs
index 2937bb5a..338ced6a 100644
--- a/Extractor/Pushers/FDM/NodeTypeCollector.cs
+++ b/Extractor/Pushers/FDM/NodeTypeCollector.cs
@@ -1,6 +1,5 @@
using System.Collections.Generic;
using System.Linq;
-using Cognite.OpcUa.Config;
using Cognite.OpcUa.Nodes;
using Cognite.OpcUa.Pushers.FDM.Types;
using Cognite.OpcUa.Types;
@@ -14,12 +13,10 @@ public class NodeTypeCollector
private readonly ILogger log;
public Dictionary Types { get; }
private readonly Dictionary properties;
- private readonly FullConfig config;
private readonly HashSet visitedIds = new();
- public NodeTypeCollector(ILogger log, FullConfig config)
+ public NodeTypeCollector(ILogger log)
{
this.log = log;
- this.config = config;
Types = new();
properties = new();
}
diff --git a/Extractor/Pushers/FDM/TypeHierarchyBuilder.cs b/Extractor/Pushers/FDM/TypeHierarchyBuilder.cs
index 5a1813e7..17399d24 100644
--- a/Extractor/Pushers/FDM/TypeHierarchyBuilder.cs
+++ b/Extractor/Pushers/FDM/TypeHierarchyBuilder.cs
@@ -1,14 +1,7 @@
-using System;
using System.Collections.Generic;
using System.Linq;
-using System.Net;
-using System.Text.RegularExpressions;
-using System.Threading;
-using System.Threading.Tasks;
using Cognite.OpcUa.Config;
using Cognite.OpcUa.Pushers.FDM.Types;
-using Cognite.OpcUa.TypeCollectors;
-using Cognite.OpcUa.Types;
using CogniteSdk.Beta.DataModels;
using Microsoft.Extensions.Logging;
using Opc.Ua;
@@ -230,9 +223,9 @@ public TypeHierarchyBuilder(ILogger log, DMSValueConverter converter, FullConfig
{
this.log = log;
this.config = config;
- nodeTypes = new NodeTypeCollector(log, config);
- space = config.Cognite!.FlexibleDataModels!.Space!;
- fdmConfig = config.Cognite.FlexibleDataModels!;
+ nodeTypes = new NodeTypeCollector(log);
+ space = config.Cognite!.MetadataTargets!.DataModels!.Space!;
+ fdmConfig = config.Cognite!.MetadataTargets!.DataModels!;
this.converter = converter;
}
diff --git a/Extractor/Pushers/IPusher.cs b/Extractor/Pushers/IPusher.cs
index a9e5c9c6..6c4250f0 100644
--- a/Extractor/Pushers/IPusher.cs
+++ b/Extractor/Pushers/IPusher.cs
@@ -30,8 +30,11 @@ namespace Cognite.OpcUa
public class PushResult
{
public bool Objects { get; set; } = true;
+ public bool RawObjects { get; set; } = true;
public bool Variables { get; set; } = true;
+ public bool RawVariables { get; set; } = true;
public bool References { get; set; } = true;
+ public bool RawReferences { get; set; } = true;
}
public interface IPusher : IDisposable
diff --git a/Extractor/Pushers/MqttPusher.cs b/Extractor/Pushers/MqttPusher.cs
index fdc2cd5c..eeace687 100644
--- a/Extractor/Pushers/MqttPusher.cs
+++ b/Extractor/Pushers/MqttPusher.cs
@@ -697,7 +697,7 @@ private async Task PushTimeseries(IEnumerable variables, TypeU
{
var minimalTimeseries = variables
.Where(variable => !update.AnyUpdate || !variable.Changed)
- .Select(variable => variable.ToTimeseries(fullConfig, Extractor, Extractor, config.DataSetId, null, null, true))
+ .Select(variable => variable.ToMinimalTimeseries(Extractor, config.DataSetId))
.Where(variable => variable != null)
.ToList();
diff --git a/Extractor/Pushers/Writers/AssetsWriter.cs b/Extractor/Pushers/Writers/AssetsWriter.cs
new file mode 100644
index 00000000..a86aa5eb
--- /dev/null
+++ b/Extractor/Pushers/Writers/AssetsWriter.cs
@@ -0,0 +1,161 @@
+/* Cognite Extractor for OPC-UA
+Copyright (C) 2021 Cognite AS
+
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License
+as published by the Free Software Foundation; either version 2
+of the License, or (at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */
+
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Cognite.Extensions;
+using Cognite.Extractor.Common;
+using Cognite.Extractor.Utils;
+using Cognite.OpcUa.Config;
+using Cognite.OpcUa.Nodes;
+using Cognite.OpcUa.Pushers.Writers.Dtos;
+using Cognite.OpcUa.Pushers.Writers.Interfaces;
+using CogniteSdk;
+using Microsoft.Extensions.Logging;
+using Opc.Ua;
+
+namespace Cognite.OpcUa.Pushers.Writers
+{
+ public class AssetsWriter : IAssetsWriter
+ {
+ private readonly ILogger log;
+ private readonly FullConfig config;
+ private readonly CogniteDestination destination;
+
+ public AssetsWriter(ILogger logger, CogniteDestination destination, FullConfig config)
+ {
+ this.log = logger;
+ this.config = config;
+ this.destination = destination;
+ }
+
+ ///
+ /// Synchronizes all BaseUANode to CDF assets
+ ///
+ /// UAExtractor instance
+ /// Dictionary of mapping of variables to keys
+ /// Node to assets to ids
+ /// Type update configuration
+ /// Cancellation token
+ /// Operation result
+ public async Task PushNodes(UAExtractor extractor, IDictionary nodes,
+ IDictionary nodeToAssetIds, TypeUpdateConfig update, CancellationToken token)
+ {
+ var result = new Result { Created = 0, Updated = 0 };
+ var assets = await CreateAssets(extractor, nodes, nodeToAssetIds, result, token);
+
+ if (update.AnyUpdate)
+ {
+ await UpdateAssets(extractor, nodes, assets, update, result, token);
+ }
+ return result;
+ }
+
+ ///
+ /// Create all BaseUANode to CDF assets
+ ///
+ /// UAExtractor instance
+ /// Dictionary of mapping of variables to keys
+ /// Node to assets to ids
+ /// Operation result
+ /// Cancellation token
+ /// Future list of assets
+ private async Task> CreateAssets(UAExtractor extractor,
+ IDictionary assetMap, IDictionary nodeToAssetIds, Result result, CancellationToken token)
+ {
+ var assets = new List();
+ var maxSize = config.Cognite?.CdfChunking.Assets ?? 1000;
+ foreach (var chunk in Chunking.ChunkByHierarchy(assetMap.Values, maxSize, node => node.Id, node => node.ParentId))
+ {
+ var assetChunk = await destination.GetOrCreateAssetsAsync(chunk.Select(node => extractor.GetUniqueId(node.Id)!), ids =>
+ {
+ var assets = ids.Select(id => assetMap[id]);
+ var creates = assets
+ .Select(node => node.ToCDFAsset(
+ config,
+ extractor,
+ config.Cognite?.DataSet?.Id,
+ config.Cognite?.MetadataMapping?.Assets))
+ .Where(asset => asset != null);
+ result.Created += creates.Count();
+ return creates;
+ }, RetryMode.None, SanitationMode.Clean, token);
+
+ log.LogResult(assetChunk, RequestType.CreateAssets, true);
+
+ assetChunk.ThrowOnFatal();
+
+ if (assetChunk.Results == null) continue;
+
+ foreach (var asset in assetChunk.Results)
+ {
+ nodeToAssetIds[assetMap[asset.ExternalId].Id] = asset.Id;
+ }
+ assets.AddRange(assetChunk.Results);
+ }
+ return assets;
+ }
+
+ ///
+ /// Update all BaseUANode to CDF assets
+ ///
+ /// UAExtractor instance
+ /// Dictionary of mapping of variables to keys
+ /// List of assets
+ /// Type update configuration
+ /// Operation result
+ /// Cancellation token
+ /// Future list of assets
+ private async Task UpdateAssets(UAExtractor extractor, IDictionary assetMap,
+ IEnumerable assets, TypeUpdateConfig update, Result result, CancellationToken token)
+ {
+ var updates = new List();
+ var existing = assets.ToDictionary(asset => asset.ExternalId);
+ foreach (var kvp in assetMap)
+ {
+ if (existing.TryGetValue(kvp.Key, out var asset))
+ {
+ var assetUpdate = PusherUtils.GetAssetUpdate(config, asset, kvp.Value, extractor, update);
+
+ if (assetUpdate == null)
+ continue;
+ if (
+ assetUpdate.ParentExternalId != null
+ || assetUpdate.Description != null
+ || assetUpdate.Name != null
+ || assetUpdate.Metadata != null
+ )
+ {
+ updates.Add(new AssetUpdateItem(asset.ExternalId) { Update = assetUpdate });
+ }
+ }
+ }
+ if (updates.Any())
+ {
+ var res = await destination.UpdateAssetsAsync(updates, RetryMode.OnError, SanitationMode.Clean, token);
+
+ log.LogResult(res, RequestType.UpdateAssets, false);
+
+ res.ThrowOnFatal();
+
+ result.Updated += res.Results?.Count() ?? 0;
+ }
+ }
+ }
+}
diff --git a/Extractor/Pushers/Writers/BaseTimeseriesWriter.cs b/Extractor/Pushers/Writers/BaseTimeseriesWriter.cs
new file mode 100644
index 00000000..530f315b
--- /dev/null
+++ b/Extractor/Pushers/Writers/BaseTimeseriesWriter.cs
@@ -0,0 +1,128 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Cognite.Extensions;
+using Cognite.Extractor.Utils;
+using Cognite.OpcUa.Config;
+using Cognite.OpcUa.Nodes;
+using Cognite.OpcUa.NodeSources;
+using Cognite.OpcUa.Pushers.Writers.Dtos;
+using Cognite.OpcUa.Pushers.Writers.Interfaces;
+using CogniteSdk;
+using Microsoft.Extensions.Logging;
+using Opc.Ua;
+
+namespace Cognite.OpcUa.Pushers.Writers
+{
+ public abstract class BaseTimeseriesWriter : ITimeseriesWriter
+ {
+ protected readonly ILogger logger;
+ protected readonly FullConfig config;
+ protected readonly CogniteDestination destination;
+
+ public BaseTimeseriesWriter(ILogger logger, CogniteDestination destination, FullConfig config)
+ {
+ this.logger = logger;
+ this.config = config;
+ this.destination = destination;
+ }
+
+
+ ///
+ /// Synchronizes all BaseUANode to CDF Timeseries
+ ///
+ /// UAExtractor instance
+ /// Dictionary of mapping of variables to keys
+ /// Node to assets to ids
+ /// Mismatched timeseries
+ /// Type update configuration
+ /// Cancellation token
+ /// Operation result
+ public virtual async Task PushVariables(UAExtractor extractor, IDictionary timeseriesMap,
+ IDictionary nodeToAssetIds, HashSet mismatchedTimeseries, TypeUpdateConfig update, CancellationToken token)
+ {
+ var result = new Result { Created = 0, Updated = 0 };
+ var timeseries = await CreateTimeseries(
+ extractor,
+ timeseriesMap,
+ nodeToAssetIds,
+ mismatchedTimeseries,
+ result,
+ token
+ );
+
+ var toPushMeta = timeseriesMap
+ .Where(kvp => kvp.Value.Source != NodeSource.CDF)
+ .ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
+
+ if (update.AnyUpdate && toPushMeta.Any())
+ {
+ await UpdateTimeseries(extractor, toPushMeta, timeseries, nodeToAssetIds, update, result, token);
+ }
+ return result;
+ }
+
+ ///
+ /// Create BaseUANode to CDF Timeseries
+ ///
+ /// UAExtractor instance
+ /// Dictionary of mapping of variables to keys
+ /// Node to assets to ids
+ /// Mismatched timeseries
+ /// Operation result
+ /// Type update configuration
+ /// Indicate if to create minimal timeseries
+ /// Cancellation token
+ /// Operation result
+ private async Task> CreateTimeseries(UAExtractor extractor, IDictionary tsMap,
+ IDictionary nodeToAssetIds, HashSet mismatchedTimeseries, Result result, CancellationToken token)
+ {
+ var timeseries = await destination.GetOrCreateTimeSeriesAsync(
+ tsMap.Keys,
+ ids => BuildTimeseries(tsMap, ids, extractor, nodeToAssetIds, result),
+ RetryMode.None,
+ SanitationMode.Clean,
+ token
+ );
+
+ logger.LogResult(timeseries, RequestType.CreateTimeSeries, true);
+
+ timeseries.ThrowOnFatal();
+
+ if (timeseries.Results == null)
+ return Array.Empty();
+
+ var foundBadTimeseries = new List();
+ foreach (var ts in timeseries.Results)
+ {
+ var loc = tsMap[ts.ExternalId];
+ if (nodeToAssetIds.TryGetValue(loc.ParentId, out var parentId))
+ {
+ nodeToAssetIds[loc.Id] = parentId;
+ }
+ if (ts.IsString != loc.FullAttributes.DataType.IsString)
+ {
+ mismatchedTimeseries.Add(ts.ExternalId);
+ foundBadTimeseries.Add(ts.ExternalId);
+ }
+ }
+ if (foundBadTimeseries.Any())
+ {
+ logger.LogDebug(
+ "Found mismatched timeseries when ensuring: {TimeSeries}",
+ string.Join(", ", foundBadTimeseries)
+ );
+ }
+
+ return timeseries.Results;
+ }
+
+ protected abstract IEnumerable BuildTimeseries(IDictionary tsMap,
+ IEnumerable ids, UAExtractor extractor, IDictionary nodeToAssetIds, Result result);
+
+ protected abstract Task UpdateTimeseries(UAExtractor extractor, IDictionary tsMap,
+ IEnumerable timeseries, IDictionary nodeToAssetIds, TypeUpdateConfig update, Result result, CancellationToken token);
+ }
+}
diff --git a/Extractor/Pushers/Writers/CDFWriter.cs b/Extractor/Pushers/Writers/CDFWriter.cs
new file mode 100644
index 00000000..88afa617
--- /dev/null
+++ b/Extractor/Pushers/Writers/CDFWriter.cs
@@ -0,0 +1,29 @@
+using Cognite.OpcUa.Pushers.FDM;
+using Cognite.OpcUa.Pushers.Writers.Interfaces;
+
+namespace Cognite.OpcUa.Pushers.Writers
+{
+ public class CDFWriter : ICDFWriter
+ {
+ public IRawWriter? Raw { get; }
+ public ITimeseriesWriter Timeseries { get; }
+ public IAssetsWriter? Assets { get; }
+ public IRelationshipsWriter? Relationships { get; }
+ public FDMWriter? FDM { get; }
+
+ public CDFWriter(
+ ITimeseriesWriter timeseriesWriter,
+ IRawWriter? rawWriter,
+ IAssetsWriter? assetsWriter,
+ IRelationshipsWriter? relationshipsWriter,
+ FDMWriter? fdmWriter
+ )
+ {
+ Raw = rawWriter;
+ Timeseries = timeseriesWriter;
+ Assets = assetsWriter;
+ Relationships = relationshipsWriter;
+ FDM = fdmWriter;
+ }
+ }
+}
diff --git a/Extractor/Pushers/Writers/Dtos/Result.cs b/Extractor/Pushers/Writers/Dtos/Result.cs
new file mode 100644
index 00000000..51ad8a14
--- /dev/null
+++ b/Extractor/Pushers/Writers/Dtos/Result.cs
@@ -0,0 +1,8 @@
+namespace Cognite.OpcUa.Pushers.Writers.Dtos
+{
+ public class Result
+ {
+ public int Created { get; set; }
+ public int Updated { get; set; }
+ }
+}
diff --git a/Extractor/Pushers/Writers/Interfaces/IAssetsWriter.cs b/Extractor/Pushers/Writers/Interfaces/IAssetsWriter.cs
new file mode 100644
index 00000000..009b52c3
--- /dev/null
+++ b/Extractor/Pushers/Writers/Interfaces/IAssetsWriter.cs
@@ -0,0 +1,21 @@
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Cognite.OpcUa.Config;
+using Cognite.OpcUa.Nodes;
+using Cognite.OpcUa.Pushers.Writers.Dtos;
+using Opc.Ua;
+
+namespace Cognite.OpcUa.Pushers.Writers.Interfaces
+{
+ public interface IAssetsWriter
+ {
+ Task PushNodes(
+ UAExtractor extractor,
+ IDictionary assetMap,
+ IDictionary nodeToAssetIds,
+ TypeUpdateConfig config,
+ CancellationToken token
+ );
+ }
+}
diff --git a/Extractor/Pushers/Writers/Interfaces/ICDFWriter.cs b/Extractor/Pushers/Writers/Interfaces/ICDFWriter.cs
new file mode 100644
index 00000000..f0d5ed64
--- /dev/null
+++ b/Extractor/Pushers/Writers/Interfaces/ICDFWriter.cs
@@ -0,0 +1,13 @@
+using Cognite.OpcUa.Pushers.FDM;
+
+namespace Cognite.OpcUa.Pushers.Writers.Interfaces
+{
+ public interface ICDFWriter
+ {
+ IRawWriter? Raw { get; }
+ ITimeseriesWriter Timeseries { get; }
+ IAssetsWriter? Assets { get; }
+ IRelationshipsWriter? Relationships { get; }
+ FDMWriter? FDM { get; }
+ }
+}
diff --git a/Extractor/Pushers/Writers/Interfaces/IRawWriter.cs b/Extractor/Pushers/Writers/Interfaces/IRawWriter.cs
new file mode 100644
index 00000000..07c7bf70
--- /dev/null
+++ b/Extractor/Pushers/Writers/Interfaces/IRawWriter.cs
@@ -0,0 +1,90 @@
+/* Cognite Extractor for OPC-UA
+Copyright (C) 2021 Cognite AS
+
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License
+as published by the Free Software Foundation; either version 2
+of the License, or (at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */
+
+using System.Collections.Generic;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+using Cognite.OpcUa.Nodes;
+using Cognite.OpcUa.Pushers.Writers.Dtos;
+using Cognite.OpcUa.Types;
+using CogniteSdk;
+
+namespace Cognite.OpcUa.Pushers.Writers.Interfaces
+{
+ public interface IRawWriter
+ {
+ static JsonSerializerOptions options =>
+ new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
+
+ ///
+ /// Get all rows from CDF
+ ///
+ /// Name of metadata database in CDF
+ /// Name of metadata table in CDF
+ /// Columns
+ /// Cancellation token
+ /// A dictionary of JsonElement
+ Task>>> GetRows(
+ string dbName,
+ string tableName,
+ IEnumerable? columns,
+ CancellationToken token
+ );
+
+ ///
+ /// Synchronizes all BaseUANode to CDF raw
+ ///
+ /// UAExtractor instance
+ /// Name of metadata database in CDF
+ /// Name of metadata table in CDF
+ /// Dictionary map of BaseUANode of their keys
+ /// Converter
+ /// Indicates if it is an update operation
+ /// Cancellation token
+ /// Operation result
+ Task PushNodes(
+ UAExtractor extractor,
+ string database,
+ string table,
+ IDictionary rows,
+ ConverterType converter,
+ bool shouldUpdate,
+ CancellationToken token
+ )
+ where T : BaseUANode;
+
+ ///
+ /// Updates all BaseUANode to CDF raw
+ ///
+ /// UAExtractor instance
+ /// Name of metadata database in CDF
+ /// Name of metadata table in CDF
+ /// Dictionary map of BaseUANode of their keys
+ /// Converter
+ /// Operation result
+ /// Indicates if it is an update operation
+ /// Cancellation token
+ /// Task
+ Task PushReferences(
+ string database,
+ string table,
+ IEnumerable relationships,
+ CancellationToken token
+ );
+ }
+}
diff --git a/Extractor/Pushers/Writers/Interfaces/IRelationshipsWriter.cs b/Extractor/Pushers/Writers/Interfaces/IRelationshipsWriter.cs
new file mode 100644
index 00000000..044b6d65
--- /dev/null
+++ b/Extractor/Pushers/Writers/Interfaces/IRelationshipsWriter.cs
@@ -0,0 +1,36 @@
+/* Cognite Extractor for OPC-UA
+Copyright (C) 2021 Cognite AS
+
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License
+as published by the Free Software Foundation; either version 2
+of the License, or (at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */
+
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Cognite.OpcUa.Pushers.Writers.Dtos;
+using CogniteSdk;
+
+namespace Cognite.OpcUa.Pushers.Writers.Interfaces
+{
+ public interface IRelationshipsWriter
+ {
+ ///
+ /// Push all refernces to CDF relationship
+ ///
+ /// List of sanitized references
+ /// Cancellation token
+ /// A result reporting items created/updated
+ Task PushReferences(IEnumerable relationships, CancellationToken token);
+ }
+}
diff --git a/Extractor/Pushers/Writers/Interfaces/ITimeseriesWriter.cs b/Extractor/Pushers/Writers/Interfaces/ITimeseriesWriter.cs
new file mode 100644
index 00000000..5514ba53
--- /dev/null
+++ b/Extractor/Pushers/Writers/Interfaces/ITimeseriesWriter.cs
@@ -0,0 +1,49 @@
+/* Cognite Extractor for OPC-UA
+Copyright (C) 2021 Cognite AS
+
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License
+as published by the Free Software Foundation; either version 2
+of the License, or (at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */
+
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Cognite.OpcUa.Config;
+using Cognite.OpcUa.Nodes;
+using Cognite.OpcUa.Pushers.Writers.Dtos;
+using Opc.Ua;
+
+namespace Cognite.OpcUa.Pushers.Writers.Interfaces
+{
+ public interface ITimeseriesWriter
+ {
+ ///
+ /// Synchronizes all BaseUANode to CDF Timeseries
+ ///
+ /// UAExtractor instance
+ /// Dictionary of mapping of variables to keys
+ /// Node to assets to ids
+ /// Mismatched timeseries
+ /// Type update configuration
+ /// Cancellation token
+ /// Operation result
+ Task PushVariables(
+ UAExtractor extractor,
+ IDictionary timeseriesMap,
+ IDictionary nodeToAssetIds,
+ HashSet mismatchedTimeseries,
+ TypeUpdateConfig update,
+ CancellationToken token
+ );
+ }
+}
diff --git a/Extractor/Pushers/Writers/MinimalTimeseriesWriter.cs b/Extractor/Pushers/Writers/MinimalTimeseriesWriter.cs
new file mode 100644
index 00000000..3b983b51
--- /dev/null
+++ b/Extractor/Pushers/Writers/MinimalTimeseriesWriter.cs
@@ -0,0 +1,57 @@
+/* Cognite Extractor for OPC-UA
+Copyright (C) 2021 Cognite AS
+
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License
+as published by the Free Software Foundation; either version 2
+of the License, or (at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */
+
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Cognite.Extractor.Utils;
+using Cognite.OpcUa.Config;
+using Cognite.OpcUa.Nodes;
+using Cognite.OpcUa.Pushers.Writers.Dtos;
+using CogniteSdk;
+using Microsoft.Extensions.Logging;
+using Opc.Ua;
+
+namespace Cognite.OpcUa.Pushers.Writers
+{
+ public class MinimalTimeseriesWriter : BaseTimeseriesWriter
+ {
+ public MinimalTimeseriesWriter(
+ ILogger logger,
+ CogniteDestination destination,
+ FullConfig config
+ )
+ : base(logger, destination, config) { }
+
+ protected override IEnumerable BuildTimeseries(IDictionary tsMap,
+ IEnumerable ids, UAExtractor extractor, IDictionary nodeToAssetIds, Result result)
+ {
+ var tss = ids.Select(id => tsMap[id]);
+ var creates = tss.Select(ts => ts.ToMinimalTimeseries(extractor, config.Cognite?.DataSet?.Id))
+ .Where(ts => ts != null);
+ result.Created += creates.Count();
+ return creates;
+ }
+
+ protected override Task UpdateTimeseries(UAExtractor extractor, IDictionary tsMap,
+ IEnumerable timeseries, IDictionary nodeToAssetIds, TypeUpdateConfig update, Result result, CancellationToken token)
+ {
+ return Task.CompletedTask;
+ }
+ }
+}
diff --git a/Extractor/Pushers/Writers/RawWriter.cs b/Extractor/Pushers/Writers/RawWriter.cs
new file mode 100644
index 00000000..28d7d769
--- /dev/null
+++ b/Extractor/Pushers/Writers/RawWriter.cs
@@ -0,0 +1,327 @@
+/* Cognite Extractor for OPC-UA
+Copyright (C) 2021 Cognite AS
+
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License
+as published by the Free Software Foundation; either version 2
+of the License, or (at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+using Cognite.Extractor.Utils;
+using Cognite.OpcUa.Config;
+using Cognite.OpcUa.Nodes;
+using Cognite.OpcUa.Pushers.Writers.Dtos;
+using Cognite.OpcUa.Pushers.Writers.Interfaces;
+using Cognite.OpcUa.Types;
+using CogniteSdk;
+using Microsoft.Extensions.Logging;
+
+namespace Cognite.OpcUa.Pushers.Writers
+{
+ public class RawWriter : IRawWriter
+ {
+ private readonly ILogger log;
+ private FullConfig config { get; }
+ private CogniteDestination destination { get; }
+
+ public RawWriter(ILogger log, CogniteDestination destination, FullConfig config)
+ {
+ this.log = log;
+ this.config = config;
+ this.destination = destination;
+ }
+
+ ///
+ /// Get all rows from CDF
+ ///
+ /// Name of metadata database in CDF
+ /// Name of metadata table in CDF
+ /// Columns
+ /// Cancellation token
+ /// A dictionary of JsonElement
+ public async Task>>> GetRows(
+ string dbName,
+ string tableName,
+ IEnumerable? columns,
+ CancellationToken token
+ )
+ {
+ string? cursor = null;
+ var rows = new List>>();
+ do
+ {
+ try
+ {
+ var result = await destination.CogniteClient.Raw.ListRowsAsync<
+ Dictionary
+ >(
+ dbName,
+ tableName,
+ new RawRowQuery { Cursor = cursor, Limit = 10_000, Columns = columns },
+ null,
+ token
+ );
+ rows.AddRange(result.Items);
+ cursor = result.NextCursor;
+ }
+ catch (ResponseException ex) when (ex.Code == 404)
+ {
+ log.LogWarning("Table or database not found: {Message}", ex.Message);
+ break;
+ }
+ } while (cursor != null);
+ return rows;
+ }
+
+ ///
+ /// Synchronizes all BaseUANode to CDF raw
+ ///
+ /// UAExtractor instance
+ /// Name of metadata database in CDF
+ /// Name of metadata table in CDF
+ /// Dictionary map of BaseUANode of their keys
+ /// Converter
+ /// Indicates if it is an update operation
+ /// Cancellation token
+ /// Operation result
+ public async Task PushNodes(UAExtractor extractor, string database, string table,
+ IDictionary rows, ConverterType converter, bool shouldUpdate, CancellationToken token) where T : BaseUANode
+ {
+ var result = new Result { Created = 0, Updated = 0 };
+
+ if (shouldUpdate)
+ {
+ await UpdateRows(extractor, database, table, rows, converter, result, token);
+ }
+ else
+ {
+ await CreateRows(extractor, database, table, rows, converter, result, token);
+ }
+ return result;
+ }
+
+ ///
+ /// Updates all BaseUANode to CDF raw
+ ///
+ /// UAExtractor instance
+ /// Name of metadata database in CDF
+ /// Name of metadata table in CDF
+ /// Dictionary map of BaseUANode of their keys
+ /// Converter
+ /// Operation result
+ /// Indicates if it is an update operation
+ /// Cancellation token
+ /// Task
+ private async Task UpdateRows(UAExtractor extractor, string database, string table,
+ IDictionary dataSet, ConverterType converter, Result result, CancellationToken token) where T : BaseUANode
+ {
+ await UpsertRows(
+ database,
+ table,
+ rows =>
+ {
+ if (rows == null)
+ {
+ return dataSet
+ .Select(
+ kvp =>
+ (kvp.Key, update: PusherUtils.CreateRawUpdate(log, extractor.StringConverter, kvp.Value, null, converter))
+ )
+ .Where(elem => elem.update != null)
+ .ToDictionary(pair => pair.Key, pair => pair.update!.Value);
+ }
+
+ var toWrite =
+ new List<(string key, RawRow> row, T node)>();
+
+ foreach (var row in rows)
+ {
+ if (dataSet.TryGetValue(row.Key, out var node))
+ {
+ toWrite.Add((row.Key, row, node));
+ dataSet.Remove(row.Key);
+ }
+ }
+
+ var updates = new Dictionary();
+
+ foreach (var (key, row, node) in toWrite)
+ {
+ var update = PusherUtils.CreateRawUpdate(log, extractor.StringConverter, node, row, converter);
+
+ if (update != null)
+ {
+ updates[key] = update.Value;
+ if (row == null)
+ {
+ result.Created++;
+ }
+ else
+ {
+ result.Updated++;
+ }
+ }
+ }
+
+ return updates;
+ },
+ null,
+ token
+ );
+ }
+
+ ///
+ /// Creates all BaseUANode to CDF raw
+ ///
+ /// UAExtractor instance
+ /// Name of metadata database in CDF
+ /// Name of metadata table in CDF
+ /// Dictionary map of BaseUANode of their keys
+ /// Converter
+ /// Operation result
+ /// Indicates if it is an update operation
+ /// Cancellation token
+ /// Task
+ private async Task CreateRows(UAExtractor extractor, string database, string table,
+ IDictionary dataMap, ConverterType converter, Result result, CancellationToken token) where T : BaseUANode
+ {
+ await EnsureRows(
+ database,
+ table,
+ dataMap.Keys,
+ ids =>
+ {
+ var rows = ids.Select(id => (dataMap[id], id));
+ var creates = rows
+ .Select(pair => (pair.Item1.ToJson(log, extractor.StringConverter, converter), pair.id))
+ .Where(pair => pair.Item1 != null)
+ .ToDictionary(pair => pair.id, pair => pair.Item1!.RootElement);
+ result.Created += creates.Count;
+ return creates;
+ },
+ new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase },
+ token
+ );
+ }
+
+ ///
+ /// Upserts all BaseUANode to CDF raw
+ ///
+ /// Name of metadata database in CDF
+ /// Name of metadata table in CDF
+ /// Callback to build the dto
+ /// Json serialization options
+ /// Cancellation token
+ /// Task
+ private async Task UpsertRows(
+ string dbName,
+ string tableName,
+ Func<
+ IEnumerable>>?,
+ IDictionary
+ > dtoBuilder,
+ JsonSerializerOptions? options,
+ CancellationToken token
+ )
+ {
+ int count = 0;
+ async Task CallAndCreate(IEnumerable>>? rows)
+ {
+ var toUpsert = dtoBuilder(rows);
+ count += toUpsert.Count;
+ await destination.InsertRawRowsAsync(dbName, tableName, toUpsert, options, token);
+ }
+
+ string? cursor = null;
+ do
+ {
+ try
+ {
+ var result = await destination.CogniteClient.Raw.ListRowsAsync<
+ Dictionary
+ >(
+ dbName,
+ tableName,
+ new RawRowQuery { Cursor = cursor, Limit = 10_000 },
+ null,
+ token
+ );
+ cursor = result.NextCursor;
+
+ await CallAndCreate(result.Items);
+ }
+ catch (ResponseException ex) when (ex.Code == 404)
+ {
+ log.LogWarning("Table or database not found: {Message}", ex.Message);
+ break;
+ }
+ } while (cursor != null);
+
+ await CallAndCreate(null);
+
+ log.LogInformation("Updated or created {Count} rows in CDF Raw", count);
+ }
+
+ ///
+ /// Ensure all rows in CDF
+ ///
+ /// Name of metadata database in CDF
+ /// Name of metadata table in CDF
+ /// keys
+ /// Callback to build the dto
+ /// Json serialization options
+ /// Cancellation token
+ /// Task
+ private async Task EnsureRows(string dbName, string tableName, IEnumerable keys,
+ Func, IDictionary> dtoBuilder, JsonSerializerOptions options, CancellationToken token)
+ {
+ var rows = await GetRows(dbName, tableName, new[] { "," }, token);
+ var existing = rows.Select(row => row.Key);
+
+ var toCreate = keys.Except(existing);
+ if (!toCreate.Any())
+ return;
+ log.LogInformation("Creating {Count} raw rows in CDF", toCreate.Count());
+
+ var createDtos = dtoBuilder(toCreate);
+
+ await destination.InsertRawRowsAsync(dbName, tableName, createDtos, options, token);
+ }
+
+ public async Task PushReferences(string database, string table, IEnumerable relationships, CancellationToken token)
+ {
+ var result = new Result { Created = 0, Updated = 0 };
+ await EnsureRows(
+ database,
+ table,
+ relationships.Select(rel => rel.ExternalId),
+ ids =>
+ {
+ var idSet = ids.ToHashSet();
+ var creates = relationships
+ .Where(rel => idSet.Contains(rel.ExternalId))
+ .ToDictionary(rel => rel.ExternalId);
+ result.Created += creates.Count;
+ return creates;
+ },
+ new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase },
+ token
+ );
+ return result;
+ }
+ }
+}
diff --git a/Extractor/Pushers/Writers/RelationshipsWriter.cs b/Extractor/Pushers/Writers/RelationshipsWriter.cs
new file mode 100644
index 00000000..dfd025be
--- /dev/null
+++ b/Extractor/Pushers/Writers/RelationshipsWriter.cs
@@ -0,0 +1,106 @@
+/* Cognite Extractor for OPC-UA
+Copyright (C) 2021 Cognite AS
+
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License
+as published by the Free Software Foundation; either version 2
+of the License, or (at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */
+
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Cognite.Extractor.Common;
+using Cognite.Extractor.Utils;
+using Cognite.OpcUa.Config;
+using Cognite.OpcUa.Pushers.Writers.Dtos;
+using Cognite.OpcUa.Pushers.Writers.Interfaces;
+using CogniteSdk;
+using Microsoft.Extensions.Logging;
+
+namespace Cognite.OpcUa.Pushers.Writers
+{
+ public class RelationshipsWriter : IRelationshipsWriter
+ {
+ private readonly ILogger log;
+ private readonly FullConfig config;
+ private readonly CogniteDestination destination;
+
+ public RelationshipsWriter( ILogger logger, CogniteDestination destination, FullConfig config)
+ {
+ this.log = logger;
+ this.config = config;
+ this.destination = destination;
+ }
+
+ ///
+ /// Push all refernces to CDF relationship
+ ///
+ /// List of sanitized references
+ /// Cancellation token
+ /// A result reporting items created/updated
+ public async Task PushReferences(IEnumerable relationships, CancellationToken token)
+ {
+ var result = new Result{ Created = 0, Updated = 0 };
+ var counts = await Task.WhenAll(
+ relationships.ChunkBy(1000).Select(chunk => PushReferencesChunk(chunk, token))
+ );
+ result.Created += counts.Sum();
+ return result;
+ }
+
+ ///
+ /// Push all references in chunks
+ ///
+ /// List of sanitized references
+ /// Cancellation token
+ /// Task
+ private async Task PushReferencesChunk(IEnumerable relationships, CancellationToken token)
+ {
+ if (!relationships.Any())
+ return 0;
+ try
+ {
+ await destination.CogniteClient.Relationships.CreateAsync(relationships, token);
+ return relationships.Count();
+ }
+ catch (ResponseException ex)
+ {
+ if (ex.Duplicated.Any())
+ {
+ var existing = new HashSet();
+ foreach (var dict in ex.Duplicated)
+ {
+ if (dict.TryGetValue("externalId", out var value))
+ {
+ if (value is MultiValue.String strValue)
+ {
+ existing.Add(strValue.Value);
+ }
+ }
+ }
+ if (!existing.Any())
+ throw;
+
+ relationships = relationships
+ .Where(rel => !existing.Contains(rel.ExternalId))
+ .ToList();
+ return await PushReferencesChunk(relationships, token);
+ }
+ else
+ {
+ throw;
+ }
+ }
+ }
+ }
+}
diff --git a/Extractor/Pushers/Writers/TimeseriesWriter.cs b/Extractor/Pushers/Writers/TimeseriesWriter.cs
new file mode 100644
index 00000000..b20cfe51
--- /dev/null
+++ b/Extractor/Pushers/Writers/TimeseriesWriter.cs
@@ -0,0 +1,99 @@
+/* Cognite Extractor for OPC-UA
+Copyright (C) 2021 Cognite AS
+
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License
+as published by the Free Software Foundation; either version 2
+of the License, or (at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */
+
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Cognite.Extensions;
+using Cognite.Extractor.Utils;
+using Cognite.OpcUa.Config;
+using Cognite.OpcUa.Nodes;
+using Cognite.OpcUa.Pushers.Writers.Dtos;
+using CogniteSdk;
+using Microsoft.Extensions.Logging;
+using Opc.Ua;
+
+namespace Cognite.OpcUa.Pushers.Writers
+{
+ public class TimeseriesWriter : BaseTimeseriesWriter
+ {
+ public TimeseriesWriter(ILogger logger, CogniteDestination destination, FullConfig config)
+ : base(logger, destination, config)
+ { }
+
+ protected override IEnumerable BuildTimeseries(IDictionary tsMap,
+ IEnumerable ids, UAExtractor extractor, IDictionary nodeToAssetIds, Result result)
+ {
+ var tss = ids.Select(id => tsMap[id]);
+ var creates = tss.Select(
+ ts =>
+ ts.ToTimeseries(
+ config,
+ extractor,
+ extractor,
+ config.Cognite?.DataSet?.Id,
+ nodeToAssetIds,
+ config.Cognite?.MetadataMapping?.Timeseries
+ )
+ )
+ .Where(ts => ts != null);
+ result.Created += creates.Count();
+ return creates;
+ }
+
+ ///
+ /// Update BaseUANode to CDF Timeseries
+ ///
+ /// UAExtractor instance
+ /// Dictionary of mapping of variables to keys
+ /// Node to assets to ids
+ /// Type update configuration
+ /// Operation result
+ /// Cancellation token
+ /// Operation result
+ protected override async Task UpdateTimeseries(UAExtractor extractor, IDictionary tsMap,
+ IEnumerable timeseries, IDictionary nodeToAssetIds, TypeUpdateConfig update, Result result, CancellationToken token)
+ {
+ var updates = new List();
+ var existing = timeseries.ToDictionary(asset => asset.ExternalId);
+ foreach (var kvp in tsMap)
+ {
+ if (existing.TryGetValue(kvp.Key, out var ts))
+ {
+ var tsUpdate = PusherUtils.GetTSUpdate(config, extractor, ts, kvp.Value, update, nodeToAssetIds);
+ if (tsUpdate == null) continue;
+ if (tsUpdate.AssetId != null || tsUpdate.Description != null
+ || tsUpdate.Name != null || tsUpdate.Metadata != null)
+ {
+ updates.Add(new TimeSeriesUpdateItem(ts.ExternalId) { Update = tsUpdate });
+ }
+ }
+ }
+
+ if (updates.Any())
+ {
+ var res = await destination.UpdateTimeSeriesAsync(updates, RetryMode.OnError, SanitationMode.Clean, token);
+
+ logger.LogResult(res, RequestType.UpdateTimeSeries, false);
+ res.ThrowOnFatal();
+
+ result.Updated += res.Results?.Count() ?? 0;
+ }
+ }
+ }
+}
diff --git a/Extractor/Pushers/Writers/WriterUtils.cs b/Extractor/Pushers/Writers/WriterUtils.cs
new file mode 100644
index 00000000..e4db3ff3
--- /dev/null
+++ b/Extractor/Pushers/Writers/WriterUtils.cs
@@ -0,0 +1,75 @@
+using System.Threading;
+using Cognite.Extractor.Utils;
+using Cognite.OpcUa.Config;
+using Cognite.OpcUa.Pushers.FDM;
+using Cognite.OpcUa.Pushers.Writers.Interfaces;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+
+namespace Cognite.OpcUa.Pushers.Writers
+{
+ public static class WriterUtils
+ {
+ public static void AddWriters(this IServiceCollection services, CancellationToken token, FullConfig config)
+ {
+ services.AddSingleton(provider => {
+ var destination = provider.GetRequiredService();
+ var config = provider.GetRequiredService();
+ return (config.Cognite?.MetadataTargets?.Clean?.Timeseries ?? false)
+ ? new TimeseriesWriter(provider.GetRequiredService>(), destination, config)
+ : new MinimalTimeseriesWriter(provider.GetRequiredService>(), destination, config);
+ });
+ if (config.Cognite?.MetadataTargets?.Clean?.Assets ?? false)
+ {
+ services.AddSingleton(provider => {
+ var destination = provider.GetRequiredService();
+ return new AssetsWriter(
+ provider.GetRequiredService>(),
+ destination,
+ config
+ );
+ });
+ }
+ if (config.Cognite?.MetadataTargets?.Clean?.Relationships ?? false)
+ {
+ services.AddSingleton(provider => {
+ var destination = provider.GetRequiredService();
+ return new RelationshipsWriter(
+ provider.GetRequiredService>(),
+ destination,
+ config
+ );
+ });
+ }
+ if (config.Cognite?.MetadataTargets?.Raw is not null)
+ {
+ services.AddSingleton(provider => {
+ var destination = provider.GetRequiredService();
+ return new RawWriter(
+ provider.GetRequiredService>(),
+ destination,
+ config
+ );
+ });
+ }
+ if (config.Cognite?.MetadataTargets?.DataModels != null && config.Cognite.MetadataTargets.DataModels.Enabled)
+ {
+ services.AddSingleton(provider => {
+ var destination = provider.GetRequiredService();
+ return new FDMWriter(provider.GetRequiredService(), destination,
+ provider.GetRequiredService>());
+ });
+ }
+ services.AddSingleton