From 0f90044c9e619f446a626fbd948fb26b4d0880ec Mon Sep 17 00:00:00 2001 From: Einar Omang Date: Wed, 30 Oct 2024 12:18:07 +0100 Subject: [PATCH 1/4] Some improvements to reconnect There is an issue here, this may help, not sure if this is actually the cause of anything major though. --- Extractor/SessionManager.cs | 53 ++++++++++++++++++++++++++++++------- manifest.yml | 5 ++++ 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/Extractor/SessionManager.cs b/Extractor/SessionManager.cs index 72b87a5c..2386dcc4 100644 --- a/Extractor/SessionManager.cs +++ b/Extractor/SessionManager.cs @@ -13,7 +13,11 @@ namespace Cognite.OpcUa { - + enum SessionManagerState + { + Connecting, + Connected + } public class SessionManager : IDisposable { private readonly SourceConfig config; @@ -28,11 +32,13 @@ public class SessionManager : IDisposable private readonly SemaphoreSlim redundancyReconnectLock = new SemaphoreSlim(1); private ISession? session; - private bool currentSessionIsDead; private CancellationToken liveToken; private bool disposedValue; private int timeout; + private SessionManagerState state = SessionManagerState.Connecting; + private object stateLock = new object(); + private static readonly Counter connects = Metrics .CreateCounter("opcua_connects", "Number of times the client has connected to and mapped the opcua server"); private static readonly Gauge connected = Metrics @@ -114,10 +120,34 @@ private void SetNewSession(ISession session) session.KeepAlive += ClientKeepAlive; session.PublishError -= OnPublishError; session.PublishError += OnPublishError; + log.LogInformation("Registered new session with ID: {Id}", session.SessionId); + if (session.SubscriptionCount > 0) + { + foreach (var sub in session.Subscriptions) + { + log.LogDebug("Session already has subscription: {Sub} with {Count} monitored items", sub.DisplayName, sub.MonitoredItemCount); + } + } + lock (stateLock) + { + state = SessionManagerState.Connected; + } lock (sessionWaitExtLock) { + if (this.session != session && this.session != null) + { + try + { + this.session.KeepAlive -= ClientKeepAlive; + this.session.PublishError -= OnPublishError; + this.session.Dispose(); + } + catch (Exception ex) + { + log.LogError(ex, "Failed to dispose old session: {M}", ex.Message); + } + } this.session = session; - currentSessionIsDead = false; Context.UpdateFromSession(session); for (; sessionWaiters > 0; sessionWaiters--) { @@ -420,11 +450,8 @@ private async Task CreateSessionWithRedundancy(IEnumera { try { - if (currentSessionIsDead) - { - log.LogInformation("Attempting to reconnect to the current server before switching to another"); - initial.Session = await CreateSessionDirect(initial.EndpointUrl, initial.Session); - } + log.LogInformation("Attempting to reconnect to the current server before switching to another"); + initial.Session = await CreateSessionDirect(initial.EndpointUrl, initial.Session); initial.ServiceLevel = await ReadServiceLevel(initial.Session); if (initial.ServiceLevel >= config.Redundancy.ServiceLevelThreshold) { @@ -501,13 +528,21 @@ private void ClientKeepAlive(ISession sender, KeepAliveEventArgs eventArgs) client.LogDump("Keep Alive", eventArgs); if (eventArgs.Status == null || !ServiceResult.IsNotGood(eventArgs.Status)) return; log.LogWarning("Keep alive failed: {Status}", eventArgs.Status); + lock (stateLock) + { + if (state == SessionManagerState.Connecting) + { + log.LogTrace("Session already connecting, skipping"); + return; + } + state = SessionManagerState.Connecting; + } if (liveToken.IsCancellationRequested) return; connected.Set(0); log.LogWarning("--- RECONNECTING ---"); if (!liveToken.IsCancellationRequested) { - currentSessionIsDead = true; client.Callbacks.TaskScheduler.ScheduleTask(null, async (_) => { log.LogInformation("Attempting to reconnect to server"); diff --git a/manifest.yml b/manifest.yml index 572fd931..a12bc3a2 100644 --- a/manifest.yml +++ b/manifest.yml @@ -67,6 +67,11 @@ schema: - "https://raw.githubusercontent.com/" versions: + "2.31.1": + description: Minor improvements to reconnect reliability with large, unstable servers. + changelog: + fixed: + - Avoid a potential race condition when reconnecting to the server. "2.31.0": description: Support for writing timeseries to the core data models. changelog: From 7e74c069285c2f63d7dc25ac1294cd1c1e74f987 Mon Sep 17 00:00:00 2001 From: Einar Omang Date: Wed, 30 Oct 2024 13:22:51 +0100 Subject: [PATCH 2/4] More time to start tests... --- Test/Utils/BaseExtractorTestFixture.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Test/Utils/BaseExtractorTestFixture.cs b/Test/Utils/BaseExtractorTestFixture.cs index 28132da7..b8298724 100644 --- a/Test/Utils/BaseExtractorTestFixture.cs +++ b/Test/Utils/BaseExtractorTestFixture.cs @@ -271,7 +271,7 @@ public void WipeEventHistory() public virtual async Task InitializeAsync() { var startTask = Start(); - var resultTask = await Task.WhenAny(startTask, Task.Delay(20000)); + var resultTask = await Task.WhenAny(startTask, Task.Delay(30000)); Assert.Equal(startTask, resultTask); if (startTask.Exception != null) { From 3d5a58c84aa81a535b3999755d2f0605a0490c66 Mon Sep 17 00:00:00 2001 From: Einar Omang Date: Wed, 30 Oct 2024 14:33:46 +0100 Subject: [PATCH 3/4] Disable pubsub tests --- Test/Unit/PubSubTests.cs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/Test/Unit/PubSubTests.cs b/Test/Unit/PubSubTests.cs index bb821f6a..3a491385 100644 --- a/Test/Unit/PubSubTests.cs +++ b/Test/Unit/PubSubTests.cs @@ -1,4 +1,7 @@ -using Cognite.Extractor.Common; +/* +PubSub is currently broken in the SDK, see https://github.com/OPCFoundation/UA-.NETStandard/issues/2827 + +using Cognite.Extractor.Common; using Cognite.Extractor.Testing; using Cognite.OpcUa.Config; using Cognite.OpcUa.PubSub; @@ -49,7 +52,7 @@ public PubSubTests(ITestOutputHelper output, PubSubTestFixture tester) tester.Client.TypeManager.Reset(); } - [Theory(Timeout = 20000)] + [Theory(Timeout = 30000)] [InlineData(true)] [InlineData(false)] public async Task TestPubSubConfiguration(bool uadp) @@ -100,7 +103,7 @@ public async Task TestPubSubConfiguration(bool uadp) Assert.Equal(Attributes.Value, vb.AttributeId); } } - [Theory(Timeout = 20000)] + [Theory(Timeout = 30000)] [InlineData(true)] [InlineData(false)] public async Task TestPubSubData(bool uadp) @@ -131,3 +134,4 @@ public async Task TestPubSubData(bool uadp) } } } +*/ \ No newline at end of file From 1f455aa79a86f714dc54607242d3ad8262b974a3 Mon Sep 17 00:00:00 2001 From: Einar Omang Date: Wed, 30 Oct 2024 14:39:56 +0100 Subject: [PATCH 4/4] Format --- Test/Unit/PubSubTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Test/Unit/PubSubTests.cs b/Test/Unit/PubSubTests.cs index 3a491385..febc9a10 100644 --- a/Test/Unit/PubSubTests.cs +++ b/Test/Unit/PubSubTests.cs @@ -134,4 +134,4 @@ public async Task TestPubSubData(bool uadp) } } } -*/ \ No newline at end of file +*/