Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backend:read-from-master-db-replica-failover #630

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 39 additions & 18 deletions lib/mobility-core/src/Kernel/Beam/Functions.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import Database.Beam hiding (timestamp)
import Database.Beam.MySQL ()
import Database.Beam.Postgres
import qualified EulerHS.KVConnector.Flow as KV
import EulerHS.KVConnector.Types (DBCommandVersion (..), KVConnector (..), MeshConfig (..), MeshMeta, TableMappings)
import EulerHS.KVConnector.Types (DBCommandVersion (..), KVConnector (..), MeshConfig (..), MeshError (MDBError), MeshMeta, MeshResult, TableMappings)
import EulerHS.KVConnector.Utils
import qualified EulerHS.Language as L
import EulerHS.Types hiding (Log, V1)
Expand Down Expand Up @@ -190,6 +190,13 @@ getReplicaLocationDbConfig = do
Just dbCnf' -> pure dbCnf'
Nothing -> L.throwException $ InternalError "Replica LocationDB Config not found"

getReadDBConfigInternal :: (HasCallStack, L.MonadFlow m) => Text -> m (DBConfig Pg)
getReadDBConfigInternal modelName = do
tables <- L.getOption KBT.Tables
let dbConfig = maybe getReplicaDbConfig (\tables' -> if modelName `elem` tables'.readFromMasterDb then getMasterDBConfig else getReplicaDbConfig) tables
isMasterReadEnabled <- L.getOptionLocal MasterReadEnabled
maybe dbConfig (\isMasterReadEnabled' -> if isMasterReadEnabled' then getMasterDBConfig else getReplicaDbConfig) isMasterReadEnabled

type BeamTableFlow table m =
( HasCallStack,
BeamTable table,
Expand All @@ -207,6 +214,22 @@ type BeamTable table =
Show (table Identity)
)

--- db failover function
runWithMasterDbIfReplicaFails ::
forall m a.
L.MonadFlow m =>
m (MeshResult a) ->
m (MeshResult a)
runWithMasterDbIfReplicaFails action = do
res <- action
case res of
Left (MDBError (DBError errType _)) | errType `elem` [ConnectionFailed, ConnectionDoesNotExist] -> do
L.setOptionLocal MasterReadEnabled True
result <- action
L.setOptionLocal MasterReadEnabled False
pure result
_ -> pure res

-- findOne --

findOneWithKV ::
Expand Down Expand Up @@ -310,8 +333,9 @@ findAllWithKVAndConditionalDB ::
m [a]
findAllWithKVAndConditionalDB where' orderBy = do
updatedMeshConfig <- setMeshConfig (modelTableName @table) (modelSchemaName @table) meshConfig
dbConf' <- getReadDBConfigInternal (modelTableName @table)
result <- KV.findAllWithKVAndConditionalDBInternal dbConf' updatedMeshConfig where' orderBy
result <- runWithMasterDbIfReplicaFails $ do
dbConf' <- getReadDBConfigInternal (modelTableName @table)
KV.findAllWithKVAndConditionalDBInternal dbConf' updatedMeshConfig where' orderBy
case result of
Right res -> do
res' <- mapM fromTType' res
Expand Down Expand Up @@ -348,8 +372,9 @@ findAllWithOptionsKV' ::
m [a]
findAllWithOptionsKV' where' mbLimit mbOffset = do
updatedMeshConfig <- setMeshConfig (modelTableName @table) (modelSchemaName @table) meshConfig
dbConf' <- getReadDBConfigInternal (modelTableName @table)
result <- KV.findAllWithOptionsKVConnector' dbConf' updatedMeshConfig where' mbLimit mbOffset
result <- runWithMasterDbIfReplicaFails $ do
dbConf' <- getReadDBConfigInternal (modelTableName @table)
KV.findAllWithOptionsKVConnector' dbConf' updatedMeshConfig where' mbLimit mbOffset
case result of
Right res -> do
res' <- mapM fromTType' res
Expand Down Expand Up @@ -466,8 +491,9 @@ findOneInternal ::
Where Postgres table ->
m (Maybe a)
findOneInternal updatedMeshConfig fromTType where' = do
dbConf' <- getReadDBConfigInternal (modelTableName @table)
result <- KV.findWithKVConnector dbConf' updatedMeshConfig where'
result <- runWithMasterDbIfReplicaFails $ do
dbConf' <- getReadDBConfigInternal (modelTableName @table)
KV.findWithKVConnector dbConf' updatedMeshConfig where'
logQueryData "findOneInternal" (show $ getFieldsAndValuesFromClause meshModelTableEntityDescriptor (And where')) ("Nothing" :: Text) (show result) (meshEnabled updatedMeshConfig) (modelTableName @table)
case result of
Right (Just res) -> fromTType res
Expand All @@ -482,8 +508,9 @@ findAllInternal ::
Where Postgres table ->
m [a]
findAllInternal updatedMeshConfig fromTType where' = do
dbConf' <- getReadDBConfigInternal (modelTableName @table)
result <- KV.findAllWithKVConnector dbConf' updatedMeshConfig where'
result <- runWithMasterDbIfReplicaFails $ do
dbConf' <- getReadDBConfigInternal (modelTableName @table)
KV.findAllWithKVConnector dbConf' updatedMeshConfig where'
logQueryData "findAllInternal" (show $ getFieldsAndValuesFromClause meshModelTableEntityDescriptor (And where')) ("Nothing" :: Text) (show result) (meshEnabled updatedMeshConfig) (modelTableName @table)
case result of
Right res -> do
Expand All @@ -502,22 +529,16 @@ findAllWithOptionsInternal ::
Maybe Int ->
m [a]
findAllWithOptionsInternal updatedMeshConfig fromTType where' orderBy mbLimit mbOffset = do
dbConf' <- getReadDBConfigInternal (modelTableName @table)
result <- KV.findAllWithOptionsKVConnector dbConf' updatedMeshConfig where' orderBy mbLimit mbOffset
result <- runWithMasterDbIfReplicaFails $ do
dbConf' <- getReadDBConfigInternal (modelTableName @table)
KV.findAllWithOptionsKVConnector dbConf' updatedMeshConfig where' orderBy mbLimit mbOffset
logQueryData "findAllWithOptionsInternal" (show $ getFieldsAndValuesFromClause meshModelTableEntityDescriptor (And where')) ("Nothing" :: Text) (show result) (meshEnabled updatedMeshConfig) (modelTableName @table)
case result of
Right res -> do
res' <- mapM fromTType res
pure $ catMaybes res'
Left err -> throwError $ InternalError $ show err

getReadDBConfigInternal :: (HasCallStack, L.MonadFlow m) => Text -> m (DBConfig Pg)
getReadDBConfigInternal modelName = do
tables <- L.getOption KBT.Tables
let dbConfig = maybe getReplicaDbConfig (\tables' -> if modelName `elem` tables'.readFromMasterDb then getMasterDBConfig else getReplicaDbConfig) tables
isMasterReadEnabled <- L.getOptionLocal MasterReadEnabled
maybe dbConfig (\isMasterReadEnabled' -> if isMasterReadEnabled' then getMasterDBConfig else getReplicaDbConfig) isMasterReadEnabled

updateInternal ::
forall table m r.
(BeamTableFlow table m, EsqDBFlow m r) =>
Expand Down