If you are in need of a basic concurrent Producers consumers pattern to be used in your application, here is a sample C# program to refer to have a queue with bounded capacity for allowing multiple producers and consumers, enqueue and dequeue task for processing.
Quick Scoops
Bite-size scoops of Tech-tips, Stories and more
Monday, January 05, 2026
[C#] Mutliple Concurrent Producers and Consumers pattern for a Task Queue
// File: Program.cs
// Build & run:
// dotnet new console -n MultiProdCons
// cd MultiProdCons
// dotnet run
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
record WorkItem(int Id, int ProducerId, string Payload);
class ProducersConsumers
{
static async Task Main(string[] args)
{
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (s, e) =>
{
Console.WriteLine("Cancellation requested (Ctrl+C). Shutting down…");
e.Cancel = true;
cts.Cancel();
};
// Bounded capacity provides backpressure to producers
var queue = new BlockingCollection<WorkItem>(
new ConcurrentQueue<WorkItem>(),
boundedCapacity: 100);
int producerCount = 3;
int consumerCount = 2;
int itemsPerProducer = 50;
var producerTasks = new List<Task>();
for (int p = 0; p < producerCount; p++)
{
int producerId = p;
producerTasks.Add(Task.Run(() => ProducerAsync(producerId, itemsPerProducer, queue, cts.Token)));
}
var consumerTasks = new List<Task>();
for (int c = 0; c < consumerCount; c++)
{
int consumerId = c;
consumerTasks.Add(Task.Run(() => ConsumerAsync(consumerId, queue, cts.Token)));
}
// Wait for all producers to finish; then signal no more adds
await Task.WhenAll(producerTasks);
queue.CompleteAdding();
// Wait for consumers to drain and finish
await Task.WhenAll(consumerTasks);
Console.WriteLine("All done.");
}
static async Task ProducerAsync(int producerId, int count, BlockingCollection<WorkItem> queue, CancellationToken ct)
{
var rnd = new Random(unchecked(Environment.TickCount * (producerId + 1)));
for (int i = 0; i < count && !ct.IsCancellationRequested; i++)
{
// Simulate work
await Task.Delay(rnd.Next(10, 40), ct);
var item = new WorkItem(
Id: i,
ProducerId: producerId,
Payload: $"Data-{producerId}-{i}");
// Respect backpressure: this will block if the queue is full
try
{
queue.Add(item, ct);
Console.WriteLine($"[P{producerId}] Produced item {item.Id}");
}
catch (OperationCanceledException)
{
Console.WriteLine($"[P{producerId}] Cancelled while adding.");
break;
}
}
Console.WriteLine($"[P{producerId}] Completed.");
}
static async Task ConsumerAsync(int consumerId, BlockingCollection<WorkItem> queue, CancellationToken ct)
{
var rnd = new Random(unchecked(Environment.TickCount * (consumerId + 1000)));
try
{
// Iterates until CompleteAdding() and queue drained
foreach (var item in queue.GetConsumingEnumerable(ct))
{
Console.WriteLine($" [C{consumerId}] Consuming item {item.Id} from P{item.ProducerId}");
// Simulate processing
await Task.Delay(rnd.Next(20, 60), ct);
}
}
catch (OperationCanceledException)
{
Console.WriteLine($" [C{consumerId}] Cancelled.");
}
Console.WriteLine($" [C{consumerId}] Completed.");
}
}
Thursday, January 01, 2026
[NPM Debugging] Dynamically load node process when editing files during development
If you are looking for a way/command to reload the node process while you are continously editing a file/working folder, this is the command to your rescue :
$c:\parag\NodeExample>node --watch-path=./src src/app-server.mjs
Thursday, March 06, 2025
Copilot and Vide Coding experience
Have been using Github copilot agents for assisting in code writing, and hugely impressed by what it can do. Yes they can do better but you need to explain/instruct what you are asking for. This small program I was able to get written by the agent in a single prompt. Not saying the best solution, but a good skeleton to begin with and you can build on top of it.
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Specialized;
using Azure.Storage.Blobs.Models;
using System.Data.SQLite;
namespace SampleUploadFileToCloudVibeAgent
{
public class LargeFileUploadRetryable
{
private readonly string _connectionString;
private readonly int _chunkSize;
public LargeFileUploadRetryable(string dbPath, int chunkSize = 4 * 1024 * 1024)
{
_connectionString = $"Data Source={dbPath};Version=3;";
_chunkSize = chunkSize;
InitializeDb();
}
private void InitializeDb()
{
using var conn = new SQLiteConnection(_connectionString);
conn.Open();
using var cmd = conn.CreateCommand();
cmd.CommandText = @"CREATE TABLE IF NOT EXISTS UploadChunks (
FilePath TEXT,
BlockId TEXT,
Offset INTEGER,
Size INTEGER,
Uploaded INTEGER,
PRIMARY KEY (FilePath, BlockId)
);";
cmd.ExecuteNonQuery();
}
public async Task UploadFileAsync(string filePath, string blobConnectionString, string containerName, string blobName)
{
// Check if file exists
if (!File.Exists(filePath))
throw new FileNotFoundException("File not found.", filePath);
// Check for internet connectivity
if (!IsInternetAvailable())
throw new InvalidOperationException("No internet connection available.");
var blobClient = new BlockBlobClient(blobConnectionString, containerName, blobName);
var fileLength = new FileInfo(filePath).Length;
var blockIds = new List<string>();
using var fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read);
int blockNum = 0;
long offset = 0;
while (offset < fileLength)
{
int bytesToRead = (int)Math.Min(_chunkSize, fileLength - offset);
string blockId = Convert.ToBase64String(BitConverter.GetBytes(blockNum));
if (!IsBlockUploaded(filePath, blockId))
{
byte[] buffer = new byte[bytesToRead];
fileStream.Seek(offset, SeekOrigin.Begin);
int read = await fileStream.ReadAsync(buffer, 0, bytesToRead);
bool uploaded = false;
int retry = 0;
while (!uploaded && retry < 5)
{
try
{
using var ms = new MemoryStream(buffer, 0, read);
await blobClient.StageBlockAsync(blockId, ms);
MarkBlockUploaded(filePath, blockId, offset, read);
uploaded = true;
}
catch
{
await Task.Delay(1000 * (retry + 1));
retry++;
if (!IsInternetAvailable())
{
// Queue a task to retry pending chunks after 2 minutes
await Task.Delay(TimeSpan.FromMinutes(2));
await RetryPendingChunksAsync(filePath, blobConnectionString, containerName, blobName);
return;
}
throw new InvalidOperationException("Lost internet connection during upload.");
}
}
}
blockIds.Add(blockId);
offset += bytesToRead;
blockNum++;
}
await blobClient.CommitBlockListAsync(blockIds);
}
private async Task RetryPendingChunksAsync(string filePath, string blobConnectionString, string containerName, string blobName)
{
var blobClient = new BlockBlobClient(blobConnectionString, containerName, blobName);
var pendingBlocks = new List<(string BlockId, long Offset, int Size)>();
using (var conn = new SQLiteConnection(_connectionString))
{
conn.Open();
using var cmd = conn.CreateCommand();
cmd.CommandText = @"SELECT BlockId, Offset, Size FROM UploadChunks
WHERE FilePath = @f AND Uploaded = 0
ORDER BY Offset ASC";
cmd.Parameters.AddWithValue("@f", filePath);
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
string blockId = reader.GetString(0);
long offset = reader.GetInt64(1);
int size = reader.GetInt32(2);
pendingBlocks.Add((blockId, offset, size));
}
}
if (pendingBlocks.Count == 0)
return;
using var fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read);
foreach (var (blockId, offset, size) in pendingBlocks)
{
byte[] buffer = new byte[size];
fileStream.Seek(offset, SeekOrigin.Begin);
int read = await fileStream.ReadAsync(buffer, 0, size);
bool uploaded = false;
int retry = 0;
while (!uploaded && retry < 5)
{
try
{
using var ms = new MemoryStream(buffer, 0, read);
await blobClient.StageBlockAsync(blockId, ms);
MarkBlockUploaded(filePath, blockId, offset, read);
uploaded = true;
}
catch
{
await Task.Delay(1000 * (retry + 1));
retry++;
if (!IsInternetAvailable())
{
// Wait and try again later
await Task.Delay(TimeSpan.FromMinutes(2));
// Optionally, you could recursively call this method, but avoid infinite recursion
return;
}
if (retry >= 5)
throw new InvalidOperationException("Failed to upload chunk after multiple retries.");
}
}
}
}
// Simple internet connectivity check (ping to a reliable host)
private bool IsInternetAvailable()
{
try
{
using var client = new System.Net.NetworkInformation.Ping();
var reply = client.Send("8.8.8.8", 2000);
return reply.Status == System.Net.NetworkInformation.IPStatus.Success;
}
catch
{
return false;
}
}
private bool IsBlockUploaded(string filePath, string blockId)
{
using var conn = new SQLiteConnection(_connectionString);
conn.Open();
using var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT Uploaded FROM UploadChunks WHERE FilePath = @f AND BlockId = @b";
cmd.Parameters.AddWithValue("@f", filePath);
cmd.Parameters.AddWithValue("@b", blockId);
var result = cmd.ExecuteScalar();
return result != null && Convert.ToInt32(result) == 1;
}
private void MarkBlockUploaded(string filePath, string blockId, long offset, int size)
{
using var conn = new SQLiteConnection(_connectionString);
conn.Open();
using var cmd = conn.CreateCommand();
cmd.CommandText = @"INSERT OR REPLACE INTO UploadChunks (FilePath, BlockId, Offset, Size, Uploaded) VALUES (@f, @b, @o, @s, 1);";
cmd.Parameters.AddWithValue("@f", filePath);
cmd.Parameters.AddWithValue("@b", blockId);
cmd.Parameters.AddWithValue("@o", offset);
cmd.Parameters.AddWithValue("@s", size);
cmd.ExecuteNonQuery();
}
}
}
Subscribe to:
Comments (Atom)
[C#] Mutliple Concurrent Producers and Consumers pattern for a Task Queue
If you are in need of a basic concurrent Producers consumers pattern to be used in your application, here is a sample C# program to refer to...
-
Recently on one of our servers we saw that the 10G solarflare NIC card was dropping packets. We could also detect the same within our appli...
-
We have some integration tests as well as end to end scenario tests which rely on Azure Cosmos DB Emulator for verifying Data layer contra...
-
Here is a quick example showing how mutex helps in doing a synchronous access on shared resource. We are using boost::mutex::scoped_lock ...