Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure the HTTP call is never made inside a database transaction. #1099

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
1e4c869
Make sure `SchedulePropagation` is never called inside a transaction …
GeoffreyHuck Jun 16, 2024
acd5bf0
Inject the endpoint in the request context and allow to access the co…
GeoffreyHuck Jul 5, 2024
2b87da4
This constant is only used in this file, so it doesn't have to be pub…
GeoffreyHuck Jul 5, 2024
faf2c9f
Rename the function into `StartAsyncPropagation`, it's clearer what i…
GeoffreyHuck Jul 5, 2024
3e53ee0
The propagation is now called after the current transaction. Update t…
GeoffreyHuck Jul 5, 2024
4ecc6bf
`SchedulePropagation`: make sure the types of propagation are unique …
GeoffreyHuck Jul 8, 2024
d4ecc20
Fix lint order of imports.
GeoffreyHuck Jul 8, 2024
469b28f
Fix lint cyclomatic complexity: schedule propagation in `updateChildr…
GeoffreyHuck Jul 8, 2024
354d2e5
Update test: we have one more middleware to inject the endpoint confi…
GeoffreyHuck Jul 8, 2024
dd140bf
Cleanup: function not used anymore.
GeoffreyHuck Jul 8, 2024
f74233d
All the propagations are now async.
GeoffreyHuck Jul 8, 2024
46b8a27
Fix typo.
GeoffreyHuck Jul 8, 2024
3c34f0e
In the migrate command, we need to call the propagation now and not s…
GeoffreyHuck Jul 8, 2024
2626486
In the unit tests, the propagation endpoint hasn't been injected. Thi…
GeoffreyHuck Jul 8, 2024
460f923
The group propagation, when scheduled, should be run before the async…
GeoffreyHuck Jul 9, 2024
bd079a0
Fix test. Since we run all propagations async, and the system does al…
GeoffreyHuck Jul 9, 2024
27d5a62
Fix test. Since we run all propagations async, and the system does al…
GeoffreyHuck Jul 9, 2024
0415b3b
Merge pull request #1100 from France-ioi/1047
GeoffreyHuck Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/api/contests/set_additional_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,6 @@ func setAdditionalTimeForGroupInContest(
service.MustNotBeError(store.Exec("DROP TEMPORARY TABLE new_expires_at").Error())
if groupsGroupsModified {
store.ScheduleGroupsAncestorsPropagation()
store.ScheduleResultsPropagation()
store.SchedulePropagation([]string{"results"})
}
}
11 changes: 6 additions & 5 deletions app/api/groups/update_permissions.feature
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Feature: Change item access rights for a group
| attempt_id | participant_id | item_id |
| 0 | 21 | 103 |

Scenario Outline: Create a new permissions_granted row (with results propagation)
Scenario Outline: Create a new permissions_granted row
Given I am the user with id "21"
And the database table 'permissions_generated' has also the following rows:
| group_id | item_id | can_view_generated | can_grant_view_generated | can_watch_generated | can_edit_generated | is_owner_generated |
Expand Down Expand Up @@ -99,7 +99,7 @@ Feature: Change item access rights for a group
| {"can_view":"info","can_make_session_official":true} | info | none | none | none | false | true | info | none | none | none | none | none | none | none |
| {"is_owner":true} | none | none | none | none | true | false | solution | solution_with_grant | answer_with_grant | all_with_grant | content | solution | answer | all |

Scenario Outline: Create a new permissions_granted row (without results propagation)
Scenario Outline: Create a new permissions_granted row (more)
Given I am the user with id "21"
And the database table 'permissions_generated' has also the following rows:
| group_id | item_id | can_view_generated | can_grant_view_generated | can_watch_generated | can_edit_generated | is_owner_generated |
Expand Down Expand Up @@ -129,9 +129,10 @@ Feature: Change item access rights for a group
| 23 | 102 | none | none | none | none | false |
| 23 | 103 | none | none | none | none | false |
And the table "attempts" should stay unchanged
And the table "results_propagate" should be:
| attempt_id | participant_id | item_id | state |
| 0 | 21 | 103 | to_be_propagated |
And the table "results" should be:
| attempt_id | participant_id | item_id |
| 0 | 21 | 102 |
| 0 | 21 | 103 |
Examples:
| json | can_enter_from | can_enter_until |
| {"can_enter_from":"2019-05-30T11:00:00Z"} | 2019-05-30 11:00:00 | 9999-12-31 23:59:59 |
Expand Down
4 changes: 2 additions & 2 deletions app/api/groups/update_permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,10 +728,10 @@ func savePermissionsIntoDB(groupID, itemID, sourceGroupID int64, dbMap map[strin

permissionGrantedStore := s.PermissionsGranted()
service.MustNotBeError(permissionGrantedStore.InsertOrUpdateMap(dbMap, columnsToUpdate))
s.SchedulePermissionsPropagation()
s.SchedulePropagation([]string{"permissions"})
if dbMap["can_view"] != nil && dbMap["can_view"] != none || dbMap["is_owner"] != nil && dbMap["is_owner"].(bool) {
// permissionGrantedStore.After() implicitly (via triggers) marks some attempts as to_be_propagated
// when an item becomes visible, so we should propagate attempts here
s.ScheduleResultsPropagation()
s.SchedulePropagation([]string{"results"})
}
}
4 changes: 2 additions & 2 deletions app/api/items/apply_dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ func (srv *Service) applyDependency(rw http.ResponseWriter, httpReq *http.Reques
// If items have been unlocked, need to recompute access
if groupsUnlocked > 0 {
// generate permissions_generated from permissions_granted
store.SchedulePermissionsPropagation()
store.SchedulePropagation([]string{"permissions"})
// we should compute attempts again as new permissions were set and
// triggers on permissions_generated likely marked some attempts as 'to_be_propagated'
store.ScheduleResultsPropagation()
store.SchedulePropagation([]string{"results"})
}
return err
})
Expand Down
2 changes: 1 addition & 1 deletion app/api/items/create_attempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (srv *Service) createAttempt(w http.ResponseWriter, r *http.Request) servic
attemptID, err = store.Attempts().CreateNew(participantID, parentAttemptID, itemID, user.GroupID)
service.MustNotBeError(err)

store.ScheduleResultsPropagation()
store.SchedulePropagation([]string{"results"})
return nil
})
if apiError != service.NoError {
Expand Down
8 changes: 1 addition & 7 deletions app/api/items/create_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ func (srv *Service) createItem(w http.ResponseWriter, r *http.Request) service.A
func validateAndInsertItem(srv *Service, r *http.Request) (itemID int64, apiError service.APIError, err error) {
user := srv.GetUser(r)
store := srv.GetStore(r)
propagationsToRun := []string{}
err = store.InTransaction(func(store *database.DataStore) error {
input := NewItemRequest{}
formData := formdata.NewFormData(&input)
Expand Down Expand Up @@ -269,15 +268,10 @@ func validateAndInsertItem(srv *Service, r *http.Request) (itemID int64, apiErro

setNewItemAsRootActivityOrSkill(store, formData, &input, itemID)

propagationsToRun = append(propagationsToRun, "items_ancestors")
store.SchedulePropagation([]string{"items_ancestors", "permissions", "results"})

return nil
})
if err == nil {
propagationsToRun = append(propagationsToRun, "permissions", "results")

service.SchedulePropagation(store, srv.GetPropagationEndpoint(), propagationsToRun)
}

return itemID, apiError, err
}
Expand Down
2 changes: 1 addition & 1 deletion app/api/items/enter.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (srv *Service) enter(w http.ResponseWriter, r *http.Request) service.APIErr
store.ScheduleGroupsAncestorsPropagation()
// Upserting into groups_groups may mark some attempts as 'to_be_propagated',
// so we need to recompute them
store.ScheduleResultsPropagation()
store.SchedulePropagation([]string{"results"})
} else {
logging.GetLogEntry(r).Warnf("items.participants_group_id is not set for the item with id = %d", entryState.itemID)
}
Expand Down
6 changes: 3 additions & 3 deletions app/api/items/path_from_root_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,9 +817,9 @@ func Test_FindItemPath(t *testing.T) {
var got []items.ItemPath
assert.NoError(t, store.InTransaction(func(s *database.DataStore) error {
s.ScheduleGroupsAncestorsPropagation()
s.ScheduleItemsAncestorsPropagation()
s.SchedulePermissionsPropagation()
s.ScheduleResultsPropagation()
s.SchedulePropagation([]string{"items_ancestors"})
s.SchedulePropagation([]string{"permissions"})
s.SchedulePropagation([]string{"results"})

return nil
}))
Expand Down
2 changes: 1 addition & 1 deletion app/api/items/start_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (srv *Service) startResult(w http.ResponseWriter, r *http.Request) service.
resultStore := store.Results()
service.MustNotBeError(resultStore.MarkAsToBePropagated(participantID, attemptID, itemID, false))

service.SchedulePropagation(store, srv.GetPropagationEndpoint(), []string{"results"})
store.SchedulePropagation([]string{"results"})
}
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion app/api/items/start_result_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (srv *Service) startResultPath(w http.ResponseWriter, r *http.Request) serv
service.MustNotBeError(resultStore.InsertOrUpdateMaps(rowsToInsert, []string{"started_at", "latest_activity_at"}))
service.MustNotBeError(resultStore.InsertIgnoreMaps("results_propagate", rowsToInsertPropagate))

service.SchedulePropagation(store, srv.GetPropagationEndpoint(), []string{"results"})
store.SchedulePropagation([]string{"results"})
}

return nil
Expand Down
15 changes: 8 additions & 7 deletions app/api/items/update_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ func (srv *Service) updateItem(w http.ResponseWriter, r *http.Request) service.A
input := updateItemRequest{}
formData := formdata.NewFormData(&input)

var propagationsToRun []string

apiError := service.NoError
err = store.InTransaction(func(store *database.DataStore) error {
var itemInfo struct {
Expand Down Expand Up @@ -187,14 +185,15 @@ func (srv *Service) updateItem(w http.ResponseWriter, r *http.Request) service.A
return apiError.Error // rollback
}

propagationsToRun, apiError, err = updateChildrenAndRunListeners(
apiError, err = updateChildrenAndRunListeners(
formData,
store,
itemID,
&input,
childrenInfoMap,
oldPropagationLevelsMap,
)

return err
})

Expand All @@ -203,8 +202,6 @@ func (srv *Service) updateItem(w http.ResponseWriter, r *http.Request) service.A
}
service.MustNotBeError(err)

service.SchedulePropagation(store, srv.GetPropagationEndpoint(), propagationsToRun)

// response
service.MustNotBeError(render.Render(w, r, service.UpdateSuccess(nil)))
return service.NoError
Expand Down Expand Up @@ -240,7 +237,9 @@ func updateChildrenAndRunListeners(
input *updateItemRequest,
childrenPermissionMap map[int64]permissionAndType,
oldPropagationLevelsMap map[int64]*itemsRelationData,
) (propagationsToRun []string, apiError service.APIError, err error) {
) (apiError service.APIError, err error) {
var propagationsToRun []string

if formData.IsSet("children") {
err = store.ItemItems().WithItemsRelationsLock(func(lockedStore *database.DataStore) error {
deleteStatement := lockedStore.ItemItems().DB.
Expand Down Expand Up @@ -280,7 +279,9 @@ func updateChildrenAndRunListeners(
propagationsToRun = append(propagationsToRun, "results")
}

return propagationsToRun, apiError, err
store.SchedulePropagation(propagationsToRun)

return apiError, err
}

// constructUpdateItemChildTypeNonSkillValidator constructs a validator for the Children field that checks
Expand Down
10 changes: 10 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
package app

import (
"context"
crand "crypto/rand"
"encoding/binary"
"fmt"
"net/http"

"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
Expand Down Expand Up @@ -97,6 +99,14 @@ func (app *Application) Reset(config *viper.Viper) error {
router.Use(corsConfig().Handler) // no need for CORS if served through the same domain
router.Use(domain.Middleware(domainsConfig, serverConfig.GetString("domainOverride")))

router.Use(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := context.WithValue(r.Context(), "propagation_endpoint", serverConfig.GetString("propagation_endpoint"))

next.ServeHTTP(w, r.WithContext(ctx))
})
})

if appenv.IsEnvDev() {
router.Mount("/debug", middleware.Profiler())
}
Expand Down
4 changes: 2 additions & 2 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestNew_Success(t *testing.T) {
assert.NotNil(app.Database)
assert.NotNil(app.HTTPHandler)
assert.NotNil(app.apiCtx)
assert.Len(app.HTTPHandler.Middlewares(), 8)
assert.Len(app.HTTPHandler.Middlewares(), 9)
assert.True(len(app.HTTPHandler.Routes()) > 0)
assert.Equal("/*", app.HTTPHandler.Routes()[0].Pattern) // test default val
}
Expand All @@ -46,7 +46,7 @@ func TestNew_SuccessNoCompress(t *testing.T) {
_ = os.Setenv("ALGOREA_SERVER__COMPRESS", "false")
defer func() { _ = os.Unsetenv("ALGOREA_SERVER__COMPRESS") }()
app, _ := New()
assert.Len(app.HTTPHandler.Middlewares(), 7)
assert.Len(app.HTTPHandler.Middlewares(), 8)
}

func TestNew_NotDefaultRootPath(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion app/database/badges.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s *GroupStore) StoreBadges(badges []Badge, userID int64, newUser bool) (er

if ancestorsCalculationNeeded {
s.ScheduleGroupsAncestorsPropagation()
s.ScheduleResultsPropagation()
s.SchedulePropagation([]string{"results"})
}

if len(managedBadgeGroups) > 0 {
Expand Down
38 changes: 30 additions & 8 deletions app/database/data_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"time"

"github.com/France-ioi/AlgoreaBackend/app/utils"

"github.com/France-ioi/AlgoreaBackend/app/rand"
)

Expand Down Expand Up @@ -176,10 +178,11 @@ func (s *DataStore) NewID() int64 {
}

type awaitingTriggers struct {
ItemAncestors bool
GroupAncestors bool
Permissions bool
Results bool
ItemAncestors bool
GroupAncestors bool
Permissions bool
Results bool
SchedulePropagationTypes []string
}
type dbContextKey string

Expand All @@ -206,6 +209,16 @@ func (s *DataStore) InTransaction(txFunc func(*DataStore) error) error {
triggersToRun.GroupAncestors = false
s.createNewAncestors("groups", "group")
}
if len(triggersToRun.SchedulePropagationTypes) > 0 {
types := triggersToRun.SchedulePropagationTypes
triggersToRun.SchedulePropagationTypes = []string{}

propagationEndpoint := ""
if s.Context().Value("propagation_endpoint") != nil {
propagationEndpoint = s.Context().Value("propagation_endpoint").(string)
}
StartAsyncPropagation(s, propagationEndpoint, types)
}
if triggersToRun.ItemAncestors {
triggersToRun.ItemAncestors = false
s.createNewAncestors("items", "item")
Expand All @@ -222,6 +235,14 @@ func (s *DataStore) InTransaction(txFunc func(*DataStore) error) error {
return err
}

// SchedulePropagation schedules a run of the propagation for the given types after the transaction commit.
func (s *DataStore) SchedulePropagation(types []string) {
s.mustBeInTransaction()

triggersToRun := s.DB.ctx.Value(triggersContextKey).(*awaitingTriggers)
triggersToRun.SchedulePropagationTypes = utils.UniqueStrings(append(triggersToRun.SchedulePropagationTypes, types...))
Copy link
Collaborator

@zenovich zenovich Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the order of types doesn't matter, it would be much cleaner to store map[string]struct{} in triggersToRun.SchedulePropagationTypes instead of creating utils.UniqueStrings() and tests for it. Also, the map works faster. But a struct containing all the possible values as boolean fields (like 'awaitingTriggers') would be even better than the map: it would store only several booleans while the map stores strings. Also, the struct provides immediate access to its fields, while the map uses hashing. Also, with the struct, it is not possible to mistype a value which is an often issue with maps/slices.

If the order of types matters, UniqueString() breaks it anyway since maps don't preserve the order of keys.

}

// ScheduleResultsPropagation schedules a run of ResultStore::propagate() after the transaction commit.
func (s *DataStore) ScheduleResultsPropagation() {
s.mustBeInTransaction()
Expand Down Expand Up @@ -262,10 +283,6 @@ func (s *DataStore) WithForeignKeyChecksDisabled(blockFunc func(*DataStore) erro
})
}

func (s *DataStore) IsInTransaction() bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a nice useful method.

return s.DB.isInTransaction()
}

// WithNamedLock wraps the given function in GET_LOCK/RELEASE_LOCK.
func (s *DataStore) WithNamedLock(lockName string, timeout time.Duration, txFunc func(*DataStore) error) error {
return s.withNamedLock(lockName, timeout, func(db *DB) error {
Expand Down Expand Up @@ -327,3 +344,8 @@ func (s *DataStore) InsertOrUpdateMap(dataMap map[string]interface{}, updateColu
func (s *DataStore) InsertOrUpdateMaps(dataMap []map[string]interface{}, updateColumns []string) error {
return s.DB.insertOrUpdateMaps(s.tableName, dataMap, updateColumns)
}

// MustNotBeInTransaction panics if the store is in a transaction.
func (s *DataStore) MustNotBeInTransaction() {
s.DB.mustNotBeInTransaction()
}
12 changes: 12 additions & 0 deletions app/database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ func (conn *DB) New() *DB {
return newDB(conn.ctx, conn.db.New())
}

// Context returns the context of the current db connection.
func (conn *DB) Context() context.Context {
return conn.ctx
}

func (conn *DB) inTransaction(txFunc func(*DB) error) (err error) {
return conn.inTransactionWithCount(txFunc, 0)
}
Expand Down Expand Up @@ -639,6 +644,13 @@ func (conn *DB) withForeignKeyChecksDisabled(blockFunc func(*DB) error) (err err
//}
}

// mustNotBeInTransaction panics if the current connection is in a transaction.
func (conn *DB) mustNotBeInTransaction() {
if conn.isInTransaction() {
panic("should not be executed in a transaction")
}
}

func mustNotBeError(err error) {
if err != nil {
panic(err)
Expand Down
6 changes: 3 additions & 3 deletions app/database/group_group_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (s *GroupGroupStore) CreateRelation(parentGroupID, childGroupID int64) (err
// and it might not work if we call CreateRelation again in this same transaction.
s.createNewAncestorsInsideTransaction("groups", "group")

s.ScheduleResultsPropagation()
s.SchedulePropagation([]string{"results"})
return nil
}))
return err
Expand Down Expand Up @@ -119,7 +119,7 @@ func (s *GroupGroupStore) DeleteRelation(parentGroupID, childGroupID int64, shou
s.ScheduleGroupsAncestorsPropagation()

if shouldPropagatePermissions {
s.SchedulePermissionsPropagation()
s.SchedulePropagation([]string{"permissions"})
}
}

Expand Down Expand Up @@ -183,7 +183,7 @@ func (s *GroupGroupStore) deleteGroupAndOrphanedDescendants(groupID int64) {
// cascading deletes from many tables including groups_ancestors, groups_propagate, permission_granted & permissions_generated
mustNotBeError(s.Groups().Delete("id IN (?)", idsToDelete).Error())

s.SchedulePermissionsPropagation()
s.SchedulePropagation([]string{"permissions"})
}

func (s *GroupGroupStore) deleteObjectsLinkedToGroups(groupIDs []int64) *DB {
Expand Down
2 changes: 1 addition & 1 deletion app/database/group_group_store_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestGroupGroupStore_DeleteRelation(t *testing.T) {
assert.NoError(t, dataStore.Table("groups_propagate").UpdateColumn("ancestors_computation_state", "done").Error())

assert.NoError(t, dataStore.InTransaction(func(s *database.DataStore) error {
s.SchedulePermissionsPropagation()
s.SchedulePropagation([]string{"permissions"})
return nil
}))
err := dataStore.InTransaction(func(s *database.DataStore) error {
Expand Down
Loading