Skip to content

Commit

Permalink
[MM-48342] Stateful timers (#375)
Browse files Browse the repository at this point in the history
Co-authored-by: Lev <[email protected]>
  • Loading branch information
hanzei and levb authored Feb 21, 2023
1 parent d5d265e commit 72fbf7a
Show file tree
Hide file tree
Showing 18 changed files with 413 additions and 13 deletions.
13 changes: 13 additions & 0 deletions apps/appclient/mattermost_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,19 @@ func (c *Client) Unsubscribe(sub *apps.Subscription) error {
return nil
}

func (c *Client) CreateTimer(t *apps.Timer) error {
res, err := c.ClientPP.CreateTimer(t)
if err != nil {
return err
}

if res.StatusCode != http.StatusCreated && res.StatusCode != http.StatusOK {
return errors.Errorf("returned with status %d", res.StatusCode)
}

return nil
}

func (c *Client) StoreOAuth2App(oauth2App apps.OAuth2App) error {
res, err := c.ClientPP.StoreOAuth2App(oauth2App)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions apps/appclient/mattermost_client_pp.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,20 @@ func (c *ClientPP) Unsubscribe(sub *apps.Subscription) (*model.Response, error)
return model.BuildResponse(r), nil
}

func (c *ClientPP) CreateTimer(t *apps.Timer) (*model.Response, error) {
data, err := json.Marshal(t)
if err != nil {
return nil, err
}
r, err := c.DoAPIPOST(c.apipath(appspath.TimerCreate), string(data)) // nolint:bodyclose
if err != nil {
return model.BuildResponse(r), err
}
defer c.closeBody(r)

return model.BuildResponse(r), nil
}

func (c *ClientPP) StoreOAuth2App(oauth2App apps.OAuth2App) (*model.Response, error) {
r, err := c.DoAPIPOST(c.apipath(appspath.OAuth2App), utils.ToJSON(oauth2App)) // nolint:bodyclose
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions apps/path/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
OAuth2User = "/oauth2/user"
Subscribe = "/subscribe"
Unsubscribe = "/unsubscribe"
TimerCreate = "/timer"

// Invoke.
Call = "/call"
Expand Down
46 changes: 46 additions & 0 deletions apps/timer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2020-present Mattermost, Inc. All Rights Reserved.
// See License for license information.

package apps

import (
"time"

"github.com/hashicorp/go-multierror"

"github.com/mattermost/mattermost-plugin-apps/utils"
)

// Timer s submitted by an app to the Timer API. It determines when
// the app would like to be notified, and how these notifications
// should be invoked.
type Timer struct {
// At is the unix time in milliseconds when the timer should be executed.
At int64 `json:"at"`

// Call is the (one-way) call to make upon the timers execution.
Call Call `json:"call"`

// ChannelID is a channel ID that is used for expansion of the Call (optional).
ChannelID string `json:"channel_id,omitempty"`
// TeamID is a team ID that is used for expansion of the Call (optional).
TeamID string `json:"team_id,omitempty"`
}

func (t Timer) Validate() error {
var result error
emptyCall := Call{}
if t.Call == emptyCall {
result = multierror.Append(result, utils.NewInvalidError("call must not be empty"))
}

if t.At <= 0 {
result = multierror.Append(result, utils.NewInvalidError("at must be positive"))
}

if time.Until(time.UnixMilli(t.At)) < 1*time.Second {
result = multierror.Append(result, utils.NewInvalidError("at most be at least 1 second in the future"))
}

return result
}
1 change: 1 addition & 0 deletions build/custom.mk
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ifneq ($(HAS_SERVER),)
mockgen -destination server/mocks/mock_upstream/mock_upstream.go github.com/mattermost/mattermost-plugin-apps/upstream Upstream
mockgen -destination server/mocks/mock_store/mock_appstore.go github.com/mattermost/mattermost-plugin-apps/server/store AppStore
mockgen -destination server/mocks/mock_store/mock_session.go github.com/mattermost/mattermost-plugin-apps/server/store SessionStore
mockgen -destination server/mocks/mock_store/mock_app.go github.com/mattermost/mattermost-plugin-apps/server/store AppStore
endif

## Generates mock golang interfaces for testing
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/hashicorp/go-getter v1.6.2
github.com/hashicorp/go-multierror v1.1.1
github.com/mattermost/mattermost-plugin-api v0.1.1
github.com/mattermost/mattermost-plugin-api v0.1.2-0.20221110071900-f8b73bc6795e
// mmgoget: github.com/mattermost/mattermost-server/[email protected] is replaced by -> github.com/mattermost/mattermost-server/v6@ea08d47f60
github.com/mattermost/mattermost-server/v6 v6.0.0-20230113170349-ea08d47f6051
github.com/nicksnyder/go-i18n/v2 v2.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1095,8 +1095,8 @@ github.com/mattermost/ldap v0.0.0-20201202150706-ee0e6284187d h1:/RJ/UV7M5c7L2TQ
github.com/mattermost/ldap v0.0.0-20201202150706-ee0e6284187d/go.mod h1:HLbgMEI5K131jpxGazJ97AxfPDt31osq36YS1oxFQPQ=
github.com/mattermost/logr/v2 v2.0.15 h1:+WNbGcsc3dBao65eXlceB6dTILNJRIrvubnsTl3zBew=
github.com/mattermost/logr/v2 v2.0.15/go.mod h1:mpPp935r5dIkFDo2y9Q87cQWhFR/4xXpNh0k/y8Hmwg=
github.com/mattermost/mattermost-plugin-api v0.1.1 h1:bNnPbWCLWZpT/k2kjUxNnzCfUggU8WKs2ddz7hNjg1U=
github.com/mattermost/mattermost-plugin-api v0.1.1/go.mod h1:9yZhtg0bBj3kqSTjXnjYBMZoTsWbe3ajdFMdl9/Jz34=
github.com/mattermost/mattermost-plugin-api v0.1.2-0.20221110071900-f8b73bc6795e h1:7iT66sN3DzSg4ZrVpSf4igNHkcoEZhfj0/q2JoQauTQ=
github.com/mattermost/mattermost-plugin-api v0.1.2-0.20221110071900-f8b73bc6795e/go.mod h1:9yZhtg0bBj3kqSTjXnjYBMZoTsWbe3ajdFMdl9/Jz34=
github.com/mattermost/mattermost-server/v6 v6.0.0-20230113170349-ea08d47f6051 h1:bL3nQUUQmQotteHuA6ltKga3S3PbR03ElM5qJRXSeyY=
github.com/mattermost/mattermost-server/v6 v6.0.0-20230113170349-ea08d47f6051/go.mod h1:U3gSM0I15WSMHPpDEU30mmc4JrbSDk+8F1+MFLOHWD0=
github.com/mattermost/morph v1.0.5-0.20221115094356-4c18a75b1f5e h1:VfNz+fvJ3DxOlALM22Eea8ONp5jHrybKBCcCtDPVlss=
Expand Down
49 changes: 45 additions & 4 deletions server/appservices/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@
package appservices

import (
"github.com/pkg/errors"

"github.com/mattermost/mattermost-plugin-api/cluster"

"github.com/mattermost/mattermost-plugin-apps/apps"
"github.com/mattermost/mattermost-plugin-apps/server/config"
"github.com/mattermost/mattermost-plugin-apps/server/incoming"
"github.com/mattermost/mattermost-plugin-apps/server/store"
"github.com/mattermost/mattermost-plugin-apps/utils"
)

type Service interface {
Expand All @@ -17,6 +23,10 @@ type Service interface {
Unsubscribe(*incoming.Request, apps.Event) error
UnsubscribeApp(*incoming.Request, apps.AppID) error

// Timer

CreateTimer(*incoming.Request, apps.Timer) error

// KV

KVSet(_ *incoming.Request, prefix, id string, data []byte) (bool, error)
Expand All @@ -33,14 +43,45 @@ type Service interface {
GetOAuth2User(_ *incoming.Request) ([]byte, error)
}

type Caller interface {
InvokeCall(*incoming.Request, apps.CallRequest) (*apps.App, apps.CallResponse)
NewIncomingRequest() *incoming.Request
}

type AppServices struct {
store *store.Service
store *store.Service
scheduler *cluster.JobOnceScheduler
caller Caller

conf config.Service
log utils.Logger
}

var _ Service = (*AppServices)(nil)

func NewService(store *store.Service) *AppServices {
return &AppServices{
store: store,
// SetCaller must be called before calling any other methods of AppsServies.
// TODO: Remove this uggly hack.
func (a *AppServices) SetCaller(caller Caller) {
a.caller = caller
}

func NewService(log utils.Logger, confService config.Service, store *store.Service, scheduler *cluster.JobOnceScheduler) (*AppServices, error) {
service := &AppServices{
store: store,
scheduler: scheduler,
conf: confService,
log: log,
}

err := scheduler.SetCallback(service.ExecuteTimer)
if err != nil {
return nil, errors.Wrap(err, "failed to set timer callback")
}

err = scheduler.Start()
if err != nil {
return nil, errors.Wrap(err, "failed to start timer scheduler")
}

return service, nil
}
109 changes: 109 additions & 0 deletions server/appservices/timer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) 2020-present Mattermost, Inc. All Rights Reserved.
// See License for license information.

package appservices

import (
"context"
"strconv"
"time"

"github.com/pkg/errors"

"github.com/mattermost/mattermost-plugin-apps/apps"
"github.com/mattermost/mattermost-plugin-apps/server/config"
"github.com/mattermost/mattermost-plugin-apps/server/incoming"
)

type storedTimer struct {
Call apps.Call `json:"call"`
AppID apps.AppID `json:"app_id"`
UserID string `json:"user_id"`
ChannelID string `json:"channel_id,omitempty"`
TeamID string `json:"team_id,omitempty"`
}

func (t storedTimer) Key(appID apps.AppID, at int64) string {
return string(appID) + t.UserID + strconv.FormatInt(at, 10)
}

func (t storedTimer) Loggable() []interface{} {
props := []interface{}{"user_id", t.UserID}
props = append(props, "app_id", t.AppID)
if t.ChannelID != "" {
props = append(props, "call_team_id", t.TeamID)
}
if t.TeamID != "" {
props = append(props, "call_channel_id", t.ChannelID)
}
return props
}

func (a *AppServices) CreateTimer(r *incoming.Request, t apps.Timer) error {
err := r.Check(
r.RequireActingUser,
r.RequireSourceApp,
t.Validate,
)
if err != nil {
return err
}

st := storedTimer{
Call: t.Call,
AppID: r.SourceAppID(),
UserID: r.ActingUserID(),
ChannelID: t.ChannelID,
TeamID: t.TeamID,
}

_, err = a.scheduler.ScheduleOnce(st.Key(r.SourceAppID(), t.At), time.UnixMilli(t.At), st)
if err != nil {
return errors.Wrap(err, "faild to schedule timer job")
}

return nil
}

func (a *AppServices) ExecuteTimer(key string, props interface{}) {
t, ok := props.(storedTimer)
if !ok {
a.log.Debugw("Timer contained unknown props. Inoring the timer.", "key", key, "props", props)
return
}

r := a.caller.NewIncomingRequest()

r.Log = r.Log.With(t)

ctx, cancel := context.WithTimeout(context.Background(), config.RequestTimeout)
defer cancel()
r = r.WithCtx(ctx)

r = r.WithDestination(t.AppID)
r = r.WithActingUserID(t.UserID)

context := &apps.Context{
UserAgentContext: apps.UserAgentContext{
AppID: t.AppID,
TeamID: t.TeamID,
ChannelID: t.ChannelID,
},
}

creq := apps.CallRequest{
Call: t.Call,
Context: *context,
}
r.Log = r.Log.With(creq)
_, cresp := a.caller.InvokeCall(r, creq)
if cresp.Type == apps.CallResponseTypeError {
if a.conf.Get().DeveloperMode {
r.Log.WithError(cresp).Errorf("Timer execute failed")
}
return
}
r.Log = r.Log.With(cresp)

r.Log.Debugf("Timer executed")
}
1 change: 1 addition & 0 deletions server/httpin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func NewService(proxy proxy.Service, appservices appservices.Service, conf confi
h.HandleFunc(path.Subscribe, h.GetSubscriptions).Methods(http.MethodGet)
h.HandleFunc(path.Subscribe, h.Subscribe).Methods(http.MethodPost)
h.HandleFunc(path.Unsubscribe, h.Unsubscribe).Methods(http.MethodPost)
h.HandleFunc(path.TimerCreate, h.CreateTimer).Methods(http.MethodPost)

// Admin API, can be used by plugins, external services, or the user agent.
h.HandleFunc(path.DisableApp, h.DisableApp).Methods(http.MethodPost)
Expand Down
35 changes: 35 additions & 0 deletions server/httpin/timer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2020-present Mattermost, Inc. All Rights Reserved.
// See License for license information.

package httpin

import (
"encoding/json"
"net/http"

"github.com/mattermost/mattermost-plugin-apps/apps"
"github.com/mattermost/mattermost-plugin-apps/server/incoming"
"github.com/mattermost/mattermost-plugin-apps/utils/httputils"
)

// CreateTimer create or updates a new statefull timer.
//
// Path: /api/v1/timer
// Method: POST
// Input: JSON {at, call, state}
// Output: None
func (s *Service) CreateTimer(r *incoming.Request, w http.ResponseWriter, req *http.Request) {
var t apps.Timer

err := json.NewDecoder(req.Body).Decode(&t)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

err = s.AppServices.CreateTimer(r, t)
if err != nil {
http.Error(w, err.Error(), httputils.ErrorToStatus(err))
return
}
}
10 changes: 9 additions & 1 deletion server/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,13 @@ func (p *Plugin) OnActivate() (err error) {
return errors.Wrap(err, "failed to initialize persistent store")
}
p.store.App.InitBuiltin(builtin.App(conf))
p.appservices = appservices.NewService(p.store)
scheduler := cluster.GetJobOnceScheduler(p.API)
appservice, err := appservices.NewService(log, p.conf, p.store, scheduler)
if err != nil {
return errors.Wrapf(err, "failed to initialize appservices")
}
p.appservices = appservice

p.sessionService = session.NewService(mm, p.store)
log.Debugf("initialized API and persistent store")

Expand All @@ -110,6 +116,7 @@ func (p *Plugin) OnActivate() (err error) {
if err != nil {
return errors.Wrapf(err, "failed creating cluster mutex")
}

p.proxy = proxy.NewService(p.conf, p.store, mutex, p.httpOut, p.sessionService, p.appservices)
err = p.proxy.Configure(conf, log)
if err != nil {
Expand All @@ -121,6 +128,7 @@ func (p *Plugin) OnActivate() (err error) {
)
log.Debugf("initialized the app proxy")

appservice.SetCaller(p.proxy)
p.httpIn = httpin.NewService(p.proxy, p.appservices, p.conf)
log.Debugf("initialized incoming HTTP")

Expand Down
1 change: 1 addition & 0 deletions server/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/mattermost/mattermost-plugin-apps/apps"
"github.com/mattermost/mattermost-plugin-apps/apps/appclient"

"github.com/mattermost/mattermost-plugin-apps/server/appservices"
"github.com/mattermost/mattermost-plugin-apps/server/config"
"github.com/mattermost/mattermost-plugin-apps/server/httpout"
Expand Down
Loading

0 comments on commit 72fbf7a

Please sign in to comment.