Günümüzde bir uygulamanın aynı anda birden fazla işi yapabilmesi kritik bir hale geldi. Milyonlarca isteği işleyen web servisleri, arka planda kuyruk tüketen API bağlantıları, dosya işleme sistemleri, büyük veri akışları gibi birçok alan buna dayanıyor. Producer-Consumer pattern en temel ve en çok kullanılan modellerden birisidir.
Producer–Consumer, concurrency (eşzamanlı) programlama mimarilerin en temel desenlerinden biridir. Bu pattern bize üreten (producer) ve bunu tüketen (consumer) bir çalışma modeli sunar. Aradaki iletişim genellikle bir kuyruk (queue) üzerinden yürür. RabbitMQ, Kafka gibi uygulamalar bunun üzerine tasarlanmıştır. Amaç hız farklarını yönetmek ve thread-safe iletişim sağlamaktır. Bu pattern bildirim kuyrukları, log toplama, sipariş işleme gibi onlarca alanda kullanılır.
İki taraf arasındaki akış doğru yönetilmezse veri kaybı, race condition sorunu ve kaynak aşımı gibi sorunlar ortaya çıkar. Tam bu noktada Microsoft tarafından geliştirilen BlockingCollection sınıfını devreye girer. C#’ta producer-consumer modelini güvenli ve performanslı şekilde yönetmek için en ideal yapılardan biri bu sınıftır.
Producer–Consumer Pattern Sorunları
.NET dünyasında BlockingCollection .NET 4.0 ile 2010 yılında geldi. Bu sınıf öncesinde thread lock ile yönetiliyor, manuel semaphore kullanılıyor, kuyruklar thread-safe değildi, producer-consumer pattern hataya çok açıktı ve deadlock üretilmesi çok kolaydı.
Şimdi basit bir örnek ile sorunları izah edelim.
Queue<int> queue = new Queue<int>();
object lockObj = new object();
void Producer() {
lock(lockObj) {
queue.Enqueue(1);
Monitor.Pulse(lockObj);
}
}
void Consumer() {
lock(lockObj) {
while (queue.Count == 0)
Monitor.Wait(lockObj);
var item = queue.Dequeue();
}
}Burada yanlış sırada “wait” kullanılırsa deadlock olacaktır. En önemlisi queue overflow yönetimi yok.
Biraz daha açalım. Kuyruk dolduğunda producer ne yapacak? Bekleyip block mu olacak yoksa bir exception mı fırlatacak? Alternatif olarak mesajı başka bir kuyruğa yönlendirmek de mümkün. Bu noktada sistem kendisini nasıl koruyacak? Rate limiting uygulanabilir veya trafik artışı anormal olarak işaretlenebilir. Bu durumlar doğru yönetilmezse producer tarafındaki thread uzun süre block olabilir, CPU ve RAM tüketimi artabilir ve API tarafında timeout oluşabilir.
Bugün bile Kafka ve RabbitMQ gibi sistemlerde en büyük sorun hala backpressure, concurrency, doğru sıralama ve deadlock riskidir.
BlockingCollection
C#’ta BlockingCollection sınıfı, bir veya birden fazla producer/consumer varlığında veri üretilip tüketildiği senaryolar için tasarlanan thread-safe bir koleksiyon sağlar.
En basit haliyle bu şekilde kapasite kontrolü yapabilirsiniz:
var queue = new BlockingCollection<string>(boundedCapacity: 1000);Kuyruk dolduğunda producer otomatik olarak block olur. Kapasite doluysa Add() metodu bekler. Bu sayede overflow sorunu otomatik yönetilir. Dolaylı olarak ise manuel lock, semaphore veya queue overflow yönetimi yazmaya da gerek yok.
Elle yazsak lock kullanmak zorunda kalırdık:
lock(_queue) {
_queue.Enqueue(item);
}BlockingCollection içindeki yapı (özellikle IProducerConsumerCollection uygulamaları) thread-safe olduğundan ötürü aynı anda 10 producer da çalışabilir, aynı anda 10 consumer da çalışabilir. Hiçbir race condition durumu doğmaz.
Birden fazla producer ve tüketiciyi destekler. Aynı koleksiyon üzerinde birden fazla iş parçacığı veri üretebilir ve aynı anda başka iş parçacıkları bu verileri işleyebilir. Bunun ne kadar kritik öneme sahip olduğunu büyük ölçekli projelerde anlıyoruz.
BlockingCollection, ConcurrentQueue gibi thread-safe bir koleksiyonu alır ve üzerine otomatik bekleme, senkronizasyon ve producer–consumer yönetimi ekler. Böylece hem veri yapısı hem de tam bir concurrency kontrol mekanizması olur.
BlockingCollection Örneği
BlockingCollection .NET’te producer–consumer modelini kolaylaştıran güçlü bir yapı sunuyor. Bu yapının en kritik özelliklerinden biri bounding yani koleksiyon için bir üst kapasite belirleyebilme yeteneğidir.
public class TaskDispatcher
{
private readonly BlockingCollection<SystemTask> _taskQueue;
private readonly int _publishTimeoutMs;
public TaskDispatcher(int maxQueueSize, int publishTimeoutMs)
{
_publishTimeoutMs = publishTimeoutMs;
_taskQueue = new BlockingCollection<SystemTask>(new ConcurrentQueue<SystemTask>(), maxQueueSize);
}
// Producer method
public async Task EnqueueAsync(int producerId, CancellationToken token)
{
var task = new SystemTask
{
Id = Guid.NewGuid(),
ProducerId = producerId,
CreatedAt = DateTime.UtcNow,
TaskType = GetRandomTaskType()
};
// Simulated external operation
await Task.Delay(Random.Shared.Next(10, 60), token);
bool queueAdded = _taskQueue.TryAdd(task, _publishTimeoutMs, token);
if (queueAdded)
Console.WriteLine($"[Producer {producerId}] Task queued: {task}");
else
Console.WriteLine($"[Producer {producerId}] Queue FULL — Task dropped: {task.Id}");
}
// Consumer method
public async Task ProcessTasksAsync(int consumerId, CancellationToken token)
{
foreach (var task in _taskQueue.GetConsumingEnumerable(token))
{
Console.WriteLine($"--> [Consumer {consumerId}] Processing {task.TaskType} ({task.Id})");
await Task.Delay(Random.Shared.Next(20, 80), token);
Console.WriteLine($"--> [Consumer {consumerId}] Completed {task.Id}");
}
}
private static TaskType GetRandomTaskType()
{
var values = Enum.GetValues<TaskType>();
return values[Random.Shared.Next(values.Length)];
}
}
// Task model
public class SystemTask
{
public Guid Id { get; set; }
public int ProducerId { get; set; }
public DateTime CreatedAt { get; set; }
public TaskType TaskType { get; set; }
public override string ToString() =>
$"{TaskType} (Producer={ProducerId}, CreatedAt={CreatedAt:HH:mm:ss.fff})";
}
public enum TaskType
{
Email,
Log,
ReportGeneration
}Bu yapının en kritik özelliklerinden biri bounding yani koleksiyon için bir kapasite belirleyebilme yeteneğidir. Koleksiyonun constructor’da boundedCapacity değeri verildiğinde, koleksiyona aynı anda kaç öğe eklenebileceği sınırlandırılır. Bu ise üreticinin tüketiciden çok daha hızlı olduğu senaryolarda sınırlama getirir.
Birden fazla producer thread, kuyruğa aynı anda güvenli şekilde öğe ekleyebiliyor. Koleksiyon kapasite sınırına ulaştığında artık producer otomatik olarak bloklanır. Bu bloklama tamamen yönetilen bir bekleme türüdür yani thread CPU’yu meşgul etmez. Consumer kuyruktan bir öğe çıkardığında boşluk açılır ve bekleyen producer thread otomatik olarak getirilir.
Producer thread’in beklemesini istemiyorsanız, BlockingCollection’da TryAdd yöntemi ile ekleme işlemine timeout tanımlayabilirsiniz. Bu daha güvenli bir yöntemdir. Kuyruk dolu olduğunda log verebilir veya işlemi atlayabilirsiniz.
Kuyruk boşken consumer ne yapacak? Thread sürekli kuyruğu kontrol ederse CPU’yu meşgul eder. Buna busy waiting denir. BlockingCollection ise consumer thread’i uykuya geçirir. Thread CPU kullanmaz ve bekleme sırasında 0 işlem yapılır. Producer kuyruğa bir veri eklediğinde bu uyutulan thread otomatik uyanır.
Şimdi oluşturduğumuz bu sınıfı kullanalım.
public static async Task Main(string[] args)
{
const int producerCount = 10;
const int consumerCount = 4;
const int queueCapacity = 50;
const int publishTimeoutMs = 150;
var dispatcher = new TaskDispatcher(queueCapacity, publishTimeoutMs);
var cancelSource = new CancellationTokenSource();
Console.WriteLine("Task Dispatcher running. Press enter to stop");
var producerTasks = Enumerable
.Range(1, producerCount)
.Select(id => dispatcher.EnqueueTaskAsync(id, cancelSource.Token));
var consumerTasks = Enumerable
.Range(1, consumerCount)
.Select(id => dispatcher.ProcessTasksAsync(id, cancelSource.Token));
var allTasks = producerTasks.Concat(consumerTasks).ToArray();
Console.ReadLine();
cancelSource.Cancel();
Console.WriteLine("Stopping, please wait...");
await Task.WhenAll(allTasks);
}
Şimdi geliştirdiğimiz TaskDispatcher sınıfının nasıl çalıştığına bakalım.

Kafka ve RabbitMQ gibi sistemler kurulum, bakım ve konfigürasyon ister. BlockingCollection ise sadece birkaç satır kodla kullanılır. Uygulama ile aynı process içinde çalışır. Thread safe ve producer-consumer senaryolarına doğrudan uygundur. Farklı servisler arasında mesajlaşma gerekmiyorsa, tek uygulama içinde iş parçacıkları birbiriyle konuşacaksa ideal çözüm olur.
Mesajların kaybolmaması zorunluysa elbette RabbitMQ gibi sistemler tercih edilir. BlockingCollection ise in-memory çalışır. Bu sebeple genellikle log kayıtları, arka planda yürütülecek bildirimler gibi işlemler için tercih edilebilir. Multi-thread pipeline (bir veriyi adım adım işlerken her adımı farklı thread ile eş zamanlı yürütme) için kusursuzdur. Bir thread dosya okur, başka thread işler veya başka bir thread output yazar. BlockingCollection ile bu pipeline eş zamanlı olarak kolaylıkla yürütülebilir.



Yorum bırakın