Skip to content

Commit

Permalink
Merge pull request #1064 from France-ioi/updateItemsSlowLocks
Browse files Browse the repository at this point in the history
`items propagation` and `groups propagation` step by step transactions
  • Loading branch information
GeoffreyHuck authored May 6, 2024
2 parents f50ebb8 + 073cb2a commit a5b3c35
Showing 1 changed file with 70 additions and 42 deletions.
112 changes: 70 additions & 42 deletions app/database/ancestors.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package database

import (
"database/sql"
"time"

"github.com/France-ioi/AlgoreaBackend/app/logging"
)

const groups = "groups"

// createNewAncestorsQueries contains the SQL queries needed for createNewAncestors.
type createNewAncestorsQueries struct {
markAsProcessingQuery string
recomputeQueries []string
markAsDoneQuery string
}

// createNewAncestors inserts new rows into
// the objectName_ancestors table (items_ancestors or groups_ancestors)
// for all rows marked with ancestors_computation_state="todo" in objectName_propagate
Expand All @@ -24,12 +30,34 @@ func (s *DataStore) createNewAncestors(objectName, singleObjectName string) { /*
mustNotBeError(s.InTransaction(func(s *DataStore) error {
initTransactionTime := time.Now()

s.createNewAncestorsInsideTransaction(objectName, singleObjectName)
s.createNewAncestorsInsideTransactionInitStep(objectName, singleObjectName)

logging.Debugf("Duration of %v_ancestors propagation: %v", objectName, time.Since(initTransactionTime))
logging.Debugf("Duration of %v_ancestors propagation init step: %v", objectName, time.Since(initTransactionTime))

return nil
}))

queries := s.constructCreateNewAncestorsQueries(objectName, singleObjectName)

hasChanges := true
for hasChanges {
mustNotBeError(s.InTransaction(func(s *DataStore) error {
initStepTransactionTime := time.Now()

rowsAffected := s.createNewAncestorsInsideTransactionStep(queries)

logging.Debugf(
"Duration of %v_ancestors propagation step: %d rows affected, took %v",
objectName,
rowsAffected,
time.Since(initStepTransactionTime),
)

hasChanges = rowsAffected > 0

return nil
}))
}
}

// createNewAncestorsInsideTransaction does the sql work of createNewAncestors.
Expand All @@ -46,6 +74,22 @@ func (s *DataStore) createNewAncestors(objectName, singleObjectName string) { /*
func (s *DataStore) createNewAncestorsInsideTransaction(objectName, singleObjectName string) {
s.mustBeInTransaction()

s.createNewAncestorsInsideTransactionInitStep(objectName, singleObjectName)

queries := s.constructCreateNewAncestorsQueries(objectName, singleObjectName)

hasChanges := true
for hasChanges {
rowsAffected := s.createNewAncestorsInsideTransactionStep(queries)

hasChanges = rowsAffected > 0
}
}

// createNewAncestorsInsideTransactionInitStep does the sql work of the initialization step of createNewAncestors.
func (s *DataStore) createNewAncestorsInsideTransactionInitStep(objectName, singleObjectName string) {
s.mustBeInTransaction()

// We mark as 'todo' all descendants of objects marked as 'todo'
query := `
INSERT INTO ` + objectName + `_propagate (id, ancestors_computation_state)
Expand All @@ -60,9 +104,22 @@ func (s *DataStore) createNewAncestorsInsideTransaction(objectName, singleObject
ON DUPLICATE KEY UPDATE ancestors_computation_state = 'todo'` /* #nosec */

mustNotBeError(s.db.Exec(query).Error)
}

hasChanges := true
// createNewAncestorsInsideTransactionStep does the sql work of a step of createNewAncestors.
func (s *DataStore) createNewAncestorsInsideTransactionStep(queries createNewAncestorsQueries) int64 {
s.mustBeInTransaction()

mustNotBeError(s.Exec(queries.markAsProcessingQuery).Error())
for i := 0; i < len(queries.recomputeQueries); i++ {
mustNotBeError(s.Exec(queries.recomputeQueries[i]).Error())
}

return s.Exec(queries.markAsDoneQuery).RowsAffected()
}

// constructCreateNewAncestorsQueries constructs the SQL queries needed for the main steps of createNewAncestors.
func (s *DataStore) constructCreateNewAncestorsQueries(objectName, singleObjectName string) (queries createNewAncestorsQueries) {
relationsTable := objectName + "_" + objectName

var additionalJoin string
Expand All @@ -75,7 +132,7 @@ func (s *DataStore) createNewAncestorsInsideTransaction(objectName, singleObject
// This way we prevent infinite looping as we never process objects that are descendants of themselves

/* #nosec */
query = `
queries.markAsProcessingQuery = `
UPDATE ` + objectName + `_propagate AS children
SET children.ancestors_computation_state='processing'
WHERE children.ancestors_computation_state = 'todo' AND NOT EXISTS (
Expand All @@ -90,9 +147,6 @@ func (s *DataStore) createNewAncestorsInsideTransaction(objectName, singleObject
FOR UPDATE
) has_undone_parents FOR UPDATE
)`
markAsProcessing, err := s.db.CommonDB().Prepare(query)
mustNotBeError(err)
defer func() { mustNotBeError(markAsProcessing.Close()) }()

expiresAtColumn := ""
expiresAtValueJoin := ""
Expand All @@ -105,8 +159,8 @@ func (s *DataStore) createNewAncestorsInsideTransaction(objectName, singleObject
}

// For every object marked as 'processing', we compute all its ancestors
recomputeQueries := make([]string, 0, 3)
recomputeQueries = append(recomputeQueries, `
queries.recomputeQueries = make([]string, 0, 3)
queries.recomputeQueries = append(queries.recomputeQueries, `
DELETE `+objectName+`_ancestors
FROM `+objectName+`_ancestors
JOIN `+objectName+`_propagate
Expand All @@ -133,12 +187,12 @@ func (s *DataStore) createNewAncestorsInsideTransaction(objectName, singleObject
WHERE
`+objectName+`_propagate.ancestors_computation_state = 'processing'`) // #nosec
if objectName == groups {
recomputeQueries[1] += `
queries.recomputeQueries[1] += `
AND NOW() < groups_groups.expires_at AND
NOW() < LEAST(groups_ancestors_join.expires_at, groups_groups.expires_at)
ON DUPLICATE KEY UPDATE
expires_at = GREATEST(groups_ancestors.expires_at, LEAST(groups_ancestors_join.expires_at, groups_groups.expires_at))`
recomputeQueries = append(recomputeQueries, `
queries.recomputeQueries = append(queries.recomputeQueries, `
INSERT IGNORE INTO `+objectName+`_ancestors
(
ancestor_`+singleObjectName+`_id,
Expand All @@ -151,8 +205,8 @@ func (s *DataStore) createNewAncestorsInsideTransaction(objectName, singleObject
WHERE groups_propagate.ancestors_computation_state = 'processing'
FOR UPDATE`) // #nosec
} else {
recomputeQueries[1] += ` FOR UPDATE`
recomputeQueries = append(recomputeQueries, `
queries.recomputeQueries[1] += ` FOR UPDATE`
queries.recomputeQueries = append(queries.recomputeQueries, `
INSERT IGNORE INTO items_ancestors (ancestor_item_id, child_item_id)
SELECT items_items.parent_item_id, items_items.child_item_id
FROM items_items
Expand All @@ -161,37 +215,11 @@ func (s *DataStore) createNewAncestorsInsideTransaction(objectName, singleObject
FOR UPDATE`) // #nosec
}

recomputeAncestors := make([]*sql.Stmt, len(recomputeQueries))
for i := 0; i < len(recomputeQueries); i++ {
recomputeAncestors[i], err = s.db.CommonDB().Prepare(recomputeQueries[i])
mustNotBeError(err)

defer func(i int) { mustNotBeError(recomputeAncestors[i].Close()) }(i)
}

// Objects marked as 'processing' are now marked as 'done'
query = `
queries.markAsDoneQuery = `
UPDATE ` + objectName + `_propagate
SET ancestors_computation_state = 'done'
WHERE ancestors_computation_state = 'processing'` // #nosec
markAsDone, err := s.db.CommonDB().Prepare(query)
mustNotBeError(err)
defer func() { mustNotBeError(markAsDone.Close()) }()

for hasChanges {
_, err = markAsProcessing.Exec()
mustNotBeError(err)
for i := 0; i < len(recomputeAncestors); i++ {
_, err = recomputeAncestors[i].Exec()
mustNotBeError(err)
}

var result sql.Result
result, err = markAsDone.Exec()
mustNotBeError(err)
var rowsAffected int64
rowsAffected, err = result.RowsAffected()
mustNotBeError(err)
hasChanges = rowsAffected > 0
}
return queries
}

0 comments on commit a5b3c35

Please sign in to comment.