Skip to content

Commit

Permalink
Fix Gun Message Leaks
Browse files Browse the repository at this point in the history
ar_http client was using internal functions
from inet (`inet:start_timer/1` and `inet:stop_timer/1`).
Those functions are wrapper around `timer` module and
send a message to the current active process doing the
http request. Unfortunately, this is not compatible with
gun:await/3 function and breaks its loop. Gun messages
are after send to the active process.

This commit also enforce request cancellation when
something goes wrong (error/timeout) to avoid leaking
messages coming in the current process.
  • Loading branch information
humaite committed Dec 3, 2024
1 parent 5f4544a commit e7042b4
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 29 deletions.
45 changes: 31 additions & 14 deletions apps/arweave/src/ar_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ unblock_peer_connections() ->
-ifdef(DEBUG).
req(#{ peer := {_, _} } = Args) ->
req(Args, false);
req(Args) ->
#{ peer := Peer } = Args,
req(#{ peer := Peer } = Args) ->
{ok, Config} = application:get_env(arweave, config),
case Config#config.port == element(5, Peer) of
true ->
Expand Down Expand Up @@ -297,17 +296,21 @@ method_to_list(_) ->
"unknown".

request(PID, Args) ->
Timer = inet:start_timer(maps:get(timeout, Args, ?HTTP_REQUEST_SEND_TIMEOUT)),
Timeout = maps:get(timeout, Args, ?HTTP_REQUEST_SEND_TIMEOUT),
Ref = request2(PID, Args),
ResponseArgs = #{ pid => PID, stream_ref => Ref,
timer => Timer, limit => maps:get(limit, Args, infinity),
counter => 0, acc => [], start => os:system_time(microsecond),
is_peer_request => maps:get(is_peer_request, Args, true) },
ResponseArgs = #{ pid => PID
, stream_ref => Ref
, timeout => Timeout
, limit => maps:get(limit, Args, infinity)
, counter => 0
, acc => []
, start => os:system_time(microsecond)
, is_peer_request => maps:get(is_peer_request, Args, true)
},
Response = await_response(maps:merge(Args, ResponseArgs)),
Method = maps:get(method, Args),
Path = maps:get(path, Args),
record_response_status(Method, Path, Response),
inet:stop_timer(Timer),
Response.

request2(PID, #{ path := Path } = Args) ->
Expand All @@ -324,16 +327,18 @@ request2(PID, #{ path := Path } = Args) ->
merge_headers(HeadersA, HeadersB) ->
lists:ukeymerge(1, lists:keysort(1, HeadersB), lists:keysort(1, HeadersA)).

await_response(Args) ->
#{ pid := PID, stream_ref := Ref, timer := Timer, start := Start, limit := Limit,
counter := Counter, acc := Acc, method := Method, path := Path } = Args,
case gun:await(PID, Ref, inet:timeout(Timer)) of
await_response( #{ pid := PID, stream_ref := Ref, timeout := Timeout
, start := Start, limit := Limit, counter := Counter
, acc := Acc, method := Method, path := Path } = Args) ->
case gun:await(PID, Ref, Timeout) of
{response, fin, Status, Headers} ->
End = os:system_time(microsecond),
upload_metric(Args),
{ok, {{integer_to_binary(Status), <<>>}, Headers, <<>>, Start, End}};

{response, nofin, Status, Headers} ->
await_response(Args#{ status => Status, headers => Headers });

{data, nofin, Data} ->
case Limit of
infinity ->
Expand All @@ -349,24 +354,32 @@ await_response(Args) ->
{error, too_much_data}
end
end;

{data, fin, Data} ->
End = os:system_time(microsecond),
FinData = iolist_to_binary([Acc | Data]),
download_metric(FinData, Args),
upload_metric(Args),
{ok, {gen_code_rest(maps:get(status, Args)), maps:get(headers, Args), FinData,
Start, End}};
ResponseCode = gen_code_rest(maps:get(status, Args)),
ResponseHeaders = maps:get(headers, Args),
Response = {ResponseCode, ResponseHeaders, FinData, Start, End},
{ok, Response};

{error, timeout} = Response ->
record_response_status(Method, Path, Response),
gun:cancel(PID, Ref),
log(warn, gun_await_process_down, Args, Response),
Response;

{error, Reason} = Response when is_tuple(Reason) ->
record_response_status(Method, Path, Response),
gun:cancel(PID, Ref),
log(warn, gun_await_process_down, Args, Reason),
Response;

Response ->
record_response_status(Method, Path, Response),
gun:cancel(PID, Ref),
log(warn, gun_await_unknown, Args, Response),
Response
end.
Expand Down Expand Up @@ -416,8 +429,12 @@ gen_code_rest(201) ->
{<<"201">>, <<"Created">>};
gen_code_rest(202) ->
{<<"202">>, <<"Accepted">>};
gen_code_rest(208) ->
{<<"208">>, <<"Transaction already processed">>};
gen_code_rest(400) ->
{<<"400">>, <<"Bad Request">>};
gen_code_rest(419) ->
{<<"419">>, <<"419 Missing Chunk">>};
gen_code_rest(421) ->
{<<"421">>, <<"Misdirected Request">>};
gen_code_rest(429) ->
Expand Down
15 changes: 0 additions & 15 deletions apps/arweave/src/ar_tx_emitter_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,6 @@ handle_cast(Msg, State) ->
handle_info({event, tx, _}, State) ->
{noreply, State};

handle_info({gun_down, _, http, normal, _, _}, State) ->
{noreply, State};
handle_info({gun_down, _, http, closed, _, _}, State) ->
{noreply, State};
handle_info({gun_down, _, http, {error,econnrefused}, _, _}, State) ->
{noreply, State};
handle_info({gun_up, _, http}, State) ->
{noreply, State};
handle_info({gun_response, _, _, _, _, _}, State) ->
{noreply, State};
handle_info({gun_data, _, _, _, _}, state) ->
{noreply, state};
handle_info({gun_error, _, _, _}, State) ->
{noreply, State};

handle_info(Info, State) ->
?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, {info, Info}]),
{noreply, State}.
Expand Down
4 changes: 4 additions & 0 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
0},
{<<"quantile_estimator">>,{pkg,<<"quantile_estimator">>,<<"0.2.1">>},1},
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.8.0">>},1},
{<<"recon">>,
{git,"https://github.com/ferd/recon.git",
{ref,"c2a76855be3a226a3148c0dfc21ce000b6186ef8"}},
0},
{<<"rocksdb">>,
{git,"https://github.com/ArweaveTeam/erlang-rocksdb.git",
{ref,"f580865c0bc18b0302a6190d7fa85e68ec0762e0"}},
Expand Down

0 comments on commit e7042b4

Please sign in to comment.