Skip to content

Commit

Permalink
refactor: use two, linked buffer blocks instead of a single buffer block
Browse files Browse the repository at this point in the history
  • Loading branch information
the-avid-engineer committed Oct 21, 2024
1 parent e9eeece commit 4ea3d5e
Showing 1 changed file with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ namespace EntityDb.Common.Transactions.Subscribers.ProcessorQueues;
internal class BufferBlockTransactionQueue<TTransactionProcessor> : BackgroundService, ITransactionProcessorQueue<TTransactionProcessor>
where TTransactionProcessor : ITransactionProcessor
{
private readonly BufferBlock<ITransaction> _transactionQueue = new();
private readonly BufferBlock<ITransaction> _foregroundQueue = new();
private readonly BufferBlock<ITransaction> _backgroundQueue = new();
private readonly IDisposable _link;
private readonly ILogger<BufferBlockTransactionQueue<TTransactionProcessor>> _logger;
private readonly TTransactionProcessor _transactionProcessor;

public BufferBlockTransactionQueue(ILogger<BufferBlockTransactionQueue<TTransactionProcessor>> logger, TTransactionProcessor transactionProcessor)
{
_link = _foregroundQueue.LinkTo(_backgroundQueue);

_logger = logger;
_transactionProcessor = transactionProcessor;
}
Expand All @@ -25,18 +29,18 @@ public void Enqueue(ITransaction transaction)
{
_logger.LogInformation("Enqueueing Transaction {TransactionId} to Transaction Queue.", transaction.Id.Value);

var enqueued = _transactionQueue.Post(transaction);
var enqueued = _foregroundQueue.Post(transaction);

_logger.LogInformation("{Enqueued} Transaction {TransactionId} to Transaction Queue.", enqueued, transaction.Id.Value);
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (await _transactionQueue.OutputAvailableAsync(stoppingToken))
while (await _backgroundQueue.OutputAvailableAsync(stoppingToken))
{
try
{
var transaction = await _transactionQueue.ReceiveAsync(stoppingToken);
var transaction = await _backgroundQueue.ReceiveAsync(stoppingToken);

await _transactionProcessor.ProcessTransaction(transaction, stoppingToken);
}
Expand All @@ -46,4 +50,11 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
}
}
}

public override void Dispose()
{
base.Dispose();

_link.Dispose();
}
}

0 comments on commit 4ea3d5e

Please sign in to comment.