diff --git a/lib/ecto/adapters/myxql.ex b/lib/ecto/adapters/myxql.ex index bfd9e41d..81b6165c 100644 --- a/lib/ecto/adapters/myxql.ex +++ b/lib/ecto/adapters/myxql.ex @@ -149,12 +149,12 @@ defmodule Ecto.Adapters.MyXQL do ## Custom MySQL types @impl true - def loaders({:map, _}, type), do: [&json_decode/1, &Ecto.Type.embedded_load(type, &1, :json)] - def loaders(:map, type), do: [&json_decode/1, type] - def loaders(:float, type), do: [&float_decode/1, type] - def loaders(:boolean, type), do: [&bool_decode/1, type] - def loaders(:binary_id, type), do: [Ecto.UUID, type] - def loaders(_, type), do: [type] + def loaders({:map, _}, type), do: [&json_decode/1, &Ecto.Type.embedded_load(type, &1, :json)] + def loaders(:map, type), do: [&json_decode/1, type] + def loaders(:float, type), do: [&float_decode/1, type] + def loaders(:boolean, type), do: [&bool_decode/1, type] + def loaders(:binary_id, type), do: [Ecto.UUID, type] + def loaders(_, type), do: [type] defp bool_decode(<<0>>), do: {:ok, false} defp bool_decode(<<1>>), do: {:ok, true} @@ -174,14 +174,19 @@ defmodule Ecto.Adapters.MyXQL do @impl true def storage_up(opts) do - database = Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration" + database = + Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration" + opts = Keyword.delete(opts, :database) charset = opts[:charset] || "utf8mb4" - check_existence_command = "SELECT TRUE FROM information_schema.schemata WHERE schema_name = '#{database}'" + check_existence_command = + "SELECT TRUE FROM information_schema.schemata WHERE schema_name = '#{database}'" + case run_query(check_existence_command, opts) do {:ok, %{num_rows: 1}} -> {:error, :already_up} + _ -> create_command = ~s(CREATE DATABASE `#{database}` DEFAULT CHARACTER SET = #{charset}) @@ -190,36 +195,46 @@ defmodule Ecto.Adapters.MyXQL do case run_query(create_command, opts) do {:ok, _} -> :ok + {:error, %{mysql: %{name: :ER_DB_CREATE_EXISTS}}} -> {:error, :already_up} + {:error, error} -> {:error, Exception.message(error)} + {:exit, exit} -> {:error, exit_to_exception(exit)} end end end - defp concat_if(content, nil, _fun), do: content + defp concat_if(content, nil, _fun), do: content defp concat_if(content, value, fun), do: content <> " " <> fun.(value) @impl true def storage_down(opts) do - database = Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration" + database = + Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration" + opts = Keyword.delete(opts, :database) command = "DROP DATABASE `#{database}`" case run_query(command, opts) do {:ok, _} -> :ok + {:error, %{mysql: %{name: :ER_DB_DROP_EXISTS}}} -> {:error, :already_down} + {:error, %{mysql: %{name: :ER_BAD_DB_ERROR}}} -> {:error, :already_down} + {:error, error} -> {:error, Exception.message(error)} + {:exit, :killed} -> {:error, :already_down} + {:exit, exit} -> {:error, exit_to_exception(exit)} end @@ -227,10 +242,13 @@ defmodule Ecto.Adapters.MyXQL do @impl Ecto.Adapter.Storage def storage_status(opts) do - database = Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration" + database = + Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration" + opts = Keyword.delete(opts, :database) - check_database_query = "SELECT schema_name FROM information_schema.schemata WHERE schema_name = '#{database}'" + check_database_query = + "SELECT schema_name FROM information_schema.schemata WHERE schema_name = '#{database}'" case run_query(check_database_query, opts) do {:ok, %{num_rows: 0}} -> :down @@ -252,7 +270,7 @@ defmodule Ecto.Adapters.MyXQL do Ecto.Adapters.SQL.raise_migration_pool_size_error() end - opts = Keyword.merge(opts, [timeout: :infinity, telemetry_options: [schema_migration: true]]) + opts = Keyword.merge(opts, timeout: :infinity, telemetry_options: [schema_migration: true]) {:ok, result} = transaction(meta, opts, fn -> @@ -277,11 +295,13 @@ defmodule Ecto.Adapters.MyXQL do key = primary_key!(schema_meta, returning) {fields, values} = :lists.unzip(params) sql = @conn.insert(prefix, source, fields, [fields], on_conflict, [], []) - opts = if is_nil(Keyword.get(opts, :cache_statement)) do - [{:cache_statement, "ecto_insert_#{source}_#{length(fields)}"} | opts] - else - opts - end + + opts = + if is_nil(Keyword.get(opts, :cache_statement)) do + [{:cache_statement, "ecto_insert_#{source}_#{length(fields)}"} | opts] + else + opts + end case Ecto.Adapters.SQL.query(adapter_meta, sql, values ++ query_params, opts) do {:ok, %{num_rows: 0}} -> @@ -297,7 +317,7 @@ defmodule Ecto.Adapters.MyXQL do {:error, err} -> case @conn.to_constraints(err, source: source) do - [] -> raise err + [] -> raise err constraints -> {:invalid, constraints} end end @@ -305,9 +325,11 @@ defmodule Ecto.Adapters.MyXQL do defp primary_key!(%{autogenerate_id: {_, key, _type}}, [key]), do: key defp primary_key!(_, []), do: nil + defp primary_key!(%{schema: schema}, returning) do - raise ArgumentError, "MySQL does not support :read_after_writes in schemas for non-primary keys. " <> - "The following fields in #{inspect schema} are tagged as such: #{inspect returning}" + raise ArgumentError, + "MySQL does not support :read_after_writes in schemas for non-primary keys. " <> + "The following fields in #{inspect(schema)} are tagged as such: #{inspect(returning)}" end defp last_insert_id(nil, _last_insert_id), do: [] @@ -369,7 +391,12 @@ defmodule Ecto.Adapters.MyXQL do def structure_load(default, config) do path = config[:dump_path] || Path.join(default, "structure.sql") - args = ["--execute", "SET FOREIGN_KEY_CHECKS = 0; SOURCE #{path}; SET FOREIGN_KEY_CHECKS = 1", "--database", config[:database]] + args = [ + "--execute", + "SET FOREIGN_KEY_CHECKS = 0; SOURCE #{path}; SET FOREIGN_KEY_CHECKS = 1", + "--database", + config[:database] + ] case run_with_cmd("mysql", config, args) do {_output, 0} -> {:ok, path} @@ -401,23 +428,27 @@ defmodule Ecto.Adapters.MyXQL do |> Keyword.put(:backoff_type, :stop) |> Keyword.put(:max_restarts, 0) - task = Task.Supervisor.async_nolink(Ecto.Adapters.SQL.StorageSupervisor, fn -> - {:ok, conn} = MyXQL.start_link(opts) + task = + Task.Supervisor.async_nolink(Ecto.Adapters.SQL.StorageSupervisor, fn -> + {:ok, conn} = MyXQL.start_link(opts) - value = MyXQL.query(conn, sql, [], opts) - GenServer.stop(conn) - value - end) + value = MyXQL.query(conn, sql, [], opts) + GenServer.stop(conn) + value + end) timeout = Keyword.get(opts, :timeout, 15_000) case Task.yield(task, timeout) || Task.shutdown(task) do {:ok, {:ok, result}} -> {:ok, result} + {:ok, {:error, error}} -> {:error, error} + {:exit, exit} -> {:exit, exit} + nil -> {:error, RuntimeError.exception("command timed out")} end @@ -432,7 +463,7 @@ defmodule Ecto.Adapters.MyXQL do defp run_with_cmd(cmd, opts, opt_args, cmd_opts \\ []) do unless System.find_executable(cmd) do raise "could not find executable `#{cmd}` in path, " <> - "please guarantee it is available before running ecto commands" + "please guarantee it is available before running ecto commands" end env = @@ -442,8 +473,8 @@ defmodule Ecto.Adapters.MyXQL do [] end - host = opts[:hostname] || System.get_env("MYSQL_HOST") || "localhost" - port = opts[:port] || System.get_env("MYSQL_TCP_PORT") || "3306" + host = opts[:hostname] || System.get_env("MYSQL_HOST") || "localhost" + port = opts[:port] || System.get_env("MYSQL_TCP_PORT") || "3306" protocol = opts[:cli_protocol] || System.get_env("MYSQL_CLI_PROTOCOL") || "tcp" user_args = @@ -455,10 +486,13 @@ defmodule Ecto.Adapters.MyXQL do args = [ - "--host", host, - "--port", to_string(port), - "--protocol", protocol - ] ++ user_args ++ opt_args + "--host", + host, + "--port", + to_string(port), + "--protocol", + protocol + ] ++ user_args ++ opt_args cmd_opts = cmd_opts diff --git a/lib/ecto/adapters/postgres.ex b/lib/ecto/adapters/postgres.ex index 9058df44..ae05323b 100644 --- a/lib/ecto/adapters/postgres.ex +++ b/lib/ecto/adapters/postgres.ex @@ -149,10 +149,10 @@ defmodule Ecto.Adapters.Postgres do # Support arrays in place of IN @impl true - def dumpers({:map, _}, type), do: [&Ecto.Type.embedded_dump(type, &1, :json)] + def dumpers({:map, _}, type), do: [&Ecto.Type.embedded_dump(type, &1, :json)] def dumpers({:in, sub}, {:in, sub}), do: [{:array, sub}] - def dumpers(:binary_id, type), do: [type, Ecto.UUID] - def dumpers(_, type), do: [type] + def dumpers(:binary_id, type), do: [type, Ecto.UUID] + def dumpers(_, type), do: [type] ## Query API @@ -162,7 +162,7 @@ defmodule Ecto.Adapters.Postgres do unless valid_prepare?(prepare) do raise ArgumentError, - "expected option `:prepare` to be either `:named` or `:unnamed`, got: #{inspect(prepare)}" + "expected option `:prepare` to be either `:named` or `:unnamed`, got: #{inspect(prepare)}" end Ecto.Adapters.SQL.execute(prepare, adapter_meta, query_meta, query, params, opts) @@ -209,23 +209,29 @@ defmodule Ecto.Adapters.Postgres do end end - defp concat_if(content, nil, _), do: content - defp concat_if(content, false, _), do: content + defp concat_if(content, nil, _), do: content + defp concat_if(content, false, _), do: content defp concat_if(content, value, fun), do: content <> " " <> fun.(value) @impl true def storage_down(opts) do - database = Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration" - command = "DROP DATABASE \"#{database}\"" - |> concat_if(opts[:force_drop], fn _ -> "WITH (FORCE)" end) + database = + Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration" + + command = + "DROP DATABASE \"#{database}\"" + |> concat_if(opts[:force_drop], fn _ -> "WITH (FORCE)" end) + maintenance_database = Keyword.get(opts, :maintenance_database, @default_maintenance_database) opts = Keyword.put(opts, :database, maintenance_database) case run_query(command, opts) do {:ok, _} -> :ok + {:error, %{postgres: %{code: :invalid_catalog_name}}} -> {:error, :already_down} + {:error, error} -> {:error, Exception.message(error)} end @@ -233,11 +239,14 @@ defmodule Ecto.Adapters.Postgres do @impl Ecto.Adapter.Storage def storage_status(opts) do - database = Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration" + database = + Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration" + maintenance_database = Keyword.get(opts, :maintenance_database, @default_maintenance_database) opts = Keyword.put(opts, :database, maintenance_database) - check_database_query = "SELECT datname FROM pg_catalog.pg_database WHERE datname = '#{database}'" + check_database_query = + "SELECT datname FROM pg_catalog.pg_database WHERE datname = '#{database}'" case run_query(check_database_query, opts) do {:ok, %{num_rows: 0}} -> :down @@ -259,7 +268,7 @@ defmodule Ecto.Adapters.Postgres do Ecto.Adapters.SQL.raise_migration_pool_size_error() end - opts = Keyword.merge(opts, [timeout: :infinity, telemetry_options: [schema_migration: true]]) + opts = Keyword.merge(opts, timeout: :infinity, telemetry_options: [schema_migration: true]) config = repo.config() lock_strategy = Keyword.get(config, :migration_lock, :table_lock) do_lock_for_migrations(lock_strategy, meta, opts, config, fun) @@ -285,7 +294,9 @@ defmodule Ecto.Adapters.Postgres do # # https://www.postgresql.org/docs/9.4/explicit-locking.html source = Keyword.get(opts, :migration_source, "schema_migrations") table = if prefix = opts[:prefix], do: ~s|"#{prefix}"."#{source}"|, else: ~s|"#{source}"| - {:ok, _} = Ecto.Adapters.SQL.query(meta, "LOCK TABLE #{table} IN SHARE UPDATE EXCLUSIVE MODE", [], opts) + lock_statement = "LOCK TABLE #{table} IN SHARE UPDATE EXCLUSIVE MODE" + {:ok, _} = Ecto.Adapters.SQL.query(meta, lock_statement, [], opts) + fun.() end) @@ -293,18 +304,20 @@ defmodule Ecto.Adapters.Postgres do end defp advisory_lock(meta, opts, lock, retry_state, fun) do - result = checkout(meta, opts, fn -> - case Ecto.Adapters.SQL.query(meta, "SELECT pg_try_advisory_lock(#{lock})", [], opts) do - {:ok, %{rows: [[true]]}} -> - try do - {:ok, fun.()} - after - release_advisory_lock(meta, opts, lock) - end - _ -> - :no_advisory_lock - end - end) + result = + checkout(meta, opts, fn -> + case Ecto.Adapters.SQL.query(meta, "SELECT pg_try_advisory_lock(#{lock})", [], opts) do + {:ok, %{rows: [[true]]}} -> + try do + {:ok, fun.()} + after + release_advisory_lock(meta, opts, lock) + end + + _ -> + :no_advisory_lock + end + end) case result do {:ok, fun_result} -> @@ -319,6 +332,7 @@ defmodule Ecto.Adapters.Postgres do case Ecto.Adapters.SQL.query(meta, "SELECT pg_advisory_unlock(#{lock})", [], opts) do {:ok, %{rows: [[true]]}} -> :ok + _ -> raise "failed to release advisory lock" end @@ -331,7 +345,9 @@ defmodule Ecto.Adapters.Postgres do raise "failed to obtain advisory lock. Tried #{max_tries} times waiting #{interval}ms between tries" else if Keyword.get(opts, :log_migrator_sql, false) do - Logger.info("Migration lock occupied for #{inspect(meta.repo)}. Retry #{tries + 1}/#{max_tries} at #{interval}ms intervals.") + Logger.info( + "Migration lock occupied for #{inspect(meta.repo)}. Retry #{tries + 1}/#{max_tries} at #{interval}ms intervals." + ) end Process.sleep(interval) @@ -343,6 +359,7 @@ defmodule Ecto.Adapters.Postgres do @impl true def structure_dump(default, config) do table = config[:migration_source] || "schema_migrations" + with {:ok, versions} <- select_versions(table, config), {:ok, path} <- pg_dump(default, config), do: append_versions(table, versions, path) @@ -354,7 +371,7 @@ defmodule Ecto.Adapters.Postgres do result = Enum.reduce_while(prefixes, [], fn prefix, versions -> case run_query(~s[SELECT version FROM #{prefix}."#{table}" ORDER BY version], config) do - {:ok, %{rows: rows}} -> {:cont, Enum.map(rows, &{prefix, hd(&1)}) ++ versions } + {:ok, %{rows: rows}} -> {:cont, Enum.map(rows, &{prefix, hd(&1)}) ++ versions} {:error, %{postgres: %{code: :undefined_table}}} -> {:cont, versions} {:error, _} = error -> {:halt, error} end @@ -381,6 +398,7 @@ defmodule Ecto.Adapters.Postgres do case run_with_cmd("pg_dump", config, args) do {_output, 0} -> {:ok, path} + {output, _} -> {:error, output} end @@ -407,9 +425,10 @@ defmodule Ecto.Adapters.Postgres do def structure_load(default, config) do path = config[:dump_path] || Path.join(default, "structure.sql") args = ["--quiet", "--file", path, "-vON_ERROR_STOP=1", "--single-transaction"] + case run_with_cmd("psql", config, args) do {_output, 0} -> {:ok, path} - {output, _} -> {:error, output} + {output, _} -> {:error, output} end end @@ -429,26 +448,31 @@ defmodule Ecto.Adapters.Postgres do |> Keyword.put(:backoff_type, :stop) |> Keyword.put(:max_restarts, 0) - task = Task.Supervisor.async_nolink(Ecto.Adapters.SQL.StorageSupervisor, fn -> - {:ok, conn} = Postgrex.start_link(opts) + task = + Task.Supervisor.async_nolink(Ecto.Adapters.SQL.StorageSupervisor, fn -> + {:ok, conn} = Postgrex.start_link(opts) - value = Postgrex.query(conn, sql, [], opts) - GenServer.stop(conn) - value - end) + value = Postgrex.query(conn, sql, [], opts) + GenServer.stop(conn) + value + end) timeout = Keyword.get(opts, :timeout, 15_000) case Task.yield(task, timeout) || Task.shutdown(task) do {:ok, {:ok, result}} -> {:ok, result} + {:ok, {:error, error}} -> {:error, error} + {:exit, {%{__struct__: struct} = error, _}} - when struct in [Postgrex.Error, DBConnection.Error] -> + when struct in [Postgrex.Error, DBConnection.Error] -> {:error, error} - {:exit, reason} -> + + {:exit, reason} -> {:error, RuntimeError.exception(Exception.format_exit(reason))} + nil -> {:error, RuntimeError.exception("command timed out")} end @@ -457,26 +481,22 @@ defmodule Ecto.Adapters.Postgres do defp run_with_cmd(cmd, opts, opt_args, cmd_opts \\ []) do unless System.find_executable(cmd) do raise "could not find executable `#{cmd}` in path, " <> - "please guarantee it is available before running ecto commands" + "please guarantee it is available before running ecto commands" end - env = - [{"PGCONNECT_TIMEOUT", "10"}] + env = [{"PGCONNECT_TIMEOUT", "10"}] + env = if password = opts[:password] do - [{"PGPASSWORD", password}|env] + [{"PGPASSWORD", password} | env] else env end - args = - [] - args = - if username = opts[:username], do: ["--username", username | args], else: args - args = - if port = opts[:port], do: ["--port", to_string(port) | args], else: args - args = - if database = opts[:database], do: ["--dbname", database | args], else: args + args = [] + args = if username = opts[:username], do: ["--username", username | args], else: args + args = if port = opts[:port], do: ["--port", to_string(port) | args], else: args + args = if database = opts[:database], do: ["--dbname", database | args], else: args host = opts[:socket_dir] || opts[:hostname] || System.get_env("PGHOST") || "localhost"