diff --git a/.changeset/lemon-taxis-stare.md b/.changeset/lemon-taxis-stare.md new file mode 100644 index 0000000000..a674890837 --- /dev/null +++ b/.changeset/lemon-taxis-stare.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Avoid stopping the beam process when an unrecoverable error is encountered. Instead, stop the main OTP supervisor. Required for multi-tenancy. diff --git a/integration-tests/tests/invalidated-replication-slot.lux b/integration-tests/tests/invalidated-replication-slot.lux index b7a1baada8..abf1653fef 100644 --- a/integration-tests/tests/invalidated-replication-slot.lux +++ b/integration-tests/tests/invalidated-replication-slot.lux @@ -6,12 +6,13 @@ [my invalidated_slot_error= """ - [error] :gen_statem {:"Elixir.Electric.ProcessRegistry:single_stack", {Electric.Postgres.ReplicationClient, nil}} terminating ** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot_integration" This slot has been invalidated because it exceeded the maximum reserved size. """] +[my stack_id="single_stack"] + ### ## Start a new Postgres cluster configured for easy replication slot invalidation. @@ -25,8 +26,11 @@ [shell electric] ??[info] Starting replication from postgres - # Reset the failure pattern because we'll be matching on an error. - - + # Verify that the stack supervisor is registered using regular process registration. If we + # change this at any point, the line below will catch it and we'll be able to correct the + # check further down that verifies that the stack supervisor is no longer running. + !IO.puts("Stack supervisor pid: #{inspect Process.whereis(Electric.StackSupervisor)}") + ??Stack supervisor pid: #PID< ## Seed the database with enough data to exceed max_wal_size and force a checkpoint that ## will invalidate the replication slot. @@ -36,21 +40,28 @@ [shell pg] ?invalidating slot "electric_slot_integration" because its restart_lsn [\d\w]+/[\d\w]+ exceeds max_slot_wal_keep_size +[macro verify_connection_and_stack_supervisors_shutdown stack_id invalidated_slot_error] + ??$invalidated_slot_error + ??[error] Stopping connection supervisor with stack_id=$stack_id due to an unrecoverable error + + !IO.puts("Stack supervisor pid: #{inspect Process.whereis(Electric.StackSupervisor)}") + ??Stack supervisor pid: nil +[endmacro] + ## Observe the fatal connection error. [shell electric] - ??$invalidated_slot_error + # Reset the failure pattern because we'll be matching on an error. + - - # Confirm Electric process exit. - ??$PS1 + [invoke verify_connection_and_stack_supervisors_shutdown $stack_id $invalidated_slot_error] -## Start the sync service once again to verify that it crashes due to the invalidated slot error. -[invoke setup_electric] + # Restart the OTP application to verify that the supervisors shut down again due to the invalidated slot. + !:ok = Application.stop(:electric) + !:ok = Application.start(:electric) -[shell electric] ??[info] Starting replication from postgres - - - ??$invalidated_slot_error - ??$PS1 + + [invoke verify_connection_and_stack_supervisors_shutdown $stack_id $invalidated_slot_error] [cleanup] [invoke teardown] diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index f9b885024a..4cea30b463 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -71,7 +71,8 @@ defmodule Electric.Application do ], pool_opts: [pool_size: Electric.Config.get_env(:db_pool_size)], storage: storage, - chunk_bytes_threshold: Electric.Config.get_env(:chunk_bytes_threshold) + chunk_bytes_threshold: Electric.Config.get_env(:chunk_bytes_threshold), + name: Electric.StackSupervisor }, {Electric.Telemetry, stack_id: stack_id, storage: storage}, {Bandit, diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index 2939d19e29..4f932d1221 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -365,13 +365,13 @@ defmodule Electric.Connection.Manager do # connection and the DB pool. If any of the latter two shut down, Connection.Manager will # itself terminate to be restarted by its supervisor in a clean state. def handle_info({:EXIT, pid, reason}, %State{replication_client_pid: pid} = state) do - halt_if_fatal_error!(reason) + with false <- stop_if_fatal_error(reason, state) do + Logger.debug( + "Handling the exit of the replication client #{inspect(pid)} with reason #{inspect(reason)}" + ) - Logger.debug( - "Handling the exit of the replication client #{inspect(pid)} with reason #{inspect(reason)}" - ) - - {:noreply, %{state | replication_client_pid: nil}, {:continue, :start_replication_client}} + {:noreply, %{state | replication_client_pid: nil}, {:continue, :start_replication_client}} + end end # The most likely reason for the lock connection or the DB pool to exit is the database @@ -524,8 +524,13 @@ defmodule Electric.Connection.Manager do end defp handle_connection_error(error, state, mode) do - halt_if_fatal_error!(error) + with false <- stop_if_fatal_error(error, state) do + state = schedule_reconnection_after_error(error, state, mode) + {:noreply, state} + end + end + defp schedule_reconnection_after_error(error, state, mode) do message = case error do %DBConnection.ConnectionError{message: message} -> @@ -553,8 +558,7 @@ defmodule Electric.Connection.Manager do is_nil(state.pool_pid) -> :start_connection_pool end - state = schedule_reconnection(step, state) - {:noreply, state} + schedule_reconnection(step, state) end defp pg_error_extra_info(pg_error) do @@ -573,23 +577,25 @@ defmodule Electric.Connection.Manager do end end - @invalid_slot_detail "This slot has been invalidated because it exceeded the maximum reserved size." - - defp halt_if_fatal_error!( + defp stop_if_fatal_error( %Postgrex.Error{ postgres: %{ code: :object_not_in_prerequisite_state, - detail: @invalid_slot_detail, - pg_code: "55000", - routine: "StartLogicalReplication" + detail: "This slot has been invalidated" <> _, + pg_code: "55000" } - } = error + } = error, + state ) do - System.stop(1) - exit(error) + # Perform supervisor shutdown in a task to avoid a circular dependency where the manager + # process is waiting for the supervisor to shut down its children, one of which is the + # manager process itself. + Task.start(Electric.Connection.Supervisor, :shutdown, [state.stack_id, error]) + + {:noreply, state} end - defp halt_if_fatal_error!(_), do: nil + defp stop_if_fatal_error(_, _), do: false defp schedule_reconnection(step, %State{backoff: {backoff, _}} = state) do {time, backoff} = :backoff.fail(backoff) diff --git a/packages/sync-service/lib/electric/connection/supervisor.ex b/packages/sync-service/lib/electric/connection/supervisor.ex index 2fca1ef689..d75de952c5 100644 --- a/packages/sync-service/lib/electric/connection/supervisor.ex +++ b/packages/sync-service/lib/electric/connection/supervisor.ex @@ -18,7 +18,14 @@ defmodule Electric.Connection.Supervisor do has successfully initialized a database connection pool. """ - use Supervisor + # This supervisor is meant to be a child of Electric.StackSupervisor. + # + # The `restart: :transient, significant: true` combo allows for shutting the supervisor down + # and signalling the parent supervisor to shut itself down as well if that one has + # `:auto_shutdown` set to `:any_significant` or `:all_significant`. + use Supervisor, restart: :transient, significant: true + + require Logger def name(opts) do Electric.ProcessRegistry.name(opts[:stack_id], __MODULE__) @@ -28,6 +35,15 @@ defmodule Electric.Connection.Supervisor do Supervisor.start_link(__MODULE__, opts, name: name(opts)) end + def shutdown(stack_id, reason) do + Logger.error( + "Stopping connection supervisor with stack_id=#{inspect(stack_id)} " <> + "due to an unrecoverable error: #{inspect(reason)}" + ) + + Supervisor.stop(name(stack_id: stack_id), {:shutdown, reason}, 1_000) + end + def init(opts) do Process.set_label({:connection_supervisor, opts[:stack_id]}) Logger.metadata(stack_id: opts[:stack_id]) diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index 71c355a3cd..a35ae1dbde 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -28,6 +28,8 @@ defmodule Electric.StackSupervisor do 2. `Electric.Replication.ShapeLogCollector` collects transactions from the replication connection, fanning them out to `Electric.Shapes.Consumer` (4.1.1.2) 3. `Electric.ShapeCache` coordinates shape creation and handle allocation, shape metadata """ + + # Setting `restart: :transient` is required for passing the `:auto_shutdown` to `Supervisor.init()` below. use Supervisor, restart: :transient @opts_schema NimbleOptions.new!( @@ -119,6 +121,11 @@ defmodule Electric.StackSupervisor do Registry.register(registry, {:stack_status, stack_id}, value) end + # noop if there's no registry running + def dispatch_stack_event(nil, _stack_id, _event) do + :ok + end + def dispatch_stack_event(registry, stack_id, event) do Registry.dispatch(registry, {:stack_status, stack_id}, fn entries -> for {pid, ref} <- entries do