Skip to content

Commit

Permalink
Merge pull request #1180 from France-ioi/results_propagation_recomput…
Browse files Browse the repository at this point in the history
…e_by_chunks

Speed up the results propagation and make it less locking + Introduce a command recomputing all the results of chapters/skills
  • Loading branch information
zenovich authored Sep 30, 2024
2 parents d7d0740 + 81077f1 commit c3ee4e8
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 60 deletions.
6 changes: 4 additions & 2 deletions app/api/currentuser/get_full_dump.feature
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ Feature: Export the current user's data
"tasks_with_help": 0, "score_obtained_at": null, "hints_requested": null,
"latest_activity_at": "2019-05-29T11:00:00Z", "latest_submission_at": null,
"latest_hint_at": null, "score_edit_comment": null,
"started": 0, "started_at": null, "validated_at": null, "help_requested": 0
"started": 0, "started_at": null, "validated_at": null, "help_requested": 0,
"recomputing_state": "unchanged"
},
{
"attempt_id": "0", "validated": 0,
Expand All @@ -125,7 +126,8 @@ Feature: Export the current user's data
"tasks_with_help": 0, "score_obtained_at": null, "hints_requested": null,
"latest_activity_at": "2019-05-30T11:00:00Z", "latest_submission_at": null,
"latest_hint_at": null, "score_edit_comment": null,
"started": 0, "started_at": null, "validated_at": null, "help_requested": 0
"started": 0, "started_at": null, "validated_at": null, "help_requested": 0,
"recomputing_state": "unchanged"
}
],
"groups_groups": [
Expand Down
12 changes: 7 additions & 5 deletions app/api/items/update_item.feature
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Background:
| 0 | 11 |
And the database has the following table 'results':
| attempt_id | participant_id | item_id | score_computed |
| 0 | 11 | 21 | 0 |
| 0 | 11 | 21 | 1 |
| 0 | 11 | 50 | 10 |
| 0 | 11 | 70 | 20 |
And the database has the following table 'languages':
Expand Down Expand Up @@ -317,9 +317,10 @@ Background:
| 11 | 70 | solution | solution_with_grant | answer_with_grant | all_with_grant | true |
| 11 | 112 | solution | content | answer | all | false |
And the table "attempts" should stay unchanged
And the table "results" should stay unchanged but the row with item_id "50"
And the table "results" at item_id "50" should be:
And the table "results" should stay unchanged but the rows with item_ids "21,50"
And the table "results" at item_ids "21,50" should be:
| attempt_id | participant_id | item_id | score_computed |
| 0 | 11 | 21 | 0 |
| 0 | 11 | 50 | 0 |
And the table "results_propagate" should be empty

Expand Down Expand Up @@ -358,9 +359,10 @@ Background:
And the table "groups" should stay unchanged
And the table "permissions_granted" should stay unchanged
And the table "attempts" should stay unchanged
And the table "results" should stay unchanged but the row with item_id "50"
And the table "results" at item_id "50" should be:
And the table "results" should stay unchanged but the rows with item_ids "21,50"
And the table "results" at item_ids "21,50" should be:
| attempt_id | participant_id | item_id | score_computed |
| 0 | 11 | 21 | 0 |
| 0 | 11 | 50 | 0 |
And the table "results_propagate" should be empty

Expand Down
138 changes: 85 additions & 53 deletions app/database/result_store_propagate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
)

const (
resultsPropagationLockName = "listener_propagate"
resultsPropagationLockWaitTimeout = 10 * time.Second
resultsPropagationChunkSize = 200
resultsPropagationLockName = "listener_propagate"
resultsPropagationLockWaitTimeout = 10 * time.Second
resultsPropagationPropagationChunkSize = 200
resultsPropagationRecomputationChunkSize = 1000
)

// propagate recomputes fields of results
Expand All @@ -25,11 +26,15 @@ const (
// 3. For results marked as 'propagating', we insert new permissions_granted for each unlocked item
// according to corresponding item_dependencies.
// 4. We unmark all results marked as 'propagating'.
// 5. We process all objects that are marked as 'to_be_recomputed' and that have no children marked as 'to_be_recomputed'.
// Then, if an object has children, we update
// latest_activity_at, tasks_tried, tasks_with_help, validated_at.
// 6. We mark all results marked as 'to_be_recomputed' as 'to_be_propagated'.
// 7. We repeat from step 1.
// 5. We atomically process results marked as 'to_be_recomputed' by chunks.
// a) We mark as 'recomputing' a chunk of results that are marked as 'to_be_recomputed' and
// that have no children marked as 'to_be_recomputed'.
// b) For each object marked as 'recomputing', we update
// latest_activity_at, tasks_tried, tasks_with_help, validated_at, score_computed.
// c) We mark all modified results marked as 'recomputing' as 'to_be_propagated' and
// unmark all unchanged results marked as 'to_be_recomputed'.
// We repeat this step until there are no more results marked as 'to_be_recomputed'.
// 6. We repeat from step 1.
//
// The `results_propagation` rows are marked in code as well as in the following SQL Triggers:
// - after_insert_groups_groups/items_items
Expand All @@ -55,7 +60,7 @@ func (s *ResultStore) propagate() (err error) {
// First we take a chunk of results marked as 'to_be_propagated' and mark them as 'propagating'.
// Then we create missing results for their parents and mark those parent results as 'to_be_recomputed'.
CallBeforePropagationStepHook(PropagationStepResultsInsideNamedLockMarkAndInsertResults)
markAsPropagatingSomeResultsMarkedAsToBePropagatedAndMarkTheirParentsAsToBeRecomputed(s, resultsPropagationChunkSize)
markAsPropagatingSomeResultsMarkedAsToBePropagatedAndMarkTheirParentsAsToBeRecomputed(s, resultsPropagationPropagationChunkSize)

// Now we unlock dependent items for results marked as 'propagating' and unmark them.
CallBeforePropagationStepHook(PropagationStepResultsInsideNamedLockItemUnlocking)
Expand All @@ -69,7 +74,7 @@ func (s *ResultStore) propagate() (err error) {

// Now there are no 'propagating' results left, so we can recompute results marked as 'to_be_recomputed'
// and mark them as 'to_be_propagated'.
recomputeResultsMarkedAsToBeRecomputedAndMarkThemAsToBePropagated(s)
recomputeResultsMarkedAsToBeRecomputedAndMarkThemAsToBePropagated(s, resultsPropagationRecomputationChunkSize)

// From here, there can be only results marked as 'to_be_propagated'.
}
Expand Down Expand Up @@ -198,7 +203,7 @@ func markAsPropagatingSomeResultsMarkedAsToBePropagatedAndMarkTheirParentsAsToBe
}))
}

func recomputeResultsMarkedAsToBeRecomputedAndMarkThemAsToBePropagated(s *DataStore) {
func recomputeResultsMarkedAsToBeRecomputedAndMarkThemAsToBePropagated(s *DataStore, chunkSize int) {
hasChanges := true

for hasChanges {
Expand All @@ -218,42 +223,52 @@ func recomputeResultsMarkedAsToBeRecomputedAndMarkThemAsToBePropagated(s *DataSt
// - children_validated as the number of children items with validated == 1
// - validated, depending on the items_items.category and items.validation_type
// (an item should have at least one validated child to become validated itself by the propagation)

// Process only those results marked as 'to_be_recomputed' that do not have child results marked as 'to_be_recomputed'.
// Start from marking them as 'recomputing'. It's important that the 'recomputing' state never leaks outside the transaction.
// Instead of marking all the suitable results as 'recomputing' at once, we do it in chunks to avoid locking the table for too long.
result := s.Exec(`
WITH
marked_to_be_recomputed AS (SELECT participant_id, attempt_id, item_id FROM results_propagate WHERE state='to_be_recomputed')
UPDATE results_propagate AS target_results_propagate
SET state = 'recomputing'
WHERE
state = 'to_be_recomputed' AND
NOT EXISTS (
SELECT 1
FROM items_items
JOIN marked_to_be_recomputed AS children
ON children.participant_id = target_results_propagate.participant_id AND
children.attempt_id = target_results_propagate.attempt_id AND
children.item_id = items_items.child_item_id
WHERE items_items.parent_item_id = target_results_propagate.item_id
) AND NOT EXISTS (
SELECT 1
FROM items_items
JOIN attempts
ON attempts.participant_id = target_results_propagate.participant_id AND
attempts.parent_attempt_id = target_results_propagate.attempt_id AND
attempts.root_item_id = items_items.child_item_id
JOIN marked_to_be_recomputed AS children
ON children.participant_id = target_results_propagate.participant_id AND
children.attempt_id = attempts.id AND
children.item_id = items_items.child_item_id
WHERE items_items.parent_item_id = target_results_propagate.item_id
)
LIMIT ?`, chunkSize)
mustNotBeError(result.Error())
rowsAffected := result.RowsAffected()

if rowsAffected == 0 {
hasChanges = false
return nil
}

const updateQuery = `
UPDATE results_propagate AS target_propagate ` +
// process only those results marked as 'to_be_recomputed' that do not have child results marked as 'to_be_recomputed'
`JOIN (
SELECT *
FROM (
WITH marked_to_be_recomputed AS (SELECT participant_id, attempt_id, item_id FROM results_propagate WHERE state='to_be_recomputed')
SELECT DISTINCT inner_parent.participant_id, inner_parent.attempt_id, inner_parent.item_id
FROM marked_to_be_recomputed AS inner_parent
WHERE
NOT EXISTS (
SELECT 1
FROM items_items
JOIN marked_to_be_recomputed AS children
ON children.participant_id = inner_parent.participant_id AND
children.attempt_id = inner_parent.attempt_id AND
children.item_id = items_items.child_item_id
WHERE items_items.parent_item_id = inner_parent.item_id
) AND NOT EXISTS (
SELECT 1
FROM items_items
JOIN attempts
ON attempts.participant_id = inner_parent.participant_id AND
attempts.parent_attempt_id = inner_parent.attempt_id AND
attempts.root_item_id = items_items.child_item_id
JOIN marked_to_be_recomputed AS children
ON children.participant_id = inner_parent.participant_id AND
children.attempt_id = attempts.id AND
children.item_id = items_items.child_item_id
WHERE items_items.parent_item_id = inner_parent.item_id
)
) AS tmp2
) AS tmp USING(participant_id, attempt_id, item_id)
JOIN results AS target_results USING(participant_id, attempt_id, item_id)
UPDATE results AS target_results
JOIN results_propagate USING (participant_id, attempt_id, item_id)
JOIN items
ON items.id = target_propagate.item_id
ON items.id = target_results.item_id
LEFT JOIN LATERAL (
SELECT
target_results.participant_id,
Expand Down Expand Up @@ -287,8 +302,8 @@ func recomputeResultsMarkedAsToBeRecomputedAndMarkThemAsToBePropagated(s *DataSt
attempts.id = children_results.attempt_id
WHERE children_results.participant_id = target_results.participant_id AND
children_results.item_id = items_items.child_item_id AND
(children_results.attempt_id = target_results.attempt_id OR
(attempts.root_item_id = items_items.child_item_id AND
(children_results.attempt_id = target_results.attempt_id OR
(attempts.root_item_id = items_items.child_item_id AND
attempts.parent_attempt_id = target_results.attempt_id))
GROUP BY children_results.participant_id, children_results.item_id
) AS aggregated_children_results ON 1
Expand Down Expand Up @@ -323,14 +338,31 @@ func recomputeResultsMarkedAsToBeRecomputedAndMarkThemAsToBePropagated(s *DataSt
WHEN 'set' THEN target_results.score_edit_value
WHEN 'diff' THEN children_stats.average_score + target_results.score_edit_value
ELSE children_stats.average_score
END, 0), 100)), 0),
target_propagate.state = 'to_be_propagated'`

rowsAffected := s.Exec(updateQuery).RowsAffected()
END, 0), 100)), 0),` +
// We set the 'recomputing_state' to 'recomputing' asking the before_update_results trigger to check if the result has changed.
// The trigger will set it to 'modified' if the result has changed and to 'unchanged' otherwise.
// Results with latest_activity_at = '1000-01-01 00:00:00' are always considered modified in order
// to propagate the newly created results.
`
target_results.recomputing_state = 'recomputing'
WHERE results_propagate.state = 'recomputing'`

mustNotBeError(s.Exec(updateQuery).Error())

// We mark all modified results marked as 'recomputing' as 'to_be_propagated'.
result = s.Exec(`
UPDATE results_propagate
JOIN results USING(participant_id, attempt_id, item_id)
SET results_propagate.state = 'to_be_propagated'
WHERE results_propagate.state = 'recomputing' AND results.recomputing_state = 'modified'`)
mustNotBeError(result.Error())
rowsModified := result.RowsAffected()

logging.Debugf("Duration of step of results propagation: %d rows affected, took %v", rowsAffected, time.Since(initTransactionTime))
// Finally we unmark all unchanged results marked as 'recomputing'.
mustNotBeError(s.Exec(`DELETE FROM results_propagate WHERE state = 'recomputing'`).Error())

hasChanges = rowsAffected > 0
logging.Debugf("Duration of step of results propagation: %d rows affected, %d rows modified, took %v",
rowsAffected, rowsModified, time.Since(initTransactionTime))

return nil
}))
Expand Down
61 changes: 61 additions & 0 deletions cmd/recompute_results_for_chapters_and_skills.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package cmd

import (
"fmt"
"log"
"os"

_ "github.com/go-sql-driver/mysql" // use to force database/sql to use mysql
"github.com/spf13/cobra"

"github.com/France-ioi/AlgoreaBackend/v2/app"
"github.com/France-ioi/AlgoreaBackend/v2/app/appenv"
"github.com/France-ioi/AlgoreaBackend/v2/app/database"
)

func init() { //nolint:gochecknoinits
recomputeResultsCmd := &cobra.Command{
Use: "recompute-results [environment]",
Short: "recompute results for chapters and skills",
Long: `for each chapter/skill marks all results linked to it as to_be_recomputed and runs the results propagation`,
Args: cobra.MaximumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
var err error

// Set the environment.
if len(args) > 0 {
appenv.SetEnv(args[0])
}

var application *app.Application
application, err = app.New()
if err != nil {
log.Fatal(err)
}

store := database.NewDataStore(application.Database)
itemNumber := 0
err = store.Items().Where("type = 'Chapter' OR type = 'Skill'").Select("id").
ScanAndHandleMaps(func(item map[string]interface{}) error {
itemNumber++
return store.InTransaction(func(store *database.DataStore) error {
log.Printf("Recomputing results for item %s (#%d)\n", item["id"], itemNumber)
err = store.Exec("INSERT IGNORE INTO results_recompute_for_items (item_id) values (?)", item["id"]).Error()
if err != nil {
return err
}
store.ScheduleResultsPropagation()
return nil
})
}).Error()
if err != nil {
fmt.Println("Error while recomputing results: ", err)
os.Exit(1)
}

fmt.Println("Done.")
},
}

rootCmd.AddCommand(recomputeResultsCmd)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- +migrate Up
ALTER TABLE `results`
ADD COLUMN `recomputing_state` ENUM('recomputing', 'modified', 'unchanged') NOT NULL DEFAULT 'unchanged'
COMMENT 'State of the result, used during recomputing' AFTER `help_requested`;

-- +migrate Down
ALTER TABLE `results` DROP COLUMN `recomputing_state`;
27 changes: 27 additions & 0 deletions db/migrations/2409290724_create_trigger_before_update_results.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- +migrate Up
DROP TRIGGER IF EXISTS `before_update_results`;
-- +migrate StatementBegin
CREATE TRIGGER `before_update_results`
BEFORE UPDATE
ON `results`
FOR EACH ROW
BEGIN
IF NEW.recomputing_state = 'recomputing' THEN
SET NEW.recomputing_state = IF(
NEW.latest_activity_at <=> OLD.latest_activity_at AND
NEW.tasks_tried <=> OLD.tasks_tried AND
NEW.tasks_with_help <=> OLD.tasks_with_help AND
NEW.validated_at <=> OLD.validated_at AND
NEW.score_computed <=> OLD.score_computed AND
-- We always consider results with the default latest_activity_at as changed
-- because they look like a newly inserted result for a chapter/skill.
-- This way we make sure that a newly inserted result is propagated.
NEW.latest_activity_at <> '1000-01-01 00:00:00',
'unchanged',
'modified');
END IF;
END;
-- +migrate StatementEnd

-- +migrate Down
DROP TRIGGER IF EXISTS `before_update_results`;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- +migrate Up
ALTER TABLE `results_propagate` MODIFY `state` ENUM('to_be_propagated','to_be_recomputed','propagating', 'recomputing') NOT NULL COMMENT '"to_be_propagated" means that ancestors should be recomputed';

-- +migrate Down
ALTER TABLE `results_propagate` MODIFY `state` ENUM('to_be_propagated','to_be_recomputed','propagating') NOT NULL COMMENT '"to_be_propagated" means that ancestors should be recomputed';

0 comments on commit c3ee4e8

Please sign in to comment.