Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async exception issue #1207

Merged
merged 13 commits into from
Mar 17, 2021
98 changes: 98 additions & 0 deletions persistent-postgresql/conn-killed/Main.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
{-# LANGUAGE ScopedTypeVariables, StandaloneDeriving, GeneralizedNewtypeDeriving, DerivingStrategies #-}
{-# LANGUAGE OverloadedStrings, QuantifiedConstraints #-}
{-# LANGUAGE TypeApplications #-}
{-# language OverloadedStrings #-}

-- | This executable is a test of the issue raised in #1199.
module Main where

import Prelude hiding (show)
import qualified Prelude

import qualified Data.Text as Text
import Control.Monad.IO.Class
import qualified Control.Monad as Monad
import qualified UnliftIO.Concurrent as Concurrent
import qualified UnliftIO.Exception as Exception
import qualified Database.Persist as Persist
import qualified Database.Persist.Sql as Persist
import qualified Database.Persist.Postgresql as Persist
import qualified Control.Monad.Logger as Logger
import Control.Monad.Logger
import qualified Data.ByteString as BS
import qualified Data.Pool as Pool
import Data.Time
import UnliftIO
import Data.Coerce
import Control.Monad.Trans.Reader
import Control.Monad.Trans

newtype LogPrefixT m a = LogPrefixT { runLogPrefixT :: ReaderT LogStr m a }
deriving newtype
(Functor, Applicative, Monad, MonadIO, MonadTrans)

instance MonadLogger m => MonadLogger (LogPrefixT m) where
monadLoggerLog loc src lvl msg = LogPrefixT $ ReaderT $ \prefix ->
monadLoggerLog loc src lvl (toLogStr prefix <> toLogStr msg)

deriving newtype instance (forall a b. Coercible a b => Coercible (m a) (m b), MonadUnliftIO m) => MonadUnliftIO (LogPrefixT m)

prefixLogs :: Text.Text -> LogPrefixT m a -> m a
prefixLogs prefix =
flip runReaderT (toLogStr $! mconcat ["[", prefix, "] "]) . runLogPrefixT

infixr 5 `prefixLogs`
show :: Show a => a -> Text.Text
show = Text.pack . Prelude.show

main :: IO ()
main = runStdoutLoggingT $ Concurrent.myThreadId >>= \tid -> prefixLogs (show tid) $ do

-- I started a postgres server with:
-- docker run --rm --name some-postgres -p 5432:5432 -e POSTGRES_PASSWORD=secret postgres
pool <- Logger.runNoLoggingT $ Persist.createPostgresqlPool "postgresql://postgres:secret@localhost:5433/postgres" 1

logInfoN "creating table..."
Monad.void $ liftIO $ createTableFoo pool

liftIO getCurrentTime >>= \now ->
simulateFailedLongRunningPostgresCall pool

-- logInfoN "destroying resources"
-- liftIO $ Pool.destroyAllResources pool

logInfoN "pg_sleep"
result :: Either Exception.SomeException [Persist.Single (Maybe String)] <-
Exception.try . (liftIO . (flip Persist.runSqlPersistMPool) pool) $ do
Persist.rawSql @(Persist.Single (Maybe String)) "select pg_sleep(2)" []

-- when we try the above we get back:
-- 'result: Left libpq: failed (another command is already in progress'
-- this is because the connection went back into the pool before it was ready
-- or perhaps it should have been destroyed and a new connection created and put into the pool?
logInfoN $ "result: " <> show result

createTableFoo :: Pool.Pool Persist.SqlBackend -> IO ()
createTableFoo pool = (flip Persist.runSqlPersistMPool) pool $ do
Persist.rawExecute "CREATE table if not exists foo(id int);" []

simulateFailedLongRunningPostgresCall
:: (MonadLogger m, MonadUnliftIO m, forall a b. Coercible a b => Coercible (m a) (m b)) => Pool.Pool Persist.SqlBackend -> m ()
simulateFailedLongRunningPostgresCall pool = do
threadId <- Concurrent.forkIO
$ (do
me <- Concurrent.myThreadId
prefixLogs (show me) $ do
let numThings :: Int = 100000000
logInfoN $ "start inserting " <> show numThings <> " things"

(`Persist.runSqlPool` pool) $ do
logInfoN "inside of thing"
Monad.forM_ [1 .. numThings] $ \i -> do
Monad.when (i `mod` 1000 == 0) $
logInfoN $ "Thing #: " <> show i
Persist.rawExecute "insert into foo values(1);" []
)
Concurrent.threadDelay 1000000
Monad.void $ Concurrent.killThread threadId
logInfoN "killed thread"
9 changes: 8 additions & 1 deletion persistent/ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Changelog for persistent

## 2.12
## 2.12 (unreleased)

* [#1162](https://github.com/yesodweb/persistent/pull/1162)
* Replace `askLogFunc` with `askLoggerIO`
Expand All @@ -12,6 +12,13 @@
* [#1179](https://github.com/yesodweb/persistent/pull/1179)
* Added `Compatible`, a newtype for marking a backend as compatible with another. Use it with `DerivingVia` to derive simple instances based on backend compatibility.
* Added `makeCompatibleInstances` and `makeCompatibleKeyInstances`, TemplateHaskell invocations for auto-generating standalone derivations using `Compatible` and `DerivingVia`.
* [#1207](https://github.com/yesodweb/persistent/pull/1207)
* @codygman discovered a bug in [issue #1199](https://github.com/yesodweb/persistent/issues/1199) where postgres connections were being returned to the `Pool SqlBackend` in an inconsistent state.
@parsonsmatt debugged the issue and determined that it had something to do with asynchronous exceptions.
Declaring it to be "out of his pay grade," he ripped the `poolToAcquire` function out and replaced it with `Data.Pool.withResource`, which doesn't exhibit the bug.
Fortunately, this doesn't affect the public API, and can be a mere bug release.
* Removed the functions `unsafeAcquireSqlConnFromPool`, `acquireASqlConnFromPool`, and `acquireSqlConnFromPoolWithIsolation`.
For a replacement, see `runSqlPoolNoTransaction` and `runSqlPoolWithHooks`.

## 2.11.0.2
* Fix a bug where an empty entity definition would break parsing of `EntityDef`s. [#1176](https://github.com/yesodweb/persistent/issues/1176)
Expand Down
2 changes: 1 addition & 1 deletion persistent/Database/Persist/Sql.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import Database.Persist
import Database.Persist.Sql.Types
import Database.Persist.Sql.Types.Internal (IsolationLevel (..))
import Database.Persist.Sql.Class
import Database.Persist.Sql.Run hiding (withResourceTimeout, rawAcquireSqlConn)
import Database.Persist.Sql.Run hiding (rawAcquireSqlConn, rawRunSqlPool)
import Database.Persist.Sql.Raw
import Database.Persist.Sql.Migration
import Database.Persist.Sql.Internal
Expand Down
159 changes: 77 additions & 82 deletions persistent/Database/Persist/Sql/Run.hs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
{-# LANGUAGE ScopedTypeVariables #-}
module Database.Persist.Sql.Run where

import Control.Exception (bracket, mask, onException)
import Control.Monad (liftM)
import Control.Monad.IO.Unlift
import qualified UnliftIO.Exception as UE
import Control.Monad.Logger.CallStack
Expand All @@ -12,74 +10,30 @@ import Control.Monad.Trans.Reader hiding (local)
import Control.Monad.Trans.Resource
import Data.Acquire (Acquire, ReleaseType(..), mkAcquireType, with)
import Data.IORef (readIORef)
import Data.Pool (Pool, LocalPool)
import Data.Pool (Pool)
import Data.Pool as P
import Data.Pool.Acquire (poolToAcquire)
import qualified Data.Map as Map
import qualified Data.Text as T
import System.Timeout (timeout)

import Database.Persist.Class.PersistStore
import Database.Persist.Sql.Types
import Database.Persist.Sql.Types.Internal (IsolationLevel)
import Database.Persist.Sql.Raw

-- | The returned 'Acquire' gets a connection from the pool, but does __NOT__
-- start a new transaction. Used to implement 'acquireSqlConnFromPool' and
-- 'acquireSqlConnFromPoolWithIsolation', this is useful for performing actions
-- on a connection that cannot be done within a transaction, such as VACUUM in
-- Sqlite.
--
-- @since 2.10.5
unsafeAcquireSqlConnFromPool
:: forall backend m
. (MonadReader (Pool backend) m, BackendCompatible SqlBackend backend)
=> m (Acquire backend)
unsafeAcquireSqlConnFromPool = MonadReader.asks poolToAcquire
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait... These things are definitely part of the public API. So I can't just remove them without pointing users to a better thing! Damn.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I'll deprecate them when I backport this fix.



-- | The returned 'Acquire' gets a connection from the pool, starts a new
-- transaction and gives access to the prepared connection.
--
-- When the acquired connection is released the transaction is committed and
-- the connection returned to the pool.
--
-- Upon an exception the transaction is rolled back and the connection
-- destroyed.
--
-- This is equivalent to 'runSqlPool' but does not incur the 'MonadUnliftIO'
-- constraint, meaning it can be used within, for example, a 'Conduit'
-- pipeline.
--
-- @since 2.10.5
acquireSqlConnFromPool
:: (MonadReader (Pool backend) m, BackendCompatible SqlBackend backend)
=> m (Acquire backend)
acquireSqlConnFromPool = do
connFromPool <- unsafeAcquireSqlConnFromPool
return $ connFromPool >>= acquireSqlConn

-- | Like 'acquireSqlConnFromPool', but lets you specify an explicit isolation
-- level.
--
-- @since 2.10.5
acquireSqlConnFromPoolWithIsolation
:: (MonadReader (Pool backend) m, BackendCompatible SqlBackend backend)
=> IsolationLevel -> m (Acquire backend)
acquireSqlConnFromPoolWithIsolation isolation = do
connFromPool <- unsafeAcquireSqlConnFromPool
return $ connFromPool >>= acquireSqlConnWithIsolation isolation

-- | Get a connection from the pool, run the given action, and then return the
-- connection to the pool.
--
-- This function performs the given action in a transaction. If an
-- exception occurs during the action, then the transaction is rolled back.
--
-- Note: This function previously timed out after 2 seconds, but this behavior
-- was buggy and caused more problems than it solved. Since version 2.1.2, it
-- performs no timeout checks.
runSqlPool
:: forall backend m a. (MonadUnliftIO m, BackendCompatible SqlBackend backend)
=> ReaderT backend m a -> Pool backend -> m a
runSqlPool r pconn = with (acquireSqlConnFromPool pconn) $ runReaderT r
runSqlPool r pconn = do
rawRunSqlPool r pconn Nothing

-- | Like 'runSqlPool', but supports specifying an isolation level.
--
Expand All @@ -88,29 +42,67 @@ runSqlPoolWithIsolation
:: forall backend m a. (MonadUnliftIO m, BackendCompatible SqlBackend backend)
=> ReaderT backend m a -> Pool backend -> IsolationLevel -> m a
runSqlPoolWithIsolation r pconn i =
with (acquireSqlConnFromPoolWithIsolation i pconn) $ runReaderT r
rawRunSqlPool r pconn (Just i)

-- | Like 'withResource', but times out the operation if resource
-- allocation does not complete within the given timeout period.
-- | Like 'runSqlPool', but does not surround the action in a transaction.
-- This action might leave your database in a weird state.
--
-- @since 2.0.0
withResourceTimeout
:: forall a m b. (MonadUnliftIO m)
=> Int -- ^ Timeout period in microseconds
-> Pool a
-> (a -> m b)
-> m (Maybe b)
{-# SPECIALIZE withResourceTimeout :: Int -> Pool a -> (a -> IO b) -> IO (Maybe b) #-}
withResourceTimeout ms pool act = withRunInIO $ \runInIO -> mask $ \restore -> do
mres <- timeout ms $ takeResource pool
case mres of
Nothing -> runInIO $ return (Nothing :: Maybe b)
Just (resource, local) -> do
ret <- restore (runInIO (liftM Just $ act resource)) `onException`
destroyResource pool local resource
putResource local resource
return ret
{-# INLINABLE withResourceTimeout #-}
-- @since 2.12.0.0
runSqlPoolNoTransaction
:: forall backend m a. (MonadUnliftIO m, BackendCompatible SqlBackend backend)
=> ReaderT backend m a -> Pool backend -> Maybe IsolationLevel -> m a
runSqlPoolNoTransaction r pconn i =
runSqlPoolWithHooks r pconn i (\_ -> pure ()) (\_ -> pure ()) (\_ _ -> pure ())

rawRunSqlPool
:: forall backend m a. (MonadUnliftIO m, BackendCompatible SqlBackend backend)
=> ReaderT backend m a -> Pool backend -> Maybe IsolationLevel -> m a
rawRunSqlPool r pconn mi =
runSqlPoolWithHooks r pconn mi before after onException
where
before conn = do
let sqlBackend = projectBackend conn
let getter = getStmtConn sqlBackend
liftIO $ connBegin sqlBackend getter mi
after conn = do
let sqlBackend = projectBackend conn
let getter = getStmtConn sqlBackend
liftIO $ connCommit sqlBackend getter
onException conn _ = do
let sqlBackend = projectBackend conn
let getter = getStmtConn sqlBackend
liftIO $ connRollback sqlBackend getter

-- | This function is how 'runSqlPool' and 'runSqlPoolNoTransaction' are
-- defined. In addition to the action to be performed and the 'Pool' of
-- conections to use, we give you the opportunity to provide three actions
-- - initialize, afterwards, and onException.
--
-- @since 2.12.0.0
runSqlPoolWithHooks
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shiny new functionality! should be equivalent in terms of power.

@merijn I think you wrote the original Acquire-based stuff, do you mind trying this out? Specifically the runSqlPoolNoTransaction stuff should be what you need.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is an extremely late reaction, but I finally got around to (trying to) bump my code to GHC 9.2 and 9.4.

runSqlPoolNoTransaction is fundamentally incompatible in doing what I was using unsafeAcquire for, as it incurs a MonadUnliftIO constraint. The Acquire interface (like the surviving acquireSqlConn) only requires MonadResource. There is no MonadUnliftIO instance for ConduitT, which means that runSqlPoolNoTransaction is unusable in large parts of my code.

:: forall backend m a before after onException. (MonadUnliftIO m, BackendCompatible SqlBackend backend)
=> ReaderT backend m a
-> Pool backend
-> Maybe IsolationLevel
-> (backend -> m before)
-- ^ Run this action immediately before the action is performed.
-> (backend -> m after)
-- ^ Run this action immediately after the action is completed.
-> (backend -> UE.SomeException -> m onException)
-- ^ This action is performed when an exception is received. The
-- exception is provided as a convenience - it is rethrown once this
-- cleanup function is complete.
-> m a
runSqlPoolWithHooks r pconn i before after onException =
withRunInIO $ \runInIO ->
withResource pconn $ \conn -> do
runInIO $ before conn
a <- runInIO (runReaderT r conn)
`UE.catchAny` \e -> do
runInIO $ onException conn e
UE.throwIO e
runInIO $ after conn
pure a

rawAcquireSqlConn
:: forall backend m
Expand All @@ -129,7 +121,8 @@ rawAcquireSqlConn isolation = do

finishTransaction :: backend -> ReleaseType -> IO ()
finishTransaction _ relType = case relType of
ReleaseException -> connRollback rawConn getter
ReleaseException -> do
connRollback rawConn getter
_ -> connCommit rawConn getter

return $ mkAcquireType beginTransaction finishTransaction
Expand Down Expand Up @@ -202,7 +195,7 @@ withSqlPoolWithConfig
-> ConnectionPoolConfig
-> (Pool backend -> m a)
-> m a
withSqlPoolWithConfig mkConn poolConfig f = withUnliftIO $ \u -> bracket
withSqlPoolWithConfig mkConn poolConfig f = withUnliftIO $ \u -> UE.bracket
(unliftIO u $ createSqlPoolWithConfig mkConn poolConfig)
destroyAllResources
(unliftIO u . f)
Expand All @@ -227,12 +220,14 @@ createSqlPoolWithConfig mkConn config = do
-- Resource pool will swallow any exceptions from close. We want to log
-- them instead.
let loggedClose :: backend -> IO ()
loggedClose backend = close' backend `UE.catchAny` \e -> runLoggingT
(logError $ T.pack $ "Error closing database connection in pool: " ++ show e)
logFunc
liftIO $ createPool
(mkConn logFunc)
loggedClose
loggedClose backend = close' backend `UE.catchAny` \e -> do
runLoggingT
(logError $ T.pack $ "Error closing database connection in pool: " ++ show e)
logFunc
UE.throwIO e
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a somewhat semantic change here. Instead of swallowing the exception, we allow resource-pool to handle it somehow.

liftIO $ createPool
(mkConn logFunc)
loggedClose
(connectionPoolConfigStripes config)
(connectionPoolConfigIdleTimeout config)
(connectionPoolConfigSize config)
Expand Down Expand Up @@ -294,7 +289,7 @@ withSqlConn
=> (LogFunc -> IO backend) -> (backend -> m a) -> m a
withSqlConn open f = do
logFunc <- askLoggerIO
withRunInIO $ \run -> bracket
withRunInIO $ \run -> UE.bracket
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly semantic change - bringing it in line with UnliftIO.Exception semantics should make things easier in the future.

(open logFunc)
close'
(run . f)
Expand Down
1 change: 0 additions & 1 deletion persistent/persistent.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ library
, path-pieces >= 0.2
, resource-pool >= 0.2.3
, resourcet >= 1.1.10
, resourcet-pool
, scientific
, silently
, template-haskell >= 2.4
Expand Down
3 changes: 0 additions & 3 deletions stack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,3 @@ packages:
- ./persistent-postgresql
- ./persistent-redis
- ./persistent-qq

extra-deps:
- resourcet-pool-0.1.0.0