Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some improvements to reconnect #727

Merged
merged 4 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 44 additions & 9 deletions Extractor/SessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

namespace Cognite.OpcUa
{

enum SessionManagerState
{
Connecting,
Connected
}
public class SessionManager : IDisposable
{
private readonly SourceConfig config;
Expand All @@ -28,11 +32,13 @@ public class SessionManager : IDisposable
private readonly SemaphoreSlim redundancyReconnectLock = new SemaphoreSlim(1);

private ISession? session;
private bool currentSessionIsDead;
private CancellationToken liveToken;
private bool disposedValue;
private int timeout;

private SessionManagerState state = SessionManagerState.Connecting;
private object stateLock = new object();

private static readonly Counter connects = Metrics
.CreateCounter("opcua_connects", "Number of times the client has connected to and mapped the opcua server");
private static readonly Gauge connected = Metrics
Expand Down Expand Up @@ -114,10 +120,34 @@ private void SetNewSession(ISession session)
session.KeepAlive += ClientKeepAlive;
session.PublishError -= OnPublishError;
session.PublishError += OnPublishError;
log.LogInformation("Registered new session with ID: {Id}", session.SessionId);
if (session.SubscriptionCount > 0)
{
foreach (var sub in session.Subscriptions)
{
log.LogDebug("Session already has subscription: {Sub} with {Count} monitored items", sub.DisplayName, sub.MonitoredItemCount);
}
}
lock (stateLock)
{
state = SessionManagerState.Connected;
}
lock (sessionWaitExtLock)
{
if (this.session != session && this.session != null)
{
try
{
this.session.KeepAlive -= ClientKeepAlive;
this.session.PublishError -= OnPublishError;
this.session.Dispose();
}
catch (Exception ex)
{
log.LogError(ex, "Failed to dispose old session: {M}", ex.Message);
}
}
this.session = session;
currentSessionIsDead = false;
Context.UpdateFromSession(session);
for (; sessionWaiters > 0; sessionWaiters--)
{
Expand Down Expand Up @@ -420,11 +450,8 @@ private async Task<SessionRedundancyResult> CreateSessionWithRedundancy(IEnumera
{
try
{
if (currentSessionIsDead)
{
log.LogInformation("Attempting to reconnect to the current server before switching to another");
initial.Session = await CreateSessionDirect(initial.EndpointUrl, initial.Session);
}
log.LogInformation("Attempting to reconnect to the current server before switching to another");
initial.Session = await CreateSessionDirect(initial.EndpointUrl, initial.Session);
initial.ServiceLevel = await ReadServiceLevel(initial.Session);
if (initial.ServiceLevel >= config.Redundancy.ServiceLevelThreshold)
{
Expand Down Expand Up @@ -501,13 +528,21 @@ private void ClientKeepAlive(ISession sender, KeepAliveEventArgs eventArgs)
client.LogDump("Keep Alive", eventArgs);
if (eventArgs.Status == null || !ServiceResult.IsNotGood(eventArgs.Status)) return;
log.LogWarning("Keep alive failed: {Status}", eventArgs.Status);
lock (stateLock)
{
if (state == SessionManagerState.Connecting)
{
log.LogTrace("Session already connecting, skipping");
return;
}
state = SessionManagerState.Connecting;
}
if (liveToken.IsCancellationRequested) return;
connected.Set(0);

log.LogWarning("--- RECONNECTING ---");
if (!liveToken.IsCancellationRequested)
{
currentSessionIsDead = true;
client.Callbacks.TaskScheduler.ScheduleTask(null, async (_) =>
{
log.LogInformation("Attempting to reconnect to server");
Expand Down
10 changes: 7 additions & 3 deletions Test/Unit/PubSubTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using Cognite.Extractor.Common;
/*
PubSub is currently broken in the SDK, see https://github.com/OPCFoundation/UA-.NETStandard/issues/2827

using Cognite.Extractor.Common;
using Cognite.Extractor.Testing;
using Cognite.OpcUa.Config;
using Cognite.OpcUa.PubSub;
Expand Down Expand Up @@ -49,7 +52,7 @@ public PubSubTests(ITestOutputHelper output, PubSubTestFixture tester)
tester.Client.TypeManager.Reset();
}

[Theory(Timeout = 20000)]
[Theory(Timeout = 30000)]
[InlineData(true)]
[InlineData(false)]
public async Task TestPubSubConfiguration(bool uadp)
Expand Down Expand Up @@ -100,7 +103,7 @@ public async Task TestPubSubConfiguration(bool uadp)
Assert.Equal(Attributes.Value, vb.AttributeId);
}
}
[Theory(Timeout = 20000)]
[Theory(Timeout = 30000)]
[InlineData(true)]
[InlineData(false)]
public async Task TestPubSubData(bool uadp)
Expand Down Expand Up @@ -131,3 +134,4 @@ public async Task TestPubSubData(bool uadp)
}
}
}
*/
2 changes: 1 addition & 1 deletion Test/Utils/BaseExtractorTestFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void WipeEventHistory()
public virtual async Task InitializeAsync()
{
var startTask = Start();
var resultTask = await Task.WhenAny(startTask, Task.Delay(20000));
var resultTask = await Task.WhenAny(startTask, Task.Delay(30000));
Assert.Equal(startTask, resultTask);
if (startTask.Exception != null)
{
Expand Down
5 changes: 5 additions & 0 deletions manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ schema:
- "https://raw.githubusercontent.com/"

versions:
"2.31.1":
description: Minor improvements to reconnect reliability with large, unstable servers.
changelog:
fixed:
- Avoid a potential race condition when reconnecting to the server.
"2.31.0":
description: Support for writing timeseries to the core data models.
changelog:
Expand Down
Loading