Skip to content

Commit

Permalink
improvement: Add flag to enable transactions for write actions.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimsynz committed Oct 20, 2024
1 parent 28af181 commit 882527c
Show file tree
Hide file tree
Showing 13 changed files with 402 additions and 13 deletions.
1 change: 1 addition & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ spark_locals_without_parens = [
code?: 1,
deferrable: 1,
down: 1,
enable_write_transactions?: 1,
exclusion_constraint_names: 1,
foreign_key_names: 1,
identity_index_names: 1,
Expand Down
3 changes: 2 additions & 1 deletion documentation/dsls/DSL:-AshSqlite.DataLayer.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ end

| Name | Type | Default | Docs |
|------|------|---------|------|
| [`repo`](#sqlite-repo){: #sqlite-repo .spark-required} | `atom` | | The repo that will be used to fetch your data. See the `AshSqlite.Repo` documentation for more |
| [`repo`](#sqlite-repo){: #sqlite-repo .spark-required} | `module \| (any, any -> any)` | | The repo that will be used to fetch your data. See the `AshSqlite.Repo` documentation for more |
| [`migrate?`](#sqlite-migrate?){: #sqlite-migrate? } | `boolean` | `true` | Whether or not to include this resource in the generated migrations with `mix ash.generate_migrations` |
| [`migration_types`](#sqlite-migration_types){: #sqlite-migration_types } | `keyword` | `[]` | A keyword list of attribute names to the ecto migration type that should be used for that attribute. Only necessary if you need to override the defaults. |
| [`migration_defaults`](#sqlite-migration_defaults){: #sqlite-migration_defaults } | `keyword` | `[]` | A keyword list of attribute names to the ecto migration default that should be used for that attribute. The string you use will be placed verbatim in the migration. Use fragments like `fragment(\\"now()\\")`, or for `nil`, use `\\"nil\\"`. |
Expand All @@ -48,6 +48,7 @@ end
| [`migration_ignore_attributes`](#sqlite-migration_ignore_attributes){: #sqlite-migration_ignore_attributes } | `list(atom)` | `[]` | A list of attributes that will be ignored when generating migrations. |
| [`table`](#sqlite-table){: #sqlite-table } | `String.t` | | The table to store and read the resource from. If this is changed, the migration generator will not remove the old table. |
| [`polymorphic?`](#sqlite-polymorphic?){: #sqlite-polymorphic? } | `boolean` | `false` | Declares this resource as polymorphic. See the [polymorphic resources guide](/documentation/topics/resources/polymorphic-resources.md) for more. |
| [`enable_write_transactions?`](#sqlite-enable_write_transactions?){: #sqlite-enable_write_transactions? } | `boolean` | `false` | Enable write transactions for this resource. See the [SQLite transaction guide](/documentation/topics/about-as-sqlite/transactions.md) for more. |


## sqlite.custom_indexes
Expand Down
175 changes: 175 additions & 0 deletions documentation/topics/about-ash-sqlite/transactions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# Transactions and SQLite

By default SQLite3 allows only one write transaction to be running at a time. Any attempt to commit a transaction while another is running will result in an error. Because Elixir is a highly concurrent environment and [Ecto](https://hexdocs.pm/ecto) uses a connection pool by default, AshSqlite disables transactions by default. This can lead to some surprising behaviours if you're used to working with AshPostgres - for example after action hooks which fail will leave records behind, but the action which ran them will return an error. This document discusses some strategies for working around this constraint.

## Replacing transactions with Reactor sagas

A saga is a way of making transaction-like behaviour by explicitly telling the system to undo any changes it has made up until the point of failure. This works well for remote resources such as web APIs, but also for working with Ash data layers that do not support transactions. As a general rule; anything you could model with action lifecycle hooks can also be modelled with Reactor, with the addition of a bit more ceremony.

For our example, we'll use the idea of a system where posting a comment needs to increment an engagement score. Here's how you could naively implement it:

```elixir
defmodule MyApp.Blog.Comment do
use Ash.Resource,
data_layer: AshSqlite.DataLayer,
domain: MyApp.Blog

attributes do
uuid_primary_key :id

attribute :body, :string, allow_nil?: false, public?: true

create_timestamp :inserted_at
end


actions do
defaults [:read, :destroy, update: :*]

create :create do
argument :post_id, :uuid, allow_nil?: false, public?: true

primary? true

change manage_relationsip(:post_id, :post, type: :append)
change relate_to_actor(:author)

change after_action(fn _changeset, record, context ->
context.actor
|> Ash.Changeset.for_update(:increment_engagement)
|> Ash.update(actor: context.actor)
|> case do
{:ok, _} -> {:ok, record}
{:error, reason} -> {:error, reason}
end
end)
end
end

relationships do
belongs_to :author, MyApp.Blog.User, public?: true, writable?: true
belongs_to :post, MyApp.Blog.Post, public?: true, writable?: true
end
end

defmodule MyApp.Blog.User do
use Ash.Resource,
data_layer: AshSqlite.DataLayer,
domain: MyApp.Blog

attributes do
uuid_primary_key :id

attribute :name, :string, allow_nil?: false, public?: true
attribute :engagement_level, :integer, allow_nil?: false, default: 0, public?: true

create_timestamp :inserted_at
update_timestamp :updated_at
end

actions do
defaults [:read, :destroy, create: :*, update: :*]

update :increment_engagement do
public? true
change increment(:engagement_level, amount: 1)
end

update :decrement_engagement do
public? true
change increment(:engagement_level, amount: -1)
end
end

relationships do
has_many :posts, MyApp.Blog.Post, public?: true, destination_attribute: :author_id
has_many :comments, MyApp.Blog.Comment, public?: true, destination_attribute: :author_id
end
end
```

This would work as expected - as long as everything goes according to plan - if, however, there is a transient failure, or some kind of validation error in one of the hooks could cause the comment to be created, but the create action to still return an error.

Converting the create into a Reactor requires us to be explicit about how our steps are composed and what the dependencies between them are:

```elixir
defmodule MyApp.Blog.Comment do
# ...

actions do
defaults [:read, :destroy, update: :*, create: :*]

action :post_comment, :struct do
constraints instance_of: __MODULE__
argument :body, :string, allow_nil?: false, public?: true
argument :post_id, :uuid, allow_nil?: false, public?: true
argument :author_id, :uuid, allow_nil?: false, public?: true
run MyApp.Blog.PostCommentReactor
end
end

# ...
end

defmodule MyApp.Blog.PostCommentReactor do
use Reactor, extensions: [Ash.Reactor]

input :body
input :post_id
input :author_id

read_one :get_author, MyApp.Blog.User, :get_by_id do
inputs %{id: input(:author_id)}
fail_on_not_found? true
authorize? false
end

create :create_comment, MyApp.Blog.Comment, :create do
inputs %{
body: input(:body),
post_id: input(:post_id),
author_id: input(:author_id)
}

actor result(:get_author)
undo :always
undo_action :destroy
end

update :update_author_engagement, MyApp.Blog.User, :increment_engagement do
initial result(:get_author)
actor result(:get_author)
undo :always
undo_action :decrement_engagement
end

return :create_comment
end
```

> {: .neutral}
> Note that the examples above are edited for brevity and will not run with without modification
## Enabling transactions

Sometimes you really just want to be able to use a database transaction, in which case you can set `enable_write_transactions?` to `true` in the `sqlite` DSL block:

```elixir
defmodule MyApp.Blog.Post do
use Ash.Resource,
data_layer: AshSqlite.DataLayer,
domain: MyApp.Blog

sqlite do
repo MyApp.Repo
enable_write_transactions? true
end
end
```

This will allow you to set `transaction? true` on actions. Doing this needs very careful management to ensure that all transactions are serialised.

Strategies for serialising transactions include:

- Running all writes through a single `GenServer`.
- Using a separate write repo with a pool size of 1.
46 changes: 44 additions & 2 deletions lib/data_layer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ defmodule AshSqlite.DataLayer do
],
schema: [
repo: [
type: :atom,
type: {:or, [{:behaviour, Ecto.Repo}, {:fun, 2}]},
required: true,
doc:
"The repo that will be used to fetch your data. See the `AshSqlite.Repo` documentation for more"
Expand Down Expand Up @@ -278,6 +278,13 @@ defmodule AshSqlite.DataLayer do
doc: """
Declares this resource as polymorphic. See the [polymorphic resources guide](/documentation/topics/resources/polymorphic-resources.md) for more.
"""
],
enable_write_transactions?: [
type: :boolean,
default: false,
doc: """
Enable write transactions for this resource. See the [SQLite transaction guide](/documentation/topics/about-as-sqlite/transactions.md) for more.
"""
]
]
}
Expand All @@ -296,13 +303,40 @@ defmodule AshSqlite.DataLayer do
AshSqlite.Transformers.ValidateReferences,
AshSqlite.Transformers.VerifyRepo,
AshSqlite.Transformers.EnsureTableOrPolymorphic
],
verifiers: [
AshSqlite.Verifiers.VerifyTransactions
]

def migrate(args) do
# TODO: take args that we care about
Mix.Task.run("ash_sqlite.migrate", args)
end

@impl true
def transaction(resource, func, timeout \\ nil, reason \\ %{type: :custom, metadata: %{}}) do
repo =
case reason[:data_layer_context] do
%{repo: repo} when not is_nil(repo) ->
repo

_ ->
AshSqlite.DataLayer.Info.repo(resource, :read)
end

func = fn ->
repo.on_transaction_begin(reason)

func.()
end

if timeout do
repo.transaction(func, timeout: timeout)
else
repo.transaction(func)
end
end

def rollback(args) do
repos = AshSqlite.Mix.Helpers.repos!([], args)

Expand Down Expand Up @@ -385,7 +419,10 @@ defmodule AshSqlite.DataLayer do
def can?(_, :bulk_create), do: true
def can?(_, {:lock, _}), do: false

def can?(_, :transact), do: false
def can?(resource, :transact) do
Spark.Dsl.Extension.get_opt(resource, [:sqlite], :enable_write_transactions?, false)
end

def can?(_, :composite_primary_key), do: true
def can?(_, {:atomic, :update}), do: true
def can?(_, {:atomic, :upsert}), do: true
Expand Down Expand Up @@ -446,6 +483,11 @@ defmodule AshSqlite.DataLayer do
def can?(_, {:sort, _}), do: true
def can?(_, _), do: false

@impl true
def in_transaction?(resource) do
AshSqlite.DataLayer.Info.repo(resource, :mutate).in_transaction?()
end

@impl true
def limit(query, nil, _), do: {:ok, query}

Expand Down
10 changes: 8 additions & 2 deletions lib/data_layer/info.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@ defmodule AshSqlite.DataLayer.Info do
alias Spark.Dsl.Extension

@doc "The configured repo for a resource"
def repo(resource) do
Extension.get_opt(resource, [:sqlite], :repo, nil, true)
def repo(resource, type \\ :mutate) do
case Extension.get_opt(resource, [:sqlite], :repo, nil, true) do
fun when is_function(fun, 2) ->
fun.(resource, type)

repo ->
repo
end
end

@doc "The configured table for a resource"
Expand Down
21 changes: 13 additions & 8 deletions lib/repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,9 @@ defmodule AshSqlite.Repo do
@doc "Use this to inform the data layer about what extensions are installed"
@callback installed_extensions() :: [String.t()]

@doc """
Use this to inform the data layer about the oldest potential sqlite version it will be run on.

Must be an integer greater than or equal to 13.
"""
@callback min_pg_version() :: integer()
@doc "Called when a transaction starts"
@callback on_transaction_begin(reason :: Ash.DataLayer.transaction_reason()) :: term

@doc "The path where your migrations are stored"
@callback migrations_path() :: String.t() | nil
Expand All @@ -39,7 +36,6 @@ defmodule AshSqlite.Repo do
def installed_extensions, do: []
def migrations_path, do: nil
def override_migration_type(type), do: type
def min_pg_version, do: 10

def init(_, config) do
new_config =
Expand All @@ -51,6 +47,8 @@ defmodule AshSqlite.Repo do
{:ok, new_config}
end

def on_transaction_begin(_reason), do: :ok

def insert(struct_or_changeset, opts \\ []) do
struct_or_changeset
|> to_ecto()
Expand Down Expand Up @@ -82,6 +80,13 @@ defmodule AshSqlite.Repo do
end)
|> from_ecto()
end

def transaction!(fun) do
case fun.() do
{:ok, value} -> value
{:error, error} -> raise Ash.Error.to_error_class(error)
end
end

def from_ecto({:ok, result}), do: {:ok, from_ecto(result)}
def from_ecto({:error, _} = other), do: other
Expand Down Expand Up @@ -148,8 +153,8 @@ defmodule AshSqlite.Repo do

defoverridable init: 2,
installed_extensions: 0,
override_migration_type: 1,
min_pg_version: 0
on_transaction_begin: 1,
override_migration_type: 1
end
end
end
Loading

0 comments on commit 882527c

Please sign in to comment.