Skip to content

Commit

Permalink
fix: fix several repack in place bugs
Browse files Browse the repository at this point in the history
1. Block repacking in place from replica_2_9 (for now)
2. Make sure the unpacked_padded sync record is cleaned up
3. Truncate the repack batch size to the end of sector to avoid
   processing the same chunk multiple times
4. Fix off by one error in the repack_in_place cursor advancement
5. When repacking in place always delete the the old chunks
   ar_chunk_storage sync_record
  • Loading branch information
JamesPiechota committed Jan 13, 2025
1 parent faaa10e commit 7a21819
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 92 deletions.
17 changes: 8 additions & 9 deletions apps/arweave/e2e/ar_e2e.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
%% WARNING: ONLY SET TO true IF YOU KNOW WHAT YOU ARE DOING!
-define(UPDATE_CHUNK_FIXTURES, false).

-define(E2E_WAIT_TIME, 60 * 1000).


-spec fixture_dir(atom()) -> binary().
fixture_dir(FixtureType) ->
Dir = filename:dirname(?FILE),
Expand Down Expand Up @@ -218,7 +215,7 @@ assert_syncs_range(Node, StartOffset, EndOffset) ->
ar_util:do_until(
fun() -> has_range(Node, StartOffset, EndOffset) end,
100,
?E2E_WAIT_TIME
60_000
),
iolist_to_binary(io_lib:format(
"~s Failed to sync range ~p - ~p", [Node, StartOffset, EndOffset]))).
Expand All @@ -227,7 +224,7 @@ assert_does_not_sync_range(Node, StartOffset, EndOffset) ->
ar_util:do_until(
fun() -> has_range(Node, StartOffset, EndOffset) end,
1000,
?E2E_WAIT_TIME
60_000
),
?assertEqual(false, has_range(Node, StartOffset, EndOffset),
iolist_to_binary(io_lib:format(
Expand All @@ -242,11 +239,13 @@ assert_partition_size(Node, PartitionNumber, Packing, Size) ->
[PartitionNumber, Packing]) >= Size
end,
100,
?E2E_WAIT_TIME
60_000
),
iolist_to_binary(io_lib:format(
"~s partition ~p,~p failed to reach size ~p", [Node, PartitionNumber,
ar_serialize:encode_packing(Packing, true), Size]))).
"~s partition ~p,~p failed to reach size ~p. Current size: ~p.",
[Node, PartitionNumber, ar_serialize:encode_packing(Packing, true), Size,
ar_test_node:remote_call(Node, ar_mining_stats, get_partition_data_size,
[PartitionNumber, Packing])]))).

assert_empty_partition(Node, PartitionNumber, Packing) ->
ar_util:do_until(
Expand All @@ -255,7 +254,7 @@ assert_empty_partition(Node, PartitionNumber, Packing) ->
[PartitionNumber, Packing]) > 0
end,
100,
?E2E_WAIT_TIME
30_000
),
?assertEqual(
0,
Expand Down
49 changes: 25 additions & 24 deletions apps/arweave/e2e/ar_repack_in_place_mine_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,36 @@
-include_lib("arweave/include/ar_consensus.hrl").
-include_lib("eunit/include/eunit.hrl").

-define(REPACK_IN_PLACE_MINE_TEST_TIMEOUT, 600).

%% --------------------------------------------------------------------------------------------
%% Test Registration
%% --------------------------------------------------------------------------------------------
%%
%% Note:
%% Repacking in place *from* replica_2_9 to any format is not currently supported.
repack_in_place_mine_test_() ->
Timeout = ?REPACK_IN_PLACE_MINE_TEST_TIMEOUT,
[
% XXX {timeout, 300, {with, {unpacked, replica_2_9}, [fun test_repack_in_place_mine/1]}},
% XXX {timeout, 300, {with, {unpacked, spora_2_6}, [fun test_repack_in_place_mine/1]}},
% XXX {timeout, 300, {with, {unpacked, composite_1}, [fun test_repack_in_place_mine/1]}},
% XXX {timeout, 300, {with, {unpacked, composite_2}, [fun test_repack_in_place_mine/1]}},
{timeout, 300, {with, {replica_2_9, replica_2_9}, [fun test_repack_in_place_mine/1]}},
{timeout, 300, {with, {replica_2_9, spora_2_6}, [fun test_repack_in_place_mine/1]}},
{timeout, 300, {with, {replica_2_9, composite_1}, [fun test_repack_in_place_mine/1]}},
{timeout, 300, {with, {replica_2_9, composite_2}, [fun test_repack_in_place_mine/1]}},
% XXX {timeout, 300, {with, {replica_2_9, unpacked}, [fun test_repack_in_place_mine/1]}},
{timeout, 300, {with, {spora_2_6, replica_2_9}, [fun test_repack_in_place_mine/1]}},
{timeout, 300, {with, {spora_2_6, spora_2_6}, [fun test_repack_in_place_mine/1]}},
{timeout, 300, {with, {spora_2_6, composite_1}, [fun test_repack_in_place_mine/1]}},
{timeout, 300, {with, {spora_2_6, composite_2}, [fun test_repack_in_place_mine/1]}},
% XXX {timeout, 300, {with, {spora_2_6, unpacked}, [fun test_repack_in_place_mine/1]}},
{timeout, 300, {with, {composite_1, replica_2_9}, [fun test_repack_in_place_mine/1]}},
{timeout, 300, {with, {composite_1, spora_2_6}, [fun test_repack_in_place_mine/1]}},
{timeout, 300, {with, {composite_1, composite_1}, [fun test_repack_in_place_mine/1]}},
{timeout, 300, {with, {composite_1, composite_2}, [fun test_repack_in_place_mine/1]}},
% XXX {timeout, 300, {with, {composite_1, unpacked}, [fun test_repack_in_place_mine/1]}},
{timeout, 300, {with, {composite_2, replica_2_9}, [fun test_repack_in_place_mine/1]}},
{timeout, 300, {with, {composite_2, spora_2_6}, [fun test_repack_in_place_mine/1]}},
{timeout, 300, {with, {composite_2, composite_1}, [fun test_repack_in_place_mine/1]}},
{timeout, 300, {with, {composite_2, composite_2}, [fun test_repack_in_place_mine/1]}}
% XXX {timeout, 300, {with, {composite_2, unpacked}, [fun test_repack_in_place_mine/1]}}
% XXX {timeout, Timeout, {with, {unpacked, replica_2_9}, [fun test_repack_in_place_mine/1]}},
% XXX {timeout, Timeout, {with, {unpacked, spora_2_6}, [fun test_repack_in_place_mine/1]}},
% XXX {timeout, Timeout, {with, {unpacked, composite_1}, [fun test_repack_in_place_mine/1]}},
% XXX {timeout, Timeout, {with, {unpacked, composite_2}, [fun test_repack_in_place_mine/1]}},
{timeout, Timeout, {with, {spora_2_6, replica_2_9}, [fun test_repack_in_place_mine/1]}},
{timeout, Timeout, {with, {spora_2_6, spora_2_6}, [fun test_repack_in_place_mine/1]}},
{timeout, Timeout, {with, {spora_2_6, composite_1}, [fun test_repack_in_place_mine/1]}},
{timeout, Timeout, {with, {spora_2_6, composite_2}, [fun test_repack_in_place_mine/1]}},
% % % % % XXX {timeout, Timeout, {with, {spora_2_6, unpacked}, [fun test_repack_in_place_mine/1]}},
{timeout, Timeout, {with, {composite_1, replica_2_9}, [fun test_repack_in_place_mine/1]}},
{timeout, Timeout, {with, {composite_1, spora_2_6}, [fun test_repack_in_place_mine/1]}},
{timeout, Timeout, {with, {composite_1, composite_1}, [fun test_repack_in_place_mine/1]}},
{timeout, Timeout, {with, {composite_1, composite_2}, [fun test_repack_in_place_mine/1]}},
% % % % % XXX {timeout, Timeout, {with, {composite_1, unpacked}, [fun test_repack_in_place_mine/1]}},
{timeout, Timeout, {with, {composite_2, replica_2_9}, [fun test_repack_in_place_mine/1]}},
{timeout, Timeout, {with, {composite_2, spora_2_6}, [fun test_repack_in_place_mine/1]}},
{timeout, Timeout, {with, {composite_2, composite_1}, [fun test_repack_in_place_mine/1]}},
{timeout, Timeout, {with, {composite_2, composite_2}, [fun test_repack_in_place_mine/1]}}
% % % % % XXX {timeout, Timeout, {with, {composite_2, unpacked}, [fun test_repack_in_place_mine/1]}}
].

%% --------------------------------------------------------------------------------------------
Expand Down
4 changes: 4 additions & 0 deletions apps/arweave/include/ar_config.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@
-define(CHUNK_GROUP_SIZE, (256 * 1024 * 8000)). % 2 GiB.

%% The default number of chunks fetched from disk at a time during in-place repacking.
-ifdef(AR_TEST).
-define(DEFAULT_REPACK_BATCH_SIZE, 2).
-else.
-define(DEFAULT_REPACK_BATCH_SIZE, 100).
-endif.

%% default filtering value for the peer list (30days)
-define(CURRENT_PEERS_LIST_FILTER, 30*60*60*24).
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_chunk_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,6 @@ handle_cast(do_prepare_replica_2_9, State) ->
store_id = StoreID, repack_cursor = RepackCursor } = State,

?LOG_DEBUG([{event, do_prepare_replica_2_9},
{tags, [repack_in_place]},
{storage_module, StoreID},
{start, Start},
{sub_chunk_start, SubChunkStart},
Expand Down
13 changes: 10 additions & 3 deletions apps/arweave/src/ar_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -926,13 +926,20 @@ validate_repack_in_place(Config) ->
validate_repack_in_place([], _Modules) ->
true;
validate_repack_in_place([{Module, _ToPacking} | L], Modules) ->
{_BucketSize, _Bucket, Packing} = Module,
ID = ar_storage_module:id(Module),
case lists:member(ID, Modules) of
true ->
PackingType = ar_mining_server:get_packing_type(Packing),
ModuleInUse = lists:member(ID, Modules),
RepackingFromReplica29 = PackingType == replica_2_9,
case {ModuleInUse, RepackingFromReplica29} of
{true, _} ->
io:format("~nCannot use the storage module ~s "
"while it is being repacked in place.~n~n", [ID]),
false;
false ->
{_, true} ->
io:format("~nCannot repack in place from replica_2_9 to any format.~n~n"),
false;
_ ->
validate_repack_in_place(L, Modules)
end.

Expand Down
13 changes: 9 additions & 4 deletions apps/arweave/src/ar_entropy_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ update_sync_records(IsComplete, PaddedEndOffset, StoreID, RewardAddr) ->
case IsComplete of
true ->
StartOffset = PaddedEndOffset - ?DATA_CHUNK_SIZE,
%% update_sync_records is only called when an unpmacked_padded chunks has
%% been written to disk before entropy was generated. In this case we have
%% to remove the unpacked_padded sync record before we add the replica_2_9
%% sync record.
ar_sync_record:delete(PaddedEndOffset, StartOffset, ar_data_sync, StoreID),
ar_sync_record:add_async(replica_2_9_entropy_with_chunk,
PaddedEndOffset,
StartOffset,
Expand Down Expand Up @@ -271,14 +276,14 @@ record_chunk(PaddedEndOffset, Chunk, RewardAddr, StoreID, FileIndex, IsPrepared)
Result;
{_EndOffset, Entropy} ->
Packing = {replica_2_9, RewardAddr},
release_semaphore(Filepath),
PackedChunk = ar_packing_server:encipher_replica_2_9_chunk(Chunk, Entropy),
ar_chunk_storage:record_chunk(
PaddedEndOffset, PackedChunk, Packing, StoreID, FileIndex)
Result = ar_chunk_storage:record_chunk(
PaddedEndOffset, PackedChunk, Packing, StoreID, FileIndex),
release_semaphore(Filepath),
Result
end.

record_entropy(ChunkEntropy, PaddedEndOffset, StoreID, RewardAddr) ->

true = byte_size(ChunkEntropy) == ?DATA_CHUNK_SIZE,

IsUnpackedChunkRecorded = ar_sync_record:is_recorded(
Expand Down
Loading

0 comments on commit 7a21819

Please sign in to comment.