Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PostgreSQL Messaging Pub/Sub Watermill #1680

Merged
merged 11 commits into from
Nov 22, 2023
Merged

Conversation

teodor-yanev
Copy link
Contributor

@teodor-yanev teodor-yanev commented Nov 16, 2023

Issue described in: #1636

Testing

2023-11-16 10:20:18 {"level":"info","Timestamp":1700122818822984756,"message":"Initializing logger in level: debug"}
2023-11-16 10:20:18 {"level":"info","Timestamp":1700122818822995839,"message":"No cloud provider credentials specified, using password"}
2023-11-16 10:20:18 {"level":"info","Timestamp":1700122818827850756,"message":"Connected to DB"}
2023-11-16 10:20:18 {"level":"debug","component":"watermill","count":"1","Timestamp":1700122818827921006,"message":"Adding publisher decorators"}
2023-11-16 10:20:18 {"level":"debug","component":"watermill","count":"1","Timestamp":1700122818827937214,"message":"Adding subscriber decorators"}
2023-11-16 10:20:18 {"level":"debug","component":"watermill","count":"1","Timestamp":1700122818827971464,"message":"Adding middleware"}
2023-11-16 10:20:18 {"level":"debug","component":"watermill","count":"3","Timestamp":1700122818827978714,"message":"Adding middleware"}
2023-11-16 10:20:18 {"level":"info","component":"watermill","topic":"internal.entity.event","handler_name":"github.com/stacklok/minder/internal/engine.(*Executor).HandleEntityEvent-fm-internal.entity.event","Timestamp":1700122818837505464,"message":"Adding handler"}
2023-11-16 10:20:18 {"level":"info","component":"watermill","handler_name":"github.com/stacklok/minder/internal/reconcilers.(*Reconciler).handleRepoReconcilerEvent-fm-internal.repo.reconciler.event","topic":"internal.repo.reconciler.event","Timestamp":1700122818837799547,"message":"Adding handler"}
2023-11-16 10:20:18 {"level":"info","component":"watermill","topic":"internal.profile.init.event","handler_name":"github.com/stacklok/minder/internal/reconcilers.(*Reconciler).handleProfileInitEvent-fm-internal.profile.init.event","Timestamp":1700122818837813172,"message":"Adding handler"}
2023-11-16 10:20:18 {"level":"debug","component":"watermill","Timestamp":1700122818837889714,"message":"Loading plugins"}
2023-11-16 10:20:18 {"level":"info","component":"watermill","count":3,"Timestamp":1700122818837920672,"message":"Running router handlers"}
2023-11-16 10:20:18 {"level":"debug","component":"watermill","subscriber_name":"github.com/stacklok/minder/internal/engine.(*Executor).HandleEntityEvent-fm-internal.entity.event","topic":"internal.entity.event","Timestamp":1700122818838012047,"message":"Subscribing to topic"}
2023-11-16 10:20:18 {"Timestamp":1700122818838127922,"message":"2023/11/16 08:20:18 Starting gRPC server on 0.0.0.0:8090"}
2023-11-16 10:20:18 {"Timestamp":1700122818838998756,"message":"2023/11/16 08:20:18 Starting HTTP server on 0.0.0.0:8080"}
2023-11-16 10:20:18 {"Timestamp":1700122818839487422,"message":"2023/11/16 08:20:18 Starting metrics server on 0.0.0.0:9090"}
2023-11-16 10:20:18 [watermill] 2023/11/16 08:20:18.838062 schema.go:28: level=INFO  msg="Initializing subscriber schema" query="[ 
2023-11-16 10:20:18 CREATE TABLE IF NOT EXISTS "watermill_internal.entity.event" (
2023-11-16 10:20:18 "offset" SERIAL,
2023-11-16 10:20:18 "uuid" VARCHAR(36) NOT NULL,
2023-11-16 10:20:18 "created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
2023-11-16 10:20:18 "payload" JSON DEFAULT NULL,
2023-11-16 10:20:18 "metadata" JSON DEFAULT NULL,
2023-11-16 10:20:18 "transaction_id" xid8 NOT NULL,
2023-11-16 10:20:18 PRIMARY KEY ("transaction_id", "offset")
2023-11-16 10:20:18 );
2023-11-16 10:20:18  
2023-11-16 10:20:18 CREATE TABLE IF NOT EXISTS "watermill_offsets_internal.entity.event" (
2023-11-16 10:20:18 consumer_group VARCHAR(255) NOT NULL,
2023-11-16 10:20:18 offset_acked BIGINT,
2023-11-16 10:20:18 last_processed_transaction_id xid8 NOT NULL,
2023-11-16 10:20:18 PRIMARY KEY(consumer_group)
2023-11-16 10:20:18 )]" subscriber_id=01HFBKEY8BZBT23P4EEANZD004 
2023-11-16 10:20:18 [watermill] 2023/11/16 08:20:18.845592 schema.go:28: level=INFO  msg="Initializing subscriber schema" query="[ 
2023-11-16 10:20:18 CREATE TABLE IF NOT EXISTS "watermill_internal.repo.reconciler.event" (
2023-11-16 10:20:18 "offset" SERIAL,
2023-11-16 10:20:18 "uuid" VARCHAR(36) NOT NULL,
2023-11-16 10:20:18 {"level":"debug","component":"watermill","subscriber_name":"github.com/stacklok/minder/internal/reconcilers.(*Reconciler).handleRepoReconcilerEvent-fm-internal.repo.reconciler.event","topic":"internal.repo.reconciler.event","Timestamp":1700122818845564006,"message":"Subscribing to topic"}
2023-11-16 10:20:18 {"level":"info","component":"watermill","subscriber_name":"github.com/stacklok/minder/internal/engine.(*Executor).HandleEntityEvent-fm-internal.entity.event","topic":"internal.entity.event","Timestamp":1700122818845722756,"message":"Starting handler"}
2023-11-16 10:20:18 {"level":"debug","component":"watermill","subscriber_name":"github.com/stacklok/minder/internal/reconcilers.(*Reconciler).handleProfileInitEvent-fm-internal.profile.init.event","topic":"internal.profile.init.event","Timestamp":1700122818849883131,"message":"Subscribing to topic"}
2023-11-16 10:20:18 {"level":"info","component":"watermill","subscriber_name":"github.com/stacklok/minder/internal/reconcilers.(*Reconciler).handleRepoReconcilerEvent-fm-internal.repo.reconciler.event","topic":"internal.repo.reconciler.event","Timestamp":1700122818849898172,"message":"Starting handler"}
2023-11-16 10:20:18 {"level":"info","component":"watermill","topic":"internal.profile.init.event","subscriber_name":"github.com/stacklok/minder/internal/reconcilers.(*Reconciler).handleProfileInitEvent-fm-internal.profile.init.event","Timestamp":1700122818860212756,"message":"Starting handler"}
2023-11-16 10:20:33 {"level":"error","Resource":{"service":"minder.v1.UserService","method":"GetUser"},"Attributes":{"http.code":"NotFound","http.content-type":["application/grpc"],"http.duration":"2.34775ms","http.user_agent":["minder-cli/v0.0.15+ref.d8c9ae80 (darwin) arm64 (go1.21.3) grpc-go/1.59.0"],"exception.message":"rpc error: code = NotFound desc = user not found"},"Timestamp":1700122833197264929}
2023-11-16 10:20:33 {"level":"info","Resource":{"service":"minder.v1.UserService","method":"CreateUser"},"Attributes":{"http.code":"OK","http.content-type":["application/grpc"],"http.duration":"6.48ms","http.user_agent":["minder-cli/v0.0.15+ref.d8c9ae80 (darwin) arm64 (go1.21.3) grpc-go/1.59.0"]},"Timestamp":1700122833204679846}
2023-11-16 10:20:18 "created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
2023-11-16 10:20:18 "payload" JSON DEFAULT NULL,
2023-11-16 10:20:18 "metadata" JSON DEFAULT NULL,
2023-11-16 10:20:18 "transaction_id" xid8 NOT NULL,
2023-11-16 10:20:18 PRIMARY KEY ("transaction_id", "offset")
2023-11-16 10:20:18 );
2023-11-16 10:20:18  
2023-11-16 10:20:18 CREATE TABLE IF NOT EXISTS "watermill_offsets_internal.repo.reconciler.event" (
2023-11-16 10:20:18 consumer_group VARCHAR(255) NOT NULL,
2023-11-16 10:20:18 offset_acked BIGINT,
2023-11-16 10:20:18 last_processed_transaction_id xid8 NOT NULL,
2023-11-16 10:20:18 PRIMARY KEY(consumer_group)
2023-11-16 10:20:18 )]" subscriber_id=01HFBKEY8BZBT23P4EEANZD004 
2023-11-16 10:20:18 [watermill] 2023/11/16 08:20:18.849949 schema.go:28: level=INFO  msg="Initializing subscriber schema" query="[ 
2023-11-16 10:20:18 CREATE TABLE IF NOT EXISTS "watermill_internal.profile.init.event" (
2023-11-16 10:20:18 "offset" SERIAL,
2023-11-16 10:20:18 "uuid" VARCHAR(36) NOT NULL,
2023-11-16 10:20:18 "created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
2023-11-16 10:20:18 "payload" JSON DEFAULT NULL,
2023-11-16 10:20:18 "metadata" JSON DEFAULT NULL,
2023-11-16 10:20:18 "transaction_id" xid8 NOT NULL,
2023-11-16 10:20:18 PRIMARY KEY ("transaction_id", "offset")
2023-11-16 10:20:18 );
2023-11-16 10:20:18  
2023-11-16 10:20:18 CREATE TABLE IF NOT EXISTS "watermill_offsets_internal.profile.init.event" (
2023-11-16 10:20:18 consumer_group VARCHAR(255) NOT NULL,
2023-11-16 10:20:18 offset_acked BIGINT,
2023-11-16 10:20:18 last_processed_transaction_id xid8 NOT NULL,
2023-11-16 10:20:18 PRIMARY KEY(consumer_group)
2023-11-16 10:20:18 )]" subscriber_id=01HFBKEY8BZBT23P4EEANZD004 

Screenshot 2023-11-16 at 10 24 50

Update: after adding the second database "minder_queue", we have a nice separation of concerns:
image

Copy link
Contributor

@stacklokbot stacklokbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minder analyzed this PR and found no vulnerable dependencies.

Copy link
Contributor

@stacklokbot stacklokbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minder analyzed this PR and found no vulnerable dependencies.

Copy link
Contributor

@stacklokbot stacklokbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minder analyzed this PR and found no vulnerable dependencies.

Copy link
Contributor

@stacklokbot stacklokbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minder analyzed this PR and found no vulnerable dependencies.

@jhrozek
Copy link
Contributor

jhrozek commented Nov 16, 2023

Maybe a stupid question, but since there appears to be a single table, how does publishing and subscribing work with multiple minder processes? Does the subscriber implement some locking with e.g. SELECT FOR UPDATE?

Copy link
Contributor

@JAORMX JAORMX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that the postgres driver brings some new tables to the mix. Where are those created? The minder-server database user does not have permissions to create tables, only the migration user is able to do this. I'm guessing this would be an issue.

@teodor-yanev teodor-yanev force-pushed the postresql-messaging-watermill branch from 3a4d2be to ec775c0 Compare November 17, 2023 11:44
Copy link
Contributor

@stacklokbot stacklokbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minder analyzed this PR and found no vulnerable dependencies.

Copy link
Contributor

@stacklokbot stacklokbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minder analyzed this PR and found no vulnerable dependencies.

Copy link
Contributor

@stacklokbot stacklokbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minder analyzed this PR and found no vulnerable dependencies.

Copy link
Contributor

@stacklokbot stacklokbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minder analyzed this PR and found no vulnerable dependencies.

Copy link
Contributor

@stacklokbot stacklokbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minder analyzed this PR and found no vulnerable dependencies.

Copy link
Contributor

@stacklokbot stacklokbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minder analyzed this PR and found no vulnerable dependencies.

Copy link
Contributor

@stacklokbot stacklokbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minder analyzed this PR and found no vulnerable dependencies.

Copy link
Contributor

@stacklokbot stacklokbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minder analyzed this PR and found no vulnerable dependencies.

internal/config/config.go Outdated Show resolved Hide resolved
config/config.yaml.example Outdated Show resolved Hide resolved
config/config.yaml.example Outdated Show resolved Hide resolved
internal/events/eventer.go Outdated Show resolved Hide resolved
Copy link
Contributor

@stacklokbot stacklokbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minder analyzed this PR and found no vulnerable dependencies.

Copy link
Contributor

@stacklokbot stacklokbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minder analyzed this PR and found no vulnerable dependencies.

Copy link
Contributor

@stacklokbot stacklokbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minder analyzed this PR and found no vulnerable dependencies.

deployment/helm/templates/deployment.yaml Outdated Show resolved Hide resolved
internal/config/db.go Outdated Show resolved Hide resolved
Copy link
Contributor

@jhrozek jhrozek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove the block once we discuss the env variable, but on first glance it seems incorrect

deployment/helm/templates/deployment.yaml Outdated Show resolved Hide resolved
internal/config/db.go Outdated Show resolved Hide resolved
This removes the boilerplate on other parts of the code and instead moves
the responsibility to eventer.
Copy link
Contributor

@stacklokbot stacklokbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minder analyzed this PR and found no vulnerable dependencies.

Env var will be added as an extraEnv in the mediator-app.yaml in infra
Copy link
Contributor

@stacklokbot stacklokbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minder analyzed this PR and found no vulnerable dependencies.

@teodor-yanev
Copy link
Contributor Author

teodor-yanev commented Nov 21, 2023

Maybe a stupid question, but since there appears to be a single table, how does publishing and subscribing work with multiple minder processes? Does the subscriber implement some locking with e.g. SELECT FOR UPDATE?

Good question, to answer this one, I did some digging in files like https://github.com/ThreeDotsLabs/watermill-sql/blob/master/pkg/sql/offsets_adapter_postgresql.go and https://github.com/ThreeDotsLabs/watermill/blob/master/_examples/real-world-examples/exactly-once-delivery-counter/README.md, and the scarce documentation in https://watermill.io/pubsubs/sql/ so I can present an overall idea of how it functions:

There are essentially two types of tables involved for each topic and consumer group:

Message Table:

  • Contains the actual messages that are published.
  • Each message typically has a unique identifier, the payload, and possibly other metadata.
  • This table is used for both publishing (inserting messages) and subscribing (reading messages).

Offsets Table:

  • Used for tracking the progress of each consumer or consumer group.
  • It records the last message offset (or ID) that has been acknowledged or processed by a consumer group.
  • Ensures that each message is processed exactly once by each consumer group and helps maintain the order of message processing.
  • Uses mechanisms like SELECT ... FOR UPDATE to lock rows and manage concurrent access by different consumers.

Relationship Between the Tables

The message table and the offsets table are related in the sense that the offsets table keeps track of which messages from the message table have been processed. The offsets table stores an identifier (like an offset or a transaction ID) that corresponds to records in the message table.

@teodor-yanev teodor-yanev requested a review from jhrozek November 21, 2023 19:31
@jhrozek
Copy link
Contributor

jhrozek commented Nov 21, 2023

One more question, but I guess this can be a follow-up..how is the InitSchema attribute used? I can't seem to find the usage:

jakub@flatline-2 ~/devel/mediator (postresql-messaging-watermill●)$ git grep init_schema
internal/config/events.go:      InitSchema bool           `mapstructure:"init_schema" default:"true"`
jakub@flatline-2 ~/devel/mediator (postresql-messaging-watermill●)$ git grep InitSchema
internal/config/events.go:      // InitSchema is whether or not to initialize the schema
internal/config/events.go:      InitSchema bool           `mapstructure:"init_schema" default:"true"`

@JAORMX
Copy link
Contributor

JAORMX commented Nov 22, 2023

It isn't as of now. I made it configurable because I wanna figure out if it's possible to move this logic out of the workload, so then we could just toggle that.

@teodor-yanev
Copy link
Contributor Author

It isn't as of now. I made it configurable because I wanna figure out if it's possible to move this logic out of the workload, so then we could just toggle that.

Should we remove it for now and re-add it when it's present in the application logic?

@JAORMX JAORMX requested a review from stacklokbot November 22, 2023 09:23
@JAORMX JAORMX merged commit c063e24 into main Nov 22, 2023
12 checks passed
@JAORMX JAORMX deleted the postresql-messaging-watermill branch November 22, 2023 09:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants