Skip to content

Commit

Permalink
Plug pour vérifier le bon fonctionnement du worker (#4399)
Browse files Browse the repository at this point in the history
* Ajout plug healthcheck spécifique pour le worker

* Add tests

* Even more tests

* Add AppSignal metric

* PR review
  • Loading branch information
AntoineAugusti authored Dec 19, 2024
1 parent f9f563d commit d1730e6
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 1 deletion.
2 changes: 1 addition & 1 deletion apps/transport/lib/transport_web/plugs/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule TransportWeb.Plugs.Router do
use Plug.Router

plug(TransportWeb.Plugs.HealthCheck, at: "/health-check")
plug(TransportWeb.Plugs.Halt, if: {Transport.Application, :worker_only?}, message: "UP (WORKER-ONLY)")
plug(TransportWeb.Plugs.WorkerHealthcheck, if: {Transport.Application, :worker_only?})

plug(:match)
plug(:dispatch)
Expand Down
82 changes: 82 additions & 0 deletions apps/transport/lib/transport_web/plugs/worker_healthcheck.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
defmodule TransportWeb.Plugs.WorkerHealthcheck do
@moduledoc """
A plug for the worker.
It can be conditionally enabled by passing an `:if` condition that will be evaluated.
It displays:
- when the app was started
- the last attempt for Oban jobs
- if the system is healthy
The system is considered healthy if the app was started recently or
if Oban attempted jobs recently.
"""
import Plug.Conn

@app_start_waiting_delay {20, :minute}
@oban_max_delay_since_last_attempt {60, :minute}

def init(options), do: options

def call(conn, opts) do
{mod, fun} = opts[:if]

if apply(mod, fun, []) do
store_last_attempted_at_delay_metric()
status_code = if healthy_state?(), do: 200, else: 503

conn
|> put_resp_content_type("text/plain")
|> send_resp(status_code, """
UP (WORKER-ONLY)
App start time: #{app_start_datetime()}
App started recently?: #{app_started_recently?()}
Oban last attempt: #{oban_last_attempted_at()}
Oban attempted jobs recently?: #{oban_attempted_jobs_recently?()}
Healthy state?: #{healthy_state?()}
""")
|> halt()
else
conn
end
end

def store_last_attempted_at_delay_metric do
value = DateTime.diff(oban_last_attempted_at(), DateTime.utc_now(), :second)
Appsignal.add_distribution_value("oban.last_attempted_at_delay", value)
end

def healthy_state? do
app_started_recently?() or oban_attempted_jobs_recently?()
end

def app_started_recently? do
{delay, unit} = @app_start_waiting_delay
DateTime.diff(DateTime.utc_now(), app_start_datetime(), unit) < delay
end

def app_start_datetime do
Transport.Cache.fetch(app_start_datetime_cache_key_name(), fn -> DateTime.utc_now() end, expire: nil)
end

def app_start_datetime_cache_key_name, do: "#{__MODULE__}::app_start_datetime"

def oban_attempted_jobs_recently? do
{delay, unit} = @oban_max_delay_since_last_attempt
DateTime.after?(oban_last_attempted_at(), DateTime.add(DateTime.utc_now(), -delay, unit))
end

def oban_last_attempted_at do
%Postgrex.Result{rows: [[delay]]} =
DB.Repo.query!("""
SELECT MAX(attempted_at)
FROM oban_jobs
WHERE state = 'completed'
""")

case delay do
nil -> DateTime.new!(~D[1970-01-01], ~T[00:00:00.000], "Etc/UTC")
%NaiveDateTime{} = nt -> DateTime.from_naive!(nt, "Etc/UTC")
end
end
end
129 changes: 129 additions & 0 deletions apps/transport/test/transport_web/plugs/worker_healthcheck_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
defmodule TransportWeb.Plugs.WorkerHealthcheckTest do
# async: false is required because we use real in-memory caching in these tests,
# and we swap application config (shared state)
use TransportWeb.ConnCase, async: false
alias TransportWeb.Plugs.WorkerHealthcheck

@cache_name Transport.Cache.Cachex.cache_name()
@cache_key WorkerHealthcheck.app_start_datetime_cache_key_name()

setup do
# Use a real in-memory cache for these tests to test the caching mecanism
old_value = Application.fetch_env!(:transport, :cache_impl)
Application.put_env(:transport, :cache_impl, Transport.Cache.Cachex)

on_exit(fn ->
Application.put_env(:transport, :cache_impl, old_value)
Cachex.reset(@cache_name)
end)

Ecto.Adapters.SQL.Sandbox.checkout(DB.Repo)
end

describe "healthy_state?" do
test "app was started recently, no Oban jobs" do
assert WorkerHealthcheck.app_started_recently?()
refute WorkerHealthcheck.oban_attempted_jobs_recently?()
assert WorkerHealthcheck.healthy_state?()
end

test "app was not started recently, Oban jobs have not been attempted recently" do
datetime = DateTime.add(DateTime.utc_now(), -30, :minute)
Cachex.put(@cache_name, @cache_key, datetime)

refute WorkerHealthcheck.app_started_recently?()
refute WorkerHealthcheck.oban_attempted_jobs_recently?()
refute WorkerHealthcheck.healthy_state?()
end

test "app was not started recently, Oban jobs have been attempted recently" do
datetime = DateTime.add(DateTime.utc_now(), -30, :minute)
Cachex.put(@cache_name, @cache_key, datetime)

# A completed job was attempted 55 minutes ago
Transport.Jobs.ResourceUnavailableJob.new(%{resource_id: 1})
|> Oban.insert!()
|> Ecto.Changeset.change(attempted_at: DateTime.add(DateTime.utc_now(), -55, :minute), state: "completed")
|> DB.Repo.update!()

refute WorkerHealthcheck.app_started_recently?()
assert WorkerHealthcheck.oban_attempted_jobs_recently?()
assert WorkerHealthcheck.healthy_state?()
end
end

describe "app_started_recently?" do
test "value is set when executed for the first time" do
assert {:ok, false} == Cachex.exists?(@cache_name, @cache_key)
# Calling for the first time creates the key
assert WorkerHealthcheck.app_started_recently?()
assert {:ok, true} == Cachex.exists?(@cache_name, @cache_key)

# Calling again does not refresh the initial value
start_datetime = WorkerHealthcheck.app_start_datetime()
WorkerHealthcheck.app_started_recently?()
assert start_datetime == WorkerHealthcheck.app_start_datetime()

# Key does not expire
assert {:ok, nil} == Cachex.ttl(@cache_name, @cache_key)
end

test "acceptable delay is 20 minutes" do
# Just right
datetime = DateTime.add(DateTime.utc_now(), -19, :minute)
Cachex.put(@cache_name, @cache_key, datetime)

assert WorkerHealthcheck.app_started_recently?()

# Too long ago
datetime = DateTime.add(DateTime.utc_now(), -21, :minute)
Cachex.put(@cache_name, @cache_key, datetime)
refute WorkerHealthcheck.app_started_recently?()
end
end

describe "oban_attempted_jobs_recently?" do
test "job attempted recently" do
# Attempted less than 60 minutes ago
Transport.Jobs.ResourceUnavailableJob.new(%{resource_id: 1})
|> Oban.insert!()
|> Ecto.Changeset.change(attempted_at: DateTime.add(DateTime.utc_now(), -59, :minute), state: "completed")
|> DB.Repo.update!()

assert WorkerHealthcheck.oban_attempted_jobs_recently?()
end

test "job attempted too long ago" do
# Attempted more than 60 minutes ago
Transport.Jobs.ResourceUnavailableJob.new(%{resource_id: 1})
|> Oban.insert!()
|> Ecto.Changeset.change(attempted_at: DateTime.add(DateTime.utc_now(), -61, :minute), state: "completed")
|> DB.Repo.update!()

refute WorkerHealthcheck.oban_attempted_jobs_recently?()
end
end

describe "call" do
test "healthy system", %{conn: conn} do
assert WorkerHealthcheck.app_started_recently?()
refute WorkerHealthcheck.oban_attempted_jobs_recently?()
assert WorkerHealthcheck.healthy_state?()

assert conn |> WorkerHealthcheck.call(if: {__MODULE__, :plug_enabled?}) |> text_response(200)
end

test "unhealthy system", %{conn: conn} do
datetime = DateTime.add(DateTime.utc_now(), -30, :minute)
Cachex.put(@cache_name, @cache_key, datetime)

refute WorkerHealthcheck.app_started_recently?()
refute WorkerHealthcheck.oban_attempted_jobs_recently?()
refute WorkerHealthcheck.healthy_state?()

assert conn |> WorkerHealthcheck.call(if: {__MODULE__, :plug_enabled?}) |> text_response(503)
end
end

def plug_enabled?, do: true
end

0 comments on commit d1730e6

Please sign in to comment.