diff --git a/osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs b/osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs index 6522e98..71a4381 100644 --- a/osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs +++ b/osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs @@ -209,6 +209,7 @@ public void MultipleErrorsAttachedToCorrectItems() processor.Error += (exception, item) => { Assert.NotNull(exception); + Assert.Equal(exception, item.Exception); gotCorrectExceptionForItem1 |= Equals(item.Data, obj1.Data) && exception.Message == "1"; gotCorrectExceptionForItem2 |= Equals(item.Data, obj2.Data) && exception.Message == "2"; diff --git a/osu.Server.QueueProcessor.Tests/TestBatchProcessor.cs b/osu.Server.QueueProcessor.Tests/TestBatchProcessor.cs index 63d5cc9..db9c80b 100644 --- a/osu.Server.QueueProcessor.Tests/TestBatchProcessor.cs +++ b/osu.Server.QueueProcessor.Tests/TestBatchProcessor.cs @@ -21,7 +21,14 @@ protected override void ProcessResults(IEnumerable items) { foreach (var item in items) { - Received?.Invoke(item); + try + { + Received?.Invoke(item); + } + catch (Exception e) + { + item.Exception = e; + } } } diff --git a/osu.Server.QueueProcessor/QueueItem.cs b/osu.Server.QueueProcessor/QueueItem.cs index 9e622fb..823a28e 100644 --- a/osu.Server.QueueProcessor/QueueItem.cs +++ b/osu.Server.QueueProcessor/QueueItem.cs @@ -12,11 +12,21 @@ namespace osu.Server.QueueProcessor [Serializable] public abstract class QueueItem { + [IgnoreDataMember] + private bool failed; + /// /// Set to true to mark this item is failed. This will cause it to be retried. /// [IgnoreDataMember] - public bool Failed { get; set; } + public bool Failed + { + get => failed || Exception != null; + set => failed = value; + } + + [IgnoreDataMember] + public Exception? Exception { get; set; } /// /// The number of times processing this item has been retried. Handled internally by . diff --git a/osu.Server.QueueProcessor/QueueProcessor.cs b/osu.Server.QueueProcessor/QueueProcessor.cs index 8fd2df0..0136590 100644 --- a/osu.Server.QueueProcessor/QueueProcessor.cs +++ b/osu.Server.QueueProcessor/QueueProcessor.cs @@ -132,11 +132,11 @@ public void Run(CancellationToken cancellation = default) // individual processing should not be cancelled as we have already grabbed from the queue. Task.Factory.StartNew(() => { ProcessResults(items); }, CancellationToken.None, TaskCreationOptions.HideScheduler, threadPool) - .ContinueWith(t => + .ContinueWith(_ => { foreach (var item in items) { - if (t.Exception != null || item.Failed) + if (item.Failed) { Interlocked.Increment(ref totalErrors); @@ -145,12 +145,12 @@ public void Run(CancellationToken cancellation = default) Interlocked.Increment(ref consecutiveErrors); - Error?.Invoke(t.Exception, item); + Error?.Invoke(item.Exception, item); - if (t.Exception != null) - SentrySdk.CaptureException(t.Exception); + if (item.Exception != null) + SentrySdk.CaptureException(item.Exception); - Console.WriteLine($"Error processing {item}: {t.Exception}"); + Console.WriteLine($"Error processing {item}: {item.Exception}"); attemptRetry(item); } else @@ -197,8 +197,6 @@ private void setupSentry(SentryOptions options) private void attemptRetry(T item) { - item.Failed = false; - if (item.TotalRetries++ < config.MaxRetries) { Console.WriteLine($"Re-queueing for attempt {item.TotalRetries} / {config.MaxRetries}"); @@ -274,11 +272,26 @@ protected virtual void ProcessResult(T item) /// /// Implement to process batches of items from the queue. /// + /// + /// In most cases, you should only need to override and implement . + /// Only override this if you need more efficient batch processing. + /// + /// If overriding this method, you should try-catch for exceptions, and set any exception against + /// the relevant . If this is not done, failures will not be handled correctly. /// The items to process. protected virtual void ProcessResults(IEnumerable items) { foreach (var item in items) - ProcessResult(item); + { + try + { + ProcessResult(item); + } + catch (Exception e) + { + item.Exception = e; + } + } } } }