From a0e916a1682e3aabbe5e32dc77c2009736bfd76b Mon Sep 17 00:00:00 2001 From: Arnaud Bouchez Date: Mon, 27 Feb 2023 19:41:22 +0100 Subject: [PATCH] prepare IEventFD use in mormot.net.async - disabled by now until further testing - current numbers are not better than our own algorithm --- src/mormot.commit.inc | 2 +- src/net/mormot.net.async.pas | 89 ++++++++++++++++++++++++------------ 2 files changed, 62 insertions(+), 29 deletions(-) diff --git a/src/mormot.commit.inc b/src/mormot.commit.inc index 26b83ea29..792ebabb9 100644 --- a/src/mormot.commit.inc +++ b/src/mormot.commit.inc @@ -1 +1 @@ -'2.0.4973' +'2.0.4974' diff --git a/src/net/mormot.net.async.pas b/src/net/mormot.net.async.pas index a0fe69394..78166933c 100644 --- a/src/net/mormot.net.async.pas +++ b/src/net/mormot.net.async.pas @@ -417,6 +417,7 @@ TAsyncConnectionsThread = class(TSynThread) fOwner: TAsyncConnections; fProcess: TAsyncConnectionsThreadProcess; fWaitForReadPending: boolean; + fWakeUpFromSlowProcess: boolean; fExecuteState: THttpServerExecuteState; fIndex: integer; fEvent: TSynEvent; @@ -425,6 +426,7 @@ TAsyncConnectionsThread = class(TSynThread) fThreadPollingLastWakeUpCount: integer; fCustomObject: TObject; procedure Execute; override; + function GetNextRead(out notif: TPollSocketResult): boolean; public /// initialize the thread constructor Create(aOwner: TAsyncConnections; @@ -528,6 +530,7 @@ TAsyncConnections = class(TNotifiedThread) fThreadPollingWakeupSafe: TLightLock; fThreadPollingWakeupLoad: integer; fThreadPollingLastWakeUpTix: integer; + fThreadPollingEventFD: IEventFD; fGC: array[1..2] of TAsyncConnectionsGC; function AllThreadsStarted: boolean; virtual; procedure AddGC(aConnection: TPollAsyncConnection); @@ -1767,7 +1770,6 @@ destructor TAsyncConnectionsThread.Destroy; procedure TAsyncConnectionsThread.Execute; var new, pending, ms: integer; - didwakeupduetoslowprocess: boolean; start: Int64; notif: TPollSocketResult; begin @@ -1841,7 +1843,10 @@ procedure TAsyncConnectionsThread.Execute; //fOwner.fClients.fRead.PendingLogDebug('Wakeup'); fEvent.ResetEvent; fWaitForReadPending := true; // should be set before wakeup - fOwner.ThreadPollingWakeup(pending); + if Assigned(fOwner.fThreadPollingEventFD) then + fOwner.fThreadPollingEventFD.SetEvent(new) + else + fOwner.ThreadPollingWakeup(pending); //fOwner.DoLog(sllCustom1, 'Execute: WaitFor ReadPending', [], self); if not Terminated then fEvent.WaitFor(20); @@ -1851,37 +1856,36 @@ procedure TAsyncConnectionsThread.Execute; end; end; atpReadPending: + // secondary threads wait, then read and process pending events begin - // secondary threads wait, then read and process pending events - fWaitForReadPending := true; - fEvent.WaitForEver; - if Terminated then - break; - didwakeupduetoslowprocess := false; - //{$I-}system.writeln(Name,' start loop ',fThreadPollingLastWakeUpCount); - while fOwner.fClients.fRead.GetOnePending(notif, fName) and - not Terminated do + if Assigned(fOwner.fThreadPollingEventFD) then + begin + // Linux/eventfd algorithm: let the kernel awake the thread + if fOwner.fThreadPollingEventFD.WaitFor(5000) then + while fOwner.fThreadPollingEventFD.GetNext and + fOwner.fClients.fRead.GetOnePending(notif, fName) and + not Terminated do + fOwner.fClients.ProcessRead(self, notif); + end + else begin - if (acoThreadSmooting in fOwner.Options) and - (fThreadPollingLastWakeUpTix <> fOwner.fThreadPollingLastWakeUpTix) and - not didwakeupduetoslowprocess then + // regular algorithm: the threads are waken using SetEvent + fWaitForReadPending := true; + fEvent.WaitForEver; + if Terminated then + break; + //{$I-}system.writeln(Name,' start loop ',fThreadPollingLastWakeUpCount); + fWakeUpFromSlowProcess := false; + while GetNextRead(notif) do + fOwner.fClients.ProcessRead(self, notif); + if acoThreadSmooting in fOwner.Options then begin - // ProcessRead() did take some time: wake up another thread - // - slow down a little bit the wrk RPS - // - but seems to reduce the wrk max latency - didwakeupduetoslowprocess := true; // do it once per loop - //{$I-}system.writeln(Name,' didwakeupduetoslowprocess'); - fOwner.ThreadPollingWakeup(1); // one thread is enough + fOwner.fThreadPollingWakeupSafe.Lock; + fThreadPollingLastWakeUpTix := 0; // this thread will now need to wakeup + fOwner.fThreadPollingWakeupSafe.UnLock; end; - fOwner.fClients.ProcessRead(self, notif); - end; - if acoThreadSmooting in fOwner.Options then - begin - fOwner.fThreadPollingWakeupSafe.Lock; - fThreadPollingLastWakeUpTix := 0; // this thread will now need to wakeup - fOwner.fThreadPollingWakeupSafe.UnLock; + //{$I-}system.writeln(Name,' stop loop ',fThreadPollingLastWakeUpCount); end; - //{$I-}system.writeln(Name,' stop loop ',fThreadPollingLastWakeUpCount); // release atpReadPoll lock above with fOwner.fThreadReadPoll do if fWaitForReadPending then @@ -1906,6 +1910,25 @@ procedure TAsyncConnectionsThread.Execute; fExecuteState := esFinished; end; +function TAsyncConnectionsThread.GetNextRead( + out notif: TPollSocketResult): boolean; +begin + result := fOwner.fClients.fRead.GetOnePending(notif, fName) and + not Terminated; + if result then + if (acoThreadSmooting in fOwner.Options) and + (fThreadPollingLastWakeUpTix <> fOwner.fThreadPollingLastWakeUpTix) and + not fWakeUpFromSlowProcess then + begin + // ProcessRead() did take some time: wake up another thread + // - slow down a little bit the wrk RPS + // - but seems to reduce the wrk max latency + fWakeUpFromSlowProcess := true; // do it once per Execute loop + //{$I-}system.writeln(Name,' didwakeupduetoslowprocess'); + fOwner.ThreadPollingWakeup(1); // one thread is enough + end; +end; + { TAsyncConnections } @@ -1955,6 +1978,7 @@ constructor TAsyncConnections.Create(const OnStart, OnStop: TOnNotifyThread; // prepare this main thread: fThreads[] requires proper fOwner.OnStart/OnStop inherited Create({suspended=}false, OnStart, OnStop, ProcessName); // initiate the read/receive thread(s) + // fThreadPollingEventFD := NewEventFD; // nil if unsupported fThreadPoolCount := aThreadPoolCount; SetLength(fThreads, fThreadPoolCount); if aThreadPoolCount = 1 then @@ -2090,6 +2114,8 @@ procedure TAsyncConnections.Shutdown; begin for i := 0 to high(fThreads) do fThreads[i].Terminate; // set the Terminated flag + if Assigned(fThreadPollingEventFD) then + fThreadPollingEventFD.SetEvent(1000); // release all sub threads now p := 0; endtix := mormot.core.os.GetTickCount64 + 10000; // wait up to 10 seconds repeat @@ -2189,6 +2215,13 @@ function TAsyncConnections.ThreadPollingWakeup(Events: PtrInt): PtrInt; tix: integer; // 32-bit is enough to check for ndx: array[byte] of byte; // wake up to 256 threads at once begin + // on Linux, use the Linux Kernel eventfd() + if Assigned(fThreadPollingEventFD) then + begin + fThreadPollingEventFD.SetEvent(Events); + result := Events; + exit; + end; // simple thread-safe fair round-robin over fThreads[] if Events > high(ndx) then Events := high(ndx); // paranoid avoid ndx[] buffer overflow