Skip to content

Commit

Permalink
Reconnect after all nodes were offline (#375)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrea Leopardi <an.leopardi@gmail.com>
  • Loading branch information
harunzengin and whatyouhide authored Dec 11, 2024
1 parent fc1bbdd commit 5ef30e9
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 27 deletions.
3 changes: 3 additions & 0 deletions lib/xandra/cluster/control_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ defmodule Xandra.Cluster.ControlConnection do

execute_telemetry(state, [:control_connection, :connected], %{}, %{})

# We know that the node we just connected to is up
send(state.cluster_pid, {:host_up, state.ip, state.port})

# We set up a timer to periodically refresh the topology.
schedule_refresh_topology(state.refresh_topology_interval)

Expand Down
60 changes: 37 additions & 23 deletions lib/xandra/cluster/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -241,27 +241,21 @@ defmodule Xandra.Cluster.Pool do

def handle_event(:info, {:host_up, address, port}, _state, %__MODULE__{} = data) do
# Set the host's status as :up if its state had been previously marked as :down.
{%Host{} = host, data} =
get_and_update_in(data.peers[{address, port}], fn
%{status: :down, host: host} = peer -> {host, %{peer | status: :up}}
%{host: host} = peer -> {host, peer}
end)

data = update_in(data.load_balancing_state, &data.load_balancing_module.host_up(&1, host))
data =
case data.peers[{address, port}] do
%{status: :down} -> set_host_status(data, {address, port}, :up)
_ -> data
end

data = maybe_start_pools(data)
{:keep_state, data}
end

def handle_event(:info, {:host_down, address, port}, _state, %__MODULE__{} = data) do
# Set the host's status as :down, regardless of its current state.
{%Host{} = host, data} =
get_and_update_in(data.peers[{address, port}], fn %{host: host} = peer ->
{host, %{peer | status: :down}}
end)

data = update_in(data.load_balancing_state, &data.load_balancing_module.host_down(&1, host))

peername = {address, port}
data = set_host_status(data, peername, :down)
host = data.peers[peername].host
data = stop_pool(data, host)
data = maybe_start_pools(data)
{:keep_state, data}
Expand Down Expand Up @@ -305,11 +299,7 @@ defmodule Xandra.Cluster.Pool do
%__MODULE__{} = data
)
when is_peername(peername) do
data = put_in(data.peers[peername].status, :connected)
host = data.peers[peername].host

data =
update_in(data.load_balancing_state, &data.load_balancing_module.host_connected(&1, host))
data = set_host_status(data, peername, :connected)

if alias = data.sync_connect_alias do
send(alias, {alias, :connected})
Expand All @@ -327,8 +317,10 @@ defmodule Xandra.Cluster.Pool do
)
when is_peername(peername) do
# Not connected anymore, but we're not really sure if the whole host is down.
data = put_in(data.peers[peername].status, :up)
data = set_host_status(data, peername, :up)
data = stop_pool(data, data.peers[peername].host)
# There might be more hosts that we could connect to instead of the stopped one
data = maybe_start_pools(data)
{:keep_state, data}
end

Expand All @@ -340,8 +332,11 @@ defmodule Xandra.Cluster.Pool do
%__MODULE__{} = data
) do
if data.peers[peername] do
data = put_in(data.peers[peername].status, :down)
data = stop_pool(data, data.peers[peername].host)
data = set_host_status(data, peername, :down)
host = data.peers[peername].host
data = stop_pool(data, host)
# There might be more hosts that we could connect to instead of the stopped one
data = maybe_start_pools(data)
{:keep_state, data}
else
{:keep_state, data}
Expand Down Expand Up @@ -374,7 +369,7 @@ defmodule Xandra.Cluster.Pool do
{peername, _info} = Enum.find(data.peers, fn {_peername, info} -> info.pool_ref == ref end)
data = put_in(data.peers[peername].pool_pid, nil)
data = put_in(data.peers[peername].pool_ref, nil)
data = put_in(data.peers[peername].status, :up)
data = set_host_status(data, peername, :up)
data = maybe_start_pools(data)
{:keep_state, data}
end
Expand Down Expand Up @@ -502,6 +497,25 @@ defmodule Xandra.Cluster.Pool do
end
end

defp set_host_status(data, peername, new_status) when new_status in [:up, :down, :connected] do
{%Host{} = host, data} =
get_and_update_in(data.peers[peername], fn %{host: host} = peer ->
{host, %{peer | status: new_status}}
end)

lb_callback =
case new_status do
:up -> :host_up
:down -> :host_down
:connected -> :host_connected
end

update_in(
data.load_balancing_state,
&apply(data.load_balancing_module, lb_callback, [&1, host])
)
end

defp stop_pool(data, %Host{} = host) do
peername = Host.to_peername(host)

Expand Down
30 changes: 26 additions & 4 deletions test/xandra/cluster_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,8 @@ defmodule Xandra.ClusterTest do
}

cluster_state = get_state(pid)
assert %{status: :down} = cluster_state.peers[{{127, 0, 0, 1}, 8092}]
assert %{status: :down, host: host} = cluster_state.peers[{{127, 0, 0, 1}, 8092}]
assert get_load_balancing_state(get_state(pid), host) == :down
end

@tag telemetry_events: [
Expand Down Expand Up @@ -759,6 +760,8 @@ defmodule Xandra.ClusterTest do
assert %{status: :down, pool_pid: nil, host: %Host{address: {127, 0, 0, 1}, port: @port}} =
get_state(pid).peers[Host.to_peername(host)]

assert get_load_balancing_state(get_state(pid), host) == :down

# Send the UP event.
send(pid, {:host_up, host.address, host.port})

Expand All @@ -771,6 +774,8 @@ defmodule Xandra.ClusterTest do
host: %Host{address: {127, 0, 0, 1}, port: @port}
} = get_state(pid).peers[Host.to_peername(host)]

assert get_load_balancing_state(get_state(pid), host) == :up

assert is_pid(pool_pid)
end

Expand Down Expand Up @@ -834,15 +839,21 @@ defmodule Xandra.ClusterTest do
opts = Keyword.merge(opts, sync_connect: 1000)
cluster = start_link_supervised!({Cluster, opts})

assert %{pool_pid: pool_pid} = get_state(cluster).peers[{{127, 0, 0, 1}, @port}]
assert %{pool_pid: pool_pid, status: :connected, host: host} =
get_state(cluster).peers[{{127, 0, 0, 1}, @port}]

assert get_load_balancing_state(get_state(cluster), host) == :connected

assert {:ok, [{conn_pid, %Host{}}]} = Pool.checkout(cluster)
ref = Process.monitor(conn_pid)

Process.exit(conn_pid, :kill)
assert_receive {:DOWN, ^ref, _, _, _}

assert %{pool_pid: ^pool_pid} = get_state(cluster).peers[{{127, 0, 0, 1}, @port}]
assert %{pool_pid: ^pool_pid, status: :connected, host: host} =
get_state(cluster).peers[{{127, 0, 0, 1}, @port}]

assert get_load_balancing_state(get_state(cluster), host) == :connected
end

@tag :capture_log
Expand All @@ -857,8 +868,10 @@ defmodule Xandra.ClusterTest do
Process.exit(pool_pid, :kill)
assert_receive {:DOWN, ^ref, _, _, _}

assert %{status: :up, pool_pid: nil, pool_ref: nil} =
assert %{status: :up, pool_pid: nil, pool_ref: nil, host: host} =
get_state(cluster).peers[{{127, 0, 0, 1}, @port}]

assert get_load_balancing_state(get_state(cluster), host) == :up
end

@tag :capture_log
Expand Down Expand Up @@ -960,6 +973,15 @@ defmodule Xandra.ClusterTest do
data
end

defp get_load_balancing_state(%Pool{load_balancing_state: lbs}, %Host{} = host) do
{_host, state} =
Enum.find(lbs, fn {lbs_host, _state} ->
Host.to_peername(lbs_host) == Host.to_peername(host)
end)

state
end

defp assert_control_connection_started(test_ref) do
assert_receive {^test_ref, ControlConnectionMock, :init_called, _start_args}
end
Expand Down

0 comments on commit 5ef30e9

Please sign in to comment.