Monday, January 05, 2026

[C#] Mutliple Concurrent Producers and Consumers pattern for a Task Queue

If you are in need of a basic concurrent Producers consumers pattern to be used in your application, here is a sample C# program to refer to have a queue with bounded capacity for allowing multiple producers and consumers, enqueue and dequeue task for processing. 



// File: Program.cs
// Build & run:
//   dotnet new console -n MultiProdCons
//   cd MultiProdCons
//   dotnet run

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

record WorkItem(int Id, int ProducerId, string Payload);

class ProducersConsumers
{
    static async Task Main(string[] args)
    {
        var cts = new CancellationTokenSource();
        Console.CancelKeyPress += (s, e) =>
        {
            Console.WriteLine("Cancellation requested (Ctrl+C). Shutting down…");
            e.Cancel = true;
            cts.Cancel();
        };

        // Bounded capacity provides backpressure to producers
        var queue = new BlockingCollection<WorkItem>(
            new ConcurrentQueue<WorkItem>(),
            boundedCapacity: 100);

        int producerCount = 3;
        int consumerCount = 2;
        int itemsPerProducer = 50;

        var producerTasks = new List<Task>();
        for (int p = 0; p < producerCount; p++)
        {
            int producerId = p;
            producerTasks.Add(Task.Run(() => ProducerAsync(producerId, itemsPerProducer, queue, cts.Token)));
        }

        var consumerTasks = new List<Task>();
        for (int c = 0; c < consumerCount; c++)
        {
            int consumerId = c;
            consumerTasks.Add(Task.Run(() => ConsumerAsync(consumerId, queue, cts.Token)));
        }

        // Wait for all producers to finish; then signal no more adds
        await Task.WhenAll(producerTasks);
        queue.CompleteAdding();

        // Wait for consumers to drain and finish
        await Task.WhenAll(consumerTasks);

        Console.WriteLine("All done.");
    }

    static async Task ProducerAsync(int producerId, int count, BlockingCollection<WorkItem> queue, CancellationToken ct)
    {
        var rnd = new Random(unchecked(Environment.TickCount * (producerId + 1)));
        for (int i = 0; i < count && !ct.IsCancellationRequested; i++)
        {
            // Simulate work
            await Task.Delay(rnd.Next(10, 40), ct);

            var item = new WorkItem(
                Id: i,
                ProducerId: producerId,
                Payload: $"Data-{producerId}-{i}");

            // Respect backpressure: this will block if the queue is full
            try
            {
                queue.Add(item, ct);
                Console.WriteLine($"[P{producerId}] Produced item {item.Id}");
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine($"[P{producerId}] Cancelled while adding.");
                break;
            }
        }

        Console.WriteLine($"[P{producerId}] Completed.");
    }

    static async Task ConsumerAsync(int consumerId, BlockingCollection<WorkItem> queue, CancellationToken ct)
    {
        var rnd = new Random(unchecked(Environment.TickCount * (consumerId + 1000)));

        try
        {
            // Iterates until CompleteAdding() and queue drained
            foreach (var item in queue.GetConsumingEnumerable(ct))
            {
                Console.WriteLine($"    [C{consumerId}] Consuming item {item.Id} from P{item.ProducerId}");
                // Simulate processing
                await Task.Delay(rnd.Next(20, 60), ct);
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine($"    [C{consumerId}] Cancelled.");
        }

        Console.WriteLine($"    [C{consumerId}] Completed.");
    }
}

No comments:

[C#] Mutliple Concurrent Producers and Consumers pattern for a Task Queue

If you are in need of a basic concurrent Producers consumers pattern to be used in your application, here is a sample C# program to refer to...