There were a few issues that I encountered while abstracting threading that I thought might be of interest, so I'll discuss them here.
The first was finding the right abstraction, the second was implementing any threading abstraction.
Finding the Right Abstraction
In my initial approach to abstracting threading, I was focused on code reuse. I'd already defined a mutable-state abstraction to hold the application's shared state and thought to reuse that to hold the progress updates and results of long-running threaded operations -- with a separate typeclass to abstract actually running something in a thread.
Here's some pseudo-code for what I'd defined:
data State r
= Waiting
| Running Float
| Cancelled
| Errored String
| Completed r
class Monad a => Container a where
create :: state -> a var
read :: var -> a state
update :: var -> (state -> state) -> a ()
-- `state` contains either the operation's progress or result
class Monad a => Operation a where
runInThread :: a () -> a ()
These interfaces seemed to be ok, their concerns were pretty orthogonal and the threading code could run operations regardless of whether they updated their progress or not -- a state variable to hold progress updates could be passed to the function being run in a thread separately as an argument.
But when I started implementing things, this abstraction didn't feel right; I was trying to do too many things in the wrong places.
For one thing, I'd been thinking to use the state variable to control the thread from the outside -- setting the Cancelled
state externally would tell the thread to quit -- thus using the same channel that the function would send its output on as an input. But that's too much for one thing to do; I was unable to ensure that the output of the function wouldn't overwrite a control signal in, and vice-versa. (I'm ashamed of the naïveté this approach displays, as I actually have a decent amount of good threading experience and should know better.)
Disjoining the input and output responsibilities into separate State
types and passing in two different state variables to a threaded method would have been cleaner, but it still wouldn't have ameliorated everything else that was wrong.
Passing a state variable to a threaded function at all was problematic because (a) it polluted what could otherwise have been pure code with the knowledge and use of the state abstraction and (b) it added the onerous responsibility of keeping track of the thread run-state to a function that already had its own work to get done.
As I was wrestling with these disadvantageous qualities, the following shifts in thinking took place:
- Have the threaded method return its output as a return-value, not via a state variable (duh)
- Move threading control (i.e. cancellation) into the threading abstraction instead of making the threaded function care about it and check a state variable for instructions
- Move progress-reporting into the threading abstraction (since I was only going to be threading operations that I wanted to know the running status of anyway)
- Hide the progress-reporting mechanism within the threading abstraction and pass a monadic function to the threaded method for it to call to report progress updates
Point 3 meant I could avoid having to pass around and keep track of a state variable and a thread handle in external code, and point 4 meant the threaded function could be otherwise essentially pure code (so easier to test).
After making these changes, I arrived at the following abstraction:
data Status r
= Completed r
| Errored SomeException
| Progressing Float -- 0-1 of progress
class Monad m => Operation m o | m -> o where
start :: ((Float -> m ()) -> m r) -> m (o r)
cancel :: o r -> m ()
poll :: o r -> m (Status r)
If the type-declaration of the start
method seems confusing, it's just saying that the function that's going to be threaded has to take as an argument a monadic function it should call with progress updates, which the abstraction implementation will provide to it.
Implementing an Abstraction
With a good abstraction established, I thought things would move forward smoothly thenceforth. However, when I went to implement it, one detail proved problematic:
Threads in Haskell, so far as I've seen at least, can only be run explicitly in one of two base monads: IO
or ST
.
The base monad of our App
type, which is what we're abstracting from, is already IO
, so this felt like a non-issue, but since the threading abstraction was being implemented at the App
level, the function being passed in to run in a thread would be of the App
type as well, and not of type IO
.
Whoops.
In order run a method of type App
in an IO
thread, I'd have to somehow go through the ReaderT GlobalContext
that encased IO
in the App
type to get at the base monad.
Thankfully there are a couple of libraries to do just that; I ended up choosing to go with the monad-unlift library since it lets one unwrap from either IO
or ST
, and I thought I might want to use the latter in testing.
Here's the code to implement the abstraction:
data IORunState r
= IORunState
{ asyncOf :: A.Async r
, progressOf :: IORef Float
}
instance Operation App IORunState where
start op = do
ioRef <- liftIO (newIORef 0)
let report = liftIO . writeIORef ioRef
UnliftBase runInBase <- askUnliftBase
-- libs we're using appear to need `forkOS` to be used,
-- which `asyncBound` uses
async <- liftIO (Async.asyncBound (runInBase (op report)))
return (IORunState async ioRef)
cancel = liftIO . Async.cancel . asyncOf
poll (IORunState a r) = do
status <- liftIO (Async.poll a)
case status of
Nothing -> do
p <- liftIO (readIORef r)
return (Progressing p)
Just (Left e) ->
return (Errored e)
Just (Right v) ->
return (Completed v)
I was mildly surprised when this code actually compiled and ran without issue.
This code creates a schism in the monadic stack between the code in the main thread and the code in the function being run in a child thread, with the only remaining connection between them being the IO
monad. With this schism, only the return value from the threaded function or effects in IO
will become available to the caller; any modifications made to other parts of the thread-local copy of the monadic stack by the child thread will be lost once that thread has completed. Since the progress updates are written to an IO
state variable behind the scenes, in-thread calls to the progress-reporting function will be visible to the outside world.
A few notes:
- I decided to change from using
IO
threads directly to using theControl.Concurrent.Async
module, since it supports easier cancellation, butIO
is still used to run the threads underneath it - The readme of the monad-unlift library says that the library has some ways to make coordinating logic between threads more convenient, if more communication between threads is desired, but I'm just using an
IORef
for progress reporting
Continuing with the building metaphor from the Foundation post, this implementation would be equivalent to having two buildings each built on a common ground floor; the child building would have a copy of the information in the parent building's levels, but any modifications made in the child building (e.g. on a StateT
level) wouldn't make it over to the same level in the parent building unless some extra work was done to shuttle it across on the ground floor.