Case Study: Abstracting Threading

Published: 2021-05-21
Edited: 2021-06-16, 2021-09-09

There were a few issues that I encountered while abstracting threading that I thought might be of interest, so I'll discuss them here.

The first was finding the right abstraction, the second was implementing any threading abstraction.

Finding the Right Abstraction

In my initial approach to abstracting threading, I was focused on code reuse. I'd already defined a mutable-state abstraction to hold the application's shared state and thought to reuse that to hold the progress updates and results of long-running threaded operations -- with a separate typeclass to abstract actually running something in a thread.

Here's some pseudo-code for what I'd defined:

data State r
  = Waiting
  | Running Float
  | Cancelled
  | Errored String
  | Completed r

class Monad a => Container a where
  create :: state -> a var
  read :: var -> a state
  update :: var -> (state -> state) -> a ()
  -- `state` contains either the operation's progress or result

class Monad a => Operation a where
  runInThread :: a () -> a ()
          

These interfaces seemed to be ok, their concerns were pretty orthogonal and the threading code could run operations regardless of whether they updated their progress or not -- a state variable to hold progress updates could be passed to the function being run in a thread separately as an argument.

But when I started implementing things, this abstraction didn't feel right; I was trying to do too many things in the wrong places.

For one thing, I'd been thinking to use the state variable to control the thread from the outside -- setting the Cancelled state externally would tell the thread to quit -- thus using the same channel that the function would send its output on as an input. But that's too much for one thing to do; I was unable to ensure that the output of the function wouldn't overwrite a control signal in, and vice-versa. (I'm ashamed of the naïveté this approach displays, as I actually have a decent amount of good threading experience and should know better.)

Disjoining the input and output responsibilities into separate State types and passing in two different state variables to a threaded method would have been cleaner, but it still wouldn't have ameliorated everything else that was wrong.

Passing a state variable to a threaded function at all was problematic because (a) it polluted what could otherwise have been pure code with the knowledge and use of the state abstraction and (b) it added the onerous responsibility of keeping track of the thread run-state to a function that already had its own work to get done.

As I was wrestling with these disadvantageous qualities, the following shifts in thinking took place:

  1. Have the threaded method return its output as a return-value, not via a state variable (duh)
  2. Move threading control (i.e. cancellation) into the threading abstraction instead of making the threaded function care about it and check a state variable for instructions
  3. Move progress-reporting into the threading abstraction (since I was only going to be threading operations that I wanted to know the running status of anyway)
  4. Hide the progress-reporting mechanism within the threading abstraction and pass a monadic function to the threaded method for it to call to report progress updates

Point 3 meant I could avoid having to pass around and keep track of a state variable and a thread handle in external code, and point 4 meant the threaded function could be otherwise essentially pure code (so easier to test).

After making these changes, I arrived at the following abstraction:

data Status r
  = Completed r
  | Errored SomeException
  | Progressing Float -- 0-1 of progress

class Monad m => Operation m o | m -> o where
  start :: ((Float -> m ()) -> m r) -> m (o r)
  cancel :: o r -> m ()
  poll :: o r -> m (Status r)
          

If the type-declaration of the start method seems confusing, it's just saying that the function that's going to be threaded has to take as an argument a monadic function it should call with progress updates, which the abstraction implementation will provide to it.

Implementing an Abstraction

With a good abstraction established, I thought things would move forward smoothly thenceforth. However, when I went to implement it, one detail proved problematic:

Threads in Haskell, so far as I've seen at least, can only be run explicitly in one of two base monads: IO or ST.

The base monad of our App type, which is what we're abstracting from, is already IO, so this felt like a non-issue, but since the threading abstraction was being implemented at the App level, the function being passed in to run in a thread would be of the App type as well, and not of type IO.

Whoops.

In order run a method of type App in an IO thread, I'd have to somehow go through the ReaderT GlobalContext that encased IO in the App type to get at the base monad.

Thankfully there are a couple of libraries to do just that; I ended up choosing to go with the monad-unlift library since it lets one unwrap from either IO or ST, and I thought I might want to use the latter in testing.

Here's the code to implement the abstraction:

data IORunState r
  = IORunState
    { asyncOf :: A.Async r
    , progressOf :: IORef Float
    }

instance Operation App IORunState where
  start op = do
    ioRef <- liftIO (newIORef 0)
    let report = liftIO . writeIORef ioRef
    UnliftBase runInBase <- askUnliftBase
    -- libs we're using appear to need `forkOS` to be used,
    -- which `asyncBound` uses
    async <- liftIO (Async.asyncBound (runInBase (op report)))
    return (IORunState async ioRef)
  cancel = liftIO . Async.cancel . asyncOf
  poll (IORunState a r) = do
    status <- liftIO (Async.poll a)
    case status of
      Nothing -> do
        p <- liftIO (readIORef r)
        return (Progressing p)
      Just (Left e) ->
        return (Errored e)
      Just (Right v) ->
        return (Completed v)
          

I was mildly surprised when this code actually compiled and ran without issue.

This code creates a schism in the monadic stack between the code in the main thread and the code in the function being run in a child thread, with the only remaining connection between them being the IO monad. With this schism, only the return value from the threaded function or effects in IO will become available to the caller; any modifications made to other parts of the thread-local copy of the monadic stack by the child thread will be lost once that thread has completed. Since the progress updates are written to an IO state variable behind the scenes, in-thread calls to the progress-reporting function will be visible to the outside world.

A few notes:

Continuing with the building metaphor from the Foundation post, this implementation would be equivalent to having two buildings each built on a common ground floor; the child building would have a copy of the information in the parent building's levels, but any modifications made in the child building (e.g. on a StateT level) wouldn't make it over to the same level in the parent building unless some extra work was done to shuttle it across on the ground floor.