.NET 雲原生架構師訓練營(模組二 基礎鞏固 RabbitMQ Masstransit 異常處理)–學習筆記

2.6.8 RabbitMQ — Masstransit 異常處理

  • 異常處理
  • 其他
  • 高級功能

異常處理

  • 異常與重試
  • 重試配置
  • 重試條件
  • 重新投遞資訊
  • 信箱

異常與重試

Exception

public class SubmitOrderConsumer :
    IConsumer<SubmitOrder>
{
    public Task Consume(ConsumeContext<SubmitOrder> context)
    {
        throw new Exception("Very bad things happened");
    }
}

UseMessageRetry

var sessionFactory = CreateSessionFactory();

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host("rabbitmq://localhost/");

    cfg.ReceiveEndpoint("submit-order", e =>
    {
        e.UseMessageRetry(r => r.Immediate(5));

        e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
    });
});

重試配置

// 立即重試:一共連續重試10次
ep.UseMessageRetry(r => r.Immediate(10));

// 間隔重試:一共重試10次,每次間隔10秒
ep.UseMessageRetry(r => r.Interval(10, TimeSpan.FromSeconds(10)));

// 多個間隔重試:5秒後第一次,5+10秒後第二次,5+10+15秒後第三次
ep.UseMessageRetry(r => r.Intervals(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(15)));

// 指數級間隔重試:共10次,每次間隔:當前重試次數 * 60秒
ep.UseMessageRetry(r => r.Exponential(10, TimeSpan.FromSeconds(60), TimeSpan.FromHours(24), TimeSpan.FromSeconds(60)));

// 每次疊加50秒
ep.UseMessageRetry(r => r.Incremental(10, TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(50)));

重試條件

e.UseMessageRetry(r => 
{
    r.Handle<ArgumentNullException>();
    r.Ignore(typeof(InvalidOperationException), typeof(InvalidCastException));
    r.Ignore<ArgumentException>(t => t.ParamName == "orderTotal");
});

過濾某些異常類型不進行重試

重新投遞資訊

cfg.ReceiveEndpoint("submit-order", e =>
{
    e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
    e.UseMessageRetry(r => r.Immediate(5));
    e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});

消息沖隊列移除之後,在一定時間之後重新投入消息隊列。需要配置調度模組(scheduling)

信箱

cfg.ReceiveEndpoint("submit-order", e =>
{
    e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
    e.UseMessageRetry(r => r.Immediate(5));
    e.UseInMemoryOutbox();

    e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});

有些消息是在 consume 方法中發送或發布的,如果在發送之後 consume 中產生了異常,那原來發出去的消息就需要撤回,如果使用信箱之後,在 consume 中要發布/發送的消息就會先暫存在記憶體中直到 consume 方法成功之後才真正發出去

其他

  • Fault
  • Consuming Faults
  • Error Pipe
  • Dead-Letter Pipe

Fault

public interface Fault<T>
    where T : class
{
    Guid FaultId { get; }
    Guid? FaultedMessageId { get; }
    DateTime Timestamp { get; }
    ExceptionInfo[] Exceptions { get; }
    HostInfo Host { get; }
    T Message { get; }
}

Fault 消息在異常的時候會發布出來

Consuming Faults

public class DashboardFaultConsumer :
    IConsumer<Fault<SubmitOrder>>
{
    public async Task Consume(ConsumeContext<Fault<SubmitOrder>> context)
    {
        // update the dashboard
    }
}

Fault 消息也是可以進行訂閱的

Error Pipe

cfg.ReceiveEndpoint("input-queue", ec =>
{
    ec.DiscardFaultedMessages();
});

默認情況下錯誤的消息會被投遞到了 _error 隊列,可以配置直接拋棄錯誤資訊

Dead-Letter Pipe

cfg.ReceiveEndpoint("input-queue", ec =>
{
    ec.DiscardSkippedMessages();
});

死信隊列:沒有消費者的消息會被移到 _skipped 隊列,但可以配置為不移到 _skipped 隊列

高級功能

  • 持久化
  • Saga 事件串
  • 調度
  • Courier 最終一致性
  • 監控

知識共享許可協議

本作品採用知識共享署名-非商業性使用-相同方式共享 4.0 國際許可協議進行許可。

歡迎轉載、使用、重新發布,但務必保留文章署名 鄭子銘 (包含鏈接: //www.cnblogs.com/MingsonZheng/ ),不得用於商業目的,基於本文修改後的作品務必以相同的許可發布。

如有任何疑問,請與我聯繫 ([email protected]) 。