diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/TcpListenerChannel.cs b/Stack/Opc.Ua.Core/Stack/Tcp/TcpListenerChannel.cs index f00813f7f..763b71f1a 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/TcpListenerChannel.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/TcpListenerChannel.cs @@ -233,10 +233,10 @@ protected void ForceChannelFault(ServiceResult reason) } bool close = false; - if (State != TcpChannelState.Connecting) + if (State != TcpChannelState.Connecting && State != TcpChannelState.Opening) { - int socketHandle = (Socket != null) ? Socket.Handle : 0; - if (socketHandle != -1) + int? socketHandle = Socket?.Handle; + if (socketHandle != null && socketHandle != -1) { Utils.LogError( "{0} ForceChannelFault Socket={1:X8}, ChannelId={2}, TokenId={3}, Reason={4}", @@ -249,7 +249,7 @@ protected void ForceChannelFault(ServiceResult reason) } else { - // Close immediately if the client never got out of connecting state + // Close immediately if the client never got out of connecting or opening state close = true; } @@ -268,13 +268,11 @@ protected void ForceChannelFault(ServiceResult reason) if (close) { // close channel immediately. - ChannelClosed(); - } - else - { - // notify any monitors. - NotifyMonitors(reason, false); + ChannelFaulted(); } + + // notify any monitors. + NotifyMonitors(reason, close); } } @@ -313,15 +311,13 @@ private void OnCleanup(object state) /// /// Closes the channel and releases resources. + /// Sets state to Closed and notifies monitors. /// protected void ChannelClosed() { try { - if (Socket != null) - { - Socket.Close(); - } + Socket?.Close(); } finally { @@ -333,6 +329,23 @@ protected void ChannelClosed() } } + /// + /// Closes the channel and releases resources. + /// Sets state to Faulted. + /// + protected void ChannelFaulted() + { + try + { + Socket?.Close(); + } + finally + { + State = TcpChannelState.Faulted; + m_listener.ChannelClosed(ChannelId); + } + } + /// /// Sends an error message over the socket. /// diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/TcpTransportListener.cs b/Stack/Opc.Ua.Core/Stack/Tcp/TcpTransportListener.cs index ea7711512..bb0d93bf7 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/TcpTransportListener.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/TcpTransportListener.cs @@ -247,7 +247,7 @@ public void ChannelClosed(uint channelId) } else { - Utils.LogInfo("ChannelId {0}: closed channel not found", channelId); + Utils.LogInfo("ChannelId {0}: closed, but channel was not found", channelId); } } @@ -526,63 +526,72 @@ private void OnAccept(object sender, SocketAsyncEventArgs e) return; } - int channelCount = m_channels?.Count ?? 0; - bool serveChannel = !(m_maxChannelCount > 0 && m_maxChannelCount < channelCount); - if (!serveChannel) + var channels = m_channels; + if (channels != null) { - Utils.LogError("OnAccept: Maximum number of channels {0} reached, serving channels is stopped until number is lower or equal than {1} ", - channelCount, m_maxChannelCount); - Utils.SilentDispose(e.AcceptSocket); - } + // TODO: .Count is flagged as hotpath, implement separate counter + int channelCount = channels.Count; + bool serveChannel = !(m_maxChannelCount > 0 && m_maxChannelCount < channelCount); + if (!serveChannel) + { + Utils.LogError("OnAccept: Maximum number of channels {0} reached, serving channels is stopped until number is lower or equal than {1} ", + channelCount, m_maxChannelCount); + Utils.SilentDispose(e.AcceptSocket); + } - // check if the accept socket has been created. - if (serveChannel && e.AcceptSocket != null && e.SocketError == SocketError.Success && m_channels != null) - { - try + // check if the accept socket has been created. + if (serveChannel && e.AcceptSocket != null && e.SocketError == SocketError.Success) { - if (m_reverseConnectListener) + try { - // create the channel to manage incoming reverse connections. - channel = new TcpReverseConnectChannel( - m_listenerId, - this, - m_bufferManager, - m_quotas, - m_descriptions); + if (m_reverseConnectListener) + { + // create the channel to manage incoming reverse connections. + channel = new TcpReverseConnectChannel( + m_listenerId, + this, + m_bufferManager, + m_quotas, + m_descriptions); + } + else + { + // create the channel to manage incoming connections. + channel = new TcpServerChannel( + m_listenerId, + this, + m_bufferManager, + m_quotas, + m_serverCertificate, + m_serverCertificateChain, + m_descriptions); + } + + if (m_callback != null) + { + channel.SetRequestReceivedCallback(new TcpChannelRequestEventHandler(OnRequestReceived)); + channel.SetReportOpenSecureChannelAuditCallback(new ReportAuditOpenSecureChannelEventHandler(OnReportAuditOpenSecureChannelEvent)); + channel.SetReportCloseSecureChannelAuditCallback(new ReportAuditCloseSecureChannelEventHandler(OnReportAuditCloseSecureChannelEvent)); + channel.SetReportCertificateAuditCallback(new ReportAuditCertificateEventHandler(OnReportAuditCertificateEvent)); + } + + uint channelId; + do + { + // get channel id + channelId = GetNextChannelId(); + + // save the channel for shutdown and reconnects. + // retry to get a channel id if it is already in use. + } while (!channels.TryAdd(channelId, channel)); + + // start accepting messages on the channel. + channel.Attach(channelId, e.AcceptSocket); } - else + catch (Exception ex) { - // create the channel to manage incoming connections. - channel = new TcpServerChannel( - m_listenerId, - this, - m_bufferManager, - m_quotas, - m_serverCertificate, - m_serverCertificateChain, - m_descriptions); + Utils.LogError(ex, "Unexpected error accepting a new connection."); } - - if (m_callback != null) - { - channel.SetRequestReceivedCallback(new TcpChannelRequestEventHandler(OnRequestReceived)); - channel.SetReportOpenSecureChannelAuditCallback(new ReportAuditOpenSecureChannelEventHandler(OnReportAuditOpenSecureChannelEvent)); - channel.SetReportCloseSecureChannelAuditCallback(new ReportAuditCloseSecureChannelEventHandler(OnReportAuditCloseSecureChannelEvent)); - channel.SetReportCertificateAuditCallback(new ReportAuditCertificateEventHandler(OnReportAuditCertificateEvent)); - } - - // get channel id - uint channelId = GetNextChannelId(); - - // start accepting messages on the channel. - channel.Attach(channelId, e.AcceptSocket); - - // save the channel for shutdown and reconnects. - m_channels.TryAdd(channelId, channel); - } - catch (Exception ex) - { - Utils.LogError(ex, "Unexpected error accepting a new connection."); } } @@ -616,18 +625,19 @@ private void OnAccept(object sender, SocketAsyncEventArgs e) /// private void DetectInactiveChannels(object state = null) { - List channels; + var channels = new List(); - channels = new List(); + bool cleanup = false; foreach (var chEntry in m_channels) { if (chEntry.Value.ElapsedSinceLastActiveTime > m_quotas.ChannelLifetime) { channels.Add(chEntry.Value); + cleanup = true; } } - if (channels.Count > 0) + if (cleanup) { Utils.LogInfo("TCPLISTENER: {0} channels scheduled for IdleCleanup.", channels.Count); foreach (var channel in channels) @@ -748,20 +758,10 @@ private void OnProcessRequestComplete(IAsyncResult result) /// private uint GetNextChannelId() { - lock (m_lock) - { - do - { - uint nextChannelId = ++m_lastChannelId; - if (nextChannelId != 0 && m_channels?.ContainsKey(nextChannelId) != true) - { - return nextChannelId; - } - } while (true); - } + // wraps at Int32.MaxValue back to 1 + return (uint)Utils.IncrementIdentifier(ref m_lastChannelId); } - /// /// Sets the URI for the listener. /// @@ -806,7 +806,7 @@ private void SetUri(Uri baseAddress, string relativeAddress) private ChannelQuotas m_quotas; private X509Certificate2 m_serverCertificate; private X509Certificate2Collection m_serverCertificateChain; - private uint m_lastChannelId; + private int m_lastChannelId; private Socket m_listeningSocket; private Socket m_listeningSocketIPv6; private ConcurrentDictionary m_channels;