Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add epoll support to ws origin requests #921

Open
wants to merge 36 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b7fbcea
chore: wip add epoll
jensneuse Oct 14, 2024
0d47057
chore: wip add epoll
jensneuse Oct 14, 2024
89385f1
Merge branch 'master' into feat/add-epoll-to-origin-ws
jensneuse Oct 14, 2024
5fae3c8
chore: finish epoll implementation for subgraph requests
jensneuse Oct 17, 2024
64e9734
Merge branch 'master' into feat/add-epoll-to-origin-ws
jensneuse Oct 17, 2024
bbdd192
chore: add error handling to subscribe
jensneuse Oct 17, 2024
9586289
chore: cleanup subscriptions impl
jensneuse Oct 17, 2024
9d854ba
chore: fix lint
jensneuse Oct 17, 2024
898d704
Merge branch 'master' into feat/add-epoll-to-origin-ws
jensneuse Oct 17, 2024
b0bd97a
chore: skip epoll if not supported
jensneuse Oct 17, 2024
406ab38
chore: skip epoll tests if not supported
jensneuse Oct 17, 2024
05944f0
chore: skip epoll tests if not supported
jensneuse Oct 17, 2024
071349c
chore: skip epoll tests if not supported
jensneuse Oct 17, 2024
d9b985c
chore: skip epoll tests if not supported
jensneuse Oct 17, 2024
cf48a53
chore: skip epoll tests if not supported
jensneuse Oct 17, 2024
7bb424f
chore: fix race
jensneuse Oct 18, 2024
7609a0d
chore: add tests for epoll disabled
jensneuse Oct 18, 2024
7b40eb8
fix: revert to previous library, move epoll lib to engine
StarpTech Oct 20, 2024
8b30a1d
chore: move heartbeat into resolver, address PR feedback
jensneuse Oct 21, 2024
3eb74ca
Merge branch 'master' into feat/add-epoll-to-origin-ws
jensneuse Oct 21, 2024
8bb9a05
chore: cleanup
jensneuse Oct 21, 2024
8d19b96
chore: fix lint
jensneuse Oct 21, 2024
e968003
chore: fix tests
jensneuse Oct 21, 2024
3c47efc
chore: fix tests
jensneuse Oct 21, 2024
90bf1ec
chore: fix tests
jensneuse Oct 21, 2024
90498eb
chore: cleanup
jensneuse Oct 21, 2024
44de1f6
chore: use websocket library to close conn, fix race with trigger cancel
StarpTech Oct 21, 2024
b7035ac
chore: use http client for upgrades
jensneuse Oct 22, 2024
2281d9d
chore: improve heap usage and epoll unsubscribe
jensneuse Oct 22, 2024
b97e73b
feat: limit concurrency, decrease read deadline when reading messages
StarpTech Oct 23, 2024
d006b22
fix: dead lock, test
StarpTech Oct 23, 2024
1ed429e
fix: tests
StarpTech Oct 23, 2024
b71bef3
chore: unsubscribe in seperate routine
StarpTech Oct 23, 2024
07cc7c4
fix: lint
StarpTech Oct 23, 2024
264dacb
chore: handle unsubscribe async batched
jensneuse Oct 24, 2024
8aed613
Merge remote-tracking branch 'origin/feat/add-epoll-to-origin-ws' int…
jensneuse Oct 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (

require (
github.com/agnivade/levenshtein v1.1.1 // indirect
github.com/coder/websocket v1.8.12 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMU
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo=
github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down
18 changes: 18 additions & 0 deletions v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -1738,6 +1738,8 @@ type GraphQLSubscriptionClient interface {
// Subscribe to the origin source. The implementation must not block the calling goroutine.
Subscribe(ctx *resolve.Context, options GraphQLSubscriptionOptions, updater resolve.SubscriptionUpdater) error
UniqueRequestID(ctx *resolve.Context, options GraphQLSubscriptionOptions, hash *xxhash.Digest) (err error)
SubscribeAsync(ctx *resolve.Context, id uint64, options GraphQLSubscriptionOptions, updater resolve.SubscriptionUpdater) error
Unsubscribe(id uint64)
}

type GraphQLSubscriptionOptions struct {
Expand All @@ -1763,6 +1765,22 @@ type SubscriptionSource struct {
client GraphQLSubscriptionClient
}

func (s *SubscriptionSource) AsyncStart(ctx *resolve.Context, id uint64, input []byte, updater resolve.SubscriptionUpdater) error {
var options GraphQLSubscriptionOptions
err := json.Unmarshal(input, &options)
if err != nil {
return err
}
if options.Body.Query == "" {
return resolve.ErrUnableToResolve
}
return s.client.SubscribeAsync(ctx, id, options, updater)
}

func (s *SubscriptionSource) AsyncStop(id uint64) {
s.client.Unsubscribe(id)
}

// Start the subscription. The updater is called on new events. Start needs to be called in a separate goroutine.
func (s *SubscriptionSource) Start(ctx *resolve.Context, input []byte, updater resolve.SubscriptionUpdater) error {
var options GraphQLSubscriptionOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8834,6 +8834,14 @@ var errSubscriptionClientFail = errors.New("subscription client fail error")

type FailingSubscriptionClient struct{}

func (f *FailingSubscriptionClient) SubscribeAsync(ctx *resolve.Context, id uint64, options GraphQLSubscriptionOptions, updater resolve.SubscriptionUpdater) error {
return errSubscriptionClientFail
}

func (f *FailingSubscriptionClient) Unsubscribe(id uint64) {

}

func (f *FailingSubscriptionClient) Subscribe(ctx *resolve.Context, options GraphQLSubscriptionOptions, updater resolve.SubscriptionUpdater) error {
return errSubscriptionClientFail
}
Expand Down
61 changes: 34 additions & 27 deletions v2/pkg/engine/datasource/graphql_datasource/graphql_sse_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,65 +25,68 @@ var (
)

type gqlSSEConnectionHandler struct {
conn *http.Client
ctx context.Context
log log.Logger
options GraphQLSubscriptionOptions
conn *http.Client
requestContext, engineContext context.Context
log log.Logger
options GraphQLSubscriptionOptions
updater resolve.SubscriptionUpdater
}

func newSSEConnectionHandler(ctx *resolve.Context, conn *http.Client, opts GraphQLSubscriptionOptions, l log.Logger) *gqlSSEConnectionHandler {
func newSSEConnectionHandler(requestContext, engineContext context.Context, conn *http.Client, updater resolve.SubscriptionUpdater, options GraphQLSubscriptionOptions, l log.Logger) *gqlSSEConnectionHandler {
return &gqlSSEConnectionHandler{
conn: conn,
ctx: ctx.Context(),
log: l,
options: opts,
conn: conn,
requestContext: requestContext,
engineContext: engineContext,
log: l,
updater: updater,
options: options,
}
}

func (h *gqlSSEConnectionHandler) StartBlocking(sub Subscription) {
reqCtx := sub.ctx

func (h *gqlSSEConnectionHandler) StartBlocking() {
dataCh := make(chan []byte)
errCh := make(chan []byte)
defer func() {
close(dataCh)
close(errCh)
sub.updater.Done()
h.updater.Done()
}()

go h.subscribe(reqCtx, sub, dataCh, errCh)
go h.subscribe(dataCh, errCh)

ticker := time.NewTicker(resolve.HearbeatInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
sub.updater.Heartbeat()
h.updater.Heartbeat()
case data := <-dataCh:
ticker.Reset(resolve.HearbeatInterval)
sub.updater.Update(data)
h.updater.Update(data)
case data := <-errCh:
ticker.Reset(resolve.HearbeatInterval)
sub.updater.Update(data)
h.updater.Update(data)
return
case <-h.requestContext.Done():
return
case <-reqCtx.Done():
case <-h.engineContext.Done():
return
}
}
}

func (h *gqlSSEConnectionHandler) subscribe(ctx context.Context, sub Subscription, dataCh, errCh chan []byte) {
resp, err := h.performSubscriptionRequest(ctx)
func (h *gqlSSEConnectionHandler) subscribe(dataCh, errCh chan []byte) {
resp, err := h.performSubscriptionRequest()
if err != nil {
h.log.Error("failed to perform subscription request", log.Error(err))

if ctx.Err() != nil {
if h.requestContext.Err() != nil {
// request context was canceled do not send an error as channel will be closed
return
}

sub.updater.Update([]byte(internalError))
h.updater.Update([]byte(internalError))

return
}
Expand All @@ -94,8 +97,12 @@ func (h *gqlSSEConnectionHandler) subscribe(ctx context.Context, sub Subscriptio
reader := sse.NewEventStreamReader(resp.Body, math.MaxInt)

for {
if ctx.Err() != nil {
select {
case <-h.requestContext.Done():
return
case <-h.engineContext.Done():
return
default:
}

msg, err := reader.ReadEvent()
Expand Down Expand Up @@ -126,7 +133,7 @@ func (h *gqlSSEConnectionHandler) subscribe(ctx context.Context, sub Subscriptio
continue
}

if ctx.Err() != nil {
if h.requestContext.Err() != nil {
// request context was canceled do not send an error as channel will be closed
return
}
Expand Down Expand Up @@ -205,16 +212,16 @@ func trim(data []byte) []byte {
return data
}

func (h *gqlSSEConnectionHandler) performSubscriptionRequest(ctx context.Context) (*http.Response, error) {
func (h *gqlSSEConnectionHandler) performSubscriptionRequest() (*http.Response, error) {

var req *http.Request
var err error

// default to GET requests when SSEMethodPost is not enabled in the SubscriptionConfiguration
if h.options.SSEMethodPost {
req, err = h.buildPOSTRequest(ctx)
req, err = h.buildPOSTRequest(h.requestContext)
} else {
req, err = h.buildGETRequest(ctx)
req, err = h.buildGETRequest(h.requestContext)
}

if err != nil {
Expand Down
Loading
Loading