Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cached store part #3: test command #459

Open
wants to merge 1 commit into
base: lev-cached-2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions server/appservices/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,9 @@ func (a *AppServices) KVDebugAppInfo(r *incoming.Request, appID apps.AppID) (*st
}
return appInfo, nil
}

func (a *AppServices) RunCachedStoreTest(r *incoming.Request, params store.CachedStoreTestParams) {
go func() {
a.store.RunCachedStoreTest(r, params)
}()
}
1 change: 1 addition & 0 deletions server/appservices/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Service interface {
// Internal

DeleteAppData(r *incoming.Request, appID apps.AppID, force bool) error
RunCachedStoreTest(r *incoming.Request, s store.CachedStoreTestParams)
}

type AppServices struct {
Expand Down
93 changes: 49 additions & 44 deletions server/builtin/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,37 +60,41 @@ const (
fOverrides = "overrides"
fPage = "page"
fSecret = "secret"
fNumberPuts = "num_puts"
fWaitToSync = "wait_to_sync"
fStoreKind = "store_kind"
fSessionID = "session_id"
fURL = "url"
)

const (
PathDebugClean = "/debug/clean"
PathDebugKVInfo = "/debug/kv/info"
PathDebugKVList = "/debug/kv/list"
PathDebugStoreList = "/debug/store/list"
PathDebugStorePollute = "/debug/store/pollute"
PathDebugSessionsList = "/debug/session/list"
pDebugBindings = "/debug/bindings"
pDebugKVClean = "/debug/kv/clean"
pDebugKVCreate = "/debug/kv/create"
pDebugKVEdit = "/debug/kv/edit"
pDebugKVEditModal = "/debug/kv/edit-modal"
pDebugLogs = "/debug/logs"
pDebugOAuthConfigView = "/debug/oauth/config/view"
pDebugSessionsRevoke = "/debug/session/delete"
pDebugSessionsView = "/debug/session/view"
pDisable = "/disable"
pEnable = "/enable"
pInfo = "/info"
pInstallConsentModal = "/install-consent"
pInstallConsentSource = "/install-consent/form"
pInstallHTTP = "/install-http"
pInstallListed = "/install-listed"
pList = "/list"
pSettingsModalSave = "/settings/save"
pSettingsModalSource = "/settings/form"
pUninstall = "/uninstall"
PathDebugClean = "/debug/clean"
PathDebugKVInfo = "/debug/kv/info"
PathDebugKVList = "/debug/kv/list"
PathDebugStoreList = "/debug/store/list"
PathDebugStorePollute = "/debug/store/pollute"
PathDebugSessionsList = "/debug/session/list"
pDebugBindings = "/debug/bindings"
pDebugClusterTestCached = "/debug/cluster/test-cached"
pDebugKVClean = "/debug/kv/clean"
pDebugKVCreate = "/debug/kv/create"
pDebugKVEdit = "/debug/kv/edit"
pDebugKVEditModal = "/debug/kv/edit-modal"
pDebugLogs = "/debug/logs"
pDebugOAuthConfigView = "/debug/oauth/config/view"
pDebugSessionsRevoke = "/debug/session/delete"
pDebugSessionsView = "/debug/session/view"
pDisable = "/disable"
pEnable = "/enable"
pInfo = "/info"
pInstallConsentModal = "/install-consent"
pInstallConsentSource = "/install-consent/form"
pInstallHTTP = "/install-http"
pInstallListed = "/install-listed"
pList = "/list"
pSettingsModalSave = "/settings/save"
pSettingsModalSource = "/settings/form"
pUninstall = "/uninstall"
)

const (
Expand Down Expand Up @@ -138,24 +142,25 @@ func NewBuiltinApp(api config.API, proxy proxy.Service, appservices appservices.
PathDebugStoreList: requireAdmin(a.debugStoreList),
PathDebugStorePollute: requireAdmin(a.debugStorePollute),

pDebugBindings: requireAdmin(a.debugBindings),
pDebugKVClean: requireAdmin(a.debugKVClean),
pDebugKVCreate: requireAdmin(a.debugKVCreate),
pDebugKVEdit: requireAdmin(a.debugKVEdit),
pDebugKVEditModal: requireAdmin(a.debugKVEdit),
pDebugOAuthConfigView: requireAdmin(a.debugOAuthConfigView),
pDebugSessionsRevoke: requireAdmin(a.debugSessionsRevoke),
pDebugSessionsView: requireAdmin(a.debugSessionsView),
pDisable: requireAdmin(a.disable),
pEnable: requireAdmin(a.enable),
pInstallConsentModal: requireAdmin(a.installConsent),
pInstallConsentSource: requireAdmin(a.installConsentForm),
pInstallHTTP: requireAdmin(a.installHTTP),
pInstallListed: requireAdmin(a.installListed),
pList: requireAdmin(a.list),
pSettingsModalSave: requireAdmin(a.settingsSave),
pSettingsModalSource: requireAdmin(a.settingsForm),
pUninstall: requireAdmin(a.uninstall),
pDebugBindings: requireAdmin(a.debugBindings),
pDebugClusterTestCached: requireAdmin(a.debugClusterTestCached),
pDebugKVClean: requireAdmin(a.debugKVClean),
pDebugKVCreate: requireAdmin(a.debugKVCreate),
pDebugKVEdit: requireAdmin(a.debugKVEdit),
pDebugKVEditModal: requireAdmin(a.debugKVEdit),
pDebugOAuthConfigView: requireAdmin(a.debugOAuthConfigView),
pDebugSessionsRevoke: requireAdmin(a.debugSessionsRevoke),
pDebugSessionsView: requireAdmin(a.debugSessionsView),
pDisable: requireAdmin(a.disable),
pEnable: requireAdmin(a.enable),
pInstallConsentModal: requireAdmin(a.installConsent),
pInstallConsentSource: requireAdmin(a.installConsentForm),
pInstallHTTP: requireAdmin(a.installHTTP),
pInstallListed: requireAdmin(a.installListed),
pList: requireAdmin(a.list),
pSettingsModalSave: requireAdmin(a.settingsSave),
pSettingsModalSource: requireAdmin(a.settingsForm),
pUninstall: requireAdmin(a.uninstall),

// Lookups.
pLookupAppID: requireAdmin(a.lookupAppID),
Expand Down
14 changes: 14 additions & 0 deletions server/builtin/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ func (a *builtinApp) debugCommandBinding(loc *i18n.Localizer) apps.Binding {
Bindings: []apps.Binding{
a.debugBindingsCommandBinding(loc),
a.debugCleanCommandBinding(loc),
{
Location: "cluster",
Label: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{
ID: "command.debug.cluster.label",
Other: "cluster",
}),
Description: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{
ID: "command.debug.cluster.description",
Other: "Test high availability (cluster) functionality.",
}),
Bindings: []apps.Binding{
a.debugClusterTestCachedCommandBinding(loc),
},
},
{
Location: "kv",
Label: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{
Expand Down
109 changes: 109 additions & 0 deletions server/builtin/debug_cluster_test_cached.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) 2019-present Mattermost, Inc. All Rights Reserved.
// See License for license information.

package builtin

import (
"strconv"
"time"

"github.com/nicksnyder/go-i18n/v2/i18n"

"github.com/mattermost/mattermost-plugin-apps/apps"
"github.com/mattermost/mattermost-plugin-apps/server/incoming"
"github.com/mattermost/mattermost-plugin-apps/server/store"
"github.com/mattermost/mattermost-server/v6/model"
)

func (a *builtinApp) debugClusterTestCachedCommandBinding(loc *i18n.Localizer) apps.Binding {
return apps.Binding{
Location: "test-cached",
Label: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{
ID: "command.debug.cluster.test_cached.label",
Other: "test-cached",
}),
Description: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{
ID: "command.debug.cluster.test_cached.description",
Other: "Runs a test of cluster aware cached store.",
}),
Hint: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{
ID: "command.debug.cluster.test_cached.hint",
Other: "[ options... ]",
}),
Form: &apps.Form{
Submit: newUserCall(pDebugClusterTestCached),
Fields: []apps.Field{
{
Name: fNumberPuts,
Type: apps.FieldTypeText,
ModalLabel: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{
ID: "field.cluster.num_puts.modal_label",
Other: "Number of PUTs",
}),
Value: "10",
},
{
Name: fWaitToSync,
Type: apps.FieldTypeText,
ModalLabel: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{
ID: "field.cluster.wait_to_sync.modal_label",
Other: "Wait before checking the index, in seconds",
}),
Value: "2",
},
{
Name: fStoreKind,
Type: apps.FieldTypeStaticSelect,
SelectStaticOptions: []apps.SelectOption{
{
Label: string(store.SimpleCachedStoreKind),
Value: string(store.SimpleCachedStoreKind),
},
{
Label: string(store.SingleWriterCachedStoreKind),
Value: string(store.SingleWriterCachedStoreKind),
},
{
Label: string(store.MutexCachedStoreKind),
Value: string(store.MutexCachedStoreKind),
},
{
Label: string(store.TestCachedStoreKind),
Value: string(store.TestCachedStoreKind),
},
},
ModalLabel: a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{
ID: "field.cluster.store_type.modal_label",
Other: "Cluster replication type",
}),
Value: apps.SelectOption{
Label: string(store.MutexCachedStoreKind),
Value: string(store.MutexCachedStoreKind),
},
},
},
},
}
}

func (a *builtinApp) debugClusterTestCached(r *incoming.Request, creq apps.CallRequest) apps.CallResponse {
numPuts, _ := strconv.Atoi(creq.GetValue(fNumberPuts, ""))
waitSeconds, _ := strconv.Atoi(creq.GetValue(fWaitToSync, ""))
storeKind := store.CachedStoreClusterKind(creq.GetValue(fStoreKind, ""))
name := "test/" + model.NewId()

a.appservices.RunCachedStoreTest(r,
store.CachedStoreTestParams{
Kind: storeKind,
Name: name,
NumberOfPuts: numPuts,
WaitForIndexToSync: time.Duration(waitSeconds) * time.Second,
},
)

loc := a.newLocalizer(creq)
return apps.NewTextResponse(a.api.I18N.LocalizeDefaultMessage(loc, &i18n.Message{
ID: "message.cluster.started_test",
Other: "Started the test. Will report as a direct conversation with @appsbot.",
}))
}
14 changes: 7 additions & 7 deletions server/httpin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer cancel()
r = r.WithCtx(ctx)

if s.Config.Get().DeveloperMode {
r.Log.With(
"method", req.Method,
"url", req.URL.String(),
"user_agent", req.Header.Get("User-Agent"),
).Debugf("Received HTTP request")
}
// if s.Config.Get().DeveloperMode {
// r.Log.With(
// "method", req.Method,
// "url", req.URL.String(),
// "user_agent", req.Header.Get("User-Agent"),
// ).Debugf("Received HTTP request")
// }

// Output panics in dev. mode.
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion server/proxy/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (p *Proxy) GetBindings(r *incoming.Request, cc apps.Context) (ret []apps.Bi
if err != nil {
log.WithError(err).Warnf("GetBindings failed")
} else {
log.Debugf("GetBindings: returned bindings for %v apps", len(allApps))
// log.Debugf("GetBindings: returned bindings for %v apps", len(allApps))
}
}()

Expand Down
4 changes: 2 additions & 2 deletions server/proxy/invoke_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ func (p *Proxy) callAppImpl(r *incoming.Request, app *apps.App, creq apps.CallRe
case cresp.Type == apps.CallResponseTypeError:
log.Debugf("Call returned an error from app: %v", cresp.Error())
case cresp.Type == apps.CallResponseTypeOK && cresp.Text != "":
log.Debugf("Called %s:%s -> %s: %s", app.AppID, creq.Path, cresp.Type, utils.FirstN(cresp.Text, 32))
// log.Debugf("Called %s:%s -> %s: %s", app.AppID, creq.Path, cresp.Type, utils.FirstN(cresp.Text, 32))
default:
log.Debugf("Called %s:%s -> %s", app.AppID, creq.Path, cresp.Type)
// log.Debugf("Called %s:%s -> %s", app.AppID, creq.Path, cresp.Type)
}
}()

Expand Down
2 changes: 1 addition & 1 deletion server/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (p *Proxy) initUpstream(typ apps.DeployType, newConfig config.Config, log u
log.WithError(err).Errorf("Failed to initialize upstream %s.", typ)
default:
p.upstreams.Store(typ, up)
log.Debugw("available upstream", "type", typ)
log.Debugf("available upstream: %s", typ)
}
} else {
p.upstreams.Delete(typ)
Expand Down
1 change: 1 addition & 0 deletions server/store/cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type CachedStore[T Cloneable[T]] interface {
Index() CachedIndex[T]
Get(key string) (value *T)
Put(r *incoming.Request, key string, value *T) error
Stop()
}

func MakeCachedStore[T Cloneable[T]](name string, cluster *CachedStoreCluster, log utils.Logger) (CachedStore[T], error) {
Expand Down
15 changes: 13 additions & 2 deletions server/store/cached_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ type CachedStoreCluster struct {
type eventHandler func(r *incoming.Request, ev model.PluginClusterEvent) error

const (
putEventID = "cached_store_data"
syncEventID = "cached_store_sync"
putEventID = "cached_store_data"
syncEventID = "cached_store_sync"
testInitEventID = "cached_store_test_init"
testRunEventID = "cached_store_test_run"
testReportEventID = "cached_store_test_report"
)

// cachedStoreClusterEvent is a cluster event sent between nodes. It works for
Expand All @@ -60,6 +63,10 @@ func NewCachedStoreCluster(api config.API, kind CachedStoreClusterKind) *CachedS
}

func (s *Service) OnPluginClusterEvent(r *incoming.Request, ev model.PluginClusterEvent) {
if done := s.processTestPluginClusterEvent(r, ev); done {
return
}

f, err := s.cluster.getEventHandler(ev)
if err != nil {
r.Log.WithError(err).Errorw("failed to find a handler for plugin cluster event")
Expand Down Expand Up @@ -107,3 +114,7 @@ func (c *CachedStoreCluster) getEventHandler(ev model.PluginClusterEvent) (event
func (c *CachedStoreCluster) setEventHandler(eventID string, h eventHandler) {
c.eventHandlers.Store(eventID, h)
}

func (c *CachedStoreCluster) removeEventHandler(eventID string) {
c.eventHandlers.Delete(eventID)
}
2 changes: 0 additions & 2 deletions server/store/cached_mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ func (s *MutexCachedStore[T]) Put(r *incoming.Request, key string, value *T) err
return err
}

r.Log.Debugf("<>/<> 1 %v", changed)

if changed {
event := s.newPluginClusterEvent(key, value, updatedIndex.hash())
s.cluster.broadcastEvent(r, s.eventID(), event)
Expand Down
4 changes: 4 additions & 0 deletions server/store/cached_simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type SimpleCachedStore[T Cloneable[T]] struct {
name string
}

var _ CachedStore[testDataType] = (*SimpleCachedStore[testDataType])(nil)

func MakeSimpleCachedStore[T Cloneable[T]](name string, api config.API, log utils.Logger) (*SimpleCachedStore[T], error) {
s := &SimpleCachedStore[T]{
cache: &sync.Map{},
Expand All @@ -38,6 +40,8 @@ func MakeSimpleCachedStore[T Cloneable[T]](name string, api config.API, log util
return s, nil
}

func (s *SimpleCachedStore[T]) Stop() {}

func (s *SimpleCachedStore[T]) Index() CachedIndex[T] {
out := CachedIndex[T]{}
s.cache.Range(func(key, mapv interface{}) bool {
Expand Down
5 changes: 5 additions & 0 deletions server/store/cached_single_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ func MakeSingleWriterCachedStore[T Cloneable[T]](name string, c *CachedStoreClus
return s, nil
}

func (s *SingleWriterCachedStore[T]) Stop() {
s.cluster.removeEventHandler(s.getPutEventID())
s.cluster.removeEventHandler(s.getSyncEventID())
}

const syncBroadcastDelay = 500 * time.Millisecond

func (s *SingleWriterCachedStore[T]) broadcastSyncLater(r *incoming.Request, hash string) {
Expand Down
Loading