Skip to content

Commit

Permalink
feat: manual triggers and give clients a hook into step run events (#141
Browse files Browse the repository at this point in the history
)

* feat: pubsub for clients, more qol stuff

* fix: generate sqlc files

* chore: linting and comments
  • Loading branch information
abelanger5 authored Feb 2, 2024
1 parent aed11c3 commit 82d7995
Show file tree
Hide file tree
Showing 52 changed files with 2,056 additions and 425 deletions.
38 changes: 38 additions & 0 deletions api-contracts/dispatcher/dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ service Dispatcher {

rpc Listen(WorkerListenRequest) returns (stream AssignedAction) {}

rpc SubscribeToWorkflowEvents(SubscribeToWorkflowEventsRequest) returns (stream WorkflowEvent) {}

rpc SendStepActionEvent(StepActionEvent) returns (ActionEventResponse) {}

rpc SendGroupKeyActionEvent(GroupKeyActionEvent) returns (ActionEventResponse) {}
Expand Down Expand Up @@ -167,3 +169,39 @@ message ActionEventResponse {
// the id of the worker
string workerId = 2;
}

message SubscribeToWorkflowEventsRequest {
// the id of the workflow run
string workflowRunId = 1;
}

enum ResourceType {
RESOURCE_TYPE_UNKNOWN = 0;
RESOURCE_TYPE_STEP_RUN = 1;
RESOURCE_TYPE_WORKFLOW_RUN = 2;
}

enum ResourceEventType {
RESOURCE_EVENT_TYPE_UNKNOWN = 0;
RESOURCE_EVENT_TYPE_STARTED = 1;
RESOURCE_EVENT_TYPE_COMPLETED = 2;
RESOURCE_EVENT_TYPE_FAILED = 3;
RESOURCE_EVENT_TYPE_CANCELLED = 4;
RESOURCE_EVENT_TYPE_TIMED_OUT = 5;
}

message WorkflowEvent {
// the id of the workflow run
string workflowRunId = 1;

ResourceType resourceType = 2;

ResourceEventType eventType = 3;

string resourceId = 4;

google.protobuf.Timestamp eventTimestamp = 5;

// the event payload
string eventPayload = 6;
}
13 changes: 12 additions & 1 deletion api-contracts/workflows/workflows.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ service WorkflowService {
rpc ListWorkflows(ListWorkflowsRequest) returns (ListWorkflowsResponse);
rpc PutWorkflow(PutWorkflowRequest) returns (WorkflowVersion);
rpc ScheduleWorkflow(ScheduleWorkflowRequest) returns (WorkflowVersion);
rpc TriggerWorkflow(TriggerWorkflowRequest) returns (TriggerWorkflowResponse);
rpc GetWorkflowByName(GetWorkflowByNameRequest) returns (Workflow);
rpc ListWorkflowsForEvent(ListWorkflowsForEventRequest) returns (ListWorkflowsResponse);
rpc DeleteWorkflow(DeleteWorkflowRequest) returns (Workflow);

}

message PutWorkflowRequest {
Expand Down Expand Up @@ -161,4 +161,15 @@ message DeleteWorkflowRequest {

message GetWorkflowByNameRequest {
string name = 1;
}

message TriggerWorkflowRequest {
string name = 1;

// (optional) the input data for the workflow
string input = 2;
}

message TriggerWorkflowResponse {
string workflow_run_id = 1;
}
74 changes: 74 additions & 0 deletions examples/manual-trigger/trigger/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package main

import (
"fmt"
"time"

"github.com/joho/godotenv"

"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
)

type userCreateEvent struct {
Username string `json:"username"`
UserID string `json:"user_id"`
Data map[string]string `json:"data"`
}

type stepOutput struct {
Message string `json:"message"`
}

func main() {
err := godotenv.Load()
if err != nil {
panic(err)
}

events := make(chan string, 50)
if err := run(cmdutils.InterruptChan(), events); err != nil {
panic(err)
}
}

func run(ch <-chan interface{}, events chan<- string) error {
c, err := client.New()

if err != nil {
return fmt.Errorf("error creating client: %w", err)
}

time.Sleep(1 * time.Second)

// trigger workflow
workflowRunId, err := c.Admin().RunWorkflow(
"post-user-update",
&userCreateEvent{
Username: "echo-test",
UserID: "1234",
Data: map[string]string{
"test": "test",
},
},
)

if err != nil {
return fmt.Errorf("error running workflow: %w", err)
}

fmt.Println("workflow run id:", workflowRunId)

interruptCtx, cancel := cmdutils.InterruptContextFromChan(ch)
defer cancel()

err = c.Run().On(interruptCtx, workflowRunId, func(event *client.StepRunEvent) error {
if event.Type == client.StepRunEventTypeCompleted {
fmt.Println(string(event.Payload))
}

return nil
})

return err
}
136 changes: 136 additions & 0 deletions examples/manual-trigger/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package main

import (
"fmt"
"time"

"github.com/joho/godotenv"

"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
"github.com/hatchet-dev/hatchet/pkg/worker"
)

type userCreateEvent struct {
Username string `json:"username"`
UserID string `json:"user_id"`
Data map[string]string `json:"data"`
}

type stepOutput struct {
Message string `json:"message"`
}

func main() {
err := godotenv.Load()
if err != nil {
panic(err)
}

events := make(chan string, 50)
if err := run(cmdutils.InterruptChan(), events); err != nil {
panic(err)
}
}

func run(ch <-chan interface{}, events chan<- string) error {
c, err := client.New()

if err != nil {
return fmt.Errorf("error creating client: %w", err)
}

// Create a worker. This automatically reads in a TemporalClient from .env and workflow files from the .hatchet
// directory, but this can be customized with the `worker.WithTemporalClient` and `worker.WithWorkflowFiles` options.
w, err := worker.NewWorker(
worker.WithClient(
c,
),
)
if err != nil {
return fmt.Errorf("error creating worker: %w", err)
}

testSvc := w.NewService("test")

err = testSvc.On(
worker.Events("user:create:simple"),
&worker.WorkflowJob{
Name: "post-user-update",
Description: "This runs after an update to the user model.",
Steps: []*worker.WorkflowStep{
worker.Fn(func(ctx worker.HatchetContext) (result *stepOutput, err error) {
input := &userCreateEvent{}
ctx.WorkflowInput(input)

time.Sleep(1 * time.Second)

return &stepOutput{
Message: "Step 1 got username: " + input.Username,
}, nil
},
).SetName("step-one"),
worker.Fn(func(ctx worker.HatchetContext) (result *stepOutput, err error) {
input := &userCreateEvent{}
ctx.WorkflowInput(input)

time.Sleep(2 * time.Second)

return &stepOutput{
Message: "Step 2 got username: " + input.Username,
}, nil
}).SetName("step-two"),
worker.Fn(func(ctx worker.HatchetContext) (result *stepOutput, err error) {
step1Out := &stepOutput{}
ctx.StepOutput("step-one", step1Out)

step2Out := &stepOutput{}
ctx.StepOutput("step-two", step2Out)

time.Sleep(3 * time.Second)

return &stepOutput{
Message: "Step 3: has parents 1 and 2:" + step1Out.Message + ", " + step2Out.Message,
}, nil
}).SetName("step-three").AddParents("step-one", "step-two"),
worker.Fn(func(ctx worker.HatchetContext) (result *stepOutput, err error) {
step1Out := &stepOutput{}
ctx.StepOutput("step-one", step1Out)

step3Out := &stepOutput{}
ctx.StepOutput("step-three", step3Out)

time.Sleep(4 * time.Second)

return &stepOutput{
Message: "Step 4: has parents 1 and 3" + step1Out.Message + ", " + step3Out.Message,
}, nil
}).SetName("step-four").AddParents("step-one", "step-three"),
worker.Fn(func(ctx worker.HatchetContext) (result *stepOutput, err error) {
step4Out := &stepOutput{}
ctx.StepOutput("step-four", step4Out)

time.Sleep(5 * time.Second)

return &stepOutput{
Message: "Step 5: has parent 4" + step4Out.Message,
}, nil
}).SetName("step-five").AddParents("step-four"),
},
},
)
if err != nil {
return fmt.Errorf("error registering workflow: %w", err)
}

interruptCtx, cancel := cmdutils.InterruptContextFromChan(ch)
defer cancel()

err = w.Start(interruptCtx)

if err != nil {
panic(err)
}

return nil
}
3 changes: 3 additions & 0 deletions frontend/app/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"@heroicons/react": "^2.0.18",
"@hookform/resolvers": "^3.3.2",
"@lukemorales/query-key-factory": "^1.3.2",
"@monaco-editor/react": "^4.6.0",
"@radix-ui/react-accordion": "^1.1.2",
"@radix-ui/react-avatar": "^1.0.4",
"@radix-ui/react-checkbox": "^1.0.4",
Expand All @@ -32,6 +33,7 @@
"@radix-ui/react-select": "^2.0.0",
"@radix-ui/react-separator": "^1.0.3",
"@radix-ui/react-slot": "^1.0.2",
"@radix-ui/react-tabs": "^1.0.4",
"@radix-ui/react-toast": "^1.1.5",
"@tanstack/react-query": "^5.12.1",
"@tanstack/react-table": "^8.10.7",
Expand All @@ -54,6 +56,7 @@
"cronstrue": "^2.47.0",
"dagre": "^0.8.5",
"jotai": "^2.6.0",
"monaco-themes": "^0.4.4",
"prism-react-renderer": "^2.3.0",
"qs": "^6.11.2",
"react": "^18.2.0",
Expand Down
Loading

0 comments on commit 82d7995

Please sign in to comment.