Parallel.ForEach and a blocking saga

I was recently doing a bit of weekend homework and encountered a piece of code that resembles the following (with all the grubby implementation details and logic specific to the business domain removed of course).

using System;
using System.Linq;
using System.Threading.Tasks;

namespace ConsoleApp8
{
    public struct Bond
    {
        public Bond(string currency, decimal issueSize) => (Currency, IssueSize) = (currency, issueSize);
        public readonly string Currency;
        public readonly decimal IssueSize;
        //and loads of other stuff omitted
    }

    class Program
    {
        static void Main(string[] args)
        {
            Bond[] bonds = new[]
                {
                    new Bond("EUR", 1m),
                    new Bond("EUR", 2m),
                    new Bond("EUR", 3m),
                    new Bond("EUR", 4m),
                    new Bond("GBP", 5m),
                    new Bond("NZD", 6m),
                    new Bond("CHF", 7m),
                    new Bond("CHF", 8m),
                    new Bond("AUD", 9m),
                    new Bond("FJD", 10m),
                    new Bond("GBP", 11m),
                    new Bond("GBP", 12m),
                };

                Parallel.ForEach(b, new ParallelOptions { MaxDegreeOfParallelism = 5 }, (bond) => { /* do stuff*/ });
        }
    }
}

The code is bugged in the sense that /* do stuff */ encapsulates logic that isn’t thread safe when acting on the same currency.

What I needed was a fix that was not pervasive; I did’t want to embark on some arduous task where I have to hunt down non-reproducible threading bugs that were manifested way down the application stack as a database deadlock in some convoluted transaction saga. I also wanted the code change to be safe in the sense that I could guarantee that the change addressed what it was supposed to address, and no more, and was still covered by the same suite of unit tests.

My approach was simple, and for this type of didactic code I am unconcerned with GC issues, and it was to ensure concurrency by guaranteeing that the array passed to the Parallel.ForEach (TPL) only contained at most one entry for each currency upon each invocation.

This can be done with Linq, one of my “goto” technologies, and the approach is to group by currency, take the top one (so guaranteeing only one entry for each currency in the array), performing the parallel processing, and when all complete removing the processed currency entries from the pending queue and proceed to a subsequent loop iteration.

            while (bonds.Length > 0)
            {
                Console.WriteLine("Tranche");
                var b = bonds.GroupBy(c => c.Currency).Select(g => g.First()).ToArray<Bond>();
                foreach (var bond in b)
                    Console.WriteLine($"{bond.Currency}    {bond.IssueSize}");

                //the /* do stuff */ below can only handle one of the same currency concurrently
                Parallel.ForEach(b, new ParallelOptions { MaxDegreeOfParallelism = 5 }, (bond) => { /* do stuff*/ });

                bonds = bonds.Except(b).ToArray<Bond>();
            }    

Above is the code patch, with a little output to the screen to demonstrate the grouping too. Note each tranche mentions a specific currency just once.

There you go, another problem solved, and yet again using a bit of Linq.

— Published by Mike, 14:07:12 31 August 2022 (CEST)

 

Leave a Reply