Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure the HTTP call is never made inside a database transaction. #1099

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
1e4c869
Make sure `SchedulePropagation` is never called inside a transaction …
GeoffreyHuck Jun 16, 2024
acd5bf0
Inject the endpoint in the request context and allow to access the co…
GeoffreyHuck Jul 5, 2024
2b87da4
This constant is only used in this file, so it doesn't have to be pub…
GeoffreyHuck Jul 5, 2024
faf2c9f
Rename the function into `StartAsyncPropagation`, it's clearer what i…
GeoffreyHuck Jul 5, 2024
3e53ee0
The propagation is now called after the current transaction. Update t…
GeoffreyHuck Jul 5, 2024
4ecc6bf
`SchedulePropagation`: make sure the types of propagation are unique …
GeoffreyHuck Jul 8, 2024
d4ecc20
Fix lint order of imports.
GeoffreyHuck Jul 8, 2024
469b28f
Fix lint cyclomatic complexity: schedule propagation in `updateChildr…
GeoffreyHuck Jul 8, 2024
354d2e5
Update test: we have one more middleware to inject the endpoint confi…
GeoffreyHuck Jul 8, 2024
dd140bf
Cleanup: function not used anymore.
GeoffreyHuck Jul 8, 2024
f74233d
All the propagations are now async.
GeoffreyHuck Jul 8, 2024
46b8a27
Fix typo.
GeoffreyHuck Jul 8, 2024
3c34f0e
In the migrate command, we need to call the propagation now and not s…
GeoffreyHuck Jul 8, 2024
2626486
In the unit tests, the propagation endpoint hasn't been injected. Thi…
GeoffreyHuck Jul 8, 2024
460f923
The group propagation, when scheduled, should be run before the async…
GeoffreyHuck Jul 9, 2024
bd079a0
Fix test. Since we run all propagations async, and the system does al…
GeoffreyHuck Jul 9, 2024
27d5a62
Fix test. Since we run all propagations async, and the system does al…
GeoffreyHuck Jul 9, 2024
0415b3b
Merge pull request #1100 from France-ioi/1047
GeoffreyHuck Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions app/api/items/create_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ func (srv *Service) createItem(w http.ResponseWriter, r *http.Request) service.A
func validateAndInsertItem(srv *Service, r *http.Request) (itemID int64, apiError service.APIError, err error) {
user := srv.GetUser(r)
store := srv.GetStore(r)
propagationsToRun := []string{}
err = store.InTransaction(func(store *database.DataStore) error {
input := NewItemRequest{}
formData := formdata.NewFormData(&input)
Expand Down Expand Up @@ -269,15 +268,10 @@ func validateAndInsertItem(srv *Service, r *http.Request) (itemID int64, apiErro

setNewItemAsRootActivityOrSkill(store, formData, &input, itemID)

propagationsToRun = append(propagationsToRun, "items_ancestors")
store.SchedulePropagation([]string{"items_ancestors", "permissions", "results"})

return nil
})
if err == nil {
propagationsToRun = append(propagationsToRun, "permissions", "results")

service.SchedulePropagation(store, srv.GetPropagationEndpoint(), propagationsToRun)
}

return itemID, apiError, err
}
Expand Down
2 changes: 1 addition & 1 deletion app/api/items/start_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (srv *Service) startResult(w http.ResponseWriter, r *http.Request) service.
resultStore := store.Results()
service.MustNotBeError(resultStore.MarkAsToBePropagated(participantID, attemptID, itemID, false))

service.SchedulePropagation(store, srv.GetPropagationEndpoint(), []string{"results"})
store.SchedulePropagation([]string{"results"})
}
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion app/api/items/start_result_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (srv *Service) startResultPath(w http.ResponseWriter, r *http.Request) serv
service.MustNotBeError(resultStore.InsertOrUpdateMaps(rowsToInsert, []string{"started_at", "latest_activity_at"}))
service.MustNotBeError(resultStore.InsertIgnoreMaps("results_propagate", rowsToInsertPropagate))

service.SchedulePropagation(store, srv.GetPropagationEndpoint(), []string{"results"})
store.SchedulePropagation([]string{"results"})
}

return nil
Expand Down
15 changes: 8 additions & 7 deletions app/api/items/update_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ func (srv *Service) updateItem(w http.ResponseWriter, r *http.Request) service.A
input := updateItemRequest{}
formData := formdata.NewFormData(&input)

var propagationsToRun []string

apiError := service.NoError
err = store.InTransaction(func(store *database.DataStore) error {
var itemInfo struct {
Expand Down Expand Up @@ -187,14 +185,15 @@ func (srv *Service) updateItem(w http.ResponseWriter, r *http.Request) service.A
return apiError.Error // rollback
}

propagationsToRun, apiError, err = updateChildrenAndRunListeners(
apiError, err = updateChildrenAndRunListeners(
formData,
store,
itemID,
&input,
childrenInfoMap,
oldPropagationLevelsMap,
)

return err
})

Expand All @@ -203,8 +202,6 @@ func (srv *Service) updateItem(w http.ResponseWriter, r *http.Request) service.A
}
service.MustNotBeError(err)

service.SchedulePropagation(store, srv.GetPropagationEndpoint(), propagationsToRun)

// response
service.MustNotBeError(render.Render(w, r, service.UpdateSuccess(nil)))
return service.NoError
Expand Down Expand Up @@ -240,7 +237,9 @@ func updateChildrenAndRunListeners(
input *updateItemRequest,
childrenPermissionMap map[int64]permissionAndType,
oldPropagationLevelsMap map[int64]*itemsRelationData,
) (propagationsToRun []string, apiError service.APIError, err error) {
) (apiError service.APIError, err error) {
var propagationsToRun []string

if formData.IsSet("children") {
err = store.ItemItems().WithItemsRelationsLock(func(lockedStore *database.DataStore) error {
deleteStatement := lockedStore.ItemItems().DB.
Expand Down Expand Up @@ -280,7 +279,9 @@ func updateChildrenAndRunListeners(
propagationsToRun = append(propagationsToRun, "results")
}

return propagationsToRun, apiError, err
store.SchedulePropagation(propagationsToRun)

return apiError, err
}

// constructUpdateItemChildTypeNonSkillValidator constructs a validator for the Children field that checks
Expand Down
10 changes: 10 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
package app

import (
"context"
crand "crypto/rand"
"encoding/binary"
"fmt"
"net/http"

"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
Expand Down Expand Up @@ -97,6 +99,14 @@ func (app *Application) Reset(config *viper.Viper) error {
router.Use(corsConfig().Handler) // no need for CORS if served through the same domain
router.Use(domain.Middleware(domainsConfig, serverConfig.GetString("domainOverride")))

router.Use(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := context.WithValue(r.Context(), "propagation_endpoint", serverConfig.GetString("propagation_endpoint"))

next.ServeHTTP(w, r.WithContext(ctx))
})
})

if appenv.IsEnvDev() {
router.Mount("/debug", middleware.Profiler())
}
Expand Down
4 changes: 2 additions & 2 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestNew_Success(t *testing.T) {
assert.NotNil(app.Database)
assert.NotNil(app.HTTPHandler)
assert.NotNil(app.apiCtx)
assert.Len(app.HTTPHandler.Middlewares(), 8)
assert.Len(app.HTTPHandler.Middlewares(), 9)
assert.True(len(app.HTTPHandler.Routes()) > 0)
assert.Equal("/*", app.HTTPHandler.Routes()[0].Pattern) // test default val
}
Expand All @@ -46,7 +46,7 @@ func TestNew_SuccessNoCompress(t *testing.T) {
_ = os.Setenv("ALGOREA_SERVER__COMPRESS", "false")
defer func() { _ = os.Unsetenv("ALGOREA_SERVER__COMPRESS") }()
app, _ := New()
assert.Len(app.HTTPHandler.Middlewares(), 7)
assert.Len(app.HTTPHandler.Middlewares(), 8)
}

func TestNew_NotDefaultRootPath(t *testing.T) {
Expand Down
34 changes: 26 additions & 8 deletions app/database/data_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"time"

"github.com/France-ioi/AlgoreaBackend/app/utils"

"github.com/France-ioi/AlgoreaBackend/app/rand"
)

Expand Down Expand Up @@ -176,10 +178,11 @@ func (s *DataStore) NewID() int64 {
}

type awaitingTriggers struct {
ItemAncestors bool
GroupAncestors bool
Permissions bool
Results bool
ItemAncestors bool
GroupAncestors bool
Permissions bool
Results bool
SchedulePropagationTypes []string
}
type dbContextKey string

Expand All @@ -202,6 +205,12 @@ func (s *DataStore) InTransaction(txFunc func(*DataStore) error) error {

triggersToRun := s.ctx.Value(triggersContextKey).(*awaitingTriggers)

if len(triggersToRun.SchedulePropagationTypes) > 0 {
types := triggersToRun.SchedulePropagationTypes
triggersToRun.SchedulePropagationTypes = []string{}

StartAsyncPropagation(s, s.Context().Value("propagation_endpoint").(string), types)
}
if triggersToRun.GroupAncestors {
triggersToRun.GroupAncestors = false
s.createNewAncestors("groups", "group")
Expand All @@ -222,6 +231,14 @@ func (s *DataStore) InTransaction(txFunc func(*DataStore) error) error {
return err
}

// SchedulePropagation schedules a run of the propagation for the given types after the transaction commit.
func (s *DataStore) SchedulePropagation(types []string) {
s.mustBeInTransaction()

triggersToRun := s.DB.ctx.Value(triggersContextKey).(*awaitingTriggers)
triggersToRun.SchedulePropagationTypes = utils.UniqueStrings(append(triggersToRun.SchedulePropagationTypes, types...))
Copy link
Collaborator

@zenovich zenovich Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the order of types doesn't matter, it would be much cleaner to store map[string]struct{} in triggersToRun.SchedulePropagationTypes instead of creating utils.UniqueStrings() and tests for it. Also, the map works faster. But a struct containing all the possible values as boolean fields (like 'awaitingTriggers') would be even better than the map: it would store only several booleans while the map stores strings. Also, the struct provides immediate access to its fields, while the map uses hashing. Also, with the struct, it is not possible to mistype a value which is an often issue with maps/slices.

If the order of types matters, UniqueString() breaks it anyway since maps don't preserve the order of keys.

}

// ScheduleResultsPropagation schedules a run of ResultStore::propagate() after the transaction commit.
func (s *DataStore) ScheduleResultsPropagation() {
s.mustBeInTransaction()
Expand Down Expand Up @@ -262,10 +279,6 @@ func (s *DataStore) WithForeignKeyChecksDisabled(blockFunc func(*DataStore) erro
})
}

func (s *DataStore) IsInTransaction() bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a nice useful method.

return s.DB.isInTransaction()
}

// WithNamedLock wraps the given function in GET_LOCK/RELEASE_LOCK.
func (s *DataStore) WithNamedLock(lockName string, timeout time.Duration, txFunc func(*DataStore) error) error {
return s.withNamedLock(lockName, timeout, func(db *DB) error {
Expand Down Expand Up @@ -327,3 +340,8 @@ func (s *DataStore) InsertOrUpdateMap(dataMap map[string]interface{}, updateColu
func (s *DataStore) InsertOrUpdateMaps(dataMap []map[string]interface{}, updateColumns []string) error {
return s.DB.insertOrUpdateMaps(s.tableName, dataMap, updateColumns)
}

// MustNotBeInTransaction panics if the store is in a transaction.
func (s *DataStore) MustNotBeInTransaction() {
s.DB.mustNotBeInTransaction()
}
12 changes: 12 additions & 0 deletions app/database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ func (conn *DB) New() *DB {
return newDB(conn.ctx, conn.db.New())
}

// Context returns the context of the current db connection.
func (conn *DB) Context() context.Context {
return conn.ctx
}

func (conn *DB) inTransaction(txFunc func(*DB) error) (err error) {
return conn.inTransactionWithCount(txFunc, 0)
}
Expand Down Expand Up @@ -639,6 +644,13 @@ func (conn *DB) withForeignKeyChecksDisabled(blockFunc func(*DB) error) (err err
//}
}

// mustNotBeInTransaction panics if the current connection is in a transaction.
func (conn *DB) mustNotBeInTransaction() {
if conn.isInTransaction() {
panic("should not be executed in a transaction")
}
}

func mustNotBeError(err error) {
if err != nil {
panic(err)
Expand Down
31 changes: 13 additions & 18 deletions app/service/propagation.go → app/database/propagation.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
package service
package database

import (
"net/http"
"strings"
"time"

"github.com/France-ioi/AlgoreaBackend/app/database"
"github.com/France-ioi/AlgoreaBackend/app/logging"
)

const PropagationEndpointTimeout = 3 * time.Second
const endpointTimeout = 3 * time.Second

// SchedulePropagation schedules asynchronous propagation of the given types.
// StartAsyncPropagation schedules asynchronous propagation of the given types.
// If endpoint is an empty string, it will be done synchronously.
func SchedulePropagation(store *database.DataStore, endpoint string, types []string) {
func StartAsyncPropagation(store *DataStore, endpoint string, types []string) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this method into app/service. It can be called right from the endpoint handler in case when the async propagation is chosen. The database package is for database-related things only.

// Must not be called in a transaction because it calls an endpoint, which can take a long time.
store.MustNotBeInTransaction()

endpointFailed := false
if endpoint != "" {
// Async.
client := http.Client{
Timeout: PropagationEndpointTimeout,
Timeout: endpointTimeout,
}

callTime := time.Now()
Expand All @@ -43,20 +45,13 @@ func SchedulePropagation(store *database.DataStore, endpoint string, types []str
}

if endpoint == "" || endpointFailed {
// Sync.
if store.IsInTransaction() {
err := store.InTransaction(func(store *DataStore) error {
store.ScheduleItemsAncestorsPropagation()
store.SchedulePermissionsPropagation()
store.ScheduleResultsPropagation()
} else {
err := store.InTransaction(func(store *database.DataStore) error {
store.ScheduleItemsAncestorsPropagation()
store.SchedulePermissionsPropagation()
store.ScheduleResultsPropagation()

return nil
})
MustNotBeError(err)
}

return nil
})
mustNotBeError(err)
Copy link
Collaborator

@zenovich zenovich Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Public methods cannot return errors via panic, panic/recover pattern should be used only within a package (see https://go.dev/doc/effective_go#recover). Also, it is an "architecture decision" made in March 2019, which we have always been following since then.

}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package service_test
package database_test

import (
"fmt"
"net/http"
"testing"

"github.com/France-ioi/AlgoreaBackend/app/database"

"github.com/stretchr/testify/assert"
"github.com/thingful/httpmock"

"github.com/France-ioi/AlgoreaBackend/app/database"
"github.com/France-ioi/AlgoreaBackend/app/logging"
"github.com/France-ioi/AlgoreaBackend/app/loggingtest"
"github.com/France-ioi/AlgoreaBackend/app/service"
"github.com/France-ioi/AlgoreaBackend/testhelpers"
)

func TestSchedulePropagation(t *testing.T) {
func TestStartAsyncPropagation(t *testing.T) {
type args struct {
endpoint string
}
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestSchedulePropagation(t *testing.T) {
}
}

service.SchedulePropagation(store, tt.args.endpoint, []string{"permissions"})
database.StartAsyncPropagation(store, tt.args.endpoint, []string{"permissions"})

exists, err := store.Permissions().Where("item_id = 1").HasRows()
assert.NoError(t, err)
Expand All @@ -122,3 +122,17 @@ func TestSchedulePropagation(t *testing.T) {
})
}
}

func TestStartAsyncPropagation_ShouldPanicWhenCalledInsideTransaction(t *testing.T) {
db := testhelpers.SetupDBWithFixtureString("")
defer func() { _ = db.Close() }()
store := database.NewDataStore(db)

assert.Panics(t, func() {
err := store.InTransaction(func(store *database.DataStore) error {
database.StartAsyncPropagation(store, "", []string{"permissions"})
return nil
})
assert.NoError(t, err)
})
}
15 changes: 15 additions & 0 deletions app/utils/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,18 @@ func Capitalize(str string) string {

return string(r)
}

// UniqueStrings returns a new slice containing only the unique strings from the input slice.
func UniqueStrings(slice []string) []string {
uniques := make(map[string]bool, len(slice))
for _, s := range slice {
uniques[s] = true
}

result := make([]string, 0, len(uniques))
for s := range uniques {
result = append(result, s)
}

return result
}
Loading