Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect after all nodes were offline #375

Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/xandra/cluster/control_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ defmodule Xandra.Cluster.ControlConnection do

execute_telemetry(state, [:control_connection, :connected], %{}, %{})

# We know that the node we just connected to is up
send(state.cluster_pid, {:host_up, state.ip, state.port})
harunzengin marked this conversation as resolved.
Show resolved Hide resolved

# We set up a timer to periodically refresh the topology.
schedule_refresh_topology(state.refresh_topology_interval)

Expand Down
58 changes: 35 additions & 23 deletions lib/xandra/cluster/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -241,27 +241,21 @@ defmodule Xandra.Cluster.Pool do

def handle_event(:info, {:host_up, address, port}, _state, %__MODULE__{} = data) do
# Set the host's status as :up if its state had been previously marked as :down.
{%Host{} = host, data} =
get_and_update_in(data.peers[{address, port}], fn
%{status: :down, host: host} = peer -> {host, %{peer | status: :up}}
%{host: host} = peer -> {host, peer}
end)

data = update_in(data.load_balancing_state, &data.load_balancing_module.host_up(&1, host))
data =
case data.peers[{address, port}] do
%{status: :down} -> set_host_status(data, {address, port}, :up)
_ -> data
end

data = maybe_start_pools(data)
{:keep_state, data}
end

def handle_event(:info, {:host_down, address, port}, _state, %__MODULE__{} = data) do
# Set the host's status as :down, regardless of its current state.
{%Host{} = host, data} =
get_and_update_in(data.peers[{address, port}], fn %{host: host} = peer ->
{host, %{peer | status: :down}}
end)

data = update_in(data.load_balancing_state, &data.load_balancing_module.host_down(&1, host))

peername = {address, port}
data = set_host_status(data, peername, :down)
host = data.peers[peername].host
data = stop_pool(data, host)
data = maybe_start_pools(data)
{:keep_state, data}
Expand Down Expand Up @@ -305,11 +299,7 @@ defmodule Xandra.Cluster.Pool do
%__MODULE__{} = data
)
when is_peername(peername) do
data = put_in(data.peers[peername].status, :connected)
host = data.peers[peername].host

data =
update_in(data.load_balancing_state, &data.load_balancing_module.host_connected(&1, host))
data = set_host_status(data, peername, :connected)

if alias = data.sync_connect_alias do
send(alias, {alias, :connected})
Expand All @@ -327,8 +317,9 @@ defmodule Xandra.Cluster.Pool do
)
when is_peername(peername) do
# Not connected anymore, but we're not really sure if the whole host is down.
data = put_in(data.peers[peername].status, :up)
data = set_host_status(data, peername, :up)
data = stop_pool(data, data.peers[peername].host)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, why were doing this and why are we not doing it anymore?

Copy link
Contributor Author

@harunzengin harunzengin Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, we should also maybe_start_pools after stop_pool.

data = maybe_start_pools(data)
{:keep_state, data}
end

Expand All @@ -340,8 +331,10 @@ defmodule Xandra.Cluster.Pool do
%__MODULE__{} = data
) do
if data.peers[peername] do
data = put_in(data.peers[peername].status, :down)
data = stop_pool(data, data.peers[peername].host)
data = set_host_status(data, peername, :down)
host = data.peers[peername].host
data = stop_pool(data, host)
data = maybe_start_pools(data)
harunzengin marked this conversation as resolved.
Show resolved Hide resolved
{:keep_state, data}
else
{:keep_state, data}
Expand Down Expand Up @@ -374,7 +367,7 @@ defmodule Xandra.Cluster.Pool do
{peername, _info} = Enum.find(data.peers, fn {_peername, info} -> info.pool_ref == ref end)
data = put_in(data.peers[peername].pool_pid, nil)
data = put_in(data.peers[peername].pool_ref, nil)
data = put_in(data.peers[peername].status, :up)
data = set_host_status(data, peername, :up)
data = maybe_start_pools(data)
{:keep_state, data}
end
Expand Down Expand Up @@ -502,6 +495,25 @@ defmodule Xandra.Cluster.Pool do
end
end

defp set_host_status(data, peername, new_status) when new_status in [:up, :down, :connected] do
{%Host{} = host, data} =
get_and_update_in(data.peers[peername], fn %{host: host} = peer ->
{host, %{peer | status: new_status}}
end)

lb_callback =
case new_status do
:up -> :host_up
:down -> :host_down
:connected -> :host_connected
end

update_in(
data.load_balancing_state,
&apply(data.load_balancing_module, lb_callback, [&1, host])
)
end

defp stop_pool(data, %Host{} = host) do
peername = Host.to_peername(host)

Expand Down
30 changes: 26 additions & 4 deletions test/xandra/cluster_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,8 @@ defmodule Xandra.ClusterTest do
}

cluster_state = get_state(pid)
assert %{status: :down} = cluster_state.peers[{{127, 0, 0, 1}, 8092}]
assert %{status: :down, host: host} = cluster_state.peers[{{127, 0, 0, 1}, 8092}]
assert get_load_balancing_state(get_state(pid), host) == :down
end

@tag telemetry_events: [
Expand Down Expand Up @@ -759,6 +760,8 @@ defmodule Xandra.ClusterTest do
assert %{status: :down, pool_pid: nil, host: %Host{address: {127, 0, 0, 1}, port: @port}} =
get_state(pid).peers[Host.to_peername(host)]

assert get_load_balancing_state(get_state(pid), host) == :down

# Send the UP event.
send(pid, {:host_up, host.address, host.port})

Expand All @@ -771,6 +774,8 @@ defmodule Xandra.ClusterTest do
host: %Host{address: {127, 0, 0, 1}, port: @port}
} = get_state(pid).peers[Host.to_peername(host)]

assert get_load_balancing_state(get_state(pid), host) == :up

assert is_pid(pool_pid)
end

Expand Down Expand Up @@ -834,15 +839,21 @@ defmodule Xandra.ClusterTest do
opts = Keyword.merge(opts, sync_connect: 1000)
cluster = start_link_supervised!({Cluster, opts})

assert %{pool_pid: pool_pid} = get_state(cluster).peers[{{127, 0, 0, 1}, @port}]
assert %{pool_pid: pool_pid, status: :connected, host: host} =
get_state(cluster).peers[{{127, 0, 0, 1}, @port}]

assert get_load_balancing_state(get_state(cluster), host) == :connected

assert {:ok, [{conn_pid, %Host{}}]} = Pool.checkout(cluster)
ref = Process.monitor(conn_pid)

Process.exit(conn_pid, :kill)
assert_receive {:DOWN, ^ref, _, _, _}

assert %{pool_pid: ^pool_pid} = get_state(cluster).peers[{{127, 0, 0, 1}, @port}]
assert %{pool_pid: ^pool_pid, status: :connected, host: host} =
get_state(cluster).peers[{{127, 0, 0, 1}, @port}]

assert get_load_balancing_state(get_state(cluster), host) == :connected
end

@tag :capture_log
Expand All @@ -857,8 +868,10 @@ defmodule Xandra.ClusterTest do
Process.exit(pool_pid, :kill)
assert_receive {:DOWN, ^ref, _, _, _}

assert %{status: :up, pool_pid: nil, pool_ref: nil} =
assert %{status: :up, pool_pid: nil, pool_ref: nil, host: host} =
get_state(cluster).peers[{{127, 0, 0, 1}, @port}]

assert get_load_balancing_state(get_state(cluster), host) == :up
end

@tag :capture_log
Expand Down Expand Up @@ -960,6 +973,15 @@ defmodule Xandra.ClusterTest do
data
end

defp get_load_balancing_state(%Pool{load_balancing_state: lbs}, %Host{} = host) do
{_host, state} =
Enum.find(lbs, fn {lbs_host, _state} ->
Host.to_peername(lbs_host) == Host.to_peername(host)
end)

state
end

defp assert_control_connection_started(test_ref) do
assert_receive {^test_ref, ControlConnectionMock, :init_called, _start_args}
end
Expand Down
Loading