Skip to content

Commit

Permalink
prepare IEventFD use in mormot.net.async
Browse files Browse the repository at this point in the history
- disabled by now until further testing
- current numbers are not better than our own algorithm
  • Loading branch information
Arnaud Bouchez committed Feb 27, 2023
1 parent d598ebf commit a0e916a
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/mormot.commit.inc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
'2.0.4973'
'2.0.4974'
89 changes: 61 additions & 28 deletions src/net/mormot.net.async.pas
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ TAsyncConnectionsThread = class(TSynThread)
fOwner: TAsyncConnections;
fProcess: TAsyncConnectionsThreadProcess;
fWaitForReadPending: boolean;
fWakeUpFromSlowProcess: boolean;
fExecuteState: THttpServerExecuteState;
fIndex: integer;
fEvent: TSynEvent;
Expand All @@ -425,6 +426,7 @@ TAsyncConnectionsThread = class(TSynThread)
fThreadPollingLastWakeUpCount: integer;
fCustomObject: TObject;
procedure Execute; override;
function GetNextRead(out notif: TPollSocketResult): boolean;
public
/// initialize the thread
constructor Create(aOwner: TAsyncConnections;
Expand Down Expand Up @@ -528,6 +530,7 @@ TAsyncConnections = class(TNotifiedThread)
fThreadPollingWakeupSafe: TLightLock;
fThreadPollingWakeupLoad: integer;
fThreadPollingLastWakeUpTix: integer;
fThreadPollingEventFD: IEventFD;
fGC: array[1..2] of TAsyncConnectionsGC;
function AllThreadsStarted: boolean; virtual;
procedure AddGC(aConnection: TPollAsyncConnection);
Expand Down Expand Up @@ -1767,7 +1770,6 @@ destructor TAsyncConnectionsThread.Destroy;
procedure TAsyncConnectionsThread.Execute;
var
new, pending, ms: integer;
didwakeupduetoslowprocess: boolean;
start: Int64;
notif: TPollSocketResult;
begin
Expand Down Expand Up @@ -1841,7 +1843,10 @@ procedure TAsyncConnectionsThread.Execute;
//fOwner.fClients.fRead.PendingLogDebug('Wakeup');
fEvent.ResetEvent;
fWaitForReadPending := true; // should be set before wakeup
fOwner.ThreadPollingWakeup(pending);
if Assigned(fOwner.fThreadPollingEventFD) then
fOwner.fThreadPollingEventFD.SetEvent(new)
else
fOwner.ThreadPollingWakeup(pending);
//fOwner.DoLog(sllCustom1, 'Execute: WaitFor ReadPending', [], self);
if not Terminated then
fEvent.WaitFor(20);
Expand All @@ -1851,37 +1856,36 @@ procedure TAsyncConnectionsThread.Execute;
end;
end;
atpReadPending:
// secondary threads wait, then read and process pending events
begin
// secondary threads wait, then read and process pending events
fWaitForReadPending := true;
fEvent.WaitForEver;
if Terminated then
break;
didwakeupduetoslowprocess := false;
//{$I-}system.writeln(Name,' start loop ',fThreadPollingLastWakeUpCount);
while fOwner.fClients.fRead.GetOnePending(notif, fName) and
not Terminated do
if Assigned(fOwner.fThreadPollingEventFD) then
begin
// Linux/eventfd algorithm: let the kernel awake the thread
if fOwner.fThreadPollingEventFD.WaitFor(5000) then
while fOwner.fThreadPollingEventFD.GetNext and
fOwner.fClients.fRead.GetOnePending(notif, fName) and
not Terminated do
fOwner.fClients.ProcessRead(self, notif);
end
else
begin
if (acoThreadSmooting in fOwner.Options) and
(fThreadPollingLastWakeUpTix <> fOwner.fThreadPollingLastWakeUpTix) and
not didwakeupduetoslowprocess then
// regular algorithm: the threads are waken using SetEvent
fWaitForReadPending := true;
fEvent.WaitForEver;
if Terminated then
break;
//{$I-}system.writeln(Name,' start loop ',fThreadPollingLastWakeUpCount);
fWakeUpFromSlowProcess := false;
while GetNextRead(notif) do
fOwner.fClients.ProcessRead(self, notif);
if acoThreadSmooting in fOwner.Options then
begin
// ProcessRead() did take some time: wake up another thread
// - slow down a little bit the wrk RPS
// - but seems to reduce the wrk max latency
didwakeupduetoslowprocess := true; // do it once per loop
//{$I-}system.writeln(Name,' didwakeupduetoslowprocess');
fOwner.ThreadPollingWakeup(1); // one thread is enough
fOwner.fThreadPollingWakeupSafe.Lock;
fThreadPollingLastWakeUpTix := 0; // this thread will now need to wakeup
fOwner.fThreadPollingWakeupSafe.UnLock;
end;
fOwner.fClients.ProcessRead(self, notif);
end;
if acoThreadSmooting in fOwner.Options then
begin
fOwner.fThreadPollingWakeupSafe.Lock;
fThreadPollingLastWakeUpTix := 0; // this thread will now need to wakeup
fOwner.fThreadPollingWakeupSafe.UnLock;
//{$I-}system.writeln(Name,' stop loop ',fThreadPollingLastWakeUpCount);
end;
//{$I-}system.writeln(Name,' stop loop ',fThreadPollingLastWakeUpCount);
// release atpReadPoll lock above
with fOwner.fThreadReadPoll do
if fWaitForReadPending then
Expand All @@ -1906,6 +1910,25 @@ procedure TAsyncConnectionsThread.Execute;
fExecuteState := esFinished;
end;

function TAsyncConnectionsThread.GetNextRead(
out notif: TPollSocketResult): boolean;
begin
result := fOwner.fClients.fRead.GetOnePending(notif, fName) and
not Terminated;
if result then
if (acoThreadSmooting in fOwner.Options) and
(fThreadPollingLastWakeUpTix <> fOwner.fThreadPollingLastWakeUpTix) and
not fWakeUpFromSlowProcess then
begin
// ProcessRead() did take some time: wake up another thread
// - slow down a little bit the wrk RPS
// - but seems to reduce the wrk max latency
fWakeUpFromSlowProcess := true; // do it once per Execute loop
//{$I-}system.writeln(Name,' didwakeupduetoslowprocess');
fOwner.ThreadPollingWakeup(1); // one thread is enough
end;
end;


{ TAsyncConnections }

Expand Down Expand Up @@ -1955,6 +1978,7 @@ constructor TAsyncConnections.Create(const OnStart, OnStop: TOnNotifyThread;
// prepare this main thread: fThreads[] requires proper fOwner.OnStart/OnStop
inherited Create({suspended=}false, OnStart, OnStop, ProcessName);
// initiate the read/receive thread(s)
// fThreadPollingEventFD := NewEventFD; // nil if unsupported
fThreadPoolCount := aThreadPoolCount;
SetLength(fThreads, fThreadPoolCount);
if aThreadPoolCount = 1 then
Expand Down Expand Up @@ -2090,6 +2114,8 @@ procedure TAsyncConnections.Shutdown;
begin
for i := 0 to high(fThreads) do
fThreads[i].Terminate; // set the Terminated flag
if Assigned(fThreadPollingEventFD) then
fThreadPollingEventFD.SetEvent(1000); // release all sub threads now
p := 0;
endtix := mormot.core.os.GetTickCount64 + 10000; // wait up to 10 seconds
repeat
Expand Down Expand Up @@ -2189,6 +2215,13 @@ function TAsyncConnections.ThreadPollingWakeup(Events: PtrInt): PtrInt;
tix: integer; // 32-bit is enough to check for
ndx: array[byte] of byte; // wake up to 256 threads at once
begin
// on Linux, use the Linux Kernel eventfd()
if Assigned(fThreadPollingEventFD) then
begin
fThreadPollingEventFD.SetEvent(Events);
result := Events;
exit;
end;
// simple thread-safe fair round-robin over fThreads[]
if Events > high(ndx) then
Events := high(ndx); // paranoid avoid ndx[] buffer overflow
Expand Down

0 comments on commit a0e916a

Please sign in to comment.