Down the Concurrency Rabbit Hole

For a client a while ago we had to build a program to ingest the Opera options feed. The wire format was fairly compact and spiked up 8 gigabits per second, and spikes well over 10m messages a second. We needed to process and pass on messages to different components. The new Dotnet Async Pipelines seemed like a good fit for this, but it was too slow. Mailbox processors were to slow. I ended up writing a abstraction over a dedicated thread that has served me well over a number or projects since.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
56: 
57: 
58: 
59: 
60: 
61: 
62: 
63: 
64: 
65: 
66: 
67: 
68: 
69: 
70: 
71: 
72: 
73: 
74: 
75: 
76: 
module ActorStatus =

    [<Literal>]
    let Idle = 0L

    [<Literal>]
    let Occupied = 1L

    [<Literal>]
    let Stopped = 2L

/// <summary> An actor pulls a message off of a (thread-safe) queue and executes it in the isolated context of the actor. </summary>
/// <typeparam name = "tmsg"> the type of messages that this actor will manage </typeparam>
/// <param name = "bodyFn"> the function to execute when the thread is available </param>
type ThreadedActor<'tmsg>(bodyFn: 'tmsg * ThreadedActor<'tmsg> -> unit) as this =

    let mutable status: int64 = ActorStatus.Idle

    //Do not expose these
    let signal = new ManualResetEventSlim(false)
    let queue = ConcurrentQueue<'tmsg>()
    let mutable count = 0
    // This is the main thread function (delegate). It gets executed when the thread is started/signaled
    let threadFn () =
        // While the actor is not stopped, check the queue and process any awaiting messages
        while Interlocked.Read(&status) <> ActorStatus.Stopped do
            while not queue.IsEmpty do
                //assert(queue.Count = Threading.Volatile.Read(&count)) race condition
                // If the thread is idle, update it to 'occupied'
                Interlocked.CompareExchange(&status, ActorStatus.Occupied, ActorStatus.Idle)
                |> ignore
                // Try to get the next message in the queue
                let isSuccessful, message = queue.TryDequeue()
                // If we successfully retrieved the next message, execute it in the context of this thread
                if isSuccessful then 
                    bodyFn (message, this)
                    if Threading.Interlocked.Decrement(&count) = 0 then 
                        signal.Reset()
            // If the thread is 'occupied', mark it as idle
            Interlocked.CompareExchange(&status, ActorStatus.Idle, ActorStatus.Occupied) |> ignore
            if Interlocked.Read(&status) <> ActorStatus.Stopped  then signal.Wait()
        // If the thread is stopped, dispose of it
        signal.Dispose()

    // The thread associated with this actor (one-to-one relationship)
    // Pass the threadFn delegate to the constructor
    let thread =
        Thread(ThreadStart(threadFn), IsBackground = true, Name = "ActorThread")

    // Start the thread
    do thread.Start()

    /// Enqueue a new messages for the thread to pick up and execute
    member this.Enqueue(msg: 'tmsg) =
        if Interlocked.Read(&status) <> ActorStatus.Stopped then
            queue.Enqueue(msg)
            if Threading.Interlocked.Increment(&count) = 1 then 
                signal.Set()
        else
            failwith "Cannot queue to stopped actor."

    // Get the length of the actor's message queue
    member this.QueueCount = queue.Count

    // Stops the actor
    member this.Stop() =
        Interlocked.Exchange(&status, ActorStatus.Stopped)
        |> ignore

        signal.Set()

    interface IDisposable with
        member __.Dispose() =
            this.Stop()
            thread.Join()
            

However, recently I've been working on a project has a bit more complex processing pipeline; I thought that the number of threads and different messages types could be come problem. I began look at different approaches. I learned that interlocked commands are not cheap cpu instructions as they take a full cpu cycle where a lot of other instructions can be compacted into a single cpu cycle.

MethodMessagesMeanErrorStdDevRatioRatioSDGen0Completed Work ItemsLock ContentionsAllocatedAlloc Ratio
Hammer1000001.525 ms0.0926 ms0.2730 ms0.050.01---2 MB0.04
Payload1000006.702 ms0.1874 ms0.5408 ms0.240.02---2 MB0.04
ThreadedActor10000011.309 ms0.2252 ms0.4118 ms0.380.02---5.06 MB0.09
ThreadedActor31000006.029 ms0.1201 ms0.3100 ms0.210.01--3.00002 MB0.04
MailBoxProcessor10000029.356 ms0.5870 ms1.4288 ms1.000.006000.00001.0000100.000056.26 MB1.00

The Internal compute payload is the Payload time - Hammer time thus (6.702 - 1.525 = 5.177 ms). The payload is a simple sum aggregation over thousand ints, this minimal compute work to get the metrics at the high frequency use case without making it into a memory benchmark. The Hammer is another thread that sends 100k messages to the actor as fast as it can which is (1.525ms).Which means that the ThreadedActor3 overhead is 6.029 - 5.177 = 0.852 ms for a 100k messages compared to TheadedActor overhead of 11.309 - 5.177 = 6.132 ms for a 100k messages, a 7.2x improvement.And a 29.356 - 5.177 = 24.179 ms for a 100k messages for MailBoxProcessor, a 28.3x improvement. What changed in ThreadedActor3? Use a spinwait before yielding thread with the reset event and secondly removing all Interlocked commands.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
 type ThreadedActor3<'tmsg>(bodyFn: 'tmsg -> unit,  ct : CancellationToken ) as this =
 
     let queue =  ConcurrentQueue<'tmsg>()
     let signal = new ManualResetEventSlim(false,1000) //avoids spin

 
     let threadFn () =
         while not <| ct.IsCancellationRequested do
             let isSuccessful, message = queue.TryDequeue()
             // If we successfully retrieved the next message, execute it in the context of this thread
             if isSuccessful then 
                 bodyFn message
             else signal.Reset()
             if queue.IsEmpty && (not <| ct.IsCancellationRequested) then 
                 signal.Wait()
                 

         signal.Dispose()
 
 
     let thread =
         Thread(ThreadStart(threadFn), IsBackground = true, Name = "ActorThread")
 
     do thread.Start()
 
 
     member this.Enqueue(msg: 'tmsg) =
         queue.Enqueue(msg)
         if not <| signal.IsSet then signal.Set()

 
 
     member this.QueueCount = queue.Count
 
     interface IDisposable with
         member __.Dispose() =
             thread.Join() 

What more could be done. I thought surely this approach won't scale. I want to put together a benchmark that would surely show that at some point it would be more efficient to run on the thread pool through async or tasks.

MethodThreadsMessagesPayloadMeanErrorStdDevMedianRatioRatioSDCompleted Work ItemsLock ContentionsGen0Gen1AllocatedAlloc Ratio
ThreadedActor3100100000100275.406 ms10.3832 ms6.8678 ms273.359 ms1.000.00----843.91 KB1.00
MailBoxProcessor1001000001005,587.295 ms64.6290 ms42.7481 ms5,600.417 ms20.300.509920532.0000836.00001171000.00001000.00009499865.83 KB11,256.91

OK... I didn't expect that. 100 Threads, I thought context switching was supposed to be expensive. There might be several factors to this, one is the simplicity of the execution context of the thread, minimal loop and no significant working memory to cycle into the cpu cache. Since I was caught of guard by this one, I don't think my speculations are worth much in regard to this.

One thing that was obvious was the allocation the mailbox processors were doing was excessive. Async is known to have less performance than tasks, so how would a similar workflow be fixed up with tasks. Conveniently there are System.Threading.Channels.

MethodTNPMeanErrorStdDevRatioRatioSDGen0Completed Work ItemsLock ContentionsGen1Gen2AllocatedAlloc Ratio
ThreadedActor31100000False5.894 ms1.155 ms0.7637 ms1.000.00--2.0000--2051.88 KB1.00
Channel1100000False11.862 ms1.695 ms1.1215 ms2.040.32-1.0000---2052.4 KB1.00
ThreadedActor31100000True49.052 ms9.460 ms6.2569 ms1.000.002000.0000-1.0000--19239.2 KB1.00
Channel1100000True52.764 ms8.348 ms5.5220 ms1.100.212000.00001.0000---19239.73 KB1.00
ThreadedActor320100000False102.835 ms5.534 ms3.6603 ms1.000.00--1.0000--422.17 KB1.00
Channel20100000False866.496 ms51.184 ms33.8551 ms8.440.44-1958891.000012.0000--116.91 KB0.28
ThreadedActor320100000True170.972 ms14.906 ms9.8591 ms1.000.0042000.0000--1000.0000-384770.88 KB1.00
Channel20100000True773.445 ms28.016 ms18.5307 ms4.540.3242000.00001790185.000012.00001000.0000-344117.42 KB0.89
ThreadedActor3100100000False269.136 ms8.376 ms5.5399 ms1.000.00--2.0000--833.09 KB1.00
Channel100100000False3,844.147 ms49.555 ms32.7776 ms14.290.32-9970796.000014.0000--278.78 KB0.33
ThreadedActor3100100000True1,432.064 ms341.260 ms225.7225 ms1.000.00214000.0000-1.00002000.0000-1897220.5 KB1.00
Channel100100000True1,135.409 ms677.421 ms448.0719 ms0.790.30216000.00002064.00004.00008000.00002000.00001821499.04 KB0.96

Here P represents whether it is computing sha512 hash or just a sum. When does the context switching outway the cost of async. Not till 100 threads with the larger compute payload. Or was something else going on here, looks like the jit found an optimization path reducing the completed work itmms, which we would expected to be close to 10 million like on the lower compute version. But it still looks like the hash compute still work correctly because the allocation is pretty similar to the thread. So the obvious solution to increasing efficient use of the thread pool through tasks is with a larger compute payload, thus mini batching may be the way to go. However, too large of batches could clog the thread pool increasing latency to the rest of the system utilizing the thread pool. Spawning a 100 threads could also do that to the rest of he system.

So far the examples have been with unbounded queues, what if we used bounded queues and see if adding a back pressure mechanism makes a difference.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
type ThreadedActor4<'tmsg>(bodyFn: 'tmsg -> unit, qDepth : uint64, ct : CancellationToken ) as this =

     let queue =  ConcurrentQueue<'tmsg>()
     let signal = new ManualResetEventSlim(false,1000) //avoids spin
     let rsignal = new ManualResetEventSlim(false,1000) //throttles
 
     let threadFn () =
         while not <| ct.IsCancellationRequested do
             let isSuccessful, message = queue.TryDequeue()
             if isSuccessful then 
                 bodyFn message
             else signal.Reset()
             if uint64 queue.Count <= qDepth && (not rsignal.IsSet) then rsignal.Set()
             if queue.IsEmpty && (not <| ct.IsCancellationRequested) then 
                 signal.Wait()
                 

         signal.Dispose()
 
 
     let thread =
         Thread(ThreadStart(threadFn), IsBackground = true, Name = "ActorThread")
 
     do thread.Start()
 
 
     member this.Enqueue(msg: 'tmsg) =
        if uint64 queue.Count < qDepth then
            queue.Enqueue(msg)
            if not <| signal.IsSet then signal.Set()

        else
            rsignal.Reset()
            if uint64 queue.Count >= qDepth then rsignal.Wait() //blocking throttle
            queue.Enqueue(msg)


     member this.QueueCount = queue.Count
 
     interface IDisposable with
         member __.Dispose() =
             thread.Join() 

This version adds a second ManuelResetEventSlim to create a wait at the enqueue if the queue exceeds its defined bounds. BEWARE of this code though it went through several iterations of subtle race condition bugs before it ran stably enough to pass the benchmark. What can happen is that a lot can happen between the condition to trigger a wait and the wait; putting it into a bad state where both are waiting in which case they will never be unstuck. Adding the spins to the ManuelResetEventSlim helped a lot, also thinking about how to recover from an unfortunate race condition.

MethodTNPMeanErrorStdDevMedianRatioRatioSDGen0Completed Work ItemsLock ContentionsGen1AllocatedAlloc Ratio
ThreadedActor41100000False13.20 ms0.319 ms0.211 ms13.23 ms1.000.00----5.73 KB1.00
Channel1100000False23.86 ms4.300 ms2.844 ms23.61 ms1.810.22-10512.00001.0000-892.59 KB155.87
ThreadedActor41100000True56.45 ms10.095 ms6.677 ms54.00 ms1.000.002000.0000---17193.05 KB1.00
Channel1100000True74.40 ms14.239 ms9.418 ms72.67 ms1.320.132000.000073682.00002.0000-22973.02 KB1.34
ThreadedActor415100000False40.93 ms5.924 ms3.919 ms41.67 ms1.000.00--4.0000-75.88 KB1.00
Channel15100000False71.77 ms51.068 ms33.778 ms53.28 ms1.810.94-66489.000025.0000-4557.92 KB60.07
ThreadedActor415100000True153.95 ms14.809 ms9.795 ms154.46 ms1.000.0031000.0000-17.0000-257885.78 KB1.00
Channel15100000True136.68 ms10.014 ms6.624 ms135.38 ms0.890.0633000.0000170032.000010.0000-271112.13 KB1.05
ThreadedActor430100000False68.97 ms9.827 ms6.500 ms71.24 ms1.000.00--1.0000-151.33 KB1.00
Channel30100000False73.10 ms37.890 ms25.062 ms65.95 ms1.070.37-87009.000022.0000-4806.56 KB31.76
ThreadedActor430100000True299.31 ms25.703 ms17.001 ms299.14 ms1.000.0064000.0000-30.0000-515771.48 KB1.00
Channel30100000True237.35 ms15.009 ms9.927 ms239.31 ms0.800.0765000.000076520.000048.00001000.0000520281.57 KB1.01

This benchmark was run with a the BoundedChannel and ThreadedActor4 both bound to 100 which is a bit low for a realistic workload. One pattern is seen is the Channel version doesn't handle the low workload all that well, but is able to perform really well when saturating the cpu.

How do Channels work? The key ingredient that makes it go burr is called IValueTaskSource which allows (only in few cases) the internal wait structure of tasks to be reused in a new value task where as with tasks a new task would have to be allocated for every 'loop'.

  1: 
  2: 
  3: 
  4: 
  5: 
  6: 
  7: 
  8: 
  9: 
 10: 
 11: 
 12: 
 13: 
 14: 
 15: 
 16: 
 17: 
 18: 
 19: 
 20: 
 21: 
 22: 
 23: 
 24: 
 25: 
 26: 
 27: 
 28: 
 29: 
 30: 
 31: 
 32: 
 33: 
 34: 
 35: 
 36: 
 37: 
 38: 
 39: 
 40: 
 41: 
 42: 
 43: 
 44: 
 45: 
 46: 
 47: 
 48: 
 49: 
 50: 
 51: 
 52: 
 53: 
 54: 
 55: 
 56: 
 57: 
 58: 
 59: 
 60: 
 61: 
 62: 
 63: 
 64: 
 65: 
 66: 
 67: 
 68: 
 69: 
 70: 
 71: 
 72: 
 73: 
 74: 
 75: 
 76: 
 77: 
 78: 
 79: 
 80: 
 81: 
 82: 
 83: 
 84: 
 85: 
 86: 
 87: 
 88: 
 89: 
 90: 
 91: 
 92: 
 93: 
 94: 
 95: 
 96: 
 97: 
 98: 
 99: 
100: 
101: 
102: 
103: 
104: 
105: 
type ResettableValueTaskSource(cancellationToken : CancellationToken) as this =

    let mutable _waitSource = ManualResetValueTaskSourceCore<bool>()
    do _waitSource.RunContinuationsAsynchronously <- false
    let mutable _waitSourceCancellation = cancellationToken.UnsafeRegister((fun x -> (x :?> ResettableValueTaskSource).CancelWaiter(cancellationToken)), this)
    let mutable _hasWaiter = 0
   

    member this.SignalWaiter() = 
        if Interlocked.Exchange(&_hasWaiter, 0) = 1
        then _waitSource.SetResult(true)
        ()
    member private this.CancelWaiter(cancellationToken : CancellationToken) = 
        if Interlocked.Exchange(&_hasWaiter, 0) = 1
        then _waitSource.SetException (ExceptionDispatchInfo.SetCurrentStackTrace (new OperationCanceledException(cancellationToken)))
        ()

    member this.WaitAsync() =
        if _hasWaiter <> 0
        then raise (new InvalidOperationException("Concurrent use is not supported") :> System.Exception)
        _waitSource.Reset ()
        Volatile.Write(&_hasWaiter, 1)
        new ValueTask(this :> IValueTaskSource, _waitSource.Version)
        
    interface IValueTaskSource with
        member this.GetResult(token : int16) = 
            _waitSource.GetResult (token) |> ignore

        member this.GetStatus(token : int16) = 
               (_waitSource.GetStatus (token)) :> ValueTaskSourceStatus

        member this.OnCompleted(continuation : Action<obj>, state : obj, token : System.Int16, flags : ValueTaskSourceOnCompletedFlags) = 
            _waitSource.OnCompleted (continuation, state, token, flags)

    interface IValueTaskSource<bool> with
        member this.GetResult(token : int16) = 
            _waitSource.GetResult (token)

        member this.GetStatus(token : int16) = 
               (_waitSource.GetStatus (token)) :> ValueTaskSourceStatus

        member this.OnCompleted(continuation : Action<obj>, state : obj, token : System.Int16, flags : ValueTaskSourceOnCompletedFlags) = 
            _waitSource.OnCompleted (continuation, state, token, flags)

            
type VACore<'tmsg> = 
    {
        mutable count: uint64
        queueDepth: uint64
        queue: ConcurrentQueue<'tmsg>
        vts: ResettableValueTaskSource
        vtsR: ResettableValueTaskSource
        ct: CancellationToken
    }

[<RequireQualifiedAccess>]    
module AsyncChannels =
    


    let init<'t> qDepth ct =
        let queue = ConcurrentQueue<'t>()
        let vts = ResettableValueTaskSource(ct) //slow down the enqueue (when queue reaches depth)
        let vtsR = ResettableValueTaskSource(ct) //prevents cpu cycling (when queue is empty)
        let mutable count = queue.Count |> uint64
        {
            count = count
            queueDepth = qDepth
            queue = queue
            vts = vts
            vtsR = vtsR
            ct = ct 
        }

    let enqueue (core : VACore<'t>) (item : 't) =
            core.queue.Enqueue(item)
            let cnt = Threading.Interlocked.Increment(&core.count)
            core.vtsR.SignalWaiter()

            if Threading.Interlocked.Read(&core.count) > core.queueDepth then
                core.vts.WaitAsync()
            else
                ValueTask.CompletedTask

    let dequeue (core : VACore<'t>) =
        let isSuccessful, message = core.queue.TryDequeue()
        if isSuccessful then 
            Threading.Interlocked.Decrement(&core.count)  
            core.vts.SignalWaiter()
            ValueSome(message)  
        else ValueNone 


    let iter1 (core : VACore<'t>) (fn : 't -> unit) =
        task {
            while not <| core.ct.IsCancellationRequested do
                let dequeue = dequeue core
                if dequeue.IsSome then
                    fn dequeue.Value
                if core.queue.IsEmpty && Interlocked.Read(&core.count) = 0UL then 
                    do! core.vtsR.WaitAsync()
                   
        }
    
        

The code here is extremely simplified, The ResettableValueTaskSource takes the role of ManuelResetEventSlim, but instead of scheduling to wake up the thread with kernel it schedules with the task scheduler. When the ResettableValueTaskSource is signaled the one and only one valuetask that is being awaited on will be complete and run its continuations on the thread pool as if it was a normal task.

MethodTNPMeanErrorStdDevRatioRatioSDGen0Completed Work ItemsLock ContentionsAllocatedAlloc Ratio
ValueActor1100000False38.47 ms109.297 ms28.384 ms1.000.00---46.68 KB1.00
BoundedChannel1100000False20.65 ms12.617 ms3.277 ms1.131.09-4857.00008.0000383.81 KB8.22
ValueActor1100000True99.34 ms97.873 ms25.417 ms1.000.00---17234.01 KB1.00
BoundedChannel1100000True128.94 ms29.091 ms7.555 ms1.360.32-98900.00008.000024944.06 KB1.45
ValueActor15100000False34.45 ms5.824 ms1.513 ms1.000.00---482.76 KB1.00
BoundedChannel15100000False76.75 ms41.562 ms10.794 ms2.230.32-111201.000052.00008843.37 KB18.32
ValueActor15100000True551.72 ms26.926 ms6.992 ms1.000.003000.0000--258284.42 KB1.00
BoundedChannel15100000True569.16 ms28.188 ms7.320 ms1.030.013000.0000803896.0000635.0000320931.56 KB1.24
ValueActor30100000False93.57 ms51.640 ms13.411 ms1.000.00---950.22 KB1.00
BoundedChannel30100000False127.36 ms32.105 ms8.338 ms1.380.13-54908.00001191.00003560.92 KB3.75
ValueActor30100000True1,075.95 ms40.364 ms10.482 ms1.000.006000.0000--516553.65 KB1.00
BoundedChannel30100000True1,073.02 ms24.850 ms6.453 ms1.000.016000.000059617.00002196.0000520664.98 KB1.01

In the scenario of 10,000 actors/loops that last the length of the program, can something even more efficient be done. I think so, but that is all for today.

https://github.com/musheddev/PerfBuf