Skip to content

Commit

Permalink
Use FailureThresholdManager to Determine History Read Operation Succe…
Browse files Browse the repository at this point in the history
…ss (#426)

* bump utils and SDK versions

* use FailureThresholdManager to manage run status

* add parameter to example config

* Add test, update config, make errors more granular

* Fix tests

* Update utils

Remove some old dependencies in MQTTCDFBridge, they are included
indirectly

* Handle bizarre double/array issue

* Update manifest

---------

Co-authored-by: Einar Marstrander Omang <[email protected]>
  • Loading branch information
ozangoktan and einarmo authored Jul 4, 2023
1 parent a64aef8 commit 56d162e
Show file tree
Hide file tree
Showing 25 changed files with 173 additions and 111 deletions.
9 changes: 8 additions & 1 deletion Extractor/Config/HistoryConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */

using Cognite.Extractor.Common;
using System;
using System.ComponentModel;
using System.ComponentModel.DataAnnotations;
using Cognite.Extractor.Common;

namespace Cognite.OpcUa.Config
{
Expand Down Expand Up @@ -137,5 +137,12 @@ public ContinuationPointThrottlingConfig Throttling
/// </summary>
[DefaultValue(true)]
public bool LogBadValues { get; set; } = true;

/// <summary>
/// Threshold for the percentage of read operations failed before the run is considered erroneous.
/// Example: 10.0 -> History read operation would consider the run as failed if more that %10 of read operations fail.
/// </summary>
[DefaultValue(10.0)]
public double ErrorThreshold { get; set; } = 10.0;
}
}
10 changes: 5 additions & 5 deletions Extractor/Extractor.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AdysTech.InfluxDB.Client.Net.Core" Version="0.25.0" />
<PackageReference Include="Cognite.ExtractorUtils" Version="1.8.0" />
<PackageReference Include="Cognite.ExtractorUtils" Version="1.12.3" />
<PackageReference Include="Microsoft.CSharp" Version="4.7.0" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client" Version="1.4.371.60" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Configuration" Version="1.4.371.60" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.PubSub" Version="1.4.371.60-beta" />
<PackageReference Include="System.ComponentModel.Annotations" Version="5.0.0" />
<PackageReference Include="MQTTnet" Version="3.1.2" />
<PackageReference Include="CogniteSdk" Version="3.8.1" />
<PackageReference Include="System.Text.Json" Version="7.0.3" />
<PackageReference Include="MQTTnet" Version="3.1.2" />
<PackageReference Include="CogniteSdk" Version="3.11.1" />
<PackageReference Include="System.Text.Json" Version="7.0.3" />
</ItemGroup>
<!--ItemGroup>
<None Include="$(SolutionDir)config\**" CopyToOutputDirectory="Always" LinkBase="config\" />
</ItemGroup-->
</Project>
</Project>
4 changes: 3 additions & 1 deletion Extractor/History/HistoryReadParams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public DateTime Time
public IEncodeable? LastResult { get; set; }
public DateTime EndTime { get; set; }
public DateTime? StartTime { get; set; }
public StatusCode? LastStatus { get; set; }
public bool IsFailed => LastStatus != null && StatusCode.IsBad(LastStatus.Value);
}
/// <summary>
/// Parameter class containing the state of a single history read operation.
Expand All @@ -107,7 +109,7 @@ public HistoryReadParams(IEnumerable<HistoryReadNode> nodes, HistoryReadDetails

public bool Completed(HistoryReadNode item)
{
return item.Completed || Exception != null;
return item.Completed || Exception != null || item.IsFailed;
}
}
}
47 changes: 36 additions & 11 deletions Extractor/History/HistoryScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Cognite.Extractor.Common;
using Cognite.OpcUa.Config;
using Cognite.OpcUa.TypeCollectors;
using Cognite.OpcUa.Types;
using Microsoft.Extensions.Logging;
using Opc.Ua;
using Prometheus;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Cognite.OpcUa.History
{
Expand Down Expand Up @@ -86,7 +86,8 @@ public class HistoryScheduler : SharedResourceScheduler<HistoryReadNode>

private readonly TimeSpan? maxReadLength;

private readonly List<Exception> exceptions = new List<Exception>();
private readonly FailureThresholdManager<NodeId, Exception> thresholdManager;
private IReadOnlyDictionary<NodeId, Exception>? exceptions;

public HistoryScheduler(
ILogger log,
Expand Down Expand Up @@ -121,6 +122,12 @@ public HistoryScheduler(

nodeCount = count;

thresholdManager = new FailureThresholdManager<NodeId, Exception>(config.ErrorThreshold, nodeCount, (x) =>
{
exceptions = x;
TokenSource.Cancel();
});

historyStartTime = GetStartTime(config.StartTime);
if (!string.IsNullOrWhiteSpace(config.EndTime)) historyEndTime = CogniteTime.ParseTimestampString(config.EndTime)!;

Expand Down Expand Up @@ -332,9 +339,9 @@ protected override IEnumerable<IChunk<HistoryReadNode>> GetNextChunks(
{
log.LogWarning("ServiceLevel is low, so destination ranges are not currently beign updated. History will run again once ServiceLevel improves");
}
if (exceptions.Any())
if (exceptions != null && exceptions.Any())
{
throw new AggregateException(exceptions);
throw new AggregateException(exceptions.Select(x => x.Value));
}
}

Expand Down Expand Up @@ -392,13 +399,26 @@ protected override IEnumerable<HistoryReadNode> HandleTaskResult(IChunk<HistoryR
if (chunk.Exception != null)
{
LogReadFailure(chunk);
exceptions.Add(chunk.Exception);
foreach (var node in chunk.Items)
{
thresholdManager.Failed(node.Id, chunk.Exception);
}
AbortChunk(chunk, token);
return Enumerable.Empty<HistoryReadNode>();
}

var failed = new List<(string id, string status)>();
foreach (var node in chunk.Items)
{
if (node.IsFailed)
{
thresholdManager.Failed(node.Id, new ServiceResultException(node.LastStatus!.Value));
var symbolicId = StatusCode.LookupSymbolicId((uint)node.LastStatus.Value);
log.LogDebug("HistoryRead {Type} failed for node {Node}: {Status}", type, node.State.Id, symbolicId);
failed.Add((node.State.Id, symbolicId));
continue;
}

if (Data)
{
HistoryDataHandler(node);
Expand All @@ -416,7 +436,12 @@ protected override IEnumerable<HistoryReadNode> HandleTaskResult(IChunk<HistoryR
metrics.NumItems.Inc(node.LastRead);
}

var toTerminate = chunk.Items.Where(node => node.Completed).ToList();
if (failed.Any())
{
log.LogError("HistoryRead {Type} failed for {Nodes}", type, string.Join(", ", failed.Select(f => $"{f.id}: {f.status}")));
}

var toTerminate = chunk.Items.Where(node => node.Completed && !node.IsFailed).ToList();
LogHistoryTermination(log, toTerminate, type);

return Enumerable.Empty<HistoryReadNode>();
Expand Down
18 changes: 9 additions & 9 deletions Extractor/UAClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,10 @@ public async Task DoHistoryRead(HistoryReadParams readParams, CancellationToken
{
var data = results[i];
var node = readParams.Nodes[i];
node.LastStatus = data.StatusCode;

if (StatusCode.IsBad(data.StatusCode)) continue;

LogDump("HistoryRead node", node);
LogDump("HistoryRead data", data);

Expand Down Expand Up @@ -823,13 +827,6 @@ private async Task<HistoryReadResultCollection> ReadHistoryChunk(HistoryReadPara

numHistoryReads.Inc();

foreach (var result in results)
{
if (StatusCode.IsBad(result.StatusCode))
{
throw new ServiceResultException(result.StatusCode);
}
}
return results;
}

Expand Down Expand Up @@ -1418,8 +1415,11 @@ public static double ConvertToDouble(object datavalue)
if (datavalue is IEnumerable enumerable)
{
var enumerator = enumerable.GetEnumerator();
enumerator.MoveNext();
return ConvertToDouble(enumerator.Current);
if (enumerator.MoveNext())
{
return ConvertToDouble(enumerator.Current);
}
return 0;
}
// Give up if there is no clear way to convert it
if (!typeof(IConvertible).IsAssignableFrom(datavalue.GetType())) return 0;
Expand Down
8 changes: 2 additions & 6 deletions MQTTCDFBridge/MQTTCDFBridge.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Cognite.ExtractorUtils" Version="1.8.0" />
<PackageReference Include="Cognite.ExtractorUtils" Version="1.12.3" />
<PackageReference Include="Microsoft.CSharp" Version="4.7.0" />
<PackageReference Include="Microsoft.Extensions.Http" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Http.Polly" Version="7.0.8" />
<PackageReference Include="MQTTnet" Version="3.1.2" />
<PackageReference Include="Polly" Version="7.2.4" />
<PackageReference Include="Polly.Extensions.Http" Version="3.0.0" />
</ItemGroup>
</Project>
</Project>
1 change: 1 addition & 0 deletions Server/DebugMasterNodeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ServerIssueConfig
public int MaxHistoryNodes { get; set; }
public int RemainingBrowseCount { get; set; }
public int BrowseFailDenom { get; set; }
public Dictionary<NodeId, StatusCode> HistoryReadStatusOverride { get; } = new();
}
/// <summary>
/// The master node manager is called from the server with most "regular" service calls.
Expand Down
27 changes: 22 additions & 5 deletions Server/NodeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,27 @@ internal sealed class TestNodeManager : CustomNodeManager2

private readonly PubSubManager pubSub;
private IEnumerable<NodeSetBundle> nodeSetFiles;
private readonly ServerIssueConfig issues;

public TestNodeManager(IServerInternal server, ApplicationConfiguration configuration, IServiceProvider provider, IEnumerable<NodeSetBundle> nodeSetFiles = null)
public TestNodeManager(IServerInternal server, ApplicationConfiguration configuration, IServiceProvider provider, ServerIssueConfig issues, IEnumerable<NodeSetBundle> nodeSetFiles = null)
: base(server, configuration, GetNamespaces(nodeSetFiles))
{
SystemContext.NodeIdFactory = this;
store = new HistoryMemoryStore(provider.GetRequiredService<ILogger<HistoryMemoryStore>>());
log = provider.GetRequiredService<ILogger<TestNodeManager>>();
Ids = new NodeIdReference();
this.nodeSetFiles = nodeSetFiles;
this.issues = issues;
}

public TestNodeManager(IServerInternal server,
ApplicationConfiguration configuration,
IEnumerable<PredefinedSetup> predefinedNodes,
string mqttUrl,
IServiceProvider provider,
ServerIssueConfig issues,
IEnumerable<NodeSetBundle> nodeSetFiles = null) :
this(server, configuration, provider, nodeSetFiles)
this(server, configuration, provider, issues, nodeSetFiles)
{
this.predefinedNodes = predefinedNodes;
pubSub = new PubSubManager(mqttUrl, provider.GetRequiredService<ILogger<PubSubManager>>());
Expand Down Expand Up @@ -1771,12 +1774,19 @@ protected override void HistoryReadRawModified(
var (rawData, final) = store.ReadHistory(request);
var data = new HistoryData();

if (issues.HistoryReadStatusOverride.TryGetValue(nodeToRead.NodeId, out var code))
{
errors[handle.Index] = code;
}
else
{
errors[handle.Index] = ServiceResult.Good;
}

data.DataValues.AddRange(rawData);

log.LogInformation("Read raw modified: {Cnt}", rawData.Count());

errors[handle.Index] = ServiceResult.Good;

if (!final)
{
result.ContinuationPoint = SaveContinuationPoint(context, request);
Expand Down Expand Up @@ -1901,7 +1911,14 @@ protected override void HistoryReadEvents(

log.LogInformation("Read events: {Cnt}", rawData.Count());

errors[handle.Index] = ServiceResult.Good;
if (issues.HistoryReadStatusOverride.TryGetValue(nodeToRead.NodeId, out var code))
{
errors[handle.Index] = code;
}
else
{
errors[handle.Index] = ServiceResult.Good;
}

if (!final)
{
Expand Down
8 changes: 4 additions & 4 deletions Server/Server.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
<None Include="$(SolutionDir)config\**" CopyToOutputDirectory="Always" LinkBase="config\" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Cognite.ExtractorUtils" Version="1.8.0" />
<PackageReference Include="Cognite.ExtractorUtils" Version="1.12.3" />
<PackageReference Include="Microsoft.CSharp" Version="4.7.0" />
<PackageReference Include="Microsoft.NETCore.Platforms" Version="7.0.3" />
<PackageReference Include="MQTTnet" Version="3.1.2" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Configuration" Version="1.4.371.60" />
<PackageReference Include="MQTTnet" Version="3.1.2" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Configuration" Version="1.4.371.60" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.PubSub" Version="1.4.371.60-beta" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Server" Version="1.4.371.60" />
</ItemGroup>
</Project>
</Project>
2 changes: 1 addition & 1 deletion Server/TestServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ protected override void OnServerStarted(IServerInternal server)

protected override MasterNodeManager CreateMasterNodeManager(IServerInternal server, ApplicationConfiguration configuration)
{
custom = new TestNodeManager(server, configuration, setups, mqttUrl, provider, BuildNodeSetFiles(server.DefaultSystemContext));
custom = new TestNodeManager(server, configuration, setups, mqttUrl, provider, Issues, BuildNodeSetFiles(server.DefaultSystemContext));
var nodeManagers = new List<INodeManager> { custom };
// create the custom node managers.

Expand Down
44 changes: 0 additions & 44 deletions Test/CDFMockHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public class CDFMockHandler
public bool BlockAllConnections { get; set; }
public bool AllowPush { get; set; } = true;
public bool AllowEvents { get; set; } = true;
public bool AllowConnectionTest { get; set; } = true;
public bool StoreDatapoints { get; set; }
public MockMode mode { get; set; }
private HttpResponseMessage GetFailedRequest(HttpStatusCode code)
Expand Down Expand Up @@ -122,14 +121,6 @@ private async Task<HttpResponseMessage> MessageHandler(HttpRequestMessage req, C
return GetFailedRequest(HttpStatusCode.InternalServerError);
}

if (req.RequestUri.AbsolutePath == "/login/status")
{
var res = HandleLoginStatus();
res.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
res.Headers.Add("x-request-id", (requestIdCounter++).ToString(CultureInfo.InvariantCulture));
return res;
}

if (req.RequestUri.AbsolutePath == $"/api/playground/projects/{project}/functions/1234/call")
{
var funcContent = await req.Content.ReadAsStringAsync(cancellationToken);
Expand Down Expand Up @@ -679,41 +670,6 @@ private HttpResponseMessage HandleCreateEvents(string content)
};
}

private HttpResponseMessage HandleLoginStatus()
{
if (!AllowConnectionTest)
{
return new HttpResponseMessage(HttpStatusCode.InternalServerError)
{
Content = new StringContent(JsonConvert.SerializeObject(new ErrorWrapper
{
error = new ErrorContent
{
code = 501,
message = "bad something or other"
}
}))
};
}
var status = new LoginInfo
{
apiKeyId = 1,
loggedIn = true,
project = project,
projectId = 1,
user = "user"
};
var result = new LoginInfoWrapper
{
data = status
};
var data = JsonConvert.SerializeObject(result);
return new HttpResponseMessage(HttpStatusCode.OK)
{
Content = new StringContent(data)
};
}

private HttpResponseMessage HandleGetDatapoints(string content)
{
// Ignore query for now, this is only used to read the first point, so we just respond with that.
Expand Down
Loading

0 comments on commit 56d162e

Please sign in to comment.