This post is an investigation of persistent issue #1199 where an asynchronous exception caused a database connnection to be improperly returned to the pool.
The linked issue contains some debugging notes, along with the PR that fixes the problem.
While I was able to identify the problem and provide a fix, I don’t really understand what happened - it’s a complex bit of work.
So I’m going to write this up as an exploration into the exact code paths that are happening.
Data.Poolresource-pool is a how persistent manages concurrent pooling and sharing of database connections.
When you create a Pool, you specify how to create resources, destroy them, and then some information around resource management: how long to keep an unused resource open, how many sub-pools to maintain, and how many resources per sub-pool (aka stripe).
persistent calls createPool here.
The database client libraries provide a LogFunc -> IO SqlBackend that is used to create new database connections, and the close' delegates to the connClose field on the SqlBackend record.
While resource-pool isn’t seeing much maintenance activity, it’s relatively well tested and reliable.
Once you’ve got a Pool a from createPool, the recommended way to use it is withResource:
withResource
:: (MonadBaseControl IO m)
=> Pool a
-> (a -> m b)
-> m b
withResource pool act = control $ \runInIO -> mask $ \restore -> do
(resource, local) <- takeResource pool
ret <- restore (runInIO (act resource)) `onException`
destroyResource pool local resource
putResource local resource
return ret
Data.AcquireNow, in persistent-2.10.5, a new API based on the resourcet package’s Data.Acquire was introduced, and this API became the underlying implementation for the runSqlPool family of functions.
The underlying functionality is in the new function unsafeAcquireSqlConnFromPool, which was later factored out into resourcet-pool.
This change was introduced because resource-pool operates in MonadBaseControl, which is incompatible with many other monad transformers - specifically, ConduitT.
Acquire is based on MonadUnliftIO, which is compatible.
In hindsight, we could have just changed the code to use MonadUnliftIO - it’s relatively straightforward to do.
A term with a single constrant like MonadBaseControl IO m => m a can be specialized to IO a, and we can then run that using withRunInIO from unliftio.
toUnliftIO
:: MonadUnliftIO n
=> (forall m. MonadBaseControl IO m => m a)
-> n a
toUnliftIO mbc =
withRunInIO $ \runInIO -> do
mbc
toPlainIO
:: (forall m. MonadBaseControl IO m => m a)
-> IO a
toPlainIO mbc = mbc
toMonadIO
:: MonadIO n
=> (forall m. MonadBaseControl IO m => m a)
-> n a
toMonadIO mbc = liftIO (toPlainIO mbc)
Acquire vs PoolI didn’t realize this at the time, but Data.Acquire is inherently a weaker tool than Data.Pool.
Data.Acquire provides a means of creating a new resource, and also freeing it automatically when a scope is exited.
Data.Pool keeps track of resources, resource counts, and occasionally destroys them if they’re unsued.
So let’s look at our conversion function:
unsafeAcquireSqlConnFromPool = do
pool <- MonadReader.ask
let freeConn :: (backend, LocalPool backend) -> ReleaseType -> IO ()
freeConn (res, localPool) relType = case relType of
ReleaseException -> P.destroyResource pool localPool res
_ -> P.putResource localPool res
return $ fst <$> mkAcquireType (P.takeResource pool) freeConn
mkAcquireType is analogous to createPool - it creates a handle Acquire a that can be used with a function named with:
with :: MonadUnliftIO m
=> Acquire a
-> (a -> m b)
-> m b
with (Acquire f) g = withRunInIO $ \run -> E.mask $ \restore -> do
Allocated x free <- f restore
res <- restore (run (g x)) `E.onException` free ReleaseException
free ReleaseNormal
return res
with is aliased to withAcquire, which I’ll use from here on out to disambiguate.
You may notice that withAcquire and withResource are implemented nearly identically.
withResource uses MonadBaseControl and withAcquire uses MonadUnliftIO, and that’s the whole of the difference.
They have the same async exception handling with mask and use the same onException functions.
All the exception handling stuff is from Control.Exception, so we’re not using UnliftIO.Exception or Control.Monad.Catch or Control.Exception.Safe here.
These are really similar.
When we look at how the unsafeSqlConnFromPool works, it should provide identical behavior.
For free, we case on ReleaseType and do destroyResource on exception and putResource on any other exit.
We’re not handling ReleaseEarly specially - this constructor is only used when we use ResourceT’s release function on a value.
Using withAcquire, we’ll only ever pass ReleaseNormal and ReleaseException.
So this is locally safe.
Weirdly enough, resourcet doesn’t really depend on the Acquire type at all, at least not directly - the ReleaseMap type contains a function ReleaseType -> IO () for freeing resources, but doesn’t mention anything else about it.
Anyway, let’s get back on track.
Since withAcquire and withResource are nearly identical, it may be our translating code that is the problem.
We can use algebraic substitution to check this out.
Let’s look at mkAcquireType:
mkAcquireType
:: IO a -- ^ acquire the resource
-> (a -> ReleaseType -> IO ()) -- ^ free the resource
-> Acquire a
mkAcquireType create free = Acquire $ \_ -> do
x <- create
return $! Allocated x (free x)
The ignored parameter in the lambda there is a function that looks like restore - and we’re ignoring it.
So, this action gets run when we unpack the Acquire in withAcquire.
Let’s plug in our create and free parameters:
mkAcquireType
:: IO a -- ^ acquire the resource
-> (a -> ReleaseType -> IO ()) -- ^ free the resource
-> Acquire a
mkAcquireType (create = P.takeResource pool) (free = freeConn) = Acquire $ \_ -> do
x <- (P.takeResource pool)
return $! Allocated x (freeConn x)
where
freeConn (res, localPool) relType = case relType of
ReleaseException -> P.destroyResource pool localPool res
_ -> P.putResource localPool res
The pool variable is captured in the closure.
Now we can look at withAcquire, and plug in our behavior:
withAcquire (Acquire f) g = withRunInIO $ \run -> E.mask $ \restore -> do
-- `f` ignores the `restore` argument: possible bug?
Allocated x free <- f restore
-- so `x` here comes from `P.takeResource pool`
-- free = freeConn
ret <- restore (run (g x))
`E.onException` free ReleaseException
free ReleaseNormal
return ret
Let’s plug in the specific case for free:
withAcquire (Acquire f) g = withRunInIO $ \run -> E.mask $ \restore -> do
-- `f` ignores the `restore` argument: possible bug?
Allocated x free <- f restore
-- so `x` here comes from `P.takeResource pool`
-- free = freeConn
ret <- restore (run (g x))
`E.onException` P.destroyResource pool localPool res
P.putResource localPool res
return ret
Closer, closer… Let’s unpack the Allocated stuff:
withAcquire (Acquire _) g = withRunInIO $ \run -> E.mask $ \restore -> do
-- `f` ignores the `restore` argument: possible bug?
Allocated x free <- f restore
-- so `x` here comes from `P.takeResource pool`
-- free = freeConn
ret <- restore (run (g x))
`E.onException` P.destroyResource pool localPool res
P.putResource localPool res
return ret
where
f _ = do
x <- (P.takeResource pool)
return $! Allocated x (freeConn x)
-- OK, let's splice in the definition of `f`:
withAcquire (Acquire _) g = withRunInIO $ \run -> E.mask $ \restore -> do
-- `f` ignores the `restore` argument: possible bug?
Allocated x free <- do
x <- P.takeResource pool
return $! Allocated x (freeConn x)
-- so `x` here comes from `P.takeResource pool`
-- free = freeConn
ret <- restore (run (g x))
`E.onException` P.destroyResource pool localPool res
P.putResource localPool res
return ret
-- Now let's remove the `Allocated` constructor:
withAcquire (Acquire _) g = withRunInIO $ \run -> E.mask $ \restore -> do
x@(res, localPool) <- P.takeResource pool
ret <- restore (run (g x))
`E.onException` P.destroyResource pool localPool res
P.putResource localPool res
return ret
With this, we’re now nearly identical with withResource (copied again):
withResource
:: (MonadBaseControl IO m)
=> Pool a
-> (a -> m b)
-> m b
withResource pool act =
control $ \runInIO -> mask $ \restore -> do
(resource, local) <- takeResource pool
ret <- restore (runInIO (act resource)) `onException`
destroyResource pool local resource
putResource local resource
return ret
The only difference here is that Acquire also passes the LocalPool to the given action.
In the persistent code, we use fmap fst so that it only passes the resource to the callback.
So, I’m not sure this function is at fault. Let’s see how we call this function.
>>= doing there??acquireSqlConnFromPool is what’s actually called by runSqlPool in this version of the code.
acquireSqlConnFromPool
:: (MonadReader (Pool backend) m, BackendCompatible SqlBackend backend)
=> m (Acquire backend)
acquireSqlConnFromPool = do
connFromPool <- unsafeAcquireSqlConnFromPool
return $ connFromPool >>= acquireSqlConn
That >>= is weird. What’s going on here?
We have return :: a -> m a, and then f >>= g.
f :: Acquire backend - so then g must have the type g :: backend -> Acquire backend, meaning that we’re using the >>= of Acquire a -> (a -> Acquire b) -> Acquire b.
acquireSqlConn cashes out to rawAcquireSqlConn:
rawAcquireSqlConn
:: forall backend m
. (MonadReader backend m, BackendCompatible SqlBackend backend)
=> Maybe IsolationLevel -> m (Acquire backend)
rawAcquireSqlConn isolation = do
conn <- MonadReader.ask
let rawConn :: SqlBackend
rawConn = projectBackend conn
getter :: T.Text -> IO Statement
getter = getStmtConn rawConn
beginTransaction :: IO backend
beginTransaction = conn <$ connBegin rawConn getter isolation
finishTransaction :: backend -> ReleaseType -> IO ()
finishTransaction _ relType = case relType of
ReleaseException -> connRollback rawConn getter
_ -> connCommit rawConn getter
return $ mkAcquireType beginTransaction finishTransaction
So, in the investigation, the exception (libpq: failed (another command is already in progress)) would happen (as best as I can tell) when we try to call connRollback.
The problem is somewhere around here.
Um excuse me what? This is also operating in m (Acquire backend), not Acquire backend.
How is it possibly being used on the RHS of a >>=?
… Oh, right.
Just like MonadBaseControl IO m => m a can be concretized to IO a, we can concretize MonadReader r m => m a to r -> a.
So what’s happening here is we’re picking the spcialized type:
rawAcquireSqlConn
:: Maybe IsolationLevel -> backend -> Acquire backend
Wild.
Well, let’s look at >>= for Acquire:
instance Monad Acquire where
return = pure
Acquire f >>= g' = Acquire $ \restore -> do
Allocated x free1 <- f restore
let Acquire g = g' x
Allocated y free2 <- g restore `E.onException` free1 ReleaseException
return $! Allocated y (\rt -> free2 rt `E.finally` free1 rt)
Hmm! This smells funny. The problem occurs when we try to roll back the transaction. So let’s apply some more substitution here.
Acquire f contains:
\_ -> do
x <- P.takeResource pool
pure $ Allocated x (freeConn x)
And g' contains (simplifying a tiny bit):
\sqlBackend -> do
Acquire $ \_ -> do
_ <- beginTransaction sqlBackend getter isolation
pure $ Allocated sqlBackend $ \case
ReleaseException ->
connRollback sqlBackend
_ ->
connCommit sqlBackend
So, we can start inlining.
Acquire $ \restore -> do
Allocated x free1 <- (\_ -> do
x <- P.takeResource pool
pure $ Allocated x (freeConn x)) restore
let Acquire g = g' x
Allocated y free2 <- g restore `E.onException` free1 ReleaseException
return $! Allocated y (\rt -> free2 rt `E.finally` free1 rt)
-- (\_ -> x) restore = x
Acquire $ \restore -> do
Allocated x free1 <- do
x <- P.takeResource pool
pure $ Allocated x (freeConn x)
let Acquire g = g' x
Allocated y free2 <- g restore `E.onException` free1 ReleaseException
return $! Allocated y (\rt -> free2 rt `E.finally` free1 rt)
-- float `c` and `freeConn` up
Acquire $ \restore -> do
x <- P.takeResource pool
let free1 = freeConn x
let Acquire g = g' x
Allocated y free2 <- g restore `E.onException` free1 ReleaseException
return $! Allocated y (\rt -> free2 rt `E.finally` free1 rt)
-- inline g'
Acquire $ \restore -> do
x <- P.takeResource pool
let free1 = freeConn x
let sqlBackend = x
let Acquire g =
Acquire $ \_ -> do
_ <- beginTransaction sqlBackend getter isolation
pure $ Allocated sqlBackend $ \case
ReleaseException ->
connRollback sqlBackend
_ ->
connCommit sqlBackend
Allocated y free2 <- g restore `E.onException` free1 ReleaseException
return $! Allocated y (\rt -> free2 rt `E.finally` free1 rt)
-- Remove `Acquire` constructor:
Acquire $ \restore -> do
x <- P.takeResource pool
let free1 = freeConn x
let sqlBackend = x
let g _ = do
_ <- beginTransaction sqlBackend getter isolation
pure $ Allocated sqlBackend $ \case
ReleaseException ->
connRollback sqlBackend
_ ->
connCommit sqlBackend
Allocated y free2 <- g restore `E.onException` free1 ReleaseException
return $! Allocated y (\rt -> free2 rt `E.finally` free1 rt)
-- Inline `g`, ignore `restore` parameter
Acquire $ \restore -> do
x <- P.takeResource pool
let free1 = freeConn x
let sqlBackend = x
Allocated y free2 <-
(do
_ <- beginTransaction sqlBackend getter isolation
pure $ Allocated sqlBackend $ \case
ReleaseException ->
connRollback sqlBackend
_ ->
connCommit sqlBackend
) `E.onException` free1 ReleaseException
return $! Allocated y (\rt -> free2 rt `E.finally` free1 rt)
Now, this next transformation feels a bit tricky.
I’m going to float beginTransaction up and put the E.onException only on it.
Note that we’re not actually running the free2 action here - just preparing it.
Then I’ll assign it with a let.
Acquire $ \restore -> do
x <- P.takeResource pool
let free1 = freeConn x
let (sqlBackend, localPool) = x
_ <- beginTransaction sqlBackend getter isolation
`E.onException` free1 ReleaseException
let free2 = \case
ReleaseException ->
connRollback sqlBackend
_ ->
connCommit sqlBackend
return $! Allocated y (\rt -> free2 rt `E.finally` free1 rt)
-- Inline free1 and free2
Acquire $ \restore -> do
x <- P.takeResource pool
let (sqlBackend, localPool) = x
_ <- beginTransaction sqlBackend getter isolation
`E.onException` freeConn x ReleaseException
return $! Allocated y $ \rt ->
(\case
ReleaseException ->
connRollback sqlBackend
_ ->
connCommit sqlBackend) rt
`E.finally`
(freeConn x rt)
-- Inline freeConn
Acquire $ \restore -> do
x <- P.takeResource pool
let (sqlBackend, localPool) = x
_ <- beginTransaction sqlBackend getter isolation
`E.onException`
P.destroyResource pool localPool sqlBackend
return $! Allocated y $ \rt ->
(case rt of
ReleaseException ->
connRollback sqlBackend
_ ->
connCommit sqlBackend)
`E.finally` do
case rt of
ReleaseException ->
P.destroyResource pool localPool sqlBackend
_ ->
P.putResource localPool sqlBackend
I think it’s important to note that, again we don’t ever actually call restore.
So the masking state is inherited and not ever changed.
It feels important but I’m not sure if it actually is.
Let’s plug this into withAcquire now.
withAcquire (Acquire f) g = withRunInIO $ \run -> E.mask $ \restore -> do
Allocated x free <- f restore
res <- restore (run (g x)) `E.onException` free ReleaseException
free ReleaseNormal
return res
-- Inline `f`. Since `restore` is never called, we can omit passing it as
-- a parameter.
withAcquire (Acquire f) g = withRunInIO $ \run -> E.mask $ \restore -> do
Allocated x free <- do
x <- P.takeResource pool
let (sqlBackend, localPool) = x
_ <- beginTransaction sqlBackend getter isolation
`E.onException` free1 ReleaseException
return $! Allocated x $ \rt ->
(case rt of
ReleaseException ->
connRollback sqlBackend
_ ->
connCommit sqlBackend)
`E.finally` do
case rt of
ReleaseException ->
P.destroyResource pool localPool sqlBackend
_ ->
P.putResource localPool sqlBackend
res <- restore (run (g x)) `E.onException` free ReleaseException
free ReleaseNormal
return res
-- float `x <- P.takeResource pool` to the top, and define `free` using `let`
withAcquire (Acquire f) g = withRunInIO $ \run -> E.mask $ \restore -> do
x <- P.takeResource pool
let free1 = freeConn x
let (sqlBackend, localPool) = x
_ <- beginTransaction sqlBackend getter isolation
`E.onException` free1 ReleaseException
let free rt =
(case rt of
ReleaseException ->
connRollback sqlBackend
_ ->
connCommit sqlBackend)
`E.finally` do
case rt of
ReleaseException ->
P.destroyResource pool localPool sqlBackend
_ ->
P.putResource localPool sqlBackend
res <- restore (run (g x)) `E.onException` free ReleaseException
free ReleaseNormal
return res
-- inline `free` for each case:
withAcquire (Acquire f) g = withRunInIO $ \run -> E.mask $ \restore -> do
x <- P.takeResource pool
let (sqlBackend, localPool) = x
_ <- beginTransaction sqlBackend getter isolation
`E.onException`
P.destroyResource pool localPool sqlBackend
res <- restore (run (g x)) `E.onException` do
connRollback sqlBackend
`E.finally`
P.destroyResource pool localPool sqlBackend
do -- ReleaseNormal
connCommit sqlBackend
`E.finally` do
P.putResource localPool sqlBackend
return res
Let’s consider our masking state.
We’re masked for everything except for the restoure (run (g x)) call.
Including beginning the transaction and committing the transaction.
But we can still receive asynchronous exceptions during interruptible operations. Interruptible operations include “anything that can block or perform IO,” which seems very likely to include the Postgres code here.
Let’s compare this with the original code.
The original code delegated to runSqlConn after acquiring a SqlBackend from the Pool in MonadUnliftIO.
runSqlConn :: (MonadUnliftIO m, BackendCompatible SqlBackend backend) => ReaderT backend m a -> backend -> m a
runSqlConn r conn = withRunInIO $ \runInIO -> mask $ \restore -> do
let conn' = projectBackend conn
getter = getStmtConn conn'
restore $ connBegin conn' getter Nothing
x <- onException
(restore $ runInIO $ runReaderT r conn)
(restore $ connRollback conn' getter)
restore $ connCommit conn' getter
return x
We’ll inline this into runSqlPool, so we’ll now see:
runSqlPool r pconn =
withRunInIO $ \run ->
withResource pconn $ run . runSqlConn r
-- expand lambda
runSqlPool r pconn =
withRunInIO $ \run ->
withResource pconn $ \conn ->
run $ runSqlConn r conn
-- inline runSqlConn
runSqlPool r pconn =
withRunInIO $ \run ->
withResource pconn $ \conn ->
run $ withRunInIO $ \runInIO ->
mask $ \restore -> do
let conn' = projectBackend conn
getter = getStmtConn conn'
restore $ connBegin conn' getter Nothing
x <- onException
(restore $ runInIO $ runReaderT r conn)
(restore $ connRollback conn' getter)
restore $ connCommit conn' getter
return x
Kind of a lot of withStuff going on, including two withRunInIOs lol.
Let’s make it even worse by inlining withResource:
-- abstract action to a variable
runSqlPool r pconn =
withRunInIO $ \run ->
withResource pconn $ \conn ->
let act =
run $ withRunInIO $ \runInIO ->
mask $ \restore -> do
let conn' = projectBackend conn
getter = getStmtConn conn'
restore $ connBegin conn' getter Nothing
x <- onException
(restore $ runInIO $ runReaderT r conn)
(restore $ connRollback conn' getter)
restore $ connCommit conn' getter
return x
in act
-- inline withResource
runSqlPool r pconn =
withRunInIO $ \run ->
-- withResource pconn $ \conn ->
control $ \runInIO0 ->
mask $ \restore0 -> do
let act conn =
run $ withRunInIO $ \runInIO1 ->
mask $ \restore1 -> do
let conn' = projectBackend conn
getter = getStmtConn conn'
restore1 $ connBegin conn' getter Nothing
x <- onException
(restore1 $ runInIO1 $ runReaderT r conn)
(restore1 $ connRollback conn' getter)
restore1 $ connCommit conn' getter
return x
(resource, local) <- takeResource pool
ret <- restore0 (runInIO0 (act resource)) `onException`
destroyResource pool local resource
putResource local resource
return ret
-- inline `act`
runSqlPool r pconn =
withRunInIO $ \run ->
-- withResource pconn $ \conn ->
control $ \runInIO0 ->
mask $ \restore0 -> do
(resource, local) <- takeResource pool
ret <- restore0 (runInIO0 (
run $ withRunInIO $ \runInIO1 ->
mask $ \restore1 -> do
let conn = resource
let conn' = projectBackend conn
getter = getStmtConn conn'
restore1 $ connBegin conn' getter Nothing
x <- onException
(restore1 $ runInIO1 $ runReaderT r conn)
(restore1 $ connRollback conn' getter)
restore1 $ connCommit conn' getter
return x)) `onException`
destroyResource pool local resource
putResource local resource
return ret
The restore paratmer in mask doesn’t unmask it completely - it restores the existing masking state before the mask was entered.
So mask $ \restore -> mask $ \restore -> restore (print 10) doesn’t have print 10 in an unmasked state, but the same mask as before.
However, here, we have this pattern:
mask $ \restore -> do
restore $ do
mask $ \restore' -> do
...
Which is interesting!
runSqlPool r pconn =
-- Unmasked
withRunInIO $ \run ->
control $ \runInIO0 ->
mask $ \restore0 -> do
-- Masked
(resource, local) <- takeResource pool
ret <- restore0
-- Unmasked
(runInIO0 $ run $ withRunInIO $ \runInIO1 ->
-- Masked
mask $ \restore1 -> do
let conn = resource
let conn' = projectBackend conn
getter = getStmtConn conn'
-- Unmasked
restore1 $ connBegin conn' getter Nothing
x <- onException
-- Unmasked
(restore1 $ runInIO1 $ runReaderT r conn)
-- Unmasked
(restore1 $ connRollback conn' getter)
restore1 $ do --unmasked
connCommit conn' getter
return x)
-- Masked
`onException`
destroyResource pool local resource
-- Still masked
putResource local resource
return ret
So our masked actions are:
takeResource poolonExceptiononException and then destroyResourceputResourceUnmasked, we have:
connBeginr (the action passed to runSqlConn)connRollbackconnCommitLet’s compare with withAcquire which was all inlined above:
takeResourcebeginTransactiondestroyResourceconnRollbackdestroyResource againconnCommitputResourcerun (g x) – the action passed to withAcquire and runSqlConn.So withAcquire actually has quite a bit more masking going on!
Interesting.
Remembering, the problem occurs when the thread killed exception happens and the connRollback function is called, causing libpq to die with the “command in progress” error.
So, we throw a killThread at our withAcquire function.
It’ll land as soon as we’re unmasked, or an interruptible action occurs.
Since almost all of it is masked, we need to determine what the interruptible operations are.
takeResource might be interruptible - it has an STM transaction, which does call retry.
I don’t know if any code with retry triggers an interrupt, or if only actually calling retry can trigger an interruptible state.
Based on a quick and bewildering look at the GHC source, I think it’s just that retry itself can be interrupted.
retry occurs when there are no available entries in the local pool and we’re at max resources for the pool.
This is exactly the scenario this test is exercising: a single stripe with a single resource that’s constantly in use.
beginTransaction kicks off an IO action to postgres, so it is almost definitely interruptible.
Same for connRollback and connCommit.
So the masked-state for these items in withAcquire is probably not a big deal - but we could check by using uninterruptibleMask on them.
I wish I had a more satisfying conclusion here, but I’m all out of time to write on this for now. Please comment on the relevant GitHub issues if you’re interested or have some insight!