From e79c9176cc57d4e474950c02287c622342efcfcb Mon Sep 17 00:00:00 2001 From: Hecate2 <2474101468@qq.com> Date: Tue, 14 Feb 2023 17:37:47 +0800 Subject: [PATCH] more reliable websocket --- Fairy.WebSocket.Subscribe.cs | 43 ++++++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/Fairy.WebSocket.Subscribe.cs b/Fairy.WebSocket.Subscribe.cs index 4ea706e..3f12091 100644 --- a/Fairy.WebSocket.Subscribe.cs +++ b/Fairy.WebSocket.Subscribe.cs @@ -1,17 +1,30 @@ using Neo.Json; using Neo.Ledger; using Neo.Network.P2P.Payloads; +using System.Collections.Concurrent; using System.Net.WebSockets; namespace Neo.Plugins { public partial class Fairy : RpcServer { - protected TaskCompletionSource committedBlock = new(); + protected Block committedBlock; + protected ConcurrentDictionary committedBlockSemaphores = new(); protected void RegisterBlockchainEvents() { - Blockchain.Committed += delegate (NeoSystem @system, Block @block) { committedBlock.SetResult(block); }; + Blockchain.Committed += delegate (NeoSystem @system, Block @block) + { + committedBlock = block; + List keysToRemove = new(); + foreach (var item in committedBlockSemaphores) + if (item.Value.State == WebSocketState.Open) + item.Key.Release(); + else + keysToRemove.Add(item.Key); + foreach (SemaphoreSlim key in keysToRemove) + committedBlockSemaphores.TryRemove(key, out _); + }; } [WebsocketControlMethod] @@ -47,18 +60,30 @@ protected virtual Action SubscribeCommittedBlock(WebSocket webSocket, JArray _pa { return async () => { + SemaphoreSlim semaphore = new(1); + committedBlockSemaphores[semaphore] = webSocket; while (true) { - Block block = await committedBlock.Task; - if (webSocket.State == WebSocketState.Open) + try { - await webSocket.SendAsync(block.ToJson(system.Settings).ToByteArray(false), WebSocketMessageType.Text, true, CancellationToken.None); - if (cancellationToken.IsCancellationRequested) - return; + await semaphore.WaitAsync(); + switch (webSocket.State) + { + case WebSocketState.Open: + await webSocket.SendAsync(committedBlock.ToJson(system.Settings).ToByteArray(false), WebSocketMessageType.Text, true, CancellationToken.None); + if (cancellationToken.IsCancellationRequested) + return; + break; + case WebSocketState.Closed: + await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, webSocket.State.ToString(), CancellationToken.None); + webSocket.Dispose(); + return; + default: + break; + } } - else + catch { - await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, webSocket.State.ToString(), CancellationToken.None); webSocket.Dispose(); return; }