Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: include last checkout verification before idle termination #292

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions lib/finch/http1/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -175,30 +175,34 @@ defmodule Finch.HTTP1.Pool do
# Register our pool with our module name as the key. This allows the caller
# to determine the correct pool module to use to make the request
{:ok, _} = Registry.register(registry, shp, __MODULE__)
{:ok, {registry, shp, pool_idx, metric_ref, opts}}

{:ok, {registry, shp, pool_idx, metric_ref, opts, System.monotonic_time(:millisecond)}}
end

@impl NimblePool
def init_worker({_name, {scheme, host, port}, _pool_idx, _metric_ref, opts} = pool_state) do
def init_worker(
{_name, {scheme, host, port}, _pool_idx, _metric_ref, opts, _last_checkout_at} =
pool_state
) do
{:ok, Conn.new(scheme, host, port, opts, self()), pool_state}
end

@impl NimblePool
def handle_checkout(:checkout, _, %{mint: nil} = conn, pool_state) do
{_name, _shp, _pool_idx, metric_ref, _opts} = pool_state
{_name, _shp, _pool_idx, metric_ref, _opts, _last_checkout_ts} = pool_state
idle_time = System.monotonic_time() - conn.last_checkin
PoolMetrics.maybe_add(metric_ref, in_use_connections: 1)
{:ok, {:fresh, conn, idle_time}, conn, pool_state}
end

def handle_checkout(:checkout, _from, conn, pool_state) do
idle_time = System.monotonic_time() - conn.last_checkin
{_name, {scheme, host, port}, _pool_idx, metric_ref, _opts} = pool_state
{_name, {scheme, host, port}, _pool_idx, metric_ref, _opts, _last_checkout_ts} = pool_state

with true <- Conn.reusable?(conn, idle_time),
{:ok, conn} <- Conn.set_mode(conn, :passive) do
PoolMetrics.maybe_add(metric_ref, in_use_connections: 1)
{:ok, {:reuse, conn, idle_time}, conn, pool_state}
{:ok, {:reuse, conn, idle_time}, conn, update_last_checkout_ts(pool_state)}
else
false ->
meta = %{
Expand All @@ -221,7 +225,7 @@ defmodule Finch.HTTP1.Pool do

@impl NimblePool
def handle_checkin(checkin, _from, _old_conn, pool_state) do
{_name, _shp, _pool_idx, metric_ref, _opts} = pool_state
{_name, _shp, _pool_idx, metric_ref, _opts, _last_checkout_ts} = pool_state
PoolMetrics.maybe_add(metric_ref, in_use_connections: -1)

with {:ok, conn} <- checkin,
Expand All @@ -248,18 +252,27 @@ defmodule Finch.HTTP1.Pool do
end

@impl NimblePool
def handle_ping(_conn, pool_state) do
{_name, {scheme, host, port}, _pool_idx, _metric_ref, _opts} = pool_state
def handle_ping(conn, pool_state) do
{_name, {scheme, host, port}, _pool_idx, _metric_ref, opts, last_checkout_ts} = pool_state

meta = %{
scheme: scheme,
host: host,
port: port
}
max_idle_time = Map.get(opts, :pool_max_idle_time, :infinity)
now = System.monotonic_time(:millisecond)
diff_from_last_checkout = now - last_checkout_ts

cond do
is_number(max_idle_time) and diff_from_last_checkout > max_idle_time ->
meta = %{
scheme: scheme,
host: host,
port: port
}

Telemetry.event(:pool_max_idle_time_exceeded, %{}, meta)
Telemetry.event(:pool_max_idle_time_exceeded, %{}, meta)
{:stop, :idle_timeout}

{:stop, :idle_timeout}
true ->
{:ok, conn}
end
end

@impl NimblePool
Expand All @@ -272,7 +285,7 @@ defmodule Finch.HTTP1.Pool do

@impl NimblePool
def handle_cancelled(:checked_out, pool_state) do
{_name, _shp, _pool_idx, metric_ref, _opts} = pool_state
{_name, _shp, _pool_idx, metric_ref, _opts, _last_checkout_ts} = pool_state
PoolMetrics.maybe_add(metric_ref, in_use_connections: -1)
:ok
end
Expand Down Expand Up @@ -301,4 +314,7 @@ defmodule Finch.HTTP1.Pool do

defp pool_idle_timeout(:infinity), do: nil
defp pool_idle_timeout(pool_max_idle_time), do: pool_max_idle_time

defp update_last_checkout_ts(pool_state),
do: put_elem(pool_state, 5, System.monotonic_time(:millisecond))
end
69 changes: 69 additions & 0 deletions test/finch/http1/pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,72 @@ defmodule Finch.HTTP1.PoolTest do
:telemetry.detach(test_name)
end

@tag capture_log: true
test "should consider last checkout timestamp on pool idle termination", %{
bypass: bypass,
finch_name: finch_name
} do
parent = self()

start_supervised!(
{Finch,
name: finch_name,
pools: %{
default: [count: 1, size: 2, pool_max_idle_time: 200]
}}
)

Bypass.expect(bypass, fn conn ->
{"delay", str_delay} =
Enum.find(conn.req_headers, fn h -> match?({"delay", _}, h) end)

Process.sleep(String.to_integer(str_delay))
Plug.Conn.send_resp(conn, 200, "OK")
end)

delay_exec = fn ref, delay ->
send(parent, {ref, :start})

resp =
Finch.build(:get, endpoint(bypass), [{"delay", "#{delay}"}])
|> Finch.request(finch_name)

send(parent, {ref, :done})

resp
end

ref1 = make_ref()
Task.async(fn -> delay_exec.(ref1, 10) end)

ref2 = make_ref()
Task.async(fn -> delay_exec.(ref2, 10) end)

assert_receive {^ref1, :done}, 150

assert_receive {^ref2, :done}, 150

# after here the next idle termination will trigger in =~ ms

assert [{pool, _pool_mod}] = Registry.lookup(finch_name, shp(bypass))

Process.monitor(pool)

refute_receive {:DOWN, _, :process, ^pool, {:shutdown, :idle_timeout}}, 200

ref3 = make_ref()

Task.async(fn -> assert {:ok, %{status: 200}} = delay_exec.(ref3, 10) end)

assert_receive {^ref3, :done}, 150

refute_receive {:DOWN, _, :process, ^pool, {:shutdown, :idle_timeout}}, 200

assert_receive {:DOWN, _, :process, ^pool, {:shutdown, :idle_timeout}}, 200

assert [] = DynamicSupervisor.which_children(:"#{finch_name}.PoolSupervisor")
end

describe "async_request" do
@describetag bypass: false

Expand Down Expand Up @@ -139,4 +205,7 @@ defmodule Finch.HTTP1.PoolTest do
assert_receive {:DOWN, ^ref, _, _, _}, 500
end
end

defp shp(%{port: port}), do: {:http, "localhost", port}
defp shp({scheme, {:local, unix_socket}}), do: {scheme, {:local, unix_socket}, 0}
end
Loading