Monolitik sistemlerde veritabanı işlemleri ACID prensiplerine dayanır. Bir işlem ya tamamen gerçekleşir ya da hiç gerçekleşmez. Ancak mikroservislerde veri dağıtıktır. Bir servisin veriyi kaydetmesi yetmez, diğer servislerin de bundan haberdar olması gerekir.
Bu makalede .NET üzerinde Transactional Outbox Pattern uygulayarak bu iki işlemi tek bir atomik birime nasıl indirgeyeceğimizi ve Eventual Consistency prensibiyle sistemler arası veri bütünlüğünü nasıl garanti altına alacağımızı inceleyeceğiz.
Dual-Write Problemi
Bunu daha iyi açıklamak için bir kod örneği gösterelim.
public async Task SaveAsync(ReserveCarCommand request)
{
using (var uow = uowProvider.Create())
{
var car = await uow.Cars.GetById(request.CarId);
car.Reserve(request.CustomerId, request.Days);
uow.Cars.Update(car);
await uow.CommitChanges();
var evt = new CarReservedEvent(car.Id);
await eventPublisher.PublishMessage(evt);
return new ReservationResult {
ReservationId = car.CurrentReservationId,
IsSuccess = true
};
}
}Burada kullanılan temel strateji, veritabanı kaydını sağlayıp diğer servislere bilgi vermek. Önce mesajı gönderip (event publish) veritabanı işlemini (commit gönderme) sona bıraksaydık mesaj çoktan yola çıkmışken veritabanında bir kısıtlama hatası veya bağlantı kopması yaşanabilirdi. Diğer tabirle CommitChanges metodunun PublishMessage’dan önce gelmesi tesadüf değil. Bu verinin doğruluğunu koruma çabasıdır. Commit sırasında hata alınırsa veritabanına hiç veri yazılmadığı için tutarsızlık olmaz. Fakat mesaj kuyruğu erişilemez durumdaysa veya bir ağ sorunu varsa ve mesaj yayınlama işlemi gerçekleşmezse ne olur? Örnekteki gibi araç kiralam isteği veritabanımızda olur ama diğer servisler bundan haberdar olmaz, dolayısıyla nihai tutarlılığa ulamazsınız. Bu soruna “Dual-Write” problemi denir.
Birçok yazılımcı “RabbitMQ asla çökmez ve mesaj göndermede sorun olmaz” yanılgısına düşer. RabbitMQ sağlam bir altyapı (Erlang/OTP) üzerine kuruludur. Kendini iyileştirebilir. Fakat RabbitMQ sağlam olsa bile, ona giden ağ kopabilir, sunucunun diski dolabilir veya RAM arızası yaşanabilir. Dolayısıyla bunu bir hata olarak değil beklenen bir durum olarak ele almalısınız.
Polly gibi kütüphanelerle sunulan “Retry” mekanizması, aslında sadece anlık ve geçici ağ kopmalarını tolere etmeye yarar ve sorunu kökten çözmek yerine sadece semptomları hafifletir. Teknik olarak bu yaklaşım mesajı hala uygulama belleğinde (RAM) tuttuğu için sistemin bütününe karşı bir risk taşır. Polly bu mesajı göndermek için deneme sınırlarını aşarsa o mesaj silinip kaybolacaktır. Dolayısıyla Polly paketi hata olasılığını matematiksel olarak düşürse de “Dual Write” sorununu mimari olarak ortadan kaldırmaz.
Microservice mimarisinde veritabanı işlemleri ile mesaj kuyruğu arasındaki atomik bağı kurarak veri tutarsızlığını kalıcı olarak nasıl çözersiniz?
Outbox Pattern
Outbox ile mesajı mesaj kuyruğuna ağ üzerinden doğrudan göndermek yerine, mevcut işlemin bir parçası olarak mikroservis veritabanımıza kaydederiz. Bu sayede servisimiz içinde iç tutarlılık sağlarız. Eğer işlem geri alınırsa hiçbir mesaj gönderilmez. Mesajımız veritabanında “outbox tablosu“nda saklandığına göre, artık onu mesaj kuyruğuna iletecek bir sürece ihtiyacımız doğar. Bu tabloyu belirli periyotlarda kontrol edecek bir background job oluşturulursa ve bu job mesaj kuyruğunu kullanarak göndermeyi denerse sonucuna göre tablonuzdan da durumunu günceller. Dolayısıyla kesin olarak event publish yapıldığından emin oluruz.
Örneğimiz üzerinden açıklayalım. Müşteri arabayı rezerve ettiğinde arabanın durumunu “rezerve” yapmak ile “rezerve edildi” mesajını Outbox tablosuna yazmak tek bir veritabanı paketidir. Bu iç tutarlılığı sağlar çünkü arabayı rezerve etmediğin halde fatura servisine “fatura kes” demek düşük ihtimaldir. Arka planda çalışan worker aynı mesajı ikinci kez atmaya çalışırsa çift mesaj sorunu ortaya çıkar fakat bunu alıcı taraf düzeltmeli. Alıcı servis mesajdaki ID’yi kontrol eder ve “ben bu rezervasyon için zaten fatura kesmişim” der ve gönderdiğiniz mesajı da çöpe atar. Hata artık kalmadı!
Outbox pattern sayesinde ana sisteminizin ağ hatalarından etkilenmesini öleyebilirsiniz, mesajın kaybolmamasını garanti edebilirsiniz ve tek riski (çift mesaj problemini) alıcı tarafa bırakarak sistemi tamamen güvenli hale getirebilirsiniz.
Outbox Pattern yapısından bahsettiğim diğer bir makale: https://recepserit.com/c-sql-ve-nosql-arasinda-distributed-transaction-sorunu-ve-outbox-pattern
Pratik
SQL üzerindeki Outbox tablosu için basit bir örnek.
CREATE TABLE app.OutboxMessages
(
"[Id] UNIQUEIDENTIFIER NOT NULL,
[CreatedDate] DATETIME2 NOT NULL,
[Type] VARCHAR(255) NOT NULL,
[Data] VARCHAR(MAX) NOT NULL,
[IsProcessCompleted] BIT NOT NULL"
)Bu tablo için oluşturacağımız entity object böyle olabilir.
public class OutboxMessage
{
public Guid Id { get; private set; }
public DateTime CreatedDate { get; private set; }
/// <summary>
/// Full name of message type.
/// </summary>
public string Type { get; private set; }
/// <summary>
/// Serialzed to JSON.
/// </summary>
public string Data { get; private set; }
public bool IsProcessCompleted { get; private set; }
private OutboxMessage()
{
}
internal OutboxMessage(string type, string data)
{
this.Id = Guid.NewGuid();
this.CreatedDate = DateTime.Now;
this.Type = type;
this.Data = data;
this.IsProcessCompleted = false;
}
}Şimdi mesajların kaydedilmesine geçelim. Her command handler için Outbox tablosuna mesajı ekleme işlemini tekrar tekrar yazdırmak yanlış tercih olur. Bunu biraz daha merkezileştirmek gerek. Kullanıcı verisi ile Outbox mesajını aynı pakete koyup veritabanına göndermeliyiz. Veritabanı ya ikisini de kabul eder ya da ikisini de reddeder. Böylece veritabanı güncellendi ama mesaj kayboldu riski de olmaz. Diğer yandan her seferinde tek tek Outbox’a kaydetme kodu yazark da kendimizi tekrar etmiş oluruz. Bunu sistemin otomatik yapması gerek.
Unit of Work için commit metodumuzu oluşturalım.
public async Task<int> CommitAsync(CancellationToken token = default)
{
var notifications = await _domainEventsDispatcher.DispatchEventsAsync(this);
if (!notifications.Any())
{
return await base.SaveChangesAsync(token);
}
var messages = notifications.Select(notification =>
{
var type = notification.GetType().FullName;
var cont = JsonSerializer.Serialize(notification);
return new OutboxMessage(type, cnt);
}).ToList();
await this.OutboxMessages.AddRangeAsync(messages, token);
return await base.SaveChangesAsync(cancellationToken);
}Buradaki DispatchEventsAsync(this) satırı, olayların mesaj kuyruklarına veya diğer servislere asıl yayınlanma anı değil, aksine bu olayların veritabanına paketlenip saklanmadan önceki son hazırlık aşamasıdır. Bu süreçte domain entity üzerinde o ana kadar birikmiş olan tüm domain events öğeleri toplanır ve bunlar veritabanına kaydedilmeye uygun “notification” nesnelerine dönüştürülerek bir liste halinde geri döndürülür. Bu verileri güvenli bir şekilde veritabanındaki “Outbox” tablosuna yazdırmamız gerekir. Böylece asıl yapılacak veritabanı kaydı ile burada diğer servislere gönderilecek bildirimler aynı veritabanı transaction içine alınarak sistemin tam tutarlılığı garanti altına alınmış olur.
Domain event oluşturalım.
public record OrderCreatedEvent(Guid OrderId, decimal TotalAmount, Guid CustomerId) : IDomainEvent;
public class OrderCreatedNotification : DomainNotificationBase<OrderCreatedEvent>
{
public Guid OrderId { get; init; }
public decimal TotalAmount { get; init; }
public OrderCreatedNotification(OrderCreatedEvent domainEvent) : base(domainEvent)
{
this.OrderId = domainEvent.OrderId;
this.TotalAmount = domainEvent.TotalAmount;
}
// Bu attribute buraya ait örneği okurken oluşturabilir.
[JsonConstructor]
public OrderCreatedNotification(Guid orderId, decimal totalAmount) : base(null)
{
this.OrderId = orderId;
this.TotalAmount = totalAmount;
}
}Burada OrderCreatedNotification sınıfının sahip olduğu 2 adet constructor alanı mevcut. İlki domain event üzerinden bir bildirim oluşturmak içindir. İkincisi işleme süreciyle ilgili, veritabanından kayıtlar okunurken deserialize ederek oluşturmak içindir.
Asenkron sürecimizi uygulamak için BackgroundService’den yararlanacağız. Hangfire veya Quartz.net paketlerini kullanabilirdik ancak küçük çözümümüz için bunların gereğinden fazla olduğunu düşünüyorum. Bunun yerine .NET Core özelliklerinden birini kullanmayı tercih ediyoruz. Bu kütüphaneler işlerin veritabanında saklanması, başarısız olan işlerin tekrar denenmesi gibi gelişmiş özellikler sunar. Burada basit bir tabloyu tarayıp mesaj gönderecek küçük bir uygulama için gereksiz karmaşıklık getirir.
BackgroundService için gerekli doküman: https://learn.microsoft.com/en-us/dotnet/api/microsoft.extensions.hosting.backgroundservice
Hosted service yapımızı oluşturalım.
Belirli saniyelerde çalışacak bir Timer olmalı. Belirteceğimiz fonksiyonu 10 saniyede bir çağırmalı. Fonksiyon aynı anda iki kez çalışmamalı çünkü çift gönderim olabilir, bu sebeple “Monitor” kullandık. Fonksiyon çalıştığında kuyruğu okur ve her birini göndermeye çalışır. Her mesaj için yeni bir transaction açarız ve mesajı RawRabbit kullanarak göndermeyi deneyeceğiz ve başarılı olursa mesajı veritabanından sileriz.
Outbox tablosundaki verileri işleyen “processor” sınıf.
public class OutboxProcessor
{
private readonly AppDbContext _context;
private readonly IMessageBus _bus;
public OutboxProcessor(AppDbContext context, IMessageBus bus)
{
_context = context;
_bus = bus;
}
public async Task ProcessMessagesAsync()
{
var messages = await _context.OutboxMessages
.OrderBy(m => m.OccurredOn)
.Take(100)
.ToListAsync();
foreach (var message in messages)
{
try
{
var notificationType = Type.GetType(message.Type);
var notification = JsonSerializer.Deserialize(message.Content, notificationType);
// Event publish
await _bus.PublishAsync(notification);
// Başarılıysa mesajı siler
_context.OutboxMessages.Remove(message);
await _context.SaveChangesAsync();
}
catch (Exception ex)
{
// Hata olursa günlüğü tut
Console.WriteLine($"Hata: {ex.Message}");
break;
}
}
}
}Arka plan servisimiz:
public class OutboxSendingService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly TimeSpan _period = TimeSpan.FromSeconds(5);
public OutboxSendingService(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
using (var scope = _serviceProvider.CreateScope())
{
var processor = scope.ServiceProvider.GetRequiredService<OutboxProcessor>();
await processor.ProcessMessagesAsync();
}
await Task.Delay(_period, stoppingToken);
}
}
}BackgroundService içinde await Task.Delay kullandığımız için bir döngü bitmeden diğeri başlamaz. Veritabanı işlemleri (DbContext) genellikle “Scoped” ömre sahiptir. Arka plan servisleri ise “Singleton” olur. Bu yüzden her döngüde CreateScope() kullanarak veritabanına erişmek teknik bir zorunluluktur. Bir hata oluşursa “break” olur ve hata üretmek yerine bir sonraki periyotta tekrar dener.
Program.cs içeriği böyle olur:
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDbContext<AppDbContext>(options =>
options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection")));
// Veritabanına eriştiği için Scoped olması gerekir.
builder.Services.AddScoped<OutboxProcessor>();
builder.Services.AddSingleton<IMessageBus, RabbitMQBus>();
// Uygulama başladığı anda arka planda sonsuz döngüye girer
builder.Services.AddHostedService<OutboxSendingService>();
var app = builder.Build();
app.Run();Sonuç
Sonuç olarak bu mimari dağıtık sistemlerde en sık karşılaşılan veri tutarsızlığı problemini güvenilir bir şekilde çözer. Transactional Outbox Pattern sayesinde, hem ana iş verinizi hem de bu işle ilgili bildirimleri aynı transaction altında tutarak birinin başarılı olup diğerinin başarısız olması riskini tamamen ortadan kaldırmış olursunuz.



Yorum bırakın