Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deferred messages disappear (not handled) #1171

Open
muhammetsahin opened this issue Aug 2, 2024 · 2 comments
Open

Deferred messages disappear (not handled) #1171

muhammetsahin opened this issue Aug 2, 2024 · 2 comments

Comments

@muhammetsahin
Copy link

muhammetsahin commented Aug 2, 2024

Hi there,

I've noticed some cases where snoozed messages disappear.

Here is my startup.cs

 string connectionString = config.GetSection("DatabaseSettings").GetValue<string>("ConnectionString");
        services.AddRebus(rebus =>
            rebus.Routing(r => r.TypeBased().MapAssemblyOf<ApplicationAssembly>("transfer-queue"))
                .Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "transfer-queue"))
                .Sagas(s =>
                    s.StoreInPostgres(connectionString,
                        "sagas",
                        "saga_indexes", schemaName: "saga"))
                .Options(o =>
                {
                    o.SetNumberOfWorkers(1);
                    o.SetMaxParallelism(50);
                    o.RetryStrategy(maxDeliveryAttempts: 0);
                    o.LogPipeline(verbose:true);
                })
                .Timeouts(t => t.StoreInPostgres(connectionString, "saga_mq_timeouts"))
                .Logging(l =>
                {
                    l.ColoredConsole(Rebus.Logging.LogLevel.Info);
                }));

        services.AutoRegisterHandlersFromAssemblyOf<ApplicationAssembly>();
    And my saga class is:
public class MoneyTransferSaga : Saga<MoneyTransferSagaData>,
    IAmInitiatedBy<StartMoneyTransferEvent>,
    IHandleMessages<ApproveMoneyTransferEvent>,
    IHandleMessages<ValidateMoneyTransferEvent>,
    IHandleMessages<CheckBalanceForMoneyTransferEvent>,
    IHandleMessages<SendMoneyTransferToTheBankEvent>,
    IHandleMessages<CheckStatusMoneyTransferFromBankEvent>,
    IHandleMessages<CompleteMoneyTransferEvent>
{
    private readonly IBus _bus;
    private readonly IMessageContext _messageContext;
    private readonly IServiceProvider _serviceProvider;
    private readonly ILogger<MoneyTransferSaga> _logger;

    public MoneyTransferSaga(IBus bus,
        IMessageContext messageContext,
        IServiceProvider serviceProvider,
        ILogger<MoneyTransferSaga> logger)
    {
        _bus = bus;
        _messageContext = messageContext;
        _serviceProvider = serviceProvider;
        _logger = logger;
    }

    protected override void CorrelateMessages(ICorrelationConfig<MoneyTransferSagaData> config)
    {
        config.Correlate<StartMoneyTransferEvent>(m => m.DocumentId, s => s.DocumentId);
        config.Correlate<ApproveMoneyTransferEvent>(m => m.DocumentId, s => s.DocumentId);
        config.Correlate<ValidateMoneyTransferEvent>(m => m.DocumentId, s => s.DocumentId);
        config.Correlate<CheckBalanceForMoneyTransferEvent>(m => m.DocumentId, s => s.DocumentId);
        config.Correlate<SendMoneyTransferToTheBankEvent>(m => m.DocumentId, s => s.DocumentId);
        config.Correlate<CheckStatusMoneyTransferFromBankEvent>(m => m.DocumentId, s => s.DocumentId);
        config.Correlate<CompleteMoneyTransferEvent>(m => m.DocumentId, s => s.DocumentId);
    }

    public async Task Handle(StartMoneyTransferEvent message)
    {
        if (!IsNew)
        {
            return;
        }

        _messageContext.Headers.TryGetValue("X-UserId", out string userId);
        _messageContext.Headers.TryGetValue("X-TenantId", out string tenantId);

        await _bus.Send(new ApproveMoneyTransferEvent(message.DocumentId, userId, tenantId));
    }

    public async Task Handle(ApproveMoneyTransferEvent message)
    {
        Data.IsApprovedMoneyTransfer = true;

        var tenantId = message.TenantId;
        var userId = message.UserId;

        using var scope = _serviceProvider.CreateScope();

        scope.ServiceProvider.GetRequiredService<ITenantResolver>().SetTenantInfo(tenantId, null);
        scope.ServiceProvider.GetRequiredService<ICurrentUserInitializer>().SetCurrentUserId(userId);
        var dbContext = scope.ServiceProvider.GetRequiredService<IApplicationDbContext>();

        var document = dbContext.Documents.Find(message.DocumentId);
        if (document is null)
        {
            MarkAsComplete();
            return;
        }

       

        await _bus.Send(new ValidateMoneyTransferEvent(document.Id, userId, tenantId));
    }

    public async Task Handle(ValidateMoneyTransferEvent message)
    {
        Data.IsValidatedMoneyTransfer = true;

        var tenantId = message.TenantId;
        var userId = message.UserId;

        using var scope = _serviceProvider.CreateScope();

        scope.ServiceProvider.GetRequiredService<ITenantResolver>().SetTenantInfo(tenantId, null);
        scope.ServiceProvider.GetRequiredService<ICurrentUserInitializer>().SetCurrentUserId(userId);
        var dbContext = scope.ServiceProvider.GetRequiredService<IApplicationDbContext>();

        var document = dbContext.Documents.Find(message.DocumentId);
        if (document is null)
        {
            MarkAsComplete();
            return;
        }

        if (document.ActionType != RuleActionType.Accept)
        {
            MarkAsComplete();
            return;
        }

        await _bus.Send(new CheckBalanceForMoneyTransferEvent(message.DocumentId, message.UserId, message.TenantId));
    }

    public async Task Handle(CheckBalanceForMoneyTransferEvent message)
    {
        Data.IsCheckedBalanceForMoneyTransfer = true;
        await _bus.Send(new SendMoneyTransferToTheBankEvent(message.DocumentId, message.UserId, message.TenantId));
    }


    public async Task Handle(SendMoneyTransferToTheBankEvent message)
    {
        Data.IsSentMoneyTransferToTheBank = true;

        var tenantId = message.TenantId;
        var userId = message.UserId;

        using var scope = _serviceProvider.CreateScope();

        scope.ServiceProvider.GetRequiredService<ITenantResolver>().SetTenantInfo(tenantId, null);
        scope.ServiceProvider.GetRequiredService<ICurrentUserInitializer>().SetCurrentUserId(userId);
        var dbContext = scope.ServiceProvider.GetRequiredService<IApplicationDbContext>();
        var moneyTransferWorkflow = scope.ServiceProvider.GetRequiredService<IMoneyTransferWorkflow>();
        var moneyTransferTransactionRepository =
            scope.ServiceProvider.GetRequiredService<IMoneyTransferTransactionRepository>();

        var document = dbContext.Documents.Find(message.DocumentId);
        if (document is null)
        {
            MarkAsComplete();
            return;
        }

        var moneyTransferRequest = document.MoneyTransferRequest;

        var moneyTransferResult = await moneyTransferWorkflow.ProcessAsync(moneyTransferRequest, tenantId);

        moneyTransferTransactionRepository.UpdateMoneyTransaction(moneyTransferRequest, moneyTransferResult);

      
            document.MoneyTransferRequest.FromIban = moneyTransferResult.FromIban;
            document.MoneyTransferRequest.FromAccountNo = moneyTransferResult.FromAccountNo;
            document.MoneyTransferRequest.DueDate = moneyTransferResult.DueDate;
            dbContext.Documents.Update(document);

            _ = await dbContext.SaveChangesAsync(CancellationToken.None);

            await _bus.Defer(TimeSpan.FromSeconds(30),
                new CheckStatusMoneyTransferFromBankEvent(message.DocumentId, moneyTransferResult, userId, tenantId));
     
    }

    public async Task Handle(CheckStatusMoneyTransferFromBankEvent message)
    {
        Data.IsCheckedStatusMoneyTransferFromBank = true;

        var tenantId = message.TenantId;
        var userId = message.UserId;

        using var scope = _serviceProvider.CreateScope();

        scope.ServiceProvider.GetRequiredService<ITenantResolver>().SetTenantInfo(tenantId, null);
        scope.ServiceProvider.GetRequiredService<ICurrentUserInitializer>().SetCurrentUserId(userId);
        var dbContext = scope.ServiceProvider.GetRequiredService<IApplicationDbContext>();
        var moneyTransferContext = scope.ServiceProvider.GetRequiredService<IMoneyTransferContext>();
        var moneyTransferTransactionRepository =
            scope.ServiceProvider.GetRequiredService<IMoneyTransferTransactionRepository>();

        var document = dbContext.Documents.Find(message.DocumentId);
        if (document is null)
        {
            MarkAsComplete();
            return;
        }

      
        var moneyTransferRequest = document.MoneyTransferRequest;
        var moneyTransferResult = message.MoneyTransferResult;


        var result = await moneyTransferContext.CheckAsync(document.TenantId, moneyTransferRequest);
        
    
        
        moneyTransferTransactionRepository.SetStatusAndTransactionEvent(result, moneyTransferRequest.ExtTrackingId);
        
 
           
          _bus.Send(new CompleteMoneyTransferEvent(message.DocumentId, moneyTransferRequest, moneyTransferResult,
                userId, tenantId));
        
    }

    public async Task Handle(CompleteMoneyTransferEvent message)
    {
        Data.IsCompletedMoneyTransfer = true;
        //Some logic code is here
        .........

        MarkAsComplete();
    }

 
}

In some cases, I see that the message triggered by the Defer method is not handled.
What could be the reason for this?

Ekran Resmi 2024-08-02 15 56 00

I also looked at the other issue opened the same issue, but I could not solve the problem.

#838

Thank you for help.

@mookid8000
Copy link
Member

Could it be caused by the fact that you're using the in-mem transport, which isn't durable?

@mookid8000
Copy link
Member

Hi @muhammetsahin , did you figure something out?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants