Skip to content

Commit

Permalink
items propagation and groups propagation now have a step by step …
Browse files Browse the repository at this point in the history
…transaction.

For now in the worst case, the items propagation took ~6s. Since the process takes a lot of write locks, it makes other select queries for other services wait for this amount of time.
This doesn't fully solve the waiting problem though. It just makes the wait faster because the transactions are now smaller.
The code is a bit weird because we still have two cases where the group propagation have to be executed inside a bigger transaction:
- The `CreateRelation` function, used in the `groupAddChild` and the accessTokenCreate and `refreshToken` (for the badges) services.
- The `DeleteRelation` function, used only in the `groupRemoveChild` service.
Once those cases are removed, the code can be simplified, and look more like the other propagations.

Signed-off-by: Geoffrey Huck <[email protected]>
  • Loading branch information
GeoffreyHuck committed May 3, 2024
1 parent f50ebb8 commit 073cb2a
Showing 1 changed file with 70 additions and 42 deletions.
112 changes: 70 additions & 42 deletions app/database/ancestors.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package database

import (
"database/sql"
"time"

"github.com/France-ioi/AlgoreaBackend/app/logging"
)

const groups = "groups"

// createNewAncestorsQueries contains the SQL queries needed for createNewAncestors.
type createNewAncestorsQueries struct {
markAsProcessingQuery string
recomputeQueries []string
markAsDoneQuery string
}

// createNewAncestors inserts new rows into
// the objectName_ancestors table (items_ancestors or groups_ancestors)
// for all rows marked with ancestors_computation_state="todo" in objectName_propagate
Expand All @@ -24,12 +30,34 @@ func (s *DataStore) createNewAncestors(objectName, singleObjectName string) { /*
mustNotBeError(s.InTransaction(func(s *DataStore) error {
initTransactionTime := time.Now()

s.createNewAncestorsInsideTransaction(objectName, singleObjectName)
s.createNewAncestorsInsideTransactionInitStep(objectName, singleObjectName)

logging.Debugf("Duration of %v_ancestors propagation: %v", objectName, time.Since(initTransactionTime))
logging.Debugf("Duration of %v_ancestors propagation init step: %v", objectName, time.Since(initTransactionTime))

return nil
}))

queries := s.constructCreateNewAncestorsQueries(objectName, singleObjectName)

hasChanges := true
for hasChanges {
mustNotBeError(s.InTransaction(func(s *DataStore) error {
initStepTransactionTime := time.Now()

rowsAffected := s.createNewAncestorsInsideTransactionStep(queries)

logging.Debugf(
"Duration of %v_ancestors propagation step: %d rows affected, took %v",
objectName,
rowsAffected,
time.Since(initStepTransactionTime),
)

hasChanges = rowsAffected > 0

return nil
}))
}
}

// createNewAncestorsInsideTransaction does the sql work of createNewAncestors.
Expand All @@ -46,6 +74,22 @@ func (s *DataStore) createNewAncestors(objectName, singleObjectName string) { /*
func (s *DataStore) createNewAncestorsInsideTransaction(objectName, singleObjectName string) {
s.mustBeInTransaction()

s.createNewAncestorsInsideTransactionInitStep(objectName, singleObjectName)

queries := s.constructCreateNewAncestorsQueries(objectName, singleObjectName)

hasChanges := true
for hasChanges {
rowsAffected := s.createNewAncestorsInsideTransactionStep(queries)

hasChanges = rowsAffected > 0
}
}

// createNewAncestorsInsideTransactionInitStep does the sql work of the initialization step of createNewAncestors.
func (s *DataStore) createNewAncestorsInsideTransactionInitStep(objectName, singleObjectName string) {
s.mustBeInTransaction()

// We mark as 'todo' all descendants of objects marked as 'todo'
query := `
INSERT INTO ` + objectName + `_propagate (id, ancestors_computation_state)
Expand All @@ -60,9 +104,22 @@ func (s *DataStore) createNewAncestorsInsideTransaction(objectName, singleObject
ON DUPLICATE KEY UPDATE ancestors_computation_state = 'todo'` /* #nosec */

mustNotBeError(s.db.Exec(query).Error)
}

hasChanges := true
// createNewAncestorsInsideTransactionStep does the sql work of a step of createNewAncestors.
func (s *DataStore) createNewAncestorsInsideTransactionStep(queries createNewAncestorsQueries) int64 {
s.mustBeInTransaction()

mustNotBeError(s.Exec(queries.markAsProcessingQuery).Error())
for i := 0; i < len(queries.recomputeQueries); i++ {
mustNotBeError(s.Exec(queries.recomputeQueries[i]).Error())
}

return s.Exec(queries.markAsDoneQuery).RowsAffected()
}

// constructCreateNewAncestorsQueries constructs the SQL queries needed for the main steps of createNewAncestors.
func (s *DataStore) constructCreateNewAncestorsQueries(objectName, singleObjectName string) (queries createNewAncestorsQueries) {
relationsTable := objectName + "_" + objectName

var additionalJoin string
Expand All @@ -75,7 +132,7 @@ func (s *DataStore) createNewAncestorsInsideTransaction(objectName, singleObject
// This way we prevent infinite looping as we never process objects that are descendants of themselves

/* #nosec */
query = `
queries.markAsProcessingQuery = `
UPDATE ` + objectName + `_propagate AS children
SET children.ancestors_computation_state='processing'
WHERE children.ancestors_computation_state = 'todo' AND NOT EXISTS (
Expand All @@ -90,9 +147,6 @@ func (s *DataStore) createNewAncestorsInsideTransaction(objectName, singleObject
FOR UPDATE
) has_undone_parents FOR UPDATE
)`
markAsProcessing, err := s.db.CommonDB().Prepare(query)
mustNotBeError(err)
defer func() { mustNotBeError(markAsProcessing.Close()) }()

expiresAtColumn := ""
expiresAtValueJoin := ""
Expand All @@ -105,8 +159,8 @@ func (s *DataStore) createNewAncestorsInsideTransaction(objectName, singleObject
}

// For every object marked as 'processing', we compute all its ancestors
recomputeQueries := make([]string, 0, 3)
recomputeQueries = append(recomputeQueries, `
queries.recomputeQueries = make([]string, 0, 3)
queries.recomputeQueries = append(queries.recomputeQueries, `
DELETE `+objectName+`_ancestors
FROM `+objectName+`_ancestors
JOIN `+objectName+`_propagate
Expand All @@ -133,12 +187,12 @@ func (s *DataStore) createNewAncestorsInsideTransaction(objectName, singleObject
WHERE
`+objectName+`_propagate.ancestors_computation_state = 'processing'`) // #nosec
if objectName == groups {
recomputeQueries[1] += `
queries.recomputeQueries[1] += `
AND NOW() < groups_groups.expires_at AND
NOW() < LEAST(groups_ancestors_join.expires_at, groups_groups.expires_at)
ON DUPLICATE KEY UPDATE
expires_at = GREATEST(groups_ancestors.expires_at, LEAST(groups_ancestors_join.expires_at, groups_groups.expires_at))`
recomputeQueries = append(recomputeQueries, `
queries.recomputeQueries = append(queries.recomputeQueries, `
INSERT IGNORE INTO `+objectName+`_ancestors
(
ancestor_`+singleObjectName+`_id,
Expand All @@ -151,8 +205,8 @@ func (s *DataStore) createNewAncestorsInsideTransaction(objectName, singleObject
WHERE groups_propagate.ancestors_computation_state = 'processing'
FOR UPDATE`) // #nosec
} else {
recomputeQueries[1] += ` FOR UPDATE`
recomputeQueries = append(recomputeQueries, `
queries.recomputeQueries[1] += ` FOR UPDATE`
queries.recomputeQueries = append(queries.recomputeQueries, `
INSERT IGNORE INTO items_ancestors (ancestor_item_id, child_item_id)
SELECT items_items.parent_item_id, items_items.child_item_id
FROM items_items
Expand All @@ -161,37 +215,11 @@ func (s *DataStore) createNewAncestorsInsideTransaction(objectName, singleObject
FOR UPDATE`) // #nosec
}

recomputeAncestors := make([]*sql.Stmt, len(recomputeQueries))
for i := 0; i < len(recomputeQueries); i++ {
recomputeAncestors[i], err = s.db.CommonDB().Prepare(recomputeQueries[i])
mustNotBeError(err)

defer func(i int) { mustNotBeError(recomputeAncestors[i].Close()) }(i)
}

// Objects marked as 'processing' are now marked as 'done'
query = `
queries.markAsDoneQuery = `
UPDATE ` + objectName + `_propagate
SET ancestors_computation_state = 'done'
WHERE ancestors_computation_state = 'processing'` // #nosec
markAsDone, err := s.db.CommonDB().Prepare(query)
mustNotBeError(err)
defer func() { mustNotBeError(markAsDone.Close()) }()

for hasChanges {
_, err = markAsProcessing.Exec()
mustNotBeError(err)
for i := 0; i < len(recomputeAncestors); i++ {
_, err = recomputeAncestors[i].Exec()
mustNotBeError(err)
}

var result sql.Result
result, err = markAsDone.Exec()
mustNotBeError(err)
var rowsAffected int64
rowsAffected, err = result.RowsAffected()
mustNotBeError(err)
hasChanges = rowsAffected > 0
}
return queries
}

0 comments on commit 073cb2a

Please sign in to comment.