diff --git a/apps/blockchain/lib/blockchain/blocktree.ex b/apps/blockchain/lib/blockchain/blocktree.ex index 498787fd0..1c01afbed 100644 --- a/apps/blockchain/lib/blockchain/blocktree.ex +++ b/apps/blockchain/lib/blockchain/blocktree.ex @@ -55,15 +55,21 @@ defmodule Blockchain.Blocktree do else: {:valid, trie} with {:valid, trie} <- validation do - {:ok, {block_hash, updated_trie}} = Block.put_block(block, trie, specified_block_hash) + add_block_without_validation(blocktree, block, trie, specified_block_hash) + end + end - # Cache computed block hash - block = %{block | block_hash: block_hash} + @spec add_block_without_validation(t, Block.t(), TrieStorage.t(), EVM.hash() | nil) :: + {:ok, {t, TrieStorage.t(), EVM.hash()}} + def add_block_without_validation(blocktree, block, trie, specified_block_hash \\ nil) do + {:ok, {block_hash, updated_trie}} = Block.put_block(block, trie, specified_block_hash) - updated_blocktree = update_best_block(blocktree, block) + # Cache computed block hash + block = %{block | block_hash: block_hash} - {:ok, {updated_blocktree, updated_trie, block_hash}} - end + updated_blocktree = update_best_block(blocktree, block) + + {:ok, {updated_blocktree, updated_trie, block_hash}} end @spec update_best_block(t, Block.t()) :: t diff --git a/apps/evm/lib/evm/log_entry.ex b/apps/evm/lib/evm/log_entry.ex index 3a7405f55..b31e88ca5 100644 --- a/apps/evm/lib/evm/log_entry.ex +++ b/apps/evm/lib/evm/log_entry.ex @@ -59,6 +59,26 @@ defmodule EVM.LogEntry do } end + @spec serialize(t) :: ExRLP.t() + def serialize(log) do + [ + log.address, + log.topics, + log.data + ] + end + + @spec deserialize(ExRLP.t()) :: t + def deserialize(rlp) do + [ + address, + topics, + data + ] = rlp + + new(address, topics, data) + end + @doc """ Converts log struct to standard Ethereum list representation. diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex b/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex index cfec882f3..59e617426 100644 --- a/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex +++ b/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex @@ -11,18 +11,19 @@ defmodule ExWire.Packet.Capability.Eth.Receipts do """ require Logger + alias Blockchain.Transaction.Receipt @behaviour ExWire.Packet @type t :: %__MODULE__{ - receipts: [any()] + receipts: [[Receipt.t()]] } defstruct [ :receipts ] - @spec new([any()]) :: t() + @spec new([[Receipt.t()]]) :: t() def new(receipts) do %__MODULE__{ receipts: receipts @@ -45,7 +46,11 @@ defmodule ExWire.Packet.Capability.Eth.Receipts do @impl true @spec serialize(t) :: ExRLP.t() def serialize(packet = %__MODULE__{}) do - for receipt <- packet.receipts, do: receipt + for receipts <- packet.receipts do + for receipt <- receipts do + Receipt.serialize(receipt) + end + end end @doc """ @@ -55,9 +60,14 @@ defmodule ExWire.Packet.Capability.Eth.Receipts do @impl true @spec deserialize(ExRLP.t()) :: t def deserialize(rlp) do - receipts = for receipt <- rlp, do: receipt + block_receipts = + for receipts <- rlp do + for receipt <- receipts do + Receipt.deserialize(receipt) + end + end - new(receipts) + new(block_receipts) end @doc """ diff --git a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex index 3d884c817..ebc2efe44 100644 --- a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex +++ b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex @@ -18,10 +18,12 @@ defmodule ExWire.Struct.BlockQueue do alias Block.Header alias ExWire.Struct.Block, as: BlockStruct alias Blockchain.{Block, Blocktree, Chain} + alias Blockchain.Transaction.Receipt alias MerklePatriciaTree.Trie require Logger + @max_receipts_to_request 500 # These will be used to help us determine if a block is empty @empty_trie MerklePatriciaTree.Trie.empty_trie_root_hash() @empty_hash [] |> ExRLP.encode() |> ExthCrypto.Hash.Keccak.kec() @@ -29,11 +31,16 @@ defmodule ExWire.Struct.BlockQueue do defstruct queue: %{}, backlog: %{}, do_validation: true, - block_numbers: MapSet.new() + block_numbers: MapSet.new(), + fast_sync_in_progress: false, + block_receipts_set: MapSet.new(), + block_receipts_to_request: [], + block_receipts_requested: [] @type block_item :: %{ commitments: list(binary()), block: Block.t(), + receipts_added: boolean(), ready: boolean() } @@ -45,7 +52,11 @@ defmodule ExWire.Struct.BlockQueue do queue: %{integer() => block_map}, backlog: %{EVM.hash() => list(Block.t())}, do_validation: boolean(), - block_numbers: MapSet.t() + block_numbers: MapSet.t(), + fast_sync_in_progress: boolean(), + block_receipts_set: MapSet.t(), + block_receipts_to_request: [EVM.hash()], + block_receipts_requested: [EVM.hash()] } @doc """ @@ -65,7 +76,12 @@ defmodule ExWire.Struct.BlockQueue do Trie.t() ) :: {t, Blocktree.t(), Trie.t(), boolean()} def add_header( - block_queue = %__MODULE__{queue: queue}, + block_queue = %__MODULE__{ + queue: queue, + fast_sync_in_progress: fast_sync_in_progress, + block_receipts_set: block_receipts_set, + block_receipts_to_request: block_receipts_to_request + }, block_tree, header, header_hash, @@ -74,8 +90,9 @@ defmodule ExWire.Struct.BlockQueue do trie ) do block_map = Map.get(queue, header.number, %{}) + header_num_and_hash = {header.number, header_hash} - {block_map, should_request_body} = + {block_map, should_request_body, receipts_to_request, receipts_set} = case Map.get(block_map, header_hash) do nil -> # may already be ready, already. @@ -85,22 +102,47 @@ defmodule ExWire.Struct.BlockQueue do Map.put(block_map, header_hash, %{ commitments: MapSet.new([remote_id]), block: %Block{header: header}, + receipts_added: is_empty, ready: is_empty }) - {block_map, not is_empty} + {receipts_set, receipts_to_request} = + if fast_sync_in_progress and not is_empty do + { + MapSet.put(block_receipts_set, header_num_and_hash), + block_queue.block_receipts_to_request ++ [header_num_and_hash] + } + else + {block_receipts_set, block_receipts_to_request} + end + + {block_map, not is_empty, receipts_to_request, receipts_set} block_item -> + {receipts_set, receipts_to_request} = + if fast_sync_in_progress and header.number != 0 and + Enum.empty?(block_item.block.receipts) and + not MapSet.member?(block_receipts_set, header_num_and_hash) do + { + MapSet.put(block_receipts_set, header_num_and_hash), + block_queue.block_receipts_to_request ++ [header_num_and_hash] + } + else + {block_receipts_set, block_receipts_to_request} + end + {Map.put(block_map, header_hash, %{ block_item | commitments: MapSet.put(block_item.commitments, remote_id) - }), false} + }), false, receipts_to_request, receipts_set} end updated_block_queue = %{ block_queue | queue: Map.put(queue, header.number, block_map), - block_numbers: MapSet.put(block_queue.block_numbers, header.number) + block_numbers: MapSet.put(block_queue.block_numbers, header.number), + block_receipts_set: receipts_set, + block_receipts_to_request: receipts_to_request } {new_block_queue, new_block_tree, new_trie} = @@ -164,6 +206,103 @@ defmodule ExWire.Struct.BlockQueue do process_block_queue(updated_block_queue, block_tree, chain, trie) end + @doc """ + Returns the collection of block hashes for which Receipts are needed, as well as the + updated BlockQueue accounting for the requested hashes, if fast sync is in progress and + a request is not already in flight. + """ + @spec get_receipts_to_request(t()) :: {:ok, [EVM.hash()], t()} | :do_not_request + def get_receipts_to_request( + block_queue = %__MODULE__{ + fast_sync_in_progress: is_fast, + block_receipts_to_request: to_request, + block_receipts_requested: requested + } + ) do + if is_fast and Enum.empty?(requested) and not Enum.empty?(to_request) do + {new_requests, to_request_tail} = Enum.split(to_request, @max_receipts_to_request) + + { + :ok, + new_requests |> Enum.map(fn {_number, hash} -> hash end), + %{ + block_queue + | block_receipts_to_request: to_request_tail, + block_receipts_requested: new_requests + } + } + else + :do_not_request + end + end + + @doc """ + Processes the provided Receipts, verifying them against stored Headers and adding + them to the Blocks stored in the BlockQueue. + This will return the updated BlockQueue and the hashes of the blocks to request Receipts for next. + """ + @spec add_receipts(t(), [[Receipt.t()]]) :: {t(), [EVM.hash()]} | {t(), []} + def add_receipts(block_queue = %__MODULE__{block_receipts_requested: req}, receipts) + when length(req) != length(receipts) do + :ok = + Logger.warn(fn -> + "[Block Queue] Received Receipts of different length than requested. Cannot match them to blocks. Receipts # [#{ + Enum.count(receipts) + }], Requested # [#{Enum.count(req)}]" + end) + + {block_queue, req |> Enum.map(fn {_number, hash} -> hash end)} + end + + def add_receipts( + block_queue = %__MODULE__{ + queue: queue, + block_receipts_set: block_receipts_set, + block_receipts_requested: requested + }, + block_receipts + ) do + number_hash_tuple_receipts = Enum.zip(requested, block_receipts) + + updated_queue = + Enum.reduce(number_hash_tuple_receipts, queue, fn {{number, hash}, receipts}, + updated_queue -> + block_map = Map.get(queue, number) + block_item = Map.get(block_map, hash) + block = Map.get(block_item, :block) + updated_block = %{block | receipts: receipts} + + # TODO: Build Trie and verify that Receipts Root matches header.receipts_root + + Map.put( + updated_queue, + number, + Map.put(block_map, hash, %{ + block_item + | receipts_added: true, + block: updated_block + }) + ) + end) + + updated_receipts_set = MapSet.difference(block_receipts_set, MapSet.new(requested)) + + updated_block_queue = %{ + block_queue + | queue: updated_queue, + block_receipts_requested: [], + block_receipts_set: updated_receipts_set + } + + case get_receipts_to_request(updated_block_queue) do + {:ok, hashes, block_queue_to_return} -> + {block_queue_to_return, hashes} + + :do_not_request -> + {updated_block_queue, []} + end + end + @doc """ Processes a the block queue, adding any blocks which are complete and pass the number of confirmations to the block tree. These blocks are then removed @@ -191,6 +330,36 @@ defmodule ExWire.Struct.BlockQueue do defp do_process_blocks([], block_queue, block_tree, _chain, trie), do: {block_queue, block_tree, trie} + defp do_process_blocks( + [block | rest], + block_queue = %__MODULE__{fast_sync_in_progress: true}, + block_tree, + chain, + trie + ) do + {:ok, {updated_blocktree, updated_trie, block_hash}} = + Blocktree.add_block_without_validation(block_tree, block, trie) + + :ok = + Logger.debug(fn -> + "[Block Queue] Added block #{block.header.number} (0x#{ + Base.encode16(block_hash, case: :lower) + }) to new block tree without validation during fast sync." + end) + + {backlogged_blocks, new_backlog} = Map.pop(block_queue.backlog, block_hash, []) + + new_block_queue = %{block_queue | backlog: new_backlog} + + do_process_blocks( + backlogged_blocks ++ rest, + new_block_queue, + updated_blocktree, + chain, + updated_trie + ) + end + defp do_process_blocks([block | rest], block_queue, block_tree, chain, trie) do {new_block_tree, new_trie, new_backlog, extra_blocks} = case Blocktree.verify_and_add_block( @@ -321,12 +490,14 @@ defmodule ExWire.Struct.BlockQueue do } """ @spec get_complete_blocks(t) :: {t, [Block.t()]} - def get_complete_blocks(block_queue = %__MODULE__{queue: queue}) do + def get_complete_blocks( + block_queue = %__MODULE__{queue: queue, fast_sync_in_progress: fast_syncing} + ) do {queue, blocks} = Enum.reduce(queue, {queue, []}, fn {number, block_map}, {queue, blocks} -> {final_block_map, new_blocks} = Enum.reduce(block_map, {block_map, []}, fn {hash, block_item}, {block_map, blocks} -> - if block_item.ready and + if block_item.ready and (not fast_syncing or block_item.receipts_added) and MapSet.size(block_item.commitments) >= ExWire.Config.commitment_count() do {Map.delete(block_map, hash), [block_item.block | blocks]} else diff --git a/apps/ex_wire/lib/ex_wire/sync.ex b/apps/ex_wire/lib/ex_wire/sync.ex index 330c1e514..5fa1e8a1a 100644 --- a/apps/ex_wire/lib/ex_wire/sync.ex +++ b/apps/ex_wire/lib/ex_wire/sync.ex @@ -28,7 +28,9 @@ defmodule ExWire.Sync do BlockBodies, BlockHeaders, GetBlockBodies, - GetBlockHeaders + GetBlockHeaders, + GetReceipts, + Receipts } alias ExWire.Packet.Capability.Par.{ @@ -204,6 +206,10 @@ defmodule ExWire.Sync do {:noreply, handle_snapshot_data(snapshot_data, peer, state)} end + def handle_info({:packet, %Receipts{} = receipts_data, peer}, state) do + {:noreply, handle_receipts(receipts_data, peer, state)} + end + def handle_info({:packet, packet, peer}, state) do :ok = Exth.trace(fn -> "[Sync] Ignoring packet #{packet.__struct__} from #{peer}" end) @@ -357,6 +363,51 @@ defmodule ExWire.Sync do %{state | warp_queue: next_warp_queue} end + @doc """ + When we receive the Receipts payload that we requested, add them to the BlockQueue, + request new receipts if there are any to be fetched, and return the state with the + updated BlockQueue that accounts for the processed Receipts as well as any newly-requested ones. + """ + @spec handle_receipts(Receipts.t(), Peer.t(), state()) :: state() + def handle_receipts( + %Receipts{receipts: blocks_receipts}, + _peer, + state = %{ + trie: trie, + chain: chain, + block_tree: block_tree, + block_queue: block_queue + } + ) do + updated_block_queue = + case BlockQueue.add_receipts(block_queue, blocks_receipts) do + {updated_block_queue, []} -> + :ok = Logger.debug("Processed receipts, no new ones queued to fetch.") + updated_block_queue + + {updated_block_queue, hashes_to_request} -> + :ok = + Logger.debug(fn -> + "[Sync] Sending GetReceipts request for [#{Enum.count(hashes_to_request)}] receipts." + end) + + _ = send_with_retry(GetReceipts.new(hashes_to_request), :random, :request_receipts) + updated_block_queue + end + + {final_queue, final_tree, final_trie} = + BlockQueue.process_block_queue(updated_block_queue, block_tree, chain, trie) + + :ok = maybe_request_next_block(final_queue) + + %{ + state + | trie: final_trie, + block_tree: final_tree, + block_queue: final_queue + } + end + @doc """ When we get block headers from peers, we add them to our current block queue to incorporate the blocks into our state chain. @@ -386,7 +437,7 @@ defmodule ExWire.Sync do :ok = maybe_request_next_block(block_queue) state else - {next_highest_block_number, next_block_queue, next_block_tree, next_trie, header_hashes} = + {next_highest_block_number, updated_block_queue, next_block_tree, next_trie, header_hashes} = Enum.reduce( block_headers.headers, {highest_block_number, block_queue, block_tree, trie, []}, @@ -420,13 +471,31 @@ defmodule ExWire.Sync do end ) - :ok = - PeerSupervisor.send_packet( - %GetBlockBodies{ - hashes: header_hashes - }, - :random - ) + if not Enum.empty?(header_hashes) do + _ = + send_with_retry( + %GetBlockBodies{ + hashes: header_hashes + }, + :random, + :request_block_bodies + ) + end + + next_block_queue = + case BlockQueue.get_receipts_to_request(updated_block_queue) do + {:ok, block_hashes, modified_queue} -> + :ok = + Logger.debug(fn -> + "[Sync] Sending GetReceipts request for [#{Enum.count(block_hashes)}] receipts." + end) + + _ = send_with_retry(GetReceipts.new(block_hashes), :random, :request_receipts) + modified_queue + + :do_not_request -> + updated_block_queue + end next_maybe_saved_trie = maybe_save(block_tree, next_block_tree, next_trie) :ok = maybe_request_next_block(next_block_queue) @@ -586,7 +655,11 @@ defmodule ExWire.Sync do @spec send_with_retry( Packet.packet(), PeerSupervisor.node_selector(), - :request_manifest | :request_next_block | {:request_chunk, EVM.hash()} + :request_manifest + | :request_next_block + | :request_block_bodies + | :request_receipts + | {:request_chunk, EVM.hash()} ) :: boolean() defp send_with_retry(packet, node_selector, retry_message) do send_packet_result =