Skip to content

Commit

Permalink
Release testing: Fix channel remove issue and channel exhaustion on r…
Browse files Browse the repository at this point in the history
…econnect (#2749)

- During a reconnect fault the channels remain open and are quickly exhausted. Immediately close channels in Opening state similar to channels in Connecting state instead of waiting for timeout cleanup.
- The channel concurrentDictionary sometimes failed to remove a channel. Removing ContainsKey and retry with TryAdd fixes the problem.
  • Loading branch information
mregen authored Sep 6, 2024
1 parent 48d64b5 commit 41363bd
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 81 deletions.
41 changes: 27 additions & 14 deletions Stack/Opc.Ua.Core/Stack/Tcp/TcpListenerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,10 @@ protected void ForceChannelFault(ServiceResult reason)
}

bool close = false;
if (State != TcpChannelState.Connecting)
if (State != TcpChannelState.Connecting && State != TcpChannelState.Opening)
{
int socketHandle = (Socket != null) ? Socket.Handle : 0;
if (socketHandle != -1)
int? socketHandle = Socket?.Handle;
if (socketHandle != null && socketHandle != -1)
{
Utils.LogError(
"{0} ForceChannelFault Socket={1:X8}, ChannelId={2}, TokenId={3}, Reason={4}",
Expand All @@ -249,7 +249,7 @@ protected void ForceChannelFault(ServiceResult reason)
}
else
{
// Close immediately if the client never got out of connecting state
// Close immediately if the client never got out of connecting or opening state
close = true;
}

Expand All @@ -268,13 +268,11 @@ protected void ForceChannelFault(ServiceResult reason)
if (close)
{
// close channel immediately.
ChannelClosed();
}
else
{
// notify any monitors.
NotifyMonitors(reason, false);
ChannelFaulted();
}

// notify any monitors.
NotifyMonitors(reason, close);
}
}

Expand Down Expand Up @@ -313,15 +311,13 @@ private void OnCleanup(object state)

/// <summary>
/// Closes the channel and releases resources.
/// Sets state to Closed and notifies monitors.
/// </summary>
protected void ChannelClosed()
{
try
{
if (Socket != null)
{
Socket.Close();
}
Socket?.Close();
}
finally
{
Expand All @@ -333,6 +329,23 @@ protected void ChannelClosed()
}
}

/// <summary>
/// Closes the channel and releases resources.
/// Sets state to Faulted.
/// </summary>
protected void ChannelFaulted()
{
try
{
Socket?.Close();
}
finally
{
State = TcpChannelState.Faulted;
m_listener.ChannelClosed(ChannelId);
}
}

/// <summary>
/// Sends an error message over the socket.
/// </summary>
Expand Down
134 changes: 67 additions & 67 deletions Stack/Opc.Ua.Core/Stack/Tcp/TcpTransportListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void ChannelClosed(uint channelId)
}
else
{
Utils.LogInfo("ChannelId {0}: closed channel not found", channelId);
Utils.LogInfo("ChannelId {0}: closed, but channel was not found", channelId);
}
}

Expand Down Expand Up @@ -526,63 +526,72 @@ private void OnAccept(object sender, SocketAsyncEventArgs e)
return;
}

int channelCount = m_channels?.Count ?? 0;
bool serveChannel = !(m_maxChannelCount > 0 && m_maxChannelCount < channelCount);
if (!serveChannel)
var channels = m_channels;
if (channels != null)
{
Utils.LogError("OnAccept: Maximum number of channels {0} reached, serving channels is stopped until number is lower or equal than {1} ",
channelCount, m_maxChannelCount);
Utils.SilentDispose(e.AcceptSocket);
}
// TODO: .Count is flagged as hotpath, implement separate counter
int channelCount = channels.Count;
bool serveChannel = !(m_maxChannelCount > 0 && m_maxChannelCount < channelCount);
if (!serveChannel)
{
Utils.LogError("OnAccept: Maximum number of channels {0} reached, serving channels is stopped until number is lower or equal than {1} ",
channelCount, m_maxChannelCount);
Utils.SilentDispose(e.AcceptSocket);
}

// check if the accept socket has been created.
if (serveChannel && e.AcceptSocket != null && e.SocketError == SocketError.Success && m_channels != null)
{
try
// check if the accept socket has been created.
if (serveChannel && e.AcceptSocket != null && e.SocketError == SocketError.Success)
{
if (m_reverseConnectListener)
try
{
// create the channel to manage incoming reverse connections.
channel = new TcpReverseConnectChannel(
m_listenerId,
this,
m_bufferManager,
m_quotas,
m_descriptions);
if (m_reverseConnectListener)
{
// create the channel to manage incoming reverse connections.
channel = new TcpReverseConnectChannel(
m_listenerId,
this,
m_bufferManager,
m_quotas,
m_descriptions);
}
else
{
// create the channel to manage incoming connections.
channel = new TcpServerChannel(
m_listenerId,
this,
m_bufferManager,
m_quotas,
m_serverCertificate,
m_serverCertificateChain,
m_descriptions);
}

if (m_callback != null)
{
channel.SetRequestReceivedCallback(new TcpChannelRequestEventHandler(OnRequestReceived));
channel.SetReportOpenSecureChannelAuditCallback(new ReportAuditOpenSecureChannelEventHandler(OnReportAuditOpenSecureChannelEvent));
channel.SetReportCloseSecureChannelAuditCallback(new ReportAuditCloseSecureChannelEventHandler(OnReportAuditCloseSecureChannelEvent));
channel.SetReportCertificateAuditCallback(new ReportAuditCertificateEventHandler(OnReportAuditCertificateEvent));
}

uint channelId;
do
{
// get channel id
channelId = GetNextChannelId();

// save the channel for shutdown and reconnects.
// retry to get a channel id if it is already in use.
} while (!channels.TryAdd(channelId, channel));

// start accepting messages on the channel.
channel.Attach(channelId, e.AcceptSocket);
}
else
catch (Exception ex)
{
// create the channel to manage incoming connections.
channel = new TcpServerChannel(
m_listenerId,
this,
m_bufferManager,
m_quotas,
m_serverCertificate,
m_serverCertificateChain,
m_descriptions);
Utils.LogError(ex, "Unexpected error accepting a new connection.");
}

if (m_callback != null)
{
channel.SetRequestReceivedCallback(new TcpChannelRequestEventHandler(OnRequestReceived));
channel.SetReportOpenSecureChannelAuditCallback(new ReportAuditOpenSecureChannelEventHandler(OnReportAuditOpenSecureChannelEvent));
channel.SetReportCloseSecureChannelAuditCallback(new ReportAuditCloseSecureChannelEventHandler(OnReportAuditCloseSecureChannelEvent));
channel.SetReportCertificateAuditCallback(new ReportAuditCertificateEventHandler(OnReportAuditCertificateEvent));
}

// get channel id
uint channelId = GetNextChannelId();

// start accepting messages on the channel.
channel.Attach(channelId, e.AcceptSocket);

// save the channel for shutdown and reconnects.
m_channels.TryAdd(channelId, channel);
}
catch (Exception ex)
{
Utils.LogError(ex, "Unexpected error accepting a new connection.");
}
}

Expand Down Expand Up @@ -616,18 +625,19 @@ private void OnAccept(object sender, SocketAsyncEventArgs e)
/// <param name="state"></param>
private void DetectInactiveChannels(object state = null)
{
List<TcpListenerChannel> channels;
var channels = new List<TcpListenerChannel>();

channels = new List<TcpListenerChannel>();
bool cleanup = false;
foreach (var chEntry in m_channels)
{
if (chEntry.Value.ElapsedSinceLastActiveTime > m_quotas.ChannelLifetime)
{
channels.Add(chEntry.Value);
cleanup = true;
}
}

if (channels.Count > 0)
if (cleanup)
{
Utils.LogInfo("TCPLISTENER: {0} channels scheduled for IdleCleanup.", channels.Count);
foreach (var channel in channels)
Expand Down Expand Up @@ -748,20 +758,10 @@ private void OnProcessRequestComplete(IAsyncResult result)
/// </summary>
private uint GetNextChannelId()
{
lock (m_lock)
{
do
{
uint nextChannelId = ++m_lastChannelId;
if (nextChannelId != 0 && m_channels?.ContainsKey(nextChannelId) != true)
{
return nextChannelId;
}
} while (true);
}
// wraps at Int32.MaxValue back to 1
return (uint)Utils.IncrementIdentifier(ref m_lastChannelId);
}


/// <summary>
/// Sets the URI for the listener.
/// </summary>
Expand Down Expand Up @@ -806,7 +806,7 @@ private void SetUri(Uri baseAddress, string relativeAddress)
private ChannelQuotas m_quotas;
private X509Certificate2 m_serverCertificate;
private X509Certificate2Collection m_serverCertificateChain;
private uint m_lastChannelId;
private int m_lastChannelId;
private Socket m_listeningSocket;
private Socket m_listeningSocketIPv6;
private ConcurrentDictionary<uint, TcpListenerChannel> m_channels;
Expand Down

0 comments on commit 41363bd

Please sign in to comment.