If you are in need of a basic concurrent Producers consumers pattern to be used in your application, here is a sample C# program to refer to have a queue with bounded capacity for allowing multiple producers and consumers, enqueue and dequeue task for processing.
// File: Program.cs
// Build & run:
// dotnet new console -n MultiProdCons
// cd MultiProdCons
// dotnet run
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
record WorkItem(int Id, int ProducerId, string Payload);
class ProducersConsumers
{
static async Task Main(string[] args)
{
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (s, e) =>
{
Console.WriteLine("Cancellation requested (Ctrl+C). Shutting down…");
e.Cancel = true;
cts.Cancel();
};
// Bounded capacity provides backpressure to producers
var queue = new BlockingCollection<WorkItem>(
new ConcurrentQueue<WorkItem>(),
boundedCapacity: 100);
int producerCount = 3;
int consumerCount = 2;
int itemsPerProducer = 50;
var producerTasks = new List<Task>();
for (int p = 0; p < producerCount; p++)
{
int producerId = p;
producerTasks.Add(Task.Run(() => ProducerAsync(producerId, itemsPerProducer, queue, cts.Token)));
}
var consumerTasks = new List<Task>();
for (int c = 0; c < consumerCount; c++)
{
int consumerId = c;
consumerTasks.Add(Task.Run(() => ConsumerAsync(consumerId, queue, cts.Token)));
}
// Wait for all producers to finish; then signal no more adds
await Task.WhenAll(producerTasks);
queue.CompleteAdding();
// Wait for consumers to drain and finish
await Task.WhenAll(consumerTasks);
Console.WriteLine("All done.");
}
static async Task ProducerAsync(int producerId, int count, BlockingCollection<WorkItem> queue, CancellationToken ct)
{
var rnd = new Random(unchecked(Environment.TickCount * (producerId + 1)));
for (int i = 0; i < count && !ct.IsCancellationRequested; i++)
{
// Simulate work
await Task.Delay(rnd.Next(10, 40), ct);
var item = new WorkItem(
Id: i,
ProducerId: producerId,
Payload: $"Data-{producerId}-{i}");
// Respect backpressure: this will block if the queue is full
try
{
queue.Add(item, ct);
Console.WriteLine($"[P{producerId}] Produced item {item.Id}");
}
catch (OperationCanceledException)
{
Console.WriteLine($"[P{producerId}] Cancelled while adding.");
break;
}
}
Console.WriteLine($"[P{producerId}] Completed.");
}
static async Task ConsumerAsync(int consumerId, BlockingCollection<WorkItem> queue, CancellationToken ct)
{
var rnd = new Random(unchecked(Environment.TickCount * (consumerId + 1000)));
try
{
// Iterates until CompleteAdding() and queue drained
foreach (var item in queue.GetConsumingEnumerable(ct))
{
Console.WriteLine($" [C{consumerId}] Consuming item {item.Id} from P{item.ProducerId}");
// Simulate processing
await Task.Delay(rnd.Next(20, 60), ct);
}
}
catch (OperationCanceledException)
{
Console.WriteLine($" [C{consumerId}] Cancelled.");
}
Console.WriteLine($" [C{consumerId}] Completed.");
}
}
No comments:
Post a Comment