Skip to main content

Dual-write problemi

Bir komut iki yere yazmak ister: (1) iş verisi (PostgreSQL), (2) integration event (RabbitMQ). Bunlar ayrı sistemler olduğu için atomik değildir:
  • DB commit oldu, ardından RabbitMQ publish başarısız → event kayboldu, downstream tutarsız.
  • RabbitMQ publish oldu, ardından DB commit rollback → “olmayan” bir olay yayınlandı.
Çözüm: Event’i RabbitMQ’ya doğrudan yazmak yerine, aynı DB transaction’ında bir outbox tablosuna yaz. DB commit’i ile event kaydı atomik olur. Ayrı bir background servis outbox satırlarını okuyup RabbitMQ’ya teslim eder.

MassTransit EF Core Outbox kurulumu

Outbox, building block içinde değil host projede (DbContext’in olduğu yerde) kurulur — AddMassTransitRabbitMqEventBus bunu configureExtra callback’i ile kabul eder. Application/DependencyInjection.cs:
builder.Services.AddMassTransitRabbitMqEventBus(builder.Configuration, x =>
{
    x.AddEntityFrameworkOutbox<DiyanetCleanArchitectureDbContext>(o =>
    {
        o.UsePostgres();
        o.UseBusOutbox();                                  // publish → outbox_message (RabbitMQ'ya değil)
        o.QueryDelay = TimeSpan.FromSeconds(1);            // outbox tarama aralığı
        o.DuplicateDetectionWindow = TimeSpan.FromMinutes(30);
    });
})
.AddMassTransitSubscription<CacheInvalidationIntegrationEvent, CacheInvalidationIntegrationEventHandler>();
AyarDeğerAnlamı
UsePostgres()Outbox/inbox tablolarını PostgreSQL üzerinde tutar.
UseBusOutbox()IPublishEndpoint.Publish artık RabbitMQ’ya değil, aynı transaction’da outbox_message’a yazar.
QueryDelay1sOutboxDeliveryService her saniye pending satırları tarar ve kuyruğa publish eder.
DuplicateDetectionWindow30 dkInbox tarafında aynı mesaj iki kez gelirse atlanır (consumer idempotency güvencesi).

messaging şeması tabloları

Tablolar iş aggregate’lerinden ayrı, ayrı bir şemada tutulur (MESSAGING_SCHEMA = "messaging"). OnModelCreating’de map edilir (DiyanetCleanArchitectureDbContext.cs):
public const string MESSAGING_SCHEMA = "messaging";

protected override void OnModelCreating(ModelBuilder modelBuilder)
{
    // MassTransit EF Core Outbox + Inbox tabloları — `messaging` şemasında.
    modelBuilder.AddInboxStateEntity(e   => e.ToTable("inbox_state",   MESSAGING_SCHEMA));
    modelBuilder.AddOutboxStateEntity(e  => e.ToTable("outbox_state",  MESSAGING_SCHEMA));
    modelBuilder.AddOutboxMessageEntity(e => e.ToTable("outbox_message", MESSAGING_SCHEMA));
    // ...
    base.OnModelCreating(modelBuilder);
}
TabloRol
messaging.outbox_messageGönderilmeyi bekleyen integration event’lerin gövdesi.
messaging.outbox_stateOutbox delivery ilerlemesi/lock state’i.
messaging.inbox_stateConsume edilen mesajların kaydı — duplicate detection bunun üzerinden çalışır.
Bu tablolar EF Core migration’larıyla oluşur. messaging şeması, business (public) şemasından ayrıdır; bkz. Veri Katmanı.

Aynı transaction garantisi & lazy publish

UseBusOutbox aktifken IPublishEndpoint MassTransit pipeline’ında outbox wrapper’la sarılır. IEventBus’ı DbContext ctor’unda inject ettiğimiz için DI graph döngüye girebilir — host startup’ta (örn. MigrationService bir scope açıp DbContext resolve ettiğinde) soft deadlock gözlemlenir. Bu yüzden MassTransitEventBus IPublishEndpoint’i lazy resolve eder:
public async Task PublishAsync(IntegrationEvent @event, CancellationToken ct = default)
{
    // Lazy resolve — publish anında. ctor injection zincirine IPublishEndpoint girmez,
    // deadlock kırılır. Bu an SaveChanges sırasıdır → DbContext zaten scope'ta, outbox'a yazılır.
    var publishEndpoint = serviceProvider.GetRequiredService<IPublishEndpoint>();
    await publishEndpoint.Publish(@event, @event.GetType(), context => { /* routing key */ }, ct);
}
Sonuç: SaveEntitiesAsyncSaveChanges → domain event dispatch → PublishAsync → outbox satırı aynı transaction’a yazılır. DB commit fail olursa outbox da yazılmaz → exactly-once’a yaklaşır; RabbitMQ down ise satırlar outbox’ta birikir ve broker toparlanınca teslim edilir.

Inbox ile duplicate detection

Tüketici tarafında inbox_state, DuplicateDetectionWindow boyunca işlenen mesaj kimliklerini tutar. Aynı Id’li mesaj (retry/redelivery sonucu) tekrar gelirse atlanır. Bu, IntegrationEvent’in Id’sini serileştirmede koruması (bkz. Integration Events) ile birlikte çalışır. Yine de handler’ı idempotent yazmak en güvenli yoldur.

İlgili

RabbitMQ Topology

Outbox’tan çıkan mesajın gittiği exchange/queue.

Integration Events

IntegrationEvent base ve idempotency.

Event Akışı

Command → SaveEntities → Outbox tam akışı.

Veri Katmanı

messaging şeması ve migration’lar.