From ab9a4f3c91332106008a3d66dea4c3bd05b02a0e Mon Sep 17 00:00:00 2001 From: 42Atomys Date: Fri, 1 Jul 2022 23:36:29 +0200 Subject: [PATCH] feat: add consumers to consume rabbitmq queue for users and locations queues (#150) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(cluster): install secrets for webhooked * feat(cluster): install webhooked 🎉 * feat(cluster): configure the webhook routing and enable intra settings * feat(cluster): add prometheus scrape on webhooked & set a rollout strategy * fix(cluster): disable istio sidecar for cronjob * feat: implement a basic consumer * feat: implement webhook rabbitmq consumer for all campus * feat: add webhhoks-processor on production * fix: check the qos error of rmq channel * fix: log of webhooks have wrong type --- .env.example | 1 + .github/workflows/deployer.yaml | 4 + cmd/locations.go | 34 +----- cmd/webhooks.go | 67 ++++++++++++ .../app/crawler-locations/base/cronjob.yaml | 2 +- deploy/app/webhooked/base/kustomization.yaml | 3 + .../webhooks-processor/base/deployment.yaml | 73 +++++++++++++ .../base/kustomization.yaml | 10 ++ .../overlays/live/kustomization.yaml | 17 +++ .../live/use-live-primary-postgres.yaml | 14 +++ go.mod | 1 + go.sum | 2 + internal/api/api.resolvers.go | 13 +-- internal/models/utils.go | 88 +++++++++++++++ internal/webhooks/location.go | 55 ++++++++++ internal/webhooks/serve.go | 100 ++++++++++++++++++ internal/webhooks/user.go | 60 +++++++++++ pkg/duoapi/campus.go | 2 + pkg/duoapi/endpoints.go | 1 + pkg/duoapi/locations.go | 8 +- pkg/duoapi/structs.go | 81 +++++++++----- pkg/duoapi/time.go | 46 ++++++++ pkg/duoapi/user.go | 14 +++ pkg/duoapi/webhooks.go | 98 +++++++++++++++++ pkg/duoapi/webhooks_processors.go | 79 ++++++++++++++ 25 files changed, 804 insertions(+), 69 deletions(-) create mode 100644 cmd/webhooks.go create mode 100644 deploy/app/webhooks-processor/base/deployment.yaml create mode 100644 deploy/app/webhooks-processor/base/kustomization.yaml create mode 100644 deploy/app/webhooks-processor/overlays/live/kustomization.yaml create mode 100644 deploy/app/webhooks-processor/overlays/live/use-live-primary-postgres.yaml create mode 100644 internal/models/utils.go create mode 100644 internal/webhooks/location.go create mode 100644 internal/webhooks/serve.go create mode 100644 internal/webhooks/user.go create mode 100644 pkg/duoapi/time.go create mode 100644 pkg/duoapi/user.go create mode 100644 pkg/duoapi/webhooks.go create mode 100644 pkg/duoapi/webhooks_processors.go diff --git a/.env.example b/.env.example index 2b3aa168..0cbfac46 100644 --- a/.env.example +++ b/.env.example @@ -2,6 +2,7 @@ GO_ENV=development APP_VERSION=indev CORS_ORIGIN=http://localhost:3000 DATABASE_URL=postgresql://postgres:postgres@localhost:5432/stud42_dev +AMQP_URL=amqp://rabbitmq:s42@localhost:5672/ S42_SERVICE_TOKEN=private-cross-service-token GITHUB_TOKEN= diff --git a/.github/workflows/deployer.yaml b/.github/workflows/deployer.yaml index 82393a1c..af8dd6f8 100644 --- a/.github/workflows/deployer.yaml +++ b/.github/workflows/deployer.yaml @@ -171,4 +171,8 @@ jobs: cd ${{github.workspace }}/deploy/app/crawler-locations/overlays/live kustomize edit set image app=ghcr.io/42atomys/stud42:${{ github.event.release.tag_name }} + kustomize build . | kubectl apply -f - + + cd ${{github.workspace }}/deploy/app/webhooks-processor/overlays/live + kustomize edit set image app=ghcr.io/42atomys/stud42:${{ github.event.release.tag_name }} kustomize build . | kubectl apply -f - \ No newline at end of file diff --git a/cmd/locations.go b/cmd/locations.go index 8034a95f..c5704585 100644 --- a/cmd/locations.go +++ b/cmd/locations.go @@ -34,7 +34,6 @@ import ( modelgen "atomys.codes/stud42/internal/models/generated" "atomys.codes/stud42/internal/models/generated/campus" "atomys.codes/stud42/internal/models/generated/location" - "atomys.codes/stud42/internal/models/generated/user" "atomys.codes/stud42/pkg/duoapi" ) @@ -71,42 +70,17 @@ For any closed locations, the location will be marked as inactive in the databas client := modelsutils.Client() bulk := []*modelgen.LocationCreate{} for _, l := range locations { - u, err := client.User.Query().Where(user.DuoID(l.User.ID)).Only(cmd.Context()) + u, err := modelsutils.UserFirstOrCreateFromComplexLocation(cmd.Context(), l) if err != nil { - if modelgen.IsNotFound(err) { - err := client.User.Create(). - SetEmail(l.User.Email). - SetDuoID(l.User.ID). - SetDuoLogin(l.User.Login). - SetFirstName(l.User.FirstName). - SetLastName(l.User.LastName). - SetUsualFirstName(l.User.UsualFirstName). - SetPhone(l.User.Phone). - SetPoolMonth(l.User.PoolMonth). - SetPoolYear(l.User.PoolYear). - SetIsStaff(l.User.Staff). - SetIsAUser(false). - OnConflictColumns(user.FieldDuoID). - DoNothing(). - Exec(cmd.Context()) - if err != nil { - log.Fatal().Err(err).Msg("Failed to create user") - } - u, err = client.User.Query().Where(user.DuoID(l.User.ID)).Only(cmd.Context()) - if err != nil { - log.Fatal().Err(err).Msg("Failed to get user 1") - } - } else { - log.Fatal().Err(err).Msg("Failed to get user 2") - } + log.Fatal().Err(err).Msg("Failed to create user") } bulk = append(bulk, client.Location.Create(). SetCampus(campus). SetUser(u). SetDuoID(l.ID). - SetBeginAt(l.BeginAt). - SetNillableEndAt(l.EndAt). + SetBeginAt(l.BeginAt.Time()). + SetNillableEndAt(l.EndAt.NillableTime()). SetIdentifier(l.Host). SetUserDuoID(l.User.ID). SetUserDuoLogin(l.User.Login)) diff --git a/cmd/webhooks.go b/cmd/webhooks.go new file mode 100644 index 00000000..8c03f32e --- /dev/null +++ b/cmd/webhooks.go @@ -0,0 +1,67 @@ +/* +Copyright © 2022 42Atomys + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ +package cmd + +import ( + "os" + + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + + "atomys.codes/stud42/internal/webhooks" +) + +// webhooksCmd represents the webhooks command +var webhooksCmd = &cobra.Command{ + Use: "webhooks", + Short: "A brief description of your command", + Long: `A longer description that spans multiple lines and likely contains examples +and usage of using your command. For example: + +Cobra is a CLI library for Go that empowers applications. +This application is a tool to generate the needed files +to quickly create a Cobra application.`, + Run: func(cmd *cobra.Command, args []string) { + var amqpURL = os.Getenv("AMQP_URL") + if amqpURL == "" { + log.Fatal().Msg("AMQP_URL not set") + } + + if err := webhooks.New().Serve(amqpURL, "webhooks-deliveries"); err != nil { + log.Fatal().Err(err).Msg("failed to start rabbitmq consumer") + } + }, +} + +func init() { + jobsCmd.AddCommand(webhooksCmd) + + // Here you will define your flags and configuration settings. + + // Cobra supports Persistent Flags which will work for this command + // and all subcommands, e.g.: + // webhooksCmd.PersistentFlags().String("foo", "", "A help for foo") + + // Cobra supports local flags which will only run when this command + // is called directly, e.g.: + // webhooksCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") +} diff --git a/deploy/app/crawler-locations/base/cronjob.yaml b/deploy/app/crawler-locations/base/cronjob.yaml index d455222d..f5627c45 100644 --- a/deploy/app/crawler-locations/base/cronjob.yaml +++ b/deploy/app/crawler-locations/base/cronjob.yaml @@ -4,7 +4,7 @@ metadata: name: crawler-locations spec: # Each minute - schedule: "* * * * *" + schedule: "* */12 * * *" successfulJobsHistoryLimit: 2 failedJobsHistoryLimit: 5 concurrencyPolicy: Forbid diff --git a/deploy/app/webhooked/base/kustomization.yaml b/deploy/app/webhooked/base/kustomization.yaml index 5cfe67cb..3b15cf9e 100644 --- a/deploy/app/webhooked/base/kustomization.yaml +++ b/deploy/app/webhooked/base/kustomization.yaml @@ -5,7 +5,10 @@ resources: - virtual-service.yaml commonLabels: + app: webhooked + version: '0.6' kubernetes.io/name: webhooked + app.kubernetes.io/version: '0.6' app.kubernetes.io/component: micro-service app.kubernetes.io/part-of: s42-app app.kubernetes.io/managed-by: kustomize diff --git a/deploy/app/webhooks-processor/base/deployment.yaml b/deploy/app/webhooks-processor/base/deployment.yaml new file mode 100644 index 00000000..a7ec984b --- /dev/null +++ b/deploy/app/webhooks-processor/base/deployment.yaml @@ -0,0 +1,73 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: webhooks-processor +spec: + selector: {} + revisionHistoryLimit: 1 + template: + spec: + imagePullSecrets: + - name: ghcr-creds + containers: + - name: service + image: app + args: [ '--config', '/config/stud42.yaml', jobs, webhooks ] + command: [ stud42cli ] + env: + - name: GO_ENV + value: production + - name: APP_VERSION + value: latest + - name: SENTRY_DSN + valueFrom: + secretKeyRef: + key: 'WEBHOOKS_PROCESSOR_DSN' + name: 'sentry-dsns' + - name: DATABASE_URL + value: postgresql://$(DATABASE_USERNAME):$(DATABASE_PASSWORD)@$(DATABASE_HOST):5432/$(DATABASE_NAME)?sslmode=disable + - name: DATABASE_HOST + value: primary-postgres + - name: DATABASE_USERNAME + value: postgres + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + key: POSTGRES_PASSWORD + name: primary-postgres-credentials + - name: DATABASE_NAME + value: s42 + - name: AMQP_URL + value: amqp://$(AMQP_USERNAME):$(AMQP_PASSWORD)@$(AMQP_HOST):$(AMQP_PORT)/ + - name: AMQP_USERNAME + valueFrom: + secretKeyRef: + key: username + name: prod-primary-rabbitmq-default-user + - name: AMQP_PASSWORD + valueFrom: + secretKeyRef: + key: password + name: prod-primary-rabbitmq-default-user + - name: AMQP_HOST + valueFrom: + secretKeyRef: + key: host + name: prod-primary-rabbitmq-default-user + - name: AMQP_PORT + valueFrom: + secretKeyRef: + key: port + name: prod-primary-rabbitmq-default-user + volumeMounts: + - name: config + mountPath: /config + readOnly: true + resources: + limits: + memory: "42Mi" + cpu: "200m" + volumes: + - name: config + configMap: + name: stud42-config \ No newline at end of file diff --git a/deploy/app/webhooks-processor/base/kustomization.yaml b/deploy/app/webhooks-processor/base/kustomization.yaml new file mode 100644 index 00000000..36052210 --- /dev/null +++ b/deploy/app/webhooks-processor/base/kustomization.yaml @@ -0,0 +1,10 @@ +resources: +- deployment.yaml + +commonLabels: + kubernetes.io/name: webhooks-processor + app.kubernetes.io/version: '0.1' + app.kubernetes.io/component: micro-service + app.kubernetes.io/part-of: s42-app + app.kubernetes.io/managed-by: kustomize + app.kubernetes.io/created-by: github-actions \ No newline at end of file diff --git a/deploy/app/webhooks-processor/overlays/live/kustomization.yaml b/deploy/app/webhooks-processor/overlays/live/kustomization.yaml new file mode 100644 index 00000000..077ec4cd --- /dev/null +++ b/deploy/app/webhooks-processor/overlays/live/kustomization.yaml @@ -0,0 +1,17 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +namePrefix: prod- +resources: +- ../../base +namespace: production + +commonAnnotations: + sidecar.istio.io/inject: "false" + +images: +- name: app + newName: ghcr.io/42atomys/stud42 + newTag: latest + +patchesStrategicMerge: +- ./use-live-primary-postgres.yaml \ No newline at end of file diff --git a/deploy/app/webhooks-processor/overlays/live/use-live-primary-postgres.yaml b/deploy/app/webhooks-processor/overlays/live/use-live-primary-postgres.yaml new file mode 100644 index 00000000..0863d329 --- /dev/null +++ b/deploy/app/webhooks-processor/overlays/live/use-live-primary-postgres.yaml @@ -0,0 +1,14 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: webhooks-processor +spec: + jobTemplate: + spec: + template: + spec: + containers: + - name: service + env: + - name: DATABASE_HOST + value: prod-primary-postgres \ No newline at end of file diff --git a/go.mod b/go.mod index 615b88e5..a65c88ce 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/shurcooL/githubv4 v0.0.0-20220115235240-a14260e6f8a2 github.com/spf13/cobra v1.4.0 github.com/spf13/viper v1.10.1 + github.com/streadway/amqp v1.0.0 github.com/stretchr/testify v1.7.1 github.com/vektah/gqlparser/v2 v2.4.1 github.com/vmihailenco/msgpack/v5 v5.0.0-beta.9 diff --git a/go.sum b/go.sum index a557743f..dfcddfca 100644 --- a/go.sum +++ b/go.sum @@ -403,6 +403,8 @@ github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5q github.com/spf13/viper v1.10.1 h1:nuJZuYpG7gTj/XqiUwg8bA0cp1+M2mC3J4g5luUYBKk= github.com/spf13/viper v1.10.1/go.mod h1:IGlFPqhNAPKRxohIzWpI5QEy4kuI7tcl5WvR+8qy1rU= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= diff --git a/internal/api/api.resolvers.go b/internal/api/api.resolvers.go index 16a1712a..d4d345ce 100644 --- a/internal/api/api.resolvers.go +++ b/internal/api/api.resolvers.go @@ -7,6 +7,13 @@ import ( "context" "os" + "github.com/bwmarrin/discordgo" + "github.com/google/uuid" + "github.com/rs/zerolog/log" + "github.com/shurcooL/githubv4" + "github.com/spf13/viper" + "golang.org/x/oauth2" + apigen "atomys.codes/stud42/internal/api/generated" typesgen "atomys.codes/stud42/internal/api/generated/types" "atomys.codes/stud42/internal/models/generated" @@ -14,12 +21,6 @@ import ( "atomys.codes/stud42/internal/models/generated/campus" "atomys.codes/stud42/internal/models/generated/location" "atomys.codes/stud42/internal/models/generated/user" - "github.com/bwmarrin/discordgo" - "github.com/google/uuid" - "github.com/rs/zerolog/log" - "github.com/shurcooL/githubv4" - "github.com/spf13/viper" - "golang.org/x/oauth2" ) func (r *mutationResolver) CreateFriendship(ctx context.Context, userID uuid.UUID) (bool, error) { diff --git a/internal/models/utils.go b/internal/models/utils.go new file mode 100644 index 00000000..64ef393f --- /dev/null +++ b/internal/models/utils.go @@ -0,0 +1,88 @@ +package modelsutils + +import ( + "context" + "fmt" + + "github.com/rs/zerolog/log" + + modelgen "atomys.codes/stud42/internal/models/generated" + "atomys.codes/stud42/internal/models/generated/user" + "atomys.codes/stud42/pkg/duoapi" +) + +func UserFirstOrCreateFromComplexLocation(ctx context.Context, l *duoapi.Location[duoapi.ComplexLocationUser]) (*modelgen.User, error) { + u, err := client.User.Query().Where(user.DuoID(l.User.ID)).Only(ctx) + if err != nil { + if modelgen.IsNotFound(err) { + err := client.User.Create(). + SetEmail(l.User.Email). + SetDuoID(l.User.ID). + SetDuoLogin(l.User.Login). + SetFirstName(l.User.FirstName). + SetLastName(l.User.LastName). + SetUsualFirstName(l.User.UsualFirstName). + SetPhone(l.User.Phone). + SetPoolMonth(l.User.PoolMonth). + SetPoolYear(l.User.PoolYear). + SetIsStaff(l.User.Staff). + SetIsAUser(false). + OnConflictColumns(user.FieldDuoID). + DoNothing(). + Exec(ctx) + if err != nil { + return nil, err + } + u, err = client.User.Query().Where(user.DuoID(l.User.ID)).Only(ctx) + if err != nil { + return nil, err + } + } else { + return nil, err + } + } + + return u, err +} + +func UserFirstOrCreateFromLocation(ctx context.Context, l *duoapi.Location[duoapi.LocationUser]) (*modelgen.User, error) { + u, err := client.User.Query().Where(user.DuoID(l.User.ID)).Only(ctx) + if err != nil { + if modelgen.IsNotFound(err) { + log.Debug().Str("login", l.User.Login).Int("duoID", l.User.ID).Msg("user don't exist import it with duopai") + duoUser, err := duoapi.UserGet(ctx, fmt.Sprint(l.User.ID)) + if err != nil { + return nil, err + } + + id, err := client.User.Create(). + SetEmail(duoUser.Email). + SetDuoID(l.User.ID). + SetDuoLogin(l.User.Login). + SetFirstName(duoUser.FirstName). + SetLastName(duoUser.LastName). + SetUsualFirstName(duoUser.UsualFirstName). + SetPhone(duoUser.Phone). + SetPoolMonth(duoUser.PoolMonth). + SetPoolYear(duoUser.PoolYear). + SetIsStaff(duoUser.Staff). + SetIsAUser(false). + OnConflictColumns(user.FieldDuoID). + DoNothing(). + ID(ctx) + if err != nil { + log.Debug().Err(err).Msg("error creating user") + return nil, err + } + u, err = client.User.Get(ctx, id) + if err != nil { + log.Debug().Err(err).Msg("failed to get user") + return nil, err + } + } else { + return nil, err + } + } + + return u, nil +} diff --git a/internal/webhooks/location.go b/internal/webhooks/location.go new file mode 100644 index 00000000..6a111c38 --- /dev/null +++ b/internal/webhooks/location.go @@ -0,0 +1,55 @@ +package webhooks + +import ( + "database/sql" + "errors" + + "github.com/rs/zerolog/log" + + modelsutils "atomys.codes/stud42/internal/models" + "atomys.codes/stud42/internal/models/generated/campus" + "atomys.codes/stud42/internal/models/generated/location" + "atomys.codes/stud42/pkg/duoapi" +) + +type locationProcessor struct { + *processor + duoapi.LocationWebhookProcessor[duoapi.LocationUser] +} + +func (p *locationProcessor) Create(loc *duoapi.Location[duoapi.LocationUser], metadata *duoapi.WebhookMetadata) error { + campus, err := p.db.Campus.Query().Where(campus.DuoID(loc.CampusID)).Only(p.ctx) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + log.Fatal().Msgf("Campus %d not found", loc.CampusID) + } + log.Fatal().Err(err).Msg("Failed to get campus") + } + + user, err := modelsutils.UserFirstOrCreateFromLocation(p.ctx, loc) + if err != nil { + log.Error().Err(err).Msg("Failed to create user") + } + + return p.db.Location.Create(). + SetCampus(campus). + SetDuoID(loc.ID). + SetBeginAt(loc.BeginAt.Time()). + SetNillableEndAt(loc.EndAt.NillableTime()). + SetIdentifier(loc.Host). + SetUser(user). + SetUserDuoID(loc.User.ID). + SetUserDuoLogin(user.DuoLogin). + Exec(p.ctx) +} +func (p *locationProcessor) Close(loc *duoapi.Location[duoapi.LocationUser], metadata *duoapi.WebhookMetadata) error { + return p.db.Location.Update(). + SetNillableEndAt(loc.EndAt.NillableTime()). + SetIdentifier(loc.Host). + Where(location.DuoID(loc.ID)). + Exec(p.ctx) +} +func (p *locationProcessor) Destroy(loc *duoapi.Location[duoapi.LocationUser], metadata *duoapi.WebhookMetadata) error { + _, err := p.db.Location.Delete().Where(location.DuoID(loc.ID)).Exec(p.ctx) + return err +} diff --git a/internal/webhooks/serve.go b/internal/webhooks/serve.go new file mode 100644 index 00000000..593a6f45 --- /dev/null +++ b/internal/webhooks/serve.go @@ -0,0 +1,100 @@ +package webhooks + +import ( + "context" + "encoding/json" + + "github.com/rs/zerolog/log" + "github.com/streadway/amqp" + + modelsutils "atomys.codes/stud42/internal/models" + modelgen "atomys.codes/stud42/internal/models/generated" + "atomys.codes/stud42/pkg/duoapi" +) + +type processor struct { + db *modelgen.Client + ctx context.Context +} + +func New() *processor { + if err := modelsutils.Connect(); err != nil { + log.Fatal().Err(err).Msg("failed to connect to database") + } + + return &processor{ + ctx: context.Background(), + db: modelsutils.Client(), + } +} + +func (p *processor) Serve(amqpUrl, channel string) error { + + conn, err := amqp.Dial(amqpUrl) + if err != nil { + return err + } + defer conn.Close() + + ch, err := conn.Channel() + if err != nil { + return err + } + defer ch.Close() + + if err := ch.Qos(5, 0, false); err != nil { + return err + } + + msgs, err := ch.Consume( + channel, // queue + "webhooks-processor", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + return err + } + + log.Info().Msg("Consumer ready. Waiting for messages...") + for d := range msgs { + log.Debug().Msg("Received a message") + err := p.handler(d.Body) + if err != nil { + if err = d.Nack(false, true); err != nil { + log.Error().Err(err).Msg("Cannot nack the message") + } + continue + } + if err := d.Ack(false); err != nil { + log.Error().Err(err).Msg("Cannot ack the message") + } + } + return nil +} + +func (p *processor) handler(data []byte) error { + md := &duoapi.Webhook{} + + if err := json.Unmarshal(data, &md); err != nil { + log.Error().Err(err).Msg("Failed to unmarshal webhook metadata") + } + + log.Debug().Msgf("Received a message(%s.%s): %+v", md.Metadata.Model, md.Metadata.Event, md.Payload) + var err error + switch md.Metadata.Model { + case "location": + err = md.Payload.ProcessWebhook(p.ctx, md.Metadata, &locationProcessor{processor: p}) + case "user": + err = md.Payload.ProcessWebhook(p.ctx, md.Metadata, &userProcessor{processor: p}) + } + if err != nil { + log.Error().Err(err).Str("model", md.Metadata.Model).Str("event", md.Metadata.Event).Msg("Failed to process webhook") + return err + } + + return nil +} diff --git a/internal/webhooks/user.go b/internal/webhooks/user.go new file mode 100644 index 00000000..bf20ab01 --- /dev/null +++ b/internal/webhooks/user.go @@ -0,0 +1,60 @@ +package webhooks + +import ( + "fmt" + + typesgen "atomys.codes/stud42/internal/api/generated/types" + modelgen "atomys.codes/stud42/internal/models/generated" + "atomys.codes/stud42/internal/models/generated/user" + "atomys.codes/stud42/pkg/duoapi" +) + +type userProcessor struct { + *processor + duoapi.UserWebhookProcessor +} + +func (p *userProcessor) Create(u *duoapi.User, metadata *duoapi.WebhookMetadata) error { + return p.db.User.Create(). + AddAccounts(&modelgen.Account{ + Provider: string(typesgen.ProviderDuo), + ProviderAccountID: fmt.Sprint(u.ID), + Username: u.Login, + }). + SetDuoID(u.ID). + SetDuoLogin(u.Login). + SetFirstName(u.FirstName). + SetLastName(u.LastName). + SetEmail(u.Email). + SetIsStaff(u.Staff). + SetNillableUsualFirstName(&u.UsualFirstName). + OnConflictColumns(user.FieldDuoID). + UpdateNewValues(). + Exec(p.ctx) +} + +func (p *userProcessor) Update(u *duoapi.User, metadata *duoapi.WebhookMetadata) error { + err := p.db.User.Update(). + SetDuoID(u.ID). + SetDuoLogin(u.Login). + SetFirstName(u.FirstName). + SetLastName(u.LastName). + SetEmail(u.Email). + SetIsStaff(u.Staff). + SetPhone(u.Phone). + SetPoolYear(u.PoolYear). + SetPoolMonth(u.PoolMonth). + SetNillableUsualFirstName(&u.UsualFirstName). + Where(user.DuoID(u.ID)). + Exec(p.ctx) + + if modelgen.IsNotFound(err) { + return p.Create(u, metadata) + } + + return err +} + +func (p *userProcessor) Alumnize(u *duoapi.User, metadata *duoapi.WebhookMetadata) error { + return p.Update(u, metadata) +} diff --git a/pkg/duoapi/campus.go b/pkg/duoapi/campus.go index 8ed1543b..da6a4a55 100644 --- a/pkg/duoapi/campus.go +++ b/pkg/duoapi/campus.go @@ -6,6 +6,7 @@ import ( "github.com/rs/zerolog/log" ) +// CampusAll returns the list of all campuses in the 42 ecosystem func CampusAll(ctx context.Context) ([]*Campus, error) { var campus = make([]*Campus, 0) err := requestCollection(ctx, EndpointCampus, map[string]string{"per_page": "100"}, &campus) @@ -16,6 +17,7 @@ func CampusAll(ctx context.Context) ([]*Campus, error) { return campus, nil } +// CampusGet returns the campus with the given ID or nil if not found func CampusGet(ctx context.Context, campusID string) (*Campus, error) { var campus = &Campus{} err := request(ctx, EndpointCampus+"/"+campusID, nil, &campus) diff --git a/pkg/duoapi/endpoints.go b/pkg/duoapi/endpoints.go index 917b1416..cfbe87bd 100644 --- a/pkg/duoapi/endpoints.go +++ b/pkg/duoapi/endpoints.go @@ -7,4 +7,5 @@ var ( EndpointCampus = EndpointBaseAPI + EndpointVersion + "/campus" EndpointLocations = EndpointBaseAPI + EndpointVersion + "/locations" + EndpointUsers = EndpointBaseAPI + EndpointVersion + "/users" ) diff --git a/pkg/duoapi/locations.go b/pkg/duoapi/locations.go index 85a603ba..638da022 100644 --- a/pkg/duoapi/locations.go +++ b/pkg/duoapi/locations.go @@ -1,10 +1,12 @@ package duoapi -import "context" +import ( + "context" +) // LocationsActive returns the list of active locations of a campus -func LocationsActive(ctx context.Context, campusID string) ([]*Location, error) { - var locations = make([]*Location, 0) +func LocationsActive(ctx context.Context, campusID string) ([]*Location[ComplexLocationUser], error) { + var locations = make([]*Location[ComplexLocationUser], 0) err := requestCollection(ctx, EndpointLocations, map[string]string{"campus_id": campusID, "per_page": "100", "filter[active]": "true"}, &locations) if err != nil { return nil, err diff --git a/pkg/duoapi/structs.go b/pkg/duoapi/structs.go index 37b5d30e..2f1df999 100644 --- a/pkg/duoapi/structs.go +++ b/pkg/duoapi/structs.go @@ -1,7 +1,5 @@ package duoapi -import "time" - type HeaderLink map[string]string type Campus struct { @@ -30,38 +28,63 @@ type Language struct { Identifier string `json:"identifier"` } -type Location struct { +type Location[UserType ILocationUser] struct { ID int `json:"id"` - BeginAt time.Time `json:"begin_at"` - EndAt *time.Time `json:"end_at"` + BeginAt DuoTime `json:"begin_at"` + EndAt *DuoTime `json:"end_at"` Primary bool `json:"primary"` Host string `json:"host"` CampusID int `json:"campus_id"` - User LocationUser `json:"user"` + User UserType `json:"user"` +} + +type ILocationUser interface { + LocationUser | ComplexLocationUser } type LocationUser struct { - ID int `json:"id"` - Email string `json:"email"` - Login string `json:"login"` - FirstName string `json:"first_name"` - LastName string `json:"last_name"` - UsualFullName string `json:"usual_full_name"` - UsualFirstName string `json:"usual_first_name"` - URL string `json:"url"` - Phone string `json:"phone"` - Displayname string `json:"displayname"` - ImageURL string `json:"image_url"` - NewImageURL string `json:"new_image_url"` - Staff bool `json:"staff?"` - CorrectionPoint int `json:"correction_point"` - PoolMonth string `json:"pool_month"` - PoolYear string `json:"pool_year"` - Location string `json:"location"` - Wallet int `json:"wallet"` - AnonymizeDate time.Time `json:"anonymize_date"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` - Alumni bool `json:"alumni"` - IsLaunched bool `json:"is_launched?"` + ID int `json:"id"` + Login string `json:"login"` + URL string `json:"url"` +} + +type ComplexLocationUser struct { + ID int `json:"id"` + Login string `json:"login"` + URL string `json:"url"` + Email string `json:"email"` + FirstName string `json:"first_name"` + LastName string `json:"last_name"` + UsualFullName string `json:"usual_full_name"` + UsualFirstName string `json:"usual_first_name"` + Phone string `json:"phone"` + Displayname string `json:"displayname"` + ImageURL string `json:"image_url"` + NewImageURL string `json:"new_image_url"` + Staff bool `json:"staff?"` + CorrectionPoint int `json:"correction_point"` + PoolMonth string `json:"pool_month"` + PoolYear string `json:"pool_year"` + Location string `json:"location"` + Wallet int `json:"wallet"` + AnonymizeDate DuoTime `json:"anonymize_date"` + CreatedAt DuoTime `json:"created_at"` + UpdatedAt DuoTime `json:"updated_at"` + Alumni bool `json:"alumni"` + IsLaunched bool `json:"is_launched?"` +} + +type User struct { + ID int `json:"id"` + Email string `json:"email"` + Login string `json:"login"` + FirstName string `json:"first_name"` + LastName string `json:"last_name"` + UsualFullName string `json:"usual_full_name"` + UsualFirstName string `json:"usual_first_name"` + URL string `json:"url"` + Staff bool `json:"staff?"` + Phone string `json:"phone"` + PoolMonth string `json:"pool_month"` + PoolYear string `json:"pool_year"` } diff --git a/pkg/duoapi/time.go b/pkg/duoapi/time.go new file mode 100644 index 00000000..a9946344 --- /dev/null +++ b/pkg/duoapi/time.go @@ -0,0 +1,46 @@ +package duoapi + +import ( + "encoding/json" + "strings" + "time" +) + +type DuoTime time.Time + +const duoTimeFormat = "2006-01-02 15:04:05 MST" + +func (dt *DuoTime) UnmarshalJSON(b []byte) error { + s := strings.Trim(string(b), "\"") + t, err := time.Parse(duoTimeFormat, s) + if err != nil { + return err + } + *dt = DuoTime(t) + return nil +} + +func (dt DuoTime) MarshalJSON() ([]byte, error) { + return json.Marshal(dt.Time()) +} + +func (dt DuoTime) Time() time.Time { + return time.Time(dt) +} + +func (dt *DuoTime) NillableTime() *time.Time { + if dt == nil { + return nil + } + + nt := dt.Time() + return &nt +} + +func (dt DuoTime) Format(s string) string { + return dt.Time().Format(s) +} + +func (dt DuoTime) String() string { + return dt.Time().String() +} diff --git a/pkg/duoapi/user.go b/pkg/duoapi/user.go new file mode 100644 index 00000000..9e1241e5 --- /dev/null +++ b/pkg/duoapi/user.go @@ -0,0 +1,14 @@ +package duoapi + +import ( + "context" +) + +func UserGet(ctx context.Context, userID string) (*User, error) { + var user = &User{} + err := request(ctx, EndpointUsers+"/"+userID, nil, &user) + if err != nil { + return nil, err + } + return user, nil +} diff --git a/pkg/duoapi/webhooks.go b/pkg/duoapi/webhooks.go new file mode 100644 index 00000000..f085cfcc --- /dev/null +++ b/pkg/duoapi/webhooks.go @@ -0,0 +1,98 @@ +package duoapi + +import ( + "context" + "encoding/json" + "errors" +) + +// Webhooks System is an advanced mechanism of the 42 API. Most applications +// will not need to use it and cannot have access to the Webhooks system. +// +// The Webhooks system is used to send notifications to applications when +// certain events happen in the 42 ecosystem. +// +// To use the s42 webhooks system, you need to reformat the JSON payloads +// that are sent to the webhooks system. The following is an example of a +// webhook payload: +// { +// "metadata": { +// "event": "create", +// "model": "location", +// "deliveryID": "6e9004ca-160e-4712-b3df-c2670f49c03f" +// }, +// "payload": { +// "campus_id": "12345", +// "id": "12345", +// "active": true, +// "host": "e1r1p1", +// } +// } +// +// +// A Wiki page will be created in the future to explain the webhooks system +// and how to use it. For now, please refer to the implementation of the +// webhooks system in the s42 API. (internal/webhooks/*.go) + +type IWebhookPayload interface { + ProcessWebhook(ctx context.Context, metadata *WebhookMetadata, processor WebhookProcessor) error +} + +// WebhookMetadata contains the metadata of a webhook. +// This informations is sended originally by the 42 API on the Header of the +// webhook. +type WebhookMetadata struct { + // Event is the event that triggered the webhook. (Header: X-Event) + // Possible values are listed on the interface {Model}WebhookProcessor. + Event string `json:"event"` + // Model is the model that triggered the webhook. (Header: X-Model) + // Possible values are all models with WebhookProcessor implemented. + Model string `json:"model"` + // DeliveryID is the ID of the webhook delivery. (Header: X-Delivery) + DeliveryID string `json:"deliveryID"` +} + +// Webhook is the JSON structure of a FORMATTED webhook payload. +// This is the format used acutally internally by the S42 project. +type Webhook struct { + Metadata *WebhookMetadata `json:"metadata"` + Payload IWebhookPayload `json:"payload"` +} + +// WebhookProcessor is the interface that must be implemented by all webhook +// processors and used to know which models have webhooks. +type WebhookProcessor interface { + HasWebhooks() bool +} + +// ErrInvalidWebhookProcessor is returned when the processor is not valid for +// the requested model. To know which methods are valid for a model, please +// refer to the implementation of {Model}WebhookProcessor. +// +// Example: for `Location` refers to `LocationWebhookProcessor`` +var ErrInvalidWebhookProcessor = errors.New("invalid webhook processor for current type") + +// UnmarshalJSON unmarshals the JSON FORMATTED payload into a Webhook struct. +// This is the format used acutally internally by the S42 project. +func (w *Webhook) UnmarshalJSON(data []byte) error { + type tp struct { + Metadata struct { + Model string `json:"model"` + } `json:"metadata"` + } + var typ = tp{} + + if err := json.Unmarshal(data, &typ); err != nil { + return err + } + + switch typ.Metadata.Model { + case "location": + w.Payload = new(Location[LocationUser]) + case "user": + w.Payload = new(User) + } + + type tmp Webhook // avoids infinite recursion + return json.Unmarshal(data, (*tmp)(w)) +} diff --git a/pkg/duoapi/webhooks_processors.go b/pkg/duoapi/webhooks_processors.go new file mode 100644 index 00000000..2c0aaad7 --- /dev/null +++ b/pkg/duoapi/webhooks_processors.go @@ -0,0 +1,79 @@ +package duoapi + +import ( + "context" +) + +// LocationWebhookProcessor is the interface that must be implemented by a +// webhook processor for the Location model. +type LocationWebhookProcessor[T ILocationUser] interface { + WebhookProcessor + + // Create is called when a new location is created. + Create(location *Location[T], metadata *WebhookMetadata) error + // Close is called when a location is closed. + // (When an user disconnects from a location) + Close(location *Location[T], metadata *WebhookMetadata) error + // Destroy is called when a location is destroyed. + // (When a location is invalid and removed by the system) + Destroy(location *Location[T], metadata *WebhookMetadata) error +} + +// HasWebhooks returns true because the Location model has webhooks. +func (*Location[LocationUser]) HasWebhooks() bool { + return true +} + +// ProcessWebhook processes a webhook for the Location model. +func (l *Location[LocationUser]) ProcessWebhook(ctx context.Context, metadata *WebhookMetadata, processor WebhookProcessor) error { + p, ok := processor.(LocationWebhookProcessor[LocationUser]) + if !ok { + return ErrInvalidWebhookProcessor + } + + switch metadata.Event { + case "create": + return p.Create(l, metadata) + case "close": + return p.Close(l, metadata) + case "destroy": + return p.Destroy(l, metadata) + } + return nil +} + +// UserWebhookProcessor is the interface that must be implemented by a +// webhook processor for the User model. +type UserWebhookProcessor interface { + WebhookProcessor + + // Create is called when a new user is created. + Create(location *User, metadata *WebhookMetadata) error + // Update is called when an user is updated. + Update(location *User, metadata *WebhookMetadata) error + // Alumnize is called when an user is alumnized. + Alumnize(location *User, metadata *WebhookMetadata) error +} + +// HasWebhooks returns true because the User model has webhooks. +func (*User) HasWebhooks() bool { + return true +} + +// ProcessWebhook processes a webhook for the User model. +func (l *User) ProcessWebhook(ctx context.Context, metadata *WebhookMetadata, processor WebhookProcessor) error { + p, ok := processor.(UserWebhookProcessor) + if !ok { + return ErrInvalidWebhookProcessor + } + + switch metadata.Event { + case "create": + return p.Create(l, metadata) + case "update": + return p.Update(l, metadata) + case "alumnize": + return p.Alumnize(l, metadata) + } + return nil +}