Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improvement: implement a subscription notification batcher #217

Merged
merged 8 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ spark_locals_without_parens = [
]

[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
inputs: ["{mix,.formatter}.exs", "{config,lib,test,benchmarks}/**/*.{ex,exs}"],
locals_without_parens: spark_locals_without_parens,
export: [
locals_without_parens: spark_locals_without_parens
Expand Down
125 changes: 125 additions & 0 deletions benchmarks/subscriptions.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
alias AshGraphql.Test.PubSub
alias AshGraphql.Test.Schema

{:ok, _pubsub} = PubSub.start_link()
{:ok, _absinthe_sub} = Absinthe.Subscription.start_link(PubSub)

# Application.put_env(:ash_graphql, :simulate_subscription_processing_time, 1000)
:ok

admin = %{
id: 0,
role: :admin
}

create_mutation = """
mutation CreateSubscribable($input: CreateSubscribableInput) {
createSubscribable(input: $input) {
result{
id
text
}
errors{
message
}
}
}
"""

AshGraphql.Subscription.Batcher.start_link()

Benchee.run(
%{
"1 mutation" => fn _input ->
Absinthe.run(create_mutation, Schema,
variables: %{"input" => %{"text" => "foo"}},
context: %{actor: admin}
)
end
},
inputs: %{
"25 same subscribers" => {25, :same},
"500 same subscribers" => {500, :same},
"50 mixed subscribers" => {25, [:same, :different]},
"1000 mixed subscribers" => {500, [:same, :different]}
},
after_scenario: fn _ ->
count = fn counter ->
receive do
_msg ->
1 + counter.(counter)
after
0 -> 0
end
end

AshGraphql.Subscription.Batcher.drain()

IO.puts("Received #{count.(count)} messages")
end,
before_scenario: fn {input, types} ->
Application.put_env(PubSub, :notifier_test_pid, self())

if :different in List.wrap(types) do
Enum.each(1..input, fn i ->
actor = %{
id: i,
role: :admin
}

{:ok, %{"subscribed" => _topic}} =
Absinthe.run(
"""
subscription {
subscribableEvents {
created {
id
text
}
updated {
id
text
}
destroyed
}
}
""",
Schema,
context: %{actor: actor, pubsub: PubSub}
)
end)
end

if :same in List.wrap(types) do
Enum.each(1..input, fn _i ->
actor = %{
id: -1,
role: :admin
}

{:ok, %{"subscribed" => _topic}} =
Absinthe.run(
"""
subscription {
subscribableEvents {
created {
id
text
}
updated {
id
text
}
destroyed
}
}
""",
Schema,
context: %{actor: actor, pubsub: PubSub}
)
end)
end
end
)

AshGraphql.Subscription.Batcher.drain()
6 changes: 4 additions & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ config :ash, :disable_async?, true
config :ash, :validate_domain_resource_inclusion?, false
config :ash, :validate_domain_config_inclusion?, false

config :logger, level: :warning

config :ash, :pub_sub, debug?: true
config :logger, level: :info

config :ash_graphql, :subscriptions, true

if Mix.env() == :test do
config :ash_graphql, :simulate_subscription_slowness?, true
end

if Mix.env() == :dev do
config :git_ops,
mix_project: AshGraphql.MixProject,
Expand Down
27 changes: 26 additions & 1 deletion documentation/topics/use-subscriptions-with-graphql.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,39 @@ end

The subscription DSL is currently in beta and before using it you have to enable them in your config.

> ### Subscription response order {: .warning}
>
> The order in which the subscription responses are sent to the client is not guaranteed to be the
> same as the order in which the mutations were executed.

```elixir
config :ash_graphql, :policies, show_policy_breakdowns?: true
```

First you'll need to do some setup, follow the the [setup guide](https://hexdocs.pm/absinthe/subscriptions.html#absinthe-phoenix-setup)
in the absinthe docs, but instead of using `Absinthe.Pheonix.Endpoint` use `AshGraphql.Subscription.Endpoint`.

Afterwards add an empty subscription block to your schema module.
By default subscriptions are resolved synchronously as part of the mutation. This means that a resolver is run for every subscriber that
is not deduplicated. If you have a lot of subscribers you can add the `AshGraphql.Subscription.Batcher` to your supervision tree, which
batches up notifications and runs subscription resolution out-of-band.

```elixir
@impl true
def start(_type, _args) do
children = [
...,
{Absinthe.Subscription, MyAppWeb.Endpoint},
AshGraphql.Subscription.Batcher
]

# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: MyAppWeb.Supervisor]
Supervisor.start_link(children, opts)
end
```

Afterwards, add an empty subscription block to your schema module.

```elixir
defmodule MyAppWeb.Schema do
Expand Down
168 changes: 168 additions & 0 deletions lib/graphql/resolver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,166 @@ defmodule AshGraphql.Graphql.Resolver do
end
end

def resolve(
%{root_value: {:pre_resolved, item}} = resolution,
{_, _, %AshGraphql.Resource.Subscription{}, _}
) do
Absinthe.Resolution.put_result(
resolution,
{:ok, item}
)
end

def resolve(
%{arguments: args, context: context, root_value: notifications} = resolution,
{domain, resource,
%AshGraphql.Resource.Subscription{read_action: read_action, name: name}, _input?}
)
when is_list(notifications) do
case handle_arguments(resource, read_action, args) do
{:ok, args} ->
metadata = %{
domain: domain,
resource: resource,
resource_short_name: Ash.Resource.Info.short_name(resource),
actor: Map.get(context, :actor),
tenant: Map.get(context, :tenant),
action: read_action,
source: :graphql,
subscription: name,
authorize?: AshGraphql.Domain.Info.authorize?(domain)
}

trace domain,
resource,
:gql_subscription,
name,
metadata do
opts = [
actor: Map.get(context, :actor),
action: read_action,
authorize?: AshGraphql.Domain.Info.authorize?(domain),
tenant: Map.get(context, :tenant)
]

subscription_events =
notifications
|> Enum.group_by(& &1.action.type)
|> Enum.map(fn {type, notifications} ->
subscription_field = subcription_field_from_action_type(type)
key = String.to_existing_atom(subscription_field)

if type in [:create, :update] do
data = Enum.map(notifications, & &1.data)
{filter, args} = Map.pop(args, :filter)

read_action =
read_action || Ash.Resource.Info.primary_action!(resource, :read).name

# read the records that were just created/updated
query =
resource
|> Ash.Query.do_filter(massage_filter(resource, filter))
|> Ash.Query.for_read(read_action, args, opts)
|> AshGraphql.Subscription.query_for_subscription(
domain,
resolution,
subscription_result_type(name),
[subscription_field]
)

query_with_authorization_rules =
Ash.can(
query,
opts[:actor],
tenant: opts[:tenant],
run_queries?: false,
alter_source?: true
)

current_filter = query.filter

{known_results, need_refetch} =
case query_with_authorization_rules do
{:ok, true, %{authorize_results: [], filter: nil} = query} ->
{data, []}

{:ok, true,
%{authorize_results: [], filter: %Ash.Filter{expression: nil}} = query} ->
{data, []}

{:ok, true, %{authorize_results: []} = query} ->
Enum.reduce(data, {[], []}, fn record, {known, refetch} ->
case Ash.Expr.eval(query.filter,
record: data,
unknown_on_unknown_refs?: true
) do
{:ok, true} ->
{[record | known], refetch}

{:ok, false} ->
{known, refetch}

_ ->
{known, [record | refetch]}
end
end)

{:error, false, _} ->
{[], []}

_ ->
{[], data}
end

primary_key = Ash.Resource.Info.primary_key(resource)

primary_key_matches =
Enum.map(need_refetch, fn record ->
Map.take(record, primary_key)
end)

with {:ok, known_results} <- Ash.load(known_results, query),
{:ok, need_refetch} <- do_refetch(query, primary_key_matches) do
known_results
|> Stream.concat(need_refetch)
|> Enum.map(fn record ->
%{key => record}
end)
else
{:error, error} ->
# caught by the batch resolver
raise Ash.Error.to_error_class(error)
end
else
Enum.map(notifications, fn notification ->
%{key => AshGraphql.Resource.encode_id(notification.data, false)}
end)
end
end)

case List.flatten(subscription_events) do
[] ->
Absinthe.Resolution.put_result(
resolution,
{:error, to_errors([Ash.Error.Query.NotFound.exception()], context, domain)}
)

[first | rest] ->
Process.put(:batch_resolved, rest)

Absinthe.Resolution.put_result(
resolution,
{:ok, first}
)
end
end

{:error, error} ->
{:error, error}
end
end

def resolve(
%{arguments: args, context: context, root_value: notification} = resolution,
{domain, resource,
Expand Down Expand Up @@ -631,6 +791,14 @@ defmodule AshGraphql.Graphql.Resolver do
end
end

defp do_refetch(_query, []) do
{:ok, []}
end

defp do_refetch(query, primary_key_matches) do
Ash.read(Ash.Query.do_filter(query, or: primary_key_matches))
end

defp subcription_field_from_action_type(:create), do: "created"
defp subcription_field_from_action_type(:update), do: "updated"
defp subcription_field_from_action_type(:destroy), do: "destroyed"
Expand Down
Loading
Loading