Skip to content

Commit

Permalink
fix workflow and traversal remove manager
Browse files Browse the repository at this point in the history
removing temporal and memory distinction for now until a clearer understanding of how to integrate
  • Loading branch information
Chris committed Aug 21, 2023
1 parent dfba71a commit 65880db
Show file tree
Hide file tree
Showing 53 changed files with 1,566 additions and 1,063 deletions.
256 changes: 187 additions & 69 deletions gen/http/http.pb.go

Large diffs are not rendered by default.

438 changes: 214 additions & 224 deletions gen/project.pb.go

Large diffs are not rendered by default.

49 changes: 30 additions & 19 deletions gen/reason/reason.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"@types/react-dom": "18.0.11",
"@types/react-syntax-highlighter": "^15.5.6",
"bootstrap": "^5.3.0",
"d3-hierarchy": "^3.1.2",
"esbuild-plugin-swc": "^1.0.1",
"esbuild-style-plugin": "^1.6.1",
"eslint": "8.39.0",
Expand Down
4 changes: 0 additions & 4 deletions pkg/graph/edge/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ func (b *Base) ID() string {
return b.Edge.Id
}

func (b *Base) CanWire() bool {
return true
}

func NewBase(edge *gen.Edge) *Base {
return &Base{
Edge: edge,
Expand Down
4 changes: 0 additions & 4 deletions pkg/graph/edge/provides.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ func NewProvides(edge *gen.Edge, p *gen.Provides) graph.Edge {
}
}

func (p *Provides) CanWire() bool {
return false
}

func (p *Provides) Connect(from, to graph.Node) error {
return to.SetProvider(from)
}
7 changes: 3 additions & 4 deletions pkg/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package graph
import (
"context"
"github.com/pkg/errors"
"github.com/protoflow-labs/protoflow/gen"
"github.com/protoflow-labs/protoflow/pkg/grpc"
"github.com/protoflow-labs/protoflow/pkg/grpc/manager"
"github.com/reactivex/rxgo/v2"
Expand Down Expand Up @@ -44,9 +45,8 @@ type Node interface {
ID() string
// TODO breadchris type should probably just return a message descriptor
Type() (*Info, error)
// Method returns the method descriptor for input and output of the node.
// Method() (*Info, error)
Init() (func(), error)

Provide() ([]*gen.Node, error)

// Provider returns the node that this node depends on. (eg. a grpc method node will return the service node)
Provider() (Node, error)
Expand Down Expand Up @@ -75,5 +75,4 @@ type Edge interface {
From() string
To() string
Connect(from, to Node) error
CanWire() bool
}
4 changes: 2 additions & 2 deletions pkg/graph/node/base/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func (n *Node) Method() (*graph.Info, error) {
return nil, errors.New("not implemented")
}

func (n *Node) Init() (func(), error) {
return func() {}, nil
func (n *Node) Provide() ([]*gen.Node, error) {
return []*gen.Node{}, nil
}

// TODO breadchris this should be more robust and take into consideration type types of edges into the node
Expand Down
40 changes: 5 additions & 35 deletions pkg/graph/node/code/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ package code
import (
"context"
"github.com/pkg/errors"
"github.com/protoflow-labs/protoflow/gen"
"github.com/protoflow-labs/protoflow/gen/code"
"github.com/protoflow-labs/protoflow/pkg/graph"
"github.com/protoflow-labs/protoflow/pkg/graph/node/base"
"github.com/protoflow-labs/protoflow/pkg/graph/node/grpc"
"github.com/rs/zerolog/log"
"net"
"net/url"
"time"
igrpc "github.com/protoflow-labs/protoflow/pkg/grpc"
)

type Server struct {
Expand Down Expand Up @@ -46,42 +44,14 @@ func (r *Server) GetServer() *grpc.Server {
return r.GRPC
}

func (r *Server) Init() (func(), error) {
if r.Grpc != nil {
return r.GRPC.Init()
}
return nil, nil
}

func (r *Server) Type() (*graph.Info, error) {
return nil, errors.New("implement me")
}

func (r *Server) Wire(ctx context.Context, input graph.IO) (graph.IO, error) {
//TODO implement me
panic("implement me")
return input, nil
}

func ensureRunning(host string) error {
maxRetries := 1
retryInterval := 2 * time.Second

u, err := url.Parse(host)
if err != nil {
return errors.Wrapf(err, "unable to parse url %s", host)
}

log.Debug().Str("host", host).Msg("waiting for host to come online")
for i := 1; i <= maxRetries; i++ {
conn, err := net.DialTimeout("tcp", u.Host, time.Second)
if err == nil {
conn.Close()
log.Debug().Str("host", host).Msg("host is not listening")
return nil
} else {
log.Debug().Err(err).Int("attempt", i).Int("max", maxRetries).Msg("error connecting to host")
time.Sleep(retryInterval)
}
}
return errors.New("host did not come online in time")
func (r *Server) Provide() ([]*gen.Node, error) {
return igrpc.EnumerateResourceBlocks(r.Grpc, false)
}
2 changes: 1 addition & 1 deletion pkg/graph/node/grpc/method.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewMethodProto(packageService, m string) *pgrpc.GRPC {

func GetMethodFromServer(r *Server, n *Method, protocol bufcurl.ReflectProtocol) (protoreflect.MethodDescriptor, error) {
// TODO breadchris I think a grpc resource should have a host that has a protocol
m := manager.NewReflectionManager("http://"+r.Host, manager.WithProtocol(protocol))
m := manager.NewReflectionManager(r.Host, manager.WithProtocol(protocol))
cleanup, err := m.Init()
if err != nil {
return nil, errors.Wrapf(err, "error initializing reflection manager")
Expand Down
24 changes: 10 additions & 14 deletions pkg/graph/node/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package grpc

import (
"context"
"github.com/protoflow-labs/protoflow/gen"
pgrpc "github.com/protoflow-labs/protoflow/gen/grpc"
"github.com/protoflow-labs/protoflow/pkg/graph"
"github.com/protoflow-labs/protoflow/pkg/graph/node/base"
"github.com/protoflow-labs/protoflow/pkg/grpc"
"strings"
)

Expand All @@ -18,6 +20,10 @@ type ServerProvider interface {
}

func NewServer(b *base.Node, n *pgrpc.Server) *Server {
// TODO breadchris this shouldn't be here
if !strings.HasPrefix(n.Host, "http://") {
n.Host = "http://" + n.Host
}
return &Server{
Node: b,
Server: n,
Expand All @@ -38,20 +44,10 @@ func (n *Server) GetServer() *Server {
return n
}

func (n *Server) Init() (func(), error) {
// TODO breadchris this is a hack to get the grpc server running, this is not ideal
if !strings.HasPrefix(n.Host, "http://") {
n.Host = "http://" + n.Host
}
//if err := ensureRunning(r.Host); err != nil {
// // TODO breadchris ignore errors for now
// // return nil, errors.Wrapf(err, "unable to get the %s grpc server running", r.Name())
// return nil, nil
//}
return nil, nil
func (n *Server) Wire(ctx context.Context, input graph.IO) (graph.IO, error) {
return input, nil
}

func (n *Server) Wire(ctx context.Context, input graph.IO) (graph.IO, error) {
//TODO implement me
panic("implement me")
func (n *Server) Provide() ([]*gen.Node, error) {
return grpc.EnumerateResourceBlocks(n.Server, false)
}
7 changes: 2 additions & 5 deletions pkg/graph/node/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,13 @@ type Router struct {
var _ graph.Node = &Router{}

func NewRouterNode(b *base.Node, node *http.Router) *Router {
return &Router{
r := &Router{
Node: b,
Router: node,
}
}

func (r *Router) Init() (func(), error) {
// TODO breadchris proper dependency injection will need to be figured out to make this work
r.HTTPStream = NewHTTPEventStream()
return nil, nil
return r
}

func (r *Router) Wire(ctx context.Context, input graph.IO) (graph.IO, error) {
Expand Down
14 changes: 14 additions & 0 deletions pkg/graph/node/http/node.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package http

import (
"github.com/google/uuid"
"github.com/protoflow-labs/protoflow/gen"
"github.com/protoflow-labs/protoflow/gen/http"
"github.com/protoflow-labs/protoflow/pkg/graph"
"github.com/protoflow-labs/protoflow/pkg/graph/node/base"
Expand All @@ -14,7 +16,19 @@ func New(b *base.Node, node *http.HTTP) graph.Node {
return NewRouterNode(b, t.Router)
case *http.HTTP_Template:
return NewTemplateNode(b, t.Template)
case *http.HTTP_TemplateFs:
return NewTemplateFSNode(b, t.TemplateFs)
default:
return nil
}
}

func NewProto(name string, c *http.HTTP) *gen.Node {
return &gen.Node{
Id: uuid.NewString(),
Name: name,
Type: &gen.Node_Http{
Http: c,
},
}
}
55 changes: 55 additions & 0 deletions pkg/graph/node/http/response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package http

import (
"context"
"fmt"
"github.com/protoflow-labs/protoflow/gen/http"
"github.com/protoflow-labs/protoflow/pkg/graph"
"github.com/protoflow-labs/protoflow/pkg/graph/node/base"
"github.com/protoflow-labs/protoflow/pkg/util/rx"
"github.com/reactivex/rxgo/v2"
)

type Response struct {
*base.Node
*http.Response
}

var _ graph.Node = &Response{}

func NewResponse(b *base.Node, node *http.Response) *Response {
return &Response{
Node: b,
Response: node,
}
}

func (n *Response) Wire(ctx context.Context, input graph.IO) (graph.IO, error) {
output := make(chan rxgo.Item)
p, err := n.Provider()
if err != nil {
return graph.IO{}, err
}
routerResource, ok := p.(*Router)
if !ok {
return graph.IO{}, fmt.Errorf("error getting http router resource: %v", n.Response)
}

input.Observable.ForEach(func(item any) {
r, ok := item.(*http.Response)
if !ok {
output <- rx.NewError(fmt.Errorf("error getting http request from stream"))
return
}
routerResource.HTTPStream.Responses <- r
output <- rx.NewItem(r)
}, func(err error) {
output <- rx.NewError(err)
}, func() {
close(output)
})

return graph.IO{
Observable: rxgo.FromChannel(output, rxgo.WithPublishStrategy()),
}, nil
}
Loading

0 comments on commit 65880db

Please sign in to comment.