Skip to content

Commit

Permalink
Merge refs/heads/master into renovate/vstest-monorepo
Browse files Browse the repository at this point in the history
  • Loading branch information
cognite-bulldozer[bot] authored Aug 22, 2024
2 parents 96627ed + e3d2850 commit 03a0d9e
Show file tree
Hide file tree
Showing 18 changed files with 209 additions and 114 deletions.
3 changes: 1 addition & 2 deletions Extractor/SessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public class SessionManager : IDisposable

public SessionContext Context { get; }


public SessionManager(FullConfig config, UAClient parent, ILogger log)
{
client = parent;
Expand Down Expand Up @@ -457,9 +456,9 @@ private async Task<SessionRedundancyResult> CreateSessionWithRedundancy(IEnumera
{
if (current != null)
{
await current.Session.CloseAsync();
current.Session.KeepAlive -= ClientKeepAlive;
current.Session.PublishError -= OnPublishError;
await current.Session.CloseAsync();
current.Session.Dispose();
}
current = new SessionRedundancyResult(session, url, serviceLevel);
Expand Down
4 changes: 2 additions & 2 deletions Extractor/State.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public class State
private readonly ConcurrentDictionary<NodeId, MappedNode> mappedNodes =
new ConcurrentDictionary<NodeId, MappedNode>();

public IEnumerable<VariableExtractionState> NodeStates => nodeStates.Values;
public IEnumerable<EventExtractionState> EmitterStates => emitterStates.Values;
public ICollection<VariableExtractionState> NodeStates => nodeStates.Values;
public ICollection<EventExtractionState> EmitterStates => emitterStates.Values;

/// <summary>
/// Return a NodeExtractionState by externalId
Expand Down
2 changes: 2 additions & 0 deletions Extractor/UAClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,8 @@ public async Task ReadNodeData(IEnumerable<BaseUANode> nodes, CancellationToken
$"Too few results in ReadNodeData, this is a bug in the OPC-UA server implementation, total : {total}, expected: {expected}");
}

token.ThrowIfCancellationRequested();

int idx = 0;
foreach (var node in nodes)
{
Expand Down
72 changes: 56 additions & 16 deletions Extractor/UAExtractor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ public class UAExtractor : BaseExtractor<FullConfig>, IUAClientAccess, IClientCa

public bool Started { get; private set; }

private int subscribed;
private bool subscribeFlag;

private static readonly Gauge startTime = Metrics
.CreateGauge("opcua_start_time", "Start time for the extractor");

Expand Down Expand Up @@ -108,6 +105,9 @@ public class UAExtractor : BaseExtractor<FullConfig>, IUAClientAccess, IClientCa
!Config.Source.Redundancy.MonitorServiceLevel
|| uaClient.SessionManager.CurrentServiceLevel >= Config.Source.Redundancy.ServiceLevelThreshold;

// Active subscriptions, used in tests for WaitForSubscription().
private HashSet<SubscriptionName> activeSubscriptions = new();

/// <summary>
/// Construct extractor with list of pushers
/// </summary>
Expand Down Expand Up @@ -384,8 +384,10 @@ protected override async Task OnStop()
/// </summary>
public async Task RestartExtractor()
{
subscribed = 0;
subscribeFlag = false;
lock (activeSubscriptions)
{
activeSubscriptions.Clear();
}

historyReader?.AddIssue(HistoryReader.StateIssue.NodeHierarchyRead);

Expand Down Expand Up @@ -506,15 +508,23 @@ public async Task Rebrowse()
/// Used for testing, wait for subscriptions to be created, with given timeout.
/// </summary>
/// <param name="timeout">Timeout in 10ths of a second</param>
public async Task WaitForSubscriptions(int timeout = 100)
public async Task WaitForSubscription(SubscriptionName name, int timeout = 100)
{
int time = 0;
while (!subscribeFlag && subscribed < 2 && time++ < timeout) await Task.Delay(100);
if (time >= timeout && !subscribeFlag && subscribed < 2)
while (time++ < timeout)
{
throw new TimeoutException("Waiting for push timed out");
lock (activeSubscriptions)
{
if (activeSubscriptions.Contains(name))
{
log.LogDebug("Waited {TimeS} milliseconds for subscriptions", time * 100);

return;
}
}
await Task.Delay(100);
}
log.LogDebug("Waited {TimeS} milliseconds for subscriptions", time * 100);
throw new TimeoutException("Waiting for subscriptions timed out");
}

public void TriggerHistoryRestart()
Expand Down Expand Up @@ -545,6 +555,10 @@ public void OnSubscriptionFailure(SubscriptionName subscription)

public void OnCreatedSubscription(SubscriptionName subscription)
{
lock (activeSubscriptions)
{
activeSubscriptions.Add(subscription);
}
switch (subscription)
{
case SubscriptionName.Events:
Expand All @@ -555,6 +569,22 @@ public void OnCreatedSubscription(SubscriptionName subscription)
break;
}
}

public void RemoveKnownSubscription(SubscriptionName name)
{
lock (activeSubscriptions)
{
activeSubscriptions.Remove(name);
}
}

public void ClearKnownSubscriptions()
{
lock (activeSubscriptions)
{
activeSubscriptions.Clear();
}
}
#endregion

#region Mapping
Expand Down Expand Up @@ -959,6 +989,14 @@ private async Task PushNodes(PusherInput input)

bool initial = input.Variables.Count() + input.Objects.Count() >= State.NumActiveNodes;

if (initial && !input.Variables.Any() && Config.Subscriptions.DataPoints)
{
log.LogWarning("No variables found, the extractor can run without any variables, but will not read history. " +
"There may be issues reported at the debug log level, or this may be a configuration issue. " +
"If this is intentional, and you do not want to read datapoints at all, you should disable " +
"data point subscriptions by setting `subscriptions.data-points` to false");
}

var pushTasks = pushers.Select(pusher => PushNodes(input, pusher, initial));

if (Config.DryRun)
Expand Down Expand Up @@ -1018,6 +1056,7 @@ private async Task PushNodes(PusherInput input)
}
}


pushTasks = pushTasks.ToList();
log.LogInformation("Waiting for pushes on pushers");
await Task.WhenAll(pushTasks);
Expand Down Expand Up @@ -1057,9 +1096,6 @@ await uaClient.SubscriptionManager.EnqueueTaskAndWait(new EventSubscriptionTask(
this),
Source.Token);
}

Interlocked.Increment(ref subscribed);
if (!State.NodeStates.Any() || subscribed > 1) subscribeFlag = true;
}

/// <summary>
Expand All @@ -1080,9 +1116,6 @@ await uaClient.SubscriptionManager.EnqueueTaskAndWait(new DataPointSubscriptionT
this),
Source.Token);
}

Interlocked.Increment(ref subscribed);
if (!State.EmitterStates.Any() || subscribed > 1) subscribeFlag = true;
}

/// <summary>
Expand Down Expand Up @@ -1248,8 +1281,15 @@ protected override async ValueTask DisposeAsyncCore()
await base.DisposeAsyncCore();
}

private int disposed = 0;

protected override void Dispose(bool disposing)
{
// Only run dispose once...
if (Interlocked.CompareExchange(ref disposed, 1, 0) == 0)
{
return;
}
if (disposing)
{
Starting.Set(0);
Expand Down
2 changes: 1 addition & 1 deletion ExtractorLauncher/ExtractorStarter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private static void VerifyAndBuildConfig(

if (options != null)
{
options.Restart |= config.Source.ExitOnFailure;
options.Restart &= !config.Source.ExitOnFailure;
}

string? configResult = VerifyConfig(log, config);
Expand Down
12 changes: 11 additions & 1 deletion Server/ServerController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ sealed public class ServerController : IDisposable
private readonly IServiceProvider provider;
private bool running;
private IEnumerable<string> nodeSetFiles;
private readonly string certPath;

public ServerController(
IEnumerable<PredefinedSetup> setups,
Expand All @@ -53,7 +54,8 @@ public ServerController(
string mqttUrl = "mqtt://localhost:4060",
string endpointUrl = "opc.tcp://localhost",
bool logTrace = false,
IEnumerable<string> nodeSetFiles = null)
IEnumerable<string> nodeSetFiles = null,
string certPath = null)
{
this.setups = setups;
this.port = port;
Expand All @@ -63,6 +65,7 @@ public ServerController(
log = provider.GetRequiredService<ILogger<ServerController>>();
this.provider = provider;
this.nodeSetFiles = nodeSetFiles;
this.certPath = certPath;
}

public void Dispose()
Expand All @@ -83,6 +86,13 @@ public async Task Start()
{
var cfg = await app.LoadApplicationConfiguration(Path.Join("config", $"{ConfigRoot}.Config.xml"), false);
var address = cfg.ServerConfiguration.BaseAddresses[0] = $"{endpointUrl}:{port}";
if (certPath != null)
{
cfg.SecurityConfiguration.ApplicationCertificate.StorePath = $"{certPath}/pki/own";
cfg.SecurityConfiguration.TrustedIssuerCertificates.StorePath = $"{certPath}/pki/issuer";
cfg.SecurityConfiguration.TrustedPeerCertificates.StorePath = $"{certPath}/pki/trusted";
cfg.SecurityConfiguration.RejectedCertificateStore.StorePath = $"{certPath}/pki/rejected";
}
await app.CheckApplicationInstanceCertificate(false, 0);
Server = new TestServer(setups, mqttUrl, provider, logTrace, nodeSetFiles);
await Task.Run(async () => await app.Start(Server));
Expand Down
2 changes: 2 additions & 0 deletions Test/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
using Xunit;
[assembly: CollectionBehavior(DisableTestParallelization = true)]
Loading

0 comments on commit 03a0d9e

Please sign in to comment.