diff --git a/server/appservices/kv.go b/server/appservices/kv.go index e2fd65d6c..9102e2707 100644 --- a/server/appservices/kv.go +++ b/server/appservices/kv.go @@ -85,3 +85,9 @@ func (a *AppServices) KVDebugAppInfo(r *incoming.Request, appID apps.AppID) (*st } return appInfo, nil } + +func (a *AppServices) RunCachedStoreTest(r *incoming.Request, params store.CachedStoreTestParams) { + go func() { + a.store.RunCachedStoreTest(r, params) + }() +} diff --git a/server/appservices/service.go b/server/appservices/service.go index 7d08e7c5b..d0793dd6e 100644 --- a/server/appservices/service.go +++ b/server/appservices/service.go @@ -34,6 +34,7 @@ type Service interface { // Internal DeleteAppData(r *incoming.Request, appID apps.AppID, force bool) error + RunCachedStoreTest(r *incoming.Request, s store.CachedStoreTestParams) } type AppServices struct { diff --git a/server/builtin/app.go b/server/builtin/app.go index 9cd3a141c..8562ac54d 100644 --- a/server/builtin/app.go +++ b/server/builtin/app.go @@ -60,37 +60,41 @@ const ( fOverrides = "overrides" fPage = "page" fSecret = "secret" + fNumberPuts = "num_puts" + fWaitToSync = "wait_to_sync" + fStoreKind = "store_kind" fSessionID = "session_id" fURL = "url" ) const ( - PathDebugClean = "/debug/clean" - PathDebugKVInfo = "/debug/kv/info" - PathDebugKVList = "/debug/kv/list" - PathDebugStoreList = "/debug/store/list" - PathDebugStorePollute = "/debug/store/pollute" - PathDebugSessionsList = "/debug/session/list" - pDebugBindings = "/debug/bindings" - pDebugKVClean = "/debug/kv/clean" - pDebugKVCreate = "/debug/kv/create" - pDebugKVEdit = "/debug/kv/edit" - pDebugKVEditModal = "/debug/kv/edit-modal" - pDebugLogs = "/debug/logs" - pDebugOAuthConfigView = "/debug/oauth/config/view" - pDebugSessionsRevoke = "/debug/session/delete" - pDebugSessionsView = "/debug/session/view" - pDisable = "/disable" - pEnable = "/enable" - pInfo = "/info" - pInstallConsentModal = "/install-consent" - pInstallConsentSource = "/install-consent/form" - pInstallHTTP = "/install-http" - pInstallListed = "/install-listed" - pList = "/list" - pSettingsModalSave = "/settings/save" - pSettingsModalSource = "/settings/form" - pUninstall = "/uninstall" + PathDebugClean = "/debug/clean" + PathDebugKVInfo = "/debug/kv/info" + PathDebugKVList = "/debug/kv/list" + PathDebugStoreList = "/debug/store/list" + PathDebugStorePollute = "/debug/store/pollute" + PathDebugSessionsList = "/debug/session/list" + pDebugBindings = "/debug/bindings" + pDebugClusterTestCached = "/debug/cluster/test-cached" + pDebugKVClean = "/debug/kv/clean" + pDebugKVCreate = "/debug/kv/create" + pDebugKVEdit = "/debug/kv/edit" + pDebugKVEditModal = "/debug/kv/edit-modal" + pDebugLogs = "/debug/logs" + pDebugOAuthConfigView = "/debug/oauth/config/view" + pDebugSessionsRevoke = "/debug/session/delete" + pDebugSessionsView = "/debug/session/view" + pDisable = "/disable" + pEnable = "/enable" + pInfo = "/info" + pInstallConsentModal = "/install-consent" + pInstallConsentSource = "/install-consent/form" + pInstallHTTP = "/install-http" + pInstallListed = "/install-listed" + pList = "/list" + pSettingsModalSave = "/settings/save" + pSettingsModalSource = "/settings/form" + pUninstall = "/uninstall" ) const ( @@ -138,24 +142,25 @@ func NewBuiltinApp(api config.API, proxy proxy.Service, appservices appservices. PathDebugStoreList: requireAdmin(a.debugStoreList), PathDebugStorePollute: requireAdmin(a.debugStorePollute), - pDebugBindings: requireAdmin(a.debugBindings), - pDebugKVClean: requireAdmin(a.debugKVClean), - pDebugKVCreate: requireAdmin(a.debugKVCreate), - pDebugKVEdit: requireAdmin(a.debugKVEdit), - pDebugKVEditModal: requireAdmin(a.debugKVEdit), - pDebugOAuthConfigView: requireAdmin(a.debugOAuthConfigView), - pDebugSessionsRevoke: requireAdmin(a.debugSessionsRevoke), - pDebugSessionsView: requireAdmin(a.debugSessionsView), - pDisable: requireAdmin(a.disable), - pEnable: requireAdmin(a.enable), - pInstallConsentModal: requireAdmin(a.installConsent), - pInstallConsentSource: requireAdmin(a.installConsentForm), - pInstallHTTP: requireAdmin(a.installHTTP), - pInstallListed: requireAdmin(a.installListed), - pList: requireAdmin(a.list), - pSettingsModalSave: requireAdmin(a.settingsSave), - pSettingsModalSource: requireAdmin(a.settingsForm), - pUninstall: requireAdmin(a.uninstall), + pDebugBindings: requireAdmin(a.debugBindings), + pDebugClusterTestCached: requireAdmin(a.debugClusterTestCached), + pDebugKVClean: requireAdmin(a.debugKVClean), + pDebugKVCreate: requireAdmin(a.debugKVCreate), + pDebugKVEdit: requireAdmin(a.debugKVEdit), + pDebugKVEditModal: requireAdmin(a.debugKVEdit), + pDebugOAuthConfigView: requireAdmin(a.debugOAuthConfigView), + pDebugSessionsRevoke: requireAdmin(a.debugSessionsRevoke), + pDebugSessionsView: requireAdmin(a.debugSessionsView), + pDisable: requireAdmin(a.disable), + pEnable: requireAdmin(a.enable), + pInstallConsentModal: requireAdmin(a.installConsent), + pInstallConsentSource: requireAdmin(a.installConsentForm), + pInstallHTTP: requireAdmin(a.installHTTP), + pInstallListed: requireAdmin(a.installListed), + pList: requireAdmin(a.list), + pSettingsModalSave: requireAdmin(a.settingsSave), + pSettingsModalSource: requireAdmin(a.settingsForm), + pUninstall: requireAdmin(a.uninstall), // Lookups. pLookupAppID: requireAdmin(a.lookupAppID), diff --git a/server/builtin/debug.go b/server/builtin/debug.go index d89b15e12..d130344f0 100644 --- a/server/builtin/debug.go +++ b/server/builtin/debug.go @@ -19,6 +19,20 @@ func (a *builtinApp) debugCommandBinding(loc *i18n.Localizer) apps.Binding { Bindings: []apps.Binding{ a.debugBindingsCommandBinding(loc), a.debugCleanCommandBinding(loc), + { + Location: "cluster", + Label: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{ + ID: "command.debug.cluster.label", + Other: "cluster", + }), + Description: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{ + ID: "command.debug.cluster.description", + Other: "Test high availability (cluster) functionality.", + }), + Bindings: []apps.Binding{ + a.debugClusterTestCachedCommandBinding(loc), + }, + }, { Location: "kv", Label: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{ diff --git a/server/builtin/debug_cluster_test_cached.go b/server/builtin/debug_cluster_test_cached.go new file mode 100644 index 000000000..844c52a1d --- /dev/null +++ b/server/builtin/debug_cluster_test_cached.go @@ -0,0 +1,109 @@ +// Copyright (c) 2019-present Mattermost, Inc. All Rights Reserved. +// See License for license information. + +package builtin + +import ( + "strconv" + "time" + + "github.com/nicksnyder/go-i18n/v2/i18n" + + "github.com/mattermost/mattermost-plugin-apps/apps" + "github.com/mattermost/mattermost-plugin-apps/server/incoming" + "github.com/mattermost/mattermost-plugin-apps/server/store" + "github.com/mattermost/mattermost-server/v6/model" +) + +func (a *builtinApp) debugClusterTestCachedCommandBinding(loc *i18n.Localizer) apps.Binding { + return apps.Binding{ + Location: "test-cached", + Label: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{ + ID: "command.debug.cluster.test_cached.label", + Other: "test-cached", + }), + Description: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{ + ID: "command.debug.cluster.test_cached.description", + Other: "Runs a test of cluster aware cached store.", + }), + Hint: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{ + ID: "command.debug.cluster.test_cached.hint", + Other: "[ options... ]", + }), + Form: &apps.Form{ + Submit: newUserCall(pDebugClusterTestCached), + Fields: []apps.Field{ + { + Name: fNumberPuts, + Type: apps.FieldTypeText, + ModalLabel: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{ + ID: "field.cluster.num_puts.modal_label", + Other: "Number of PUTs", + }), + Value: "10", + }, + { + Name: fWaitToSync, + Type: apps.FieldTypeText, + ModalLabel: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{ + ID: "field.cluster.wait_to_sync.modal_label", + Other: "Wait before checking the index, in seconds", + }), + Value: "2", + }, + { + Name: fStoreKind, + Type: apps.FieldTypeStaticSelect, + SelectStaticOptions: []apps.SelectOption{ + { + Label: string(store.SimpleCachedStoreKind), + Value: string(store.SimpleCachedStoreKind), + }, + { + Label: string(store.SingleWriterCachedStoreKind), + Value: string(store.SingleWriterCachedStoreKind), + }, + { + Label: string(store.MutexCachedStoreKind), + Value: string(store.MutexCachedStoreKind), + }, + { + Label: string(store.TestCachedStoreKind), + Value: string(store.TestCachedStoreKind), + }, + }, + ModalLabel: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{ + ID: "field.cluster.store_type.modal_label", + Other: "Cluster replication type", + }), + Value: apps.SelectOption{ + Label: string(store.MutexCachedStoreKind), + Value: string(store.MutexCachedStoreKind), + }, + }, + }, + }, + } +} + +func (a *builtinApp) debugClusterTestCached(r *incoming.Request, creq apps.CallRequest) apps.CallResponse { + numPuts, _ := strconv.Atoi(creq.GetValue(fNumberPuts, "")) + waitSeconds, _ := strconv.Atoi(creq.GetValue(fWaitToSync, "")) + storeKind := store.CachedStoreClusterKind(creq.GetValue(fStoreKind, "")) + name := "test/" + model.NewId() + + a.appservices.RunCachedStoreTest(r, + store.CachedStoreTestParams{ + Kind: storeKind, + Name: name, + NumberOfPuts: numPuts, + WaitForIndexToSync: time.Duration(waitSeconds) * time.Second, + }, + ) + + loc := a.newLocalizer(creq) + return apps.NewTextResponse(a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{ + ID: "message.cluster.started_test", + Other: "Started the test. Will report as a direct conversation with @appsbot.", + })) +} diff --git a/server/httpin/service.go b/server/httpin/service.go index 54c5a3889..37e03eac7 100644 --- a/server/httpin/service.go +++ b/server/httpin/service.go @@ -137,13 +137,13 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, req *http.Request) { defer cancel() r = r.WithCtx(ctx) - if s.Config.Get().DeveloperMode { - r.Log.With( - "method", req.Method, - "url", req.URL.String(), - "user_agent", req.Header.Get("User-Agent"), - ).Debugf("Received HTTP request") - } + // if s.Config.Get().DeveloperMode { + // r.Log.With( + // "method", req.Method, + // "url", req.URL.String(), + // "user_agent", req.Header.Get("User-Agent"), + // ).Debugf("Received HTTP request") + // } // Output panics in dev. mode. defer func() { diff --git a/server/proxy/bindings.go b/server/proxy/bindings.go index c266597a1..2d98264ca 100644 --- a/server/proxy/bindings.go +++ b/server/proxy/bindings.go @@ -47,7 +47,7 @@ func (p *Proxy) GetBindings(r *incoming.Request, cc apps.Context) (ret []apps.Bi if err != nil { log.WithError(err).Warnf("GetBindings failed") } else { - log.Debugf("GetBindings: returned bindings for %v apps", len(allApps)) + // log.Debugf("GetBindings: returned bindings for %v apps", len(allApps)) } }() diff --git a/server/proxy/invoke_call.go b/server/proxy/invoke_call.go index 29d13c85f..310ccf21c 100644 --- a/server/proxy/invoke_call.go +++ b/server/proxy/invoke_call.go @@ -166,9 +166,9 @@ func (p *Proxy) callAppImpl(r *incoming.Request, app *apps.App, creq apps.CallRe case cresp.Type == apps.CallResponseTypeError: log.Debugf("Call returned an error from app: %v", cresp.Error()) case cresp.Type == apps.CallResponseTypeOK && cresp.Text != "": - log.Debugf("Called %s:%s -> %s: %s", app.AppID, creq.Path, cresp.Type, utils.FirstN(cresp.Text, 32)) + // log.Debugf("Called %s:%s -> %s: %s", app.AppID, creq.Path, cresp.Type, utils.FirstN(cresp.Text, 32)) default: - log.Debugf("Called %s:%s -> %s", app.AppID, creq.Path, cresp.Type) + // log.Debugf("Called %s:%s -> %s", app.AppID, creq.Path, cresp.Type) } }() diff --git a/server/proxy/service.go b/server/proxy/service.go index e98f8dd6b..ce8ec56e2 100644 --- a/server/proxy/service.go +++ b/server/proxy/service.go @@ -228,7 +228,7 @@ func (p *Proxy) initUpstream(typ apps.DeployType, newConfig config.Config, log u log.WithError(err).Errorf("Failed to initialize upstream %s.", typ) default: p.upstreams.Store(typ, up) - log.Debugw("available upstream", "type", typ) + log.Debugf("available upstream: %s", typ) } } else { p.upstreams.Delete(typ) diff --git a/server/store/cached.go b/server/store/cached.go index f8708ecc1..1124b4466 100644 --- a/server/store/cached.go +++ b/server/store/cached.go @@ -21,6 +21,7 @@ type CachedStore[T Cloneable[T]] interface { Index() CachedIndex[T] Get(key string) (value *T) Put(r *incoming.Request, key string, value *T) error + Stop() } func MakeCachedStore[T Cloneable[T]](name string, cluster *CachedStoreCluster, log utils.Logger) (CachedStore[T], error) { diff --git a/server/store/cached_cluster.go b/server/store/cached_cluster.go index 79d301f05..05a403d86 100644 --- a/server/store/cached_cluster.go +++ b/server/store/cached_cluster.go @@ -32,8 +32,11 @@ type CachedStoreCluster struct { type eventHandler func(r *incoming.Request, ev model.PluginClusterEvent) error const ( - putEventID = "cached_store_data" - syncEventID = "cached_store_sync" + putEventID = "cached_store_data" + syncEventID = "cached_store_sync" + testInitEventID = "cached_store_test_init" + testRunEventID = "cached_store_test_run" + testReportEventID = "cached_store_test_report" ) // cachedStoreClusterEvent is a cluster event sent between nodes. It works for @@ -60,6 +63,10 @@ func NewCachedStoreCluster(api config.API, kind CachedStoreClusterKind) *CachedS } func (s *Service) OnPluginClusterEvent(r *incoming.Request, ev model.PluginClusterEvent) { + if done := s.processTestPluginClusterEvent(r, ev); done { + return + } + f, err := s.cluster.getEventHandler(ev) if err != nil { r.Log.WithError(err).Errorw("failed to find a handler for plugin cluster event") @@ -107,3 +114,7 @@ func (c *CachedStoreCluster) getEventHandler(ev model.PluginClusterEvent) (event func (c *CachedStoreCluster) setEventHandler(eventID string, h eventHandler) { c.eventHandlers.Store(eventID, h) } + +func (c *CachedStoreCluster) removeEventHandler(eventID string) { + c.eventHandlers.Delete(eventID) +} diff --git a/server/store/cached_mutex.go b/server/store/cached_mutex.go index eb189e97e..c69c0ea7e 100644 --- a/server/store/cached_mutex.go +++ b/server/store/cached_mutex.go @@ -66,8 +66,6 @@ func (s *MutexCachedStore[T]) Put(r *incoming.Request, key string, value *T) err return err } - r.Log.Debugf("<>/<> 1 %v", changed) - if changed { event := s.newPluginClusterEvent(key, value, updatedIndex.hash()) s.cluster.broadcastEvent(r, s.eventID(), event) diff --git a/server/store/cached_simple.go b/server/store/cached_simple.go index 37186a382..a6ec1424c 100644 --- a/server/store/cached_simple.go +++ b/server/store/cached_simple.go @@ -23,6 +23,8 @@ type SimpleCachedStore[T Cloneable[T]] struct { name string } +var _ CachedStore[testDataType] = (*SimpleCachedStore[testDataType])(nil) + func MakeSimpleCachedStore[T Cloneable[T]](name string, api config.API, log utils.Logger) (*SimpleCachedStore[T], error) { s := &SimpleCachedStore[T]{ cache: &sync.Map{}, @@ -38,6 +40,8 @@ func MakeSimpleCachedStore[T Cloneable[T]](name string, api config.API, log util return s, nil } +func (s *SimpleCachedStore[T]) Stop() {} + func (s *SimpleCachedStore[T]) Index() CachedIndex[T] { out := CachedIndex[T]{} s.cache.Range(func(key, mapv interface{}) bool { diff --git a/server/store/cached_single_writer.go b/server/store/cached_single_writer.go index c8efb2ed0..46bb1513d 100644 --- a/server/store/cached_single_writer.go +++ b/server/store/cached_single_writer.go @@ -46,6 +46,11 @@ func MakeSingleWriterCachedStore[T Cloneable[T]](name string, c *CachedStoreClus return s, nil } +func (s *SingleWriterCachedStore[T]) Stop() { + s.cluster.removeEventHandler(s.getPutEventID()) + s.cluster.removeEventHandler(s.getSyncEventID()) +} + const syncBroadcastDelay = 500 * time.Millisecond func (s *SingleWriterCachedStore[T]) broadcastSyncLater(r *incoming.Request, hash string) { diff --git a/server/store/cached_test_cluster.go b/server/store/cached_test_cluster.go new file mode 100644 index 000000000..a835d0078 --- /dev/null +++ b/server/store/cached_test_cluster.go @@ -0,0 +1,293 @@ +// Copyright (c) 2020-present Mattermost, Inc. All Rights Reserved. +// See License for license information. + +package store + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/mattermost/mattermost-plugin-api/cluster" + "github.com/mattermost/mattermost-server/v6/model" + + "github.com/mattermost/mattermost-plugin-apps/server/incoming" + "github.com/mattermost/mattermost-plugin-apps/utils" +) + +type testDataType struct { + Key string + UpdatedByRequestID string +} + +func (t testDataType) Clone() *testDataType { + return &t +} + +type CachedStoreTestParams struct { + Kind CachedStoreClusterKind + Name string + NumberOfPuts int + WaitForIndexToSync time.Duration + ActingUserID string + RootPostID string +} + +type CachedStoreTestReport struct { + Errors []string + HostNickname string + IndexSHA string + Params CachedStoreTestParams + Index *StoredIndex[testDataType] +} + +func (s *Service) RunCachedStoreTest(r *incoming.Request, params CachedStoreTestParams) { + mutex, err := cluster.NewMutex(r.API.Plugin, "cluster_cached_store_test_mutex") + if err != nil { + r.Log.WithError(err).Errorf("Failed to create cluster mutex") + return + } + mutex.Lock() + defer mutex.Unlock() + if s.getTestReportChannel() != nil { + r.Log.Errorf("Another cluster test is already running on this host (unreachable)") + } + testReportChan := make(chan CachedStoreTestReport) + s.setTestReportChannel(testReportChan) + defer func() { s.setTestReportChannel(nil) }() + + params.ActingUserID = r.ActingUserID() + params.RootPostID = dm(r, params, "Starting a cluster test run from host %s. Parameters:\n%s", utils.HostNickname(), utils.JSONBlock(params)) + + // Initialize the stores on all hosts. + store, report := s.initLocalCachedStoreTest(r, params) + if store == nil || len(report.Errors) > 0 { + dm(r, params, "(Local) host %s failed to initialize: %v", report.HostNickname, report.Errors) + return + } + defer store.Stop() + + nOtherHosts := 0 + s.broadcastTestEvent(r, testInitEventID, params, params) + + acks, _ := s.collectReports(r, params, 1*time.Second, -1) + responded := []string{} + for _, report := range acks { + if len(report.Errors) > 0 { + dm(r, params, "Host %s failed to initialize: %v", report.HostNickname, report.Errors) + return + } + responded = append(responded, report.HostNickname) + } + nOtherHosts = len(responded) + dm(r, params, "Found %v other hosts in the cluster (%v)", nOtherHosts, responded) + + // Run the test locally. + go func() { + s.getTestReportChannel() <- s.runLocalCachedStoreTest(r, params, store) + }() + + // Run the test on other hosts, and receive all reports including the local + // one. + timeout := 10 * time.Second + s.broadcastTestEvent(r, testRunEventID, params, params) + reports, timedOut := s.collectReports(r, params, timeout, 1+nOtherHosts) + if !timedOut { + dm(r, params, "Finished with %v reports", len(reports)) + } else { + dm(r, params, "Timed out after %v, received %v reports", timeout, len(reports)) + } + + // Check the test reports + sha := "" + for _, report := range reports { + switch { + case sha == "": + sha = report.IndexSHA + dm(r, params, "Test report from `%s`: ok: set SHA to `%s`: ok", report.HostNickname, utils.FirstN(report.IndexSHA, 10)) + case sha == report.IndexSHA: + dm(r, params, "Test report from `%s`: `%s`: ok: matched\n", report.HostNickname, utils.FirstN(report.IndexSHA, 10)) + default: + dm(r, params, "Test report from `%s`: `%s`: FAIL: expected `%s`", report.HostNickname, utils.FirstN(report.IndexSHA, 10), utils.FirstN(sha, 10)) + return + } + } + + // Verify against the stored index, again. + if params.Kind == TestCachedStoreKind { + dm(r, params, "Skipping final verification sine KV was never updated.") + } else { + dm(r, params, "Starting final verification against the KV") + store.Stop() + time.Sleep(500 * time.Millisecond) + store, err = makeCachedStore[testDataType](params.Kind, params.Name, s.cluster, r.Log) + if err != nil { + dm(r, params, "FAIL: final verification from %s", params.Name) + return + } + if sha != store.Index().Stored().hash() { + dm(r, params, "FAIL: final verification from %s: expected %s, got %s", params.Name, sha, store.Index().Stored()) + return + } + dm(r, params, "Final verification: OK") + } +} + +func (s *Service) processTestPluginClusterEvent(r *incoming.Request, ev model.PluginClusterEvent) bool { + switch ev.Id { + case testInitEventID, testRunEventID: + params := CachedStoreTestParams{} + if err := json.Unmarshal(ev.Data, ¶ms); err != nil { + r.Log.WithError(err).Errorw("failed to unmarshal test params") + return true + } + + go func() { + var report CachedStoreTestReport + if ev.Id == testInitEventID { + // dm(r, params, "received cluster message INIT TEST %s", params.Name) + store, initReport := s.initLocalCachedStoreTest(r, params) + s.setTestStore(store) + report = initReport + } else { // run + // dm(r, params, "received cluster message RUN TEST %s", params.Name) + report = s.runLocalCachedStoreTest(r, params, s.getTestStore()) + } + s.broadcastTestEvent(r, testReportEventID, report.Params, report) + }() + return true + + case testReportEventID: + report := CachedStoreTestReport{} + if err := json.Unmarshal(ev.Data, &report); err != nil { + r.Log.WithError(err).Errorf("failed to unmarshal test report: %v", err) + return true + } + if ch := s.getTestReportChannel(); ch != nil { + // dm(r, report.Params, "received cluster message TEST REPORT %s:\n%s", report.Params.Name, utils.JSONBlock(report)) + ch <- report + } + return true + + default: + return false + } +} + +func (s *Service) initLocalCachedStoreTest(r *incoming.Request, params CachedStoreTestParams) (store CachedStore[testDataType], report CachedStoreTestReport) { + var allErrors []string + hostNickname := utils.HostNickname() + defer func() { + report.HostNickname = hostNickname + report.Params = params + report.Errors = allErrors + }() + dm(r, params, "preparing a local test run on host %s.", hostNickname) + + store, err := makeCachedStore[testDataType](params.Kind, params.Name, s.cluster, r.Log) + if err != nil { + allErrors = append(allErrors, err.Error()) + return nil, report + } + return store, CachedStoreTestReport{ + IndexSHA: store.Index().Stored().hash(), + } +} + +func (s *Service) runLocalCachedStoreTest(r *incoming.Request, params CachedStoreTestParams, store CachedStore[testDataType]) (report CachedStoreTestReport) { + var allErrors []string + hostNickname := utils.HostNickname() + defer func() { + report.HostNickname = hostNickname + report.Errors = allErrors + report.Params = params + dm(r, params, "Done running local test.") + }() + + for i := 0; i < params.NumberOfPuts; i++ { + cloneR := r.Clone() + key := fmt.Sprintf("%s-test-%d", hostNickname, i) + value := testDataType{ + Key: key, + UpdatedByRequestID: cloneR.RequestID, + } + if err := store.Put(r.Clone(), key, &value); err != nil { + allErrors = append(allErrors, err.Error()) + } + } + + stored := store.Index().Stored() + dm(r, params, "Done with %v puts, index has %v items, waiting %v for index to sync...", params.NumberOfPuts, len(stored.Data), params.WaitForIndexToSync) + time.Sleep(params.WaitForIndexToSync) + stored = store.Index().Stored() + dm(r, params, "Done waiting, index has %v items", len(stored.Data)) + + store.Stop() + return CachedStoreTestReport{ + IndexSHA: stored.hash(), + Index: stored, + } +} + +func dm(r *incoming.Request, params CachedStoreTestParams, message string, args ...interface{}) string { + r.Log.Debugf(message, args...) + post := &model.Post{ + Message: fmt.Sprintf(utils.HostNickname()+": "+message, args...), + RootId: params.RootPostID, + } + if err := r.API.Mattermost.Post.DM( + r.Config.Get().BotUserID, + params.ActingUserID, + post, + ); err != nil { + return "" + } + return post.Id +} + +func (s *Service) collectReports(r *incoming.Request, params CachedStoreTestParams, timeout time.Duration, n int) (out []CachedStoreTestReport, timedOut bool) { + timer := time.NewTimer(timeout) + for { + select { + case report := <-s.getTestReportChannel(): + out = append(out, report) + if n >= 0 && len(out) >= n { + return out, false + } + + case <-timer.C: + return out, true + } + } +} + +func (s *Service) setTestReportChannel(ch chan CachedStoreTestReport) { + s.testDataMutex.Lock() + defer s.testDataMutex.Unlock() + s.testReportChan = ch +} + +func (s *Service) getTestReportChannel() chan CachedStoreTestReport { + s.testDataMutex.RLock() + defer s.testDataMutex.RUnlock() + return s.testReportChan +} + +func (s *Service) setTestStore(store CachedStore[testDataType]) { + s.testDataMutex.Lock() + defer s.testDataMutex.Unlock() + s.testStore = store +} + +func (s *Service) getTestStore() CachedStore[testDataType] { + s.testDataMutex.RLock() + defer s.testDataMutex.RUnlock() + return s.testStore +} + +func (s *Service) broadcastTestEvent(r *incoming.Request, id string, params CachedStoreTestParams, data any) { + runRemoteTests := params.Kind == SingleWriterCachedStoreKind || params.Kind == MutexCachedStoreKind + if runRemoteTests { + s.cluster.broadcastEvent(r, id, data) + } +} diff --git a/server/store/cached_test_memory_store.go b/server/store/cached_test_memory_store.go index da7bebc70..d9765710d 100644 --- a/server/store/cached_test_memory_store.go +++ b/server/store/cached_test_memory_store.go @@ -17,6 +17,8 @@ func MakeTestCachedStore[T Cloneable[T]](name string, c *CachedStoreCluster, log return TestingCachedStore[T]{}, nil } +func (s TestingCachedStore[T]) Stop() {} + func (s TestingCachedStore[T]) Get(key string) *T { value, ok := s[key] if ok { diff --git a/server/store/service.go b/server/store/service.go index fa7f5d962..405764299 100644 --- a/server/store/service.go +++ b/server/store/service.go @@ -6,6 +6,7 @@ package store import ( "encoding/ascii85" "strings" + "sync" "github.com/pkg/errors" "golang.org/x/crypto/sha3" @@ -83,6 +84,15 @@ type Service struct { Session Sessions cluster *CachedStoreCluster + + // testReportChan is used in the cluster test to receive test reports from + // other hosts. It is initialized only on the host that actually runs the + // test. It receives reports from all nodes in the cluster. It should be + // synchronized, but is modified only from the test command, rarely, so it's + // ok not to. + testReportChan chan CachedStoreTestReport + testStore CachedStore[testDataType] + testDataMutex *sync.RWMutex } // MakeService creates and initializes a persistent storage Service. defaultKind @@ -94,6 +104,8 @@ func MakeService(conf config.Service, defaultKind CachedStoreClusterKind) (*Serv AppKV: &KVStore{}, OAuth2: &OAuth2Store{}, Session: &SessionStore{}, + + testDataMutex: &sync.RWMutex{}, } log := conf.NewBaseLogger()