diff --git a/Cargo.lock b/Cargo.lock index 209d4d895a9..a1f941c1c55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4072,9 +4072,9 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "futures" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -4107,9 +4107,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -4117,15 +4117,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -4135,9 +4135,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-lite" @@ -4169,9 +4169,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -4210,15 +4210,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-ticker" @@ -4239,9 +4239,9 @@ checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -5497,7 +5497,7 @@ dependencies = [ [[package]] name = "libp2p" version = "0.54.1" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "bytes", "either", @@ -5526,7 +5526,7 @@ dependencies = [ "libp2p-yamux 0.46.0", "multiaddr 0.18.2", "pin-project", - "rw-stream-sink 0.4.0 (git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0)", + "rw-stream-sink 0.4.0 (git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8)", "thiserror", ] @@ -5545,7 +5545,7 @@ dependencies = [ [[package]] name = "libp2p-allow-block-list" version = "0.4.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "libp2p-core 0.42.0", "libp2p-identity", @@ -5556,7 +5556,7 @@ dependencies = [ [[package]] name = "libp2p-autonat" version = "0.13.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "async-trait", "asynchronous-codec 0.7.0", @@ -5594,7 +5594,7 @@ dependencies = [ [[package]] name = "libp2p-connection-limits" version = "0.4.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "libp2p-core 0.42.0", "libp2p-identity", @@ -5633,7 +5633,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.42.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "either", "fnv", @@ -5642,13 +5642,13 @@ dependencies = [ "libp2p-identity", "multiaddr 0.18.2", "multihash 0.19.1", - "multistream-select 0.13.0 (git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0)", + "multistream-select 0.13.0 (git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8)", "once_cell", "parking_lot 0.12.3", "pin-project", "quick-protobuf", "rand", - "rw-stream-sink 0.4.0 (git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0)", + "rw-stream-sink 0.4.0 (git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8)", "serde", "smallvec", "thiserror", @@ -5677,7 +5677,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.42.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "async-trait", "futures", @@ -5692,7 +5692,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" version = "0.47.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "asynchronous-codec 0.7.0", "base64 0.22.1", @@ -5746,7 +5746,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.45.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "asynchronous-codec 0.7.0", "either", @@ -5816,7 +5816,7 @@ dependencies = [ [[package]] name = "libp2p-kad" version = "0.46.1" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "arrayvec", "asynchronous-codec 0.7.0", @@ -5866,7 +5866,7 @@ dependencies = [ [[package]] name = "libp2p-mdns" version = "0.46.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "data-encoding", "futures", @@ -5903,7 +5903,7 @@ dependencies = [ [[package]] name = "libp2p-metrics" version = "0.15.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "futures", "libp2p-core 0.42.0", @@ -5946,7 +5946,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.45.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "asynchronous-codec 0.7.0", "bytes", @@ -5989,7 +5989,7 @@ dependencies = [ [[package]] name = "libp2p-ping" version = "0.45.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "either", "futures", @@ -6006,7 +6006,7 @@ dependencies = [ [[package]] name = "libp2p-plaintext" version = "0.42.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "asynchronous-codec 0.7.0", "bytes", @@ -6045,7 +6045,7 @@ dependencies = [ [[package]] name = "libp2p-quic" version = "0.11.1" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "bytes", "futures", @@ -6086,7 +6086,7 @@ dependencies = [ [[package]] name = "libp2p-request-response" version = "0.27.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "async-trait", "futures", @@ -6128,7 +6128,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.45.1" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "async-std", "either", @@ -6139,7 +6139,7 @@ dependencies = [ "libp2p-identity", "libp2p-swarm-derive 0.35.0", "lru", - "multistream-select 0.13.0 (git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0)", + "multistream-select 0.13.0 (git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8)", "once_cell", "rand", "smallvec", @@ -6165,7 +6165,7 @@ dependencies = [ [[package]] name = "libp2p-swarm-derive" version = "0.35.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -6176,7 +6176,7 @@ dependencies = [ [[package]] name = "libp2p-swarm-test" version = "0.4.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "async-trait", "futures", @@ -6211,7 +6211,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.42.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "async-io 2.3.4", "futures", @@ -6247,7 +6247,7 @@ dependencies = [ [[package]] name = "libp2p-tls" version = "0.5.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "futures", "futures-rustls 0.26.0", @@ -6281,7 +6281,7 @@ dependencies = [ [[package]] name = "libp2p-upnp" version = "0.3.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "futures", "futures-timer", @@ -6344,7 +6344,7 @@ dependencies = [ [[package]] name = "libp2p-yamux" version = "0.46.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "either", "futures", @@ -7048,7 +7048,7 @@ dependencies = [ [[package]] name = "multistream-select" version = "0.13.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "bytes", "futures", @@ -8873,7 +8873,7 @@ dependencies = [ [[package]] name = "quick-protobuf-codec" version = "0.3.1" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "asynchronous-codec 0.7.0", "bytes", @@ -9675,7 +9675,7 @@ dependencies = [ [[package]] name = "rw-stream-sink" version = "0.4.0" -source = "git+https://github.com/autonomys/rust-libp2p?rev=458d22ef382641a6f42f7baddff99b70f33cdcc0#458d22ef382641a6f42f7baddff99b70f33cdcc0" +source = "git+https://github.com/autonomys/rust-libp2p?rev=ae7527453146df24aff6afed5f5b9efdffbc15b8#ae7527453146df24aff6afed5f5b9efdffbc15b8" dependencies = [ "futures", "pin-project", @@ -12591,6 +12591,19 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "subspace-gateway-rpc" +version = "0.1.0" +dependencies = [ + "hex", + "jsonrpsee", + "serde", + "subspace-core-primitives", + "subspace-data-retrieval", + "thiserror", + "tracing", +] + [[package]] name = "subspace-kzg" version = "0.1.0" diff --git a/crates/pallet-subspace/Cargo.toml b/crates/pallet-subspace/Cargo.toml index 8b4bec25243..828ada518fa 100644 --- a/crates/pallet-subspace/Cargo.toml +++ b/crates/pallet-subspace/Cargo.toml @@ -31,7 +31,7 @@ subspace-verification = { version = "0.1.0", path = "../subspace-verification", [dev-dependencies] env_logger = "0.11.5" -futures = "0.3.30" +futures = "0.3.31" pallet-balances = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } rand = { version = "0.8.5", features = ["min_const_gen"] } sp-io = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } diff --git a/crates/sc-consensus-subspace-rpc/Cargo.toml b/crates/sc-consensus-subspace-rpc/Cargo.toml index 17e245872ac..701917d88ac 100644 --- a/crates/sc-consensus-subspace-rpc/Cargo.toml +++ b/crates/sc-consensus-subspace-rpc/Cargo.toml @@ -14,7 +14,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] async-oneshot = "0.5.9" -futures = "0.3.30" +futures = "0.3.31" futures-timer = "3.0.3" jsonrpsee = { version = "0.24.5", features = ["server", "macros"] } parking_lot = "0.12.2" diff --git a/crates/sc-consensus-subspace-rpc/src/lib.rs b/crates/sc-consensus-subspace-rpc/src/lib.rs index 54aae745bb8..8fc484d2edf 100644 --- a/crates/sc-consensus-subspace-rpc/src/lib.rs +++ b/crates/sc-consensus-subspace-rpc/src/lib.rs @@ -172,7 +172,7 @@ pub trait SubspaceRpcApi { #[method(name = "subspace_lastSegmentHeaders")] async fn last_segment_headers(&self, limit: u32) -> Result>, Error>; - /// Block/transaction object mappings subscription + /// DSN object mappings subscription #[subscription( name = "subspace_subscribeObjectMappings" => "subspace_object_mappings", unsubscribe = "subspace_unsubscribeObjectMappings", @@ -181,7 +181,7 @@ pub trait SubspaceRpcApi { )] fn subscribe_object_mappings(&self); - /// Filtered block/transaction object mappings subscription + /// Filtered DSN object mappings subscription #[subscription( name = "subspace_subscribeFilteredObjectMappings" => "subspace_filtered_object_mappings", unsubscribe = "subspace_unsubscribeFilteredObjectMappings", diff --git a/crates/sc-consensus-subspace/Cargo.toml b/crates/sc-consensus-subspace/Cargo.toml index 4d6d650e609..6bb4fa38f6c 100644 --- a/crates/sc-consensus-subspace/Cargo.toml +++ b/crates/sc-consensus-subspace/Cargo.toml @@ -15,7 +15,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] async-trait = "0.1.83" codec = { package = "parity-scale-codec", version = "3.6.12", features = ["derive"] } -futures = "0.3.30" +futures = "0.3.31" parking_lot = "0.12.2" rand = "0.8.5" rand_chacha = "0.3.1" diff --git a/crates/sc-proof-of-time/Cargo.toml b/crates/sc-proof-of-time/Cargo.toml index 21459d71f7d..a350872dd0c 100644 --- a/crates/sc-proof-of-time/Cargo.toml +++ b/crates/sc-proof-of-time/Cargo.toml @@ -13,7 +13,7 @@ include = [ [dependencies] core_affinity = "0.8.1" derive_more = { version = "1.0.0", features = ["full"] } -futures = "0.3.30" +futures = "0.3.31" parity-scale-codec = { version = "3.6.12", features = ["derive"] } parking_lot = "0.12.2" rayon = "1.10.0" diff --git a/crates/sc-subspace-block-relay/Cargo.toml b/crates/sc-subspace-block-relay/Cargo.toml index df46c9629d3..326ad1c4a9b 100644 --- a/crates/sc-subspace-block-relay/Cargo.toml +++ b/crates/sc-subspace-block-relay/Cargo.toml @@ -15,7 +15,7 @@ async-channel = "1.9.0" async-trait = "0.1.83" codec = { package = "parity-scale-codec", version = "3.6.12", default-features = false, features = ["derive"] } derive_more = { version = "1.0.0", features = ["full"] } -futures = "0.3.30" +futures = "0.3.31" parking_lot = "0.12.2" sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } sc-network = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } diff --git a/crates/sp-domains-fraud-proof/Cargo.toml b/crates/sp-domains-fraud-proof/Cargo.toml index f2c456e12f1..3a742235d14 100644 --- a/crates/sp-domains-fraud-proof/Cargo.toml +++ b/crates/sp-domains-fraud-proof/Cargo.toml @@ -49,7 +49,7 @@ fp-rpc = { version = "3.0.0-dev", git = "https://github.com/autonomys/frontier", fp-self-contained = { version = "1.0.0-dev", git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968", features = ['default'] } frame-support = { default-features = false, git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } frame-system = { default-features = false, git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } -futures = "0.3.30" +futures = "0.3.31" libsecp256k1 = { version = "0.7.1", features = ["static-context", "hmac"] } pallet-balances = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } pallet-ethereum = { git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968", features = ['default'] } diff --git a/crates/subspace-farmer-components/Cargo.toml b/crates/subspace-farmer-components/Cargo.toml index 8edc69aa4ef..5ca9c4ae554 100644 --- a/crates/subspace-farmer-components/Cargo.toml +++ b/crates/subspace-farmer-components/Cargo.toml @@ -23,7 +23,7 @@ backoff = { version = "0.4.0", features = ["futures", "tokio"] } bitvec = "1.0.1" # TODO: Switch to fs4 once https://github.com/al8n/fs4-rs/issues/15 is resolved fs2 = "0.4.3" -futures = "0.3.30" +futures = "0.3.31" hex = "0.4.3" libc = "0.2.159" parity-scale-codec = "3.6.12" @@ -48,7 +48,7 @@ winapi = "0.3.9" [dev-dependencies] criterion = "0.5.1" -futures = "0.3.30" +futures = "0.3.31" subspace-archiving = { version = "0.1.0", path = "../subspace-archiving" } subspace-proof-of-space = { version = "0.1.0", path = "../subspace-proof-of-space" } diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index d252bf85f56..69a5fa8af46 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -279,7 +279,7 @@ where pieces_to_download .entry(piece_index) .or_default() - .push((record, metadata)) + .push((record, metadata)); } // This map will be mutated, removing piece indices we have already processed let pieces_to_download = AsyncMutex::new(pieces_to_download); @@ -788,6 +788,15 @@ where } } + if final_result.is_ok() && !pieces_to_download.is_empty() { + return Err(PlottingError::FailedToRetrievePieces { + error: anyhow::anyhow!( + "Successful result, but not all pieces were downloaded, this is likely a piece \ + getter implementation bug" + ), + }); + } + final_result } diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index 9b5008d6ace..cfe35a8588c 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -33,7 +33,7 @@ event-listener = "5.3.1" event-listener-primitives = "2.0.1" fdlimit = { version = "0.3.0", optional = true } fs4 = "0.9.1" -futures = "0.3.30" +futures = "0.3.31" hex = { version = "0.4.3", features = ["serde"] } hwlocality = { version = "1.0.0-alpha.6", features = ["vendored"], optional = true } jsonrpsee = { version = "0.24.5", features = ["ws-client"] } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs index 94c496a5334..8f211e50469 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs @@ -14,7 +14,7 @@ use futures::{select, FutureExt, StreamExt}; use parking_lot::Mutex; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; -use std::future::{pending, ready, Future}; +use std::future::{ready, Future}; use std::mem; use std::pin::{pin, Pin}; use std::sync::Arc; @@ -139,9 +139,7 @@ pub(super) async fn maintain_farms( // Farm that is being added/removed right now (if any) let mut farm_add_remove_in_progress = (Box::pin(ready(None)) as AddRemoveFuture).fuse(); // Initialize with pending future so it never ends - let mut farms = FuturesUnordered::from_iter([ - Box::pin(pending()) as Pin)>>> - ]); + let mut farms = FuturesUnordered::new(); let farmer_identify_subscription = pin!(nats_client .subscribe_to_broadcasts::(None, None) @@ -253,7 +251,7 @@ pub(super) async fn maintain_farms( } result = farm_add_remove_in_progress => { if let Some((farm_index, expired_receiver, farm)) = result { - farms.push(Box::pin(async move { + farms.push(async move { select! { result = farm.run().fuse() => { (farm_index, result) @@ -263,7 +261,7 @@ pub(super) async fn maintain_farms( (farm_index, Ok(())) } } - })); + }); } } } diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs index a5bb7574e66..e8239c174e7 100644 --- a/crates/subspace-farmer/src/cluster/cache.rs +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -9,7 +9,7 @@ use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast; use crate::cluster::nats_client::{ - GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient, StreamRequest, + GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient, }; use crate::farm::{FarmError, PieceCache, PieceCacheId, PieceCacheOffset}; use anyhow::anyhow; @@ -17,8 +17,7 @@ use async_trait::async_trait; use futures::stream::FuturesUnordered; use futures::{select, stream, FutureExt, Stream, StreamExt}; use parity_scale_codec::{Decode, Encode}; -use std::future::{pending, Future}; -use std::pin::{pin, Pin}; +use std::pin::Pin; use std::time::{Duration, Instant}; use subspace_core_primitives::pieces::{Piece, PieceIndex}; use tokio::time::MissedTickBehavior; @@ -377,6 +376,7 @@ where ) }, ) + .instrument(info_span!("", cache_id = %cache_details.cache_id)) .await }) .collect::>() @@ -409,6 +409,7 @@ where ) }, ) + .instrument(info_span!("", cache_id = %cache_details.cache_id)) .await }) .collect::>() @@ -441,6 +442,7 @@ where ) }, ) + .instrument(info_span!("", cache_id = %cache_details.cache_id)) .await }) .collect::>() @@ -458,93 +460,31 @@ where { caches_details .iter() - .enumerate() - .map(|(cache_index, cache_details)| async move { - // Initialize with pending future so it never ends - let mut processing = FuturesUnordered::from_iter([ - Box::pin(pending()) as Pin + Send>> - ]); - let mut subscription = nats_client - .subscribe_to_stream_requests( - Some(&cache_details.cache_id_string), + .map(|cache_details| async move { + nats_client + .stream_request_responder::<_, _, Pin + Send>>, _>( + Some(cache_details.cache_id_string.as_str()), Some(cache_details.cache_id_string.clone()), + |_request: ClusterCacheContentsRequest| async move { + Some(match cache_details.cache.contents().await { + Ok(contents) => Box::pin(contents.map(|maybe_cache_element| { + maybe_cache_element.map_err(|error| error.to_string()) + })) as _, + Err(error) => { + error!(%error, "Failed to get contents"); + + Box::pin(stream::once(async move { + Err(format!("Failed to get contents: {error}")) + })) as _ + } + }) + }, ) + .instrument(info_span!("", cache_id = %cache_details.cache_id)) .await - .map_err(|error| { - anyhow!( - "Failed to subscribe to contents requests for cache {}: {}", - cache_details.cache_id, - error - ) - })? - .fuse(); - - loop { - select! { - maybe_message = subscription.next() => { - let Some(message) = maybe_message else { - break; - }; - - // Create background task for concurrent processing - processing.push(Box::pin( - process_contents_request( - nats_client, - cache_details, - message, - ) - .instrument(info_span!("", %cache_index)) - )); - } - _ = processing.next() => { - // Nothing to do here - } - } - } - - Ok(()) }) .collect::>() .next() .await .ok_or_else(|| anyhow!("No caches"))? } - -async fn process_contents_request( - nats_client: &NatsClient, - cache_details: &CacheDetails<'_, C>, - request: StreamRequest, -) where - C: PieceCache, -{ - trace!(?request, "Contents request"); - - match cache_details.cache.contents().await { - Ok(contents) => { - nats_client - .stream_response::( - request.response_subject, - contents.map(|maybe_cache_element| { - maybe_cache_element.map_err(|error| error.to_string()) - }), - ) - .await; - } - Err(error) => { - error!( - %error, - cache_id = %cache_details.cache_id, - "Failed to get contents" - ); - - nats_client - .stream_response::( - request.response_subject, - pin!(stream::once(async move { - Err(format!("Failed to get contents: {error}")) - })), - ) - .await; - } - } -} diff --git a/crates/subspace-farmer/src/cluster/farmer.rs b/crates/subspace-farmer/src/cluster/farmer.rs index dcc81ba2378..4c184c1b2fc 100644 --- a/crates/subspace-farmer/src/cluster/farmer.rs +++ b/crates/subspace-farmer/src/cluster/farmer.rs @@ -9,7 +9,7 @@ use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast; use crate::cluster::nats_client::{ - GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient, StreamRequest, + GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient, }; use crate::farm::{ Farm, FarmError, FarmId, FarmingNotification, HandlerFn, HandlerId, PieceReader, @@ -23,7 +23,7 @@ use futures::channel::mpsc; use futures::stream::FuturesUnordered; use futures::{select, stream, FutureExt, Stream, StreamExt}; use parity_scale_codec::{Decode, Encode}; -use std::future::{pending, Future}; +use std::future::Future; use std::pin::{pin, Pin}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -33,7 +33,7 @@ use subspace_core_primitives::sectors::SectorIndex; use subspace_farmer_components::plotting::PlottedSector; use subspace_rpc_primitives::SolutionResponse; use tokio::time::MissedTickBehavior; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, error, info_span, trace, warn, Instrument}; const BROADCAST_NOTIFICATIONS_BUFFER: usize = 1000; const MIN_FARMER_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1); @@ -591,46 +591,33 @@ async fn plotted_sectors_responder( farms_details .iter() .map(|farm_details| async move { - // Initialize with pending future so it never ends - let mut processing = FuturesUnordered::from_iter([ - Box::pin(pending()) as Pin + Send>> - ]); - let mut subscription = nats_client - .subscribe_to_stream_requests( + nats_client + .stream_request_responder::<_, _, Pin + Send>>, _>( Some(&farm_details.farm_id_string), Some(farm_details.farm_id_string.clone()), + |_request: ClusterFarmerPlottedSectorsRequest| async move { + Some(match farm_details.plotted_sectors.get().await { + Ok(plotted_sectors) => { + Box::pin(plotted_sectors.map(|maybe_plotted_sector| { + maybe_plotted_sector.map_err(|error| error.to_string()) + })) as _ + } + Err(error) => { + error!( + %error, + farm_id = %farm_details.farm_id, + "Failed to get plotted sectors" + ); + + Box::pin(stream::once(async move { + Err(format!("Failed to get plotted sectors: {error}")) + })) as _ + } + }) + }, ) + .instrument(info_span!("", cache_id = %farm_details.farm_id)) .await - .map_err(|error| { - anyhow!( - "Failed to subscribe to plotted sectors requests for farm {}: {}", - farm_details.farm_id, - error - ) - })? - .fuse(); - - loop { - select! { - maybe_message = subscription.next() => { - let Some(message) = maybe_message else { - break; - }; - - // Create background task for concurrent processing - processing.push(Box::pin(process_plotted_sectors_request( - nats_client, - farm_details, - message, - ))); - } - _ = processing.next() => { - // Nothing to do here - } - } - } - - Ok(()) }) .collect::>() .next() @@ -638,43 +625,6 @@ async fn plotted_sectors_responder( .ok_or_else(|| anyhow!("No farms"))? } -async fn process_plotted_sectors_request( - nats_client: &NatsClient, - farm_details: &FarmDetails, - request: StreamRequest, -) { - trace!(?request, "Plotted sectors request"); - - match farm_details.plotted_sectors.get().await { - Ok(plotted_sectors) => { - nats_client - .stream_response::( - request.response_subject, - plotted_sectors.map(|maybe_plotted_sector| { - maybe_plotted_sector.map_err(|error| error.to_string()) - }), - ) - .await; - } - Err(error) => { - error!( - %error, - farm_id = %farm_details.farm_id, - "Failed to get plotted sectors" - ); - - nats_client - .stream_response::( - request.response_subject, - pin!(stream::once(async move { - Err(format!("Failed to get plotted sectors: {error}")) - })), - ) - .await; - } - } -} - async fn read_piece_responder( nats_client: &NatsClient, farms_details: &[FarmDetails], @@ -696,6 +646,7 @@ async fn read_piece_responder( ) }, ) + .instrument(info_span!("", cache_id = %farm_details.farm_id)) .await }) .collect::>() diff --git a/crates/subspace-farmer/src/cluster/nats_client.rs b/crates/subspace-farmer/src/cluster/nats_client.rs index 7619894f4e2..be2d53b508d 100644 --- a/crates/subspace-farmer/src/cluster/nats_client.rs +++ b/crates/subspace-farmer/src/cluster/nats_client.rs @@ -28,7 +28,7 @@ use futures::{select, FutureExt, Stream, StreamExt}; use parity_scale_codec::{Decode, Encode}; use std::any::type_name; use std::collections::VecDeque; -use std::future::{pending, Future}; +use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; @@ -57,7 +57,8 @@ pub trait GenericRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static { type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static; } -/// Generic stream request where response is streamed using [`NatsClient::stream_response`]. +/// Generic stream request where response is streamed using +/// [`NatsClient::stream_request_responder`]. /// /// Used for cases where a large payload that doesn't fit into NATS message needs to be sent or /// there is a very large number of messages to send. For simple request/response patten @@ -66,15 +67,15 @@ pub trait GenericStreamRequest: Encode + Decode + fmt::Debug + Send + Sync + 'st /// Request subject with optional `*` in place of application instance to receive the request const SUBJECT: &'static str; /// Response type that corresponds to this stream request. These responses are send as a stream - /// of [`GenericStreamResponses`] messages. + /// of messages. type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static; } -/// Messages sent in response to [`StreamRequest`]. +/// Messages sent in response to [`GenericStreamRequest`]. /// /// Empty list of responses means the end of the stream. #[derive(Debug, Encode, Decode)] -pub enum GenericStreamResponses { +enum GenericStreamResponses { /// Some responses, but the stream didn't end yet Continue { /// Monotonically increasing index of responses in a stream @@ -132,35 +133,6 @@ impl GenericStreamResponses { } } -/// Generic stream request that expects a stream of responses. -/// -/// Internally it is expected that [`GenericStreamResponses`] messages will be -/// sent to auto-generated subject specified in `response_subject` field. -#[derive(Debug, Encode, Decode)] -#[non_exhaustive] -pub struct StreamRequest -where - Request: GenericStreamRequest, -{ - /// Request - pub request: Request, - /// Topic to send a stream of [`GenericStreamResponses`]s to - pub response_subject: String, -} - -impl StreamRequest -where - Request: GenericStreamRequest, -{ - /// Create new stream request - pub fn new(request: Request) -> Self { - Self { - request, - response_subject: format!("stream-response.{}", Ulid::new()), - } - } -} - /// Stream request error #[derive(Debug, Error)] pub enum StreamRequestError { @@ -172,8 +144,8 @@ pub enum StreamRequestError { Publish(#[from] PublishError), } -/// Wrapper around subscription that transforms [`GenericStreamResponses`] messages into a -/// normal `Response` stream. +/// Wrapper around subscription that transforms stream of wrapped response messages into a normal +/// `Response` stream. #[derive(Debug, Deref, DerefMut)] #[pin_project::pin_project] pub struct StreamResponseSubscriber { @@ -550,25 +522,17 @@ impl NatsClient { OP: Fn(Request) -> F + Send + Sync, { // Initialize with pending future so it never ends - let mut processing = FuturesUnordered::from_iter([ - Box::pin(pending()) as Pin + Send>> - ]); + let mut processing = FuturesUnordered::new(); - let subject = subject_with_instance(Request::SUBJECT, instance); - let subscription = if let Some(queue_group) = queue_group { - self.inner - .client - .queue_subscribe(subject, queue_group) - .await - } else { - self.inner.client.subscribe(subject).await - } - .map_err(|error| { - anyhow!( - "Failed to subscribe to {} requests for {instance:?}: {error}", - type_name::(), - ) - })?; + let subscription = self + .common_subscribe(Request::SUBJECT, instance, queue_group) + .await + .map_err(|error| { + anyhow!( + "Failed to subscribe to {} requests for {instance:?}: {error}", + type_name::(), + ) + })?; debug!( request_type = %type_name::(), @@ -579,19 +543,22 @@ impl NatsClient { loop { select! { - maybe_message = subscription.next() => { - let Some(message) = maybe_message else { - break; - }; - + message = subscription.select_next_some() => { // Create background task for concurrent processing - processing.push(Box::pin(self.process_request( - message, - &process, - ))); - } + processing.push( + self + .process_request( + message, + &process, + ) + .in_current_span(), + ); + }, _ = processing.next() => { // Nothing to do here + }, + complete => { + break; } } } @@ -663,36 +630,163 @@ impl NatsClient { where Request: GenericStreamRequest, { - let stream_request = StreamRequest::new(request); + let stream_request_subject = subject_with_instance(Request::SUBJECT, instance); + let stream_response_subject = format!("stream-response.{}", Ulid::new()); let subscriber = self .inner .client - .subscribe(stream_request.response_subject.clone()) + .subscribe(stream_response_subject.clone()) .await?; - let stream_request_subject = subject_with_instance(Request::SUBJECT, instance); debug!( request_type = %type_name::(), %stream_request_subject, + %stream_response_subject, ?subscriber, "Stream request subscription" ); self.inner .client - .publish(stream_request_subject, stream_request.encode().into()) + .publish_with_reply( + stream_request_subject, + stream_response_subject.clone(), + request.encode().into(), + ) .await?; Ok(StreamResponseSubscriber::new( subscriber, - stream_request.response_subject, + stream_response_subject, self.clone(), )) } + /// Responds to stream requests from the given subject using the provided processing function. + /// + /// This will create a subscription on the subject for the given instance (if provided) and + /// queue group. Incoming messages will be deserialized as the request type `Request` and passed + /// to the `process` function to produce a stream response of type `Request::Response`. The + /// stream response will then be sent back on the reply subject from the original request. + /// + /// Each request is processed in a newly created async tokio task. + /// + /// # Arguments + /// + /// * `instance` - Optional instance name to use in place of the `*` in the subject + /// * `group` - The queue group name for the subscription + /// * `process` - The function to call with the decoded request to produce a response + pub async fn stream_request_responder( + &self, + instance: Option<&str>, + queue_group: Option, + process: OP, + ) -> anyhow::Result<()> + where + Request: GenericStreamRequest, + F: Future> + Send, + S: Stream + Unpin, + OP: Fn(Request) -> F + Send + Sync, + { + // Initialize with pending future so it never ends + let mut processing = FuturesUnordered::new(); + + let subscription = self + .common_subscribe(Request::SUBJECT, instance, queue_group) + .await + .map_err(|error| { + anyhow!( + "Failed to subscribe to {} stream requests for {instance:?}: {error}", + type_name::(), + ) + })?; + + debug!( + request_type = %type_name::(), + ?subscription, + "Stream requests subscription" + ); + let mut subscription = subscription.fuse(); + + loop { + select! { + message = subscription.select_next_some() => { + // Create background task for concurrent processing + processing.push( + self + .process_stream_request( + message, + &process, + ) + .in_current_span(), + ); + }, + _ = processing.next() => { + // Nothing to do here + }, + complete => { + break; + } + } + } + + Ok(()) + } + + async fn process_stream_request(&self, message: Message, process: OP) + where + Request: GenericStreamRequest, + F: Future> + Send, + S: Stream + Unpin, + OP: Fn(Request) -> F + Send + Sync, + { + let Some(reply_subject) = message.reply else { + return; + }; + + let message_payload_size = message.payload.len(); + let request = match Request::decode(&mut message.payload.as_ref()) { + Ok(request) => { + // Free allocation early + drop(message.payload); + request + } + Err(error) => { + warn!( + request_type = %type_name::(), + %error, + message = %hex::encode(message.payload), + "Failed to decode request" + ); + return; + } + }; + + // Avoid printing large messages in logs + if message_payload_size > 1024 { + trace!( + request_type = %type_name::(), + %reply_subject, + "Processing request" + ); + } else { + trace!( + request_type = %type_name::(), + ?request, + %reply_subject, + "Processing request" + ); + } + + if let Some(stream) = process(request).await { + self.stream_response::(reply_subject, stream) + .await; + } + } + /// Helper method to send responses to requests initiated with [`Self::stream_request`] - pub async fn stream_response(&self, response_subject: String, response_stream: S) + async fn stream_response(&self, response_subject: Subject, response_stream: S) where Request: GenericStreamRequest, S: Stream + Unpin, @@ -975,20 +1069,6 @@ impl NatsClient { .await } - /// Simple subscription that will produce decoded stream requests, while skipping messages that - /// fail to decode - pub async fn subscribe_to_stream_requests( - &self, - instance: Option<&str>, - queue_group: Option, - ) -> Result>, SubscribeError> - where - Request: GenericStreamRequest, - { - self.simple_subscribe(Request::SUBJECT, instance, queue_group) - .await - } - /// Simple subscription that will produce decoded notifications, while skipping messages that /// fail to decode pub async fn subscribe_to_notifications( @@ -1028,6 +1108,30 @@ impl NatsClient { where Message: Decode, { + let subscriber = self + .common_subscribe(subject, instance, queue_group) + .await?; + debug!( + %subject, + message_type = %type_name::(), + ?subscriber, + "Simple subscription" + ); + + Ok(SubscriberWrapper { + subscriber, + _phantom: PhantomData, + }) + } + + /// Simple subscription that will produce decoded messages, while skipping messages that fail to + /// decode + async fn common_subscribe( + &self, + subject: &'static str, + instance: Option<&str>, + queue_group: Option, + ) -> Result { let subscriber = if let Some(queue_group) = queue_group { self.inner .client @@ -1039,17 +1143,8 @@ impl NatsClient { .subscribe(subject_with_instance(subject, instance)) .await? }; - debug!( - %subject, - message_type = %type_name::(), - ?subscriber, - "Simple subscription" - ); - Ok(SubscriberWrapper { - subscriber, - _phantom: PhantomData, - }) + Ok(subscriber) } } diff --git a/crates/subspace-farmer/src/cluster/plotter.rs b/crates/subspace-farmer/src/cluster/plotter.rs index 1b112e40333..15856d0a2e1 100644 --- a/crates/subspace-farmer/src/cluster/plotter.rs +++ b/crates/subspace-farmer/src/cluster/plotter.rs @@ -6,9 +6,7 @@ //! implementation designed to work with cluster plotter and a service function to drive the backend //! part of the plotter. -use crate::cluster::nats_client::{ - GenericRequest, GenericStreamRequest, NatsClient, StreamRequest, -}; +use crate::cluster::nats_client::{GenericRequest, GenericStreamRequest, NatsClient}; use crate::plotter::{Plotter, SectorPlottingProgress}; use crate::utils::AsyncJoinOnDrop; use anyhow::anyhow; @@ -19,14 +17,16 @@ use bytes::Bytes; use derive_more::Display; use event_listener_primitives::{Bag, HandlerId}; use futures::channel::mpsc; +use futures::future::FusedFuture; use futures::stream::FuturesUnordered; use futures::{select, stream, FutureExt, Sink, SinkExt, StreamExt}; use parity_scale_codec::{Decode, Encode}; use std::error::Error; -use std::future::{pending, Future}; +use std::future::pending; use std::num::NonZeroUsize; -use std::pin::{pin, Pin}; +use std::pin::pin; use std::sync::Arc; +use std::task::Poll; use std::time::{Duration, Instant}; use subspace_core_primitives::sectors::SectorIndex; use subspace_core_primitives::PublicKey; @@ -754,56 +754,51 @@ where { let plotter_id_string = plotter_id.to_string(); - // Initialize with pending future so it never ends - let mut processing = FuturesUnordered::from_iter([ - Box::pin(pending()) as Pin + Send>> - ]); - let subscription = nats_client - .subscribe_to_stream_requests(Some(&plotter_id_string), Some(plotter_id_string.clone())) - .await - .map_err(|error| anyhow!("Failed to subscribe to plot sector requests: {}", error))?; - debug!(?subscription, "Plot sector subscription"); - let mut subscription = subscription.fuse(); - - loop { - select! { - maybe_message = subscription.next() => { - let Some(message) = maybe_message else { - break; - }; + nats_client + .stream_request_responder( + Some(&plotter_id_string), + Some(plotter_id_string.clone()), + |request| async move { + let (progress_sender, mut progress_receiver) = mpsc::channel(10); + + let fut = + process_plot_sector_request(nats_client, plotter, request, progress_sender); + let mut fut = Box::pin(fut.fuse()); + + Some( + // Drive above future and stream back any pieces that were downloaded so far + stream::poll_fn(move |cx| { + if !fut.is_terminated() { + // Result doesn't matter, we'll need to poll stream below anyway + let _ = fut.poll_unpin(cx); + } - // Create background task for concurrent processing - processing.push(Box::pin(process_plot_sector_request( - nats_client, - plotter, - message, - ))); - } - _ = processing.next() => { - // Nothing to do here - } - } - } + if let Poll::Ready(maybe_result) = progress_receiver.poll_next_unpin(cx) { + return Poll::Ready(maybe_result); + } - Ok(()) + // Exit will be done by the stream above + Poll::Pending + }), + ) + }, + ) + .await } async fn process_plot_sector_request

( nats_client: &NatsClient, plotter: &P, - request: StreamRequest, + request: ClusterPlotterPlotSectorRequest, + mut response_proxy_sender: mpsc::Sender, ) where P: Plotter, { - let StreamRequest { - request: - ClusterPlotterPlotSectorRequest { - public_key, - sector_index, - farmer_protocol_info, - pieces_in_sector, - }, - response_subject, + let ClusterPlotterPlotSectorRequest { + public_key, + sector_index, + farmer_protocol_info, + pieces_in_sector, } = request; // Wrapper future just for instrumentation below @@ -825,26 +820,16 @@ async fn process_plot_sector_request

( { debug!("Plotter is currently occupied and can't plot more sectors"); - nats_client - .stream_response::( - response_subject, - pin!(stream::once(async move { - ClusterSectorPlottingProgress::Occupied - })), - ) - .await; + if let Err(error) = response_proxy_sender + .send(ClusterSectorPlottingProgress::Occupied) + .await + { + warn!(%error, "Failed to send plotting progress"); + return; + } return; } - let (mut response_proxy_sender, response_proxy_receiver) = mpsc::channel(10); - - let response_streaming_fut = nats_client - .stream_response::( - response_subject, - response_proxy_receiver, - ) - .fuse(); - let mut response_streaming_fut = pin!(response_streaming_fut); let progress_proxy_fut = { let mut response_proxy_sender = response_proxy_sender.clone(); let approximate_max_message_size = nats_client.approximate_max_message_size(); @@ -877,11 +862,6 @@ async fn process_plot_sector_request

( }; select! { - _ = response_streaming_fut => { - warn!("Response sending ended early"); - - return; - } _ = progress_proxy_fut.fuse() => { // Done } @@ -890,9 +870,6 @@ async fn process_plot_sector_request

( } } - // Drain remaining progress messages - response_streaming_fut.await; - info!("Finished plotting sector successfully"); }; diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 86409eceeb6..9168e0b25f6 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -16,6 +16,7 @@ use crate::utils::run_future_in_dedicated_thread; use async_lock::RwLock as AsyncRwLock; use event_listener_primitives::{Bag, HandlerId}; use futures::channel::mpsc; +use futures::future::FusedFuture; use futures::stream::{FuturesOrdered, FuturesUnordered}; use futures::{select, stream, FutureExt, SinkExt, Stream, StreamExt}; use prometheus_client::registry::Registry; @@ -1188,7 +1189,7 @@ where let (tx, mut rx) = mpsc::unbounded(); - let mut fut = Box::pin(async move { + let fut = async move { let tx = &tx; let mut reading_from_piece_cache = reading_from_piece_cache @@ -1300,17 +1301,22 @@ where }; join!(reading_from_piece_cache_fut, reading_from_plot_cache_fut).await - }); + }; + let mut fut = Box::pin(fut.fuse()); // Drive above future and stream back any pieces that were downloaded so far stream::poll_fn(move |cx| { - let end_result = fut.poll_unpin(cx); + if !fut.is_terminated() { + // Result doesn't matter, we'll need to poll stream below anyway + let _ = fut.poll_unpin(cx); + } - if let Ok(maybe_result) = rx.try_next() { + if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) { return Poll::Ready(maybe_result); } - end_result.map(|((), ())| None) + // Exit will be done by the stream above + Poll::Pending }) } diff --git a/crates/subspace-farmer/src/farmer_piece_getter.rs b/crates/subspace-farmer/src/farmer_piece_getter.rs index 14b5fc49ace..e28ca167a8c 100644 --- a/crates/subspace-farmer/src/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/farmer_piece_getter.rs @@ -9,6 +9,7 @@ use backoff::backoff::Backoff; use backoff::future::retry; use backoff::ExponentialBackoff; use futures::channel::mpsc; +use futures::future::FusedFuture; use futures::stream::FuturesUnordered; use futures::{stream, FutureExt, Stream, StreamExt}; use std::fmt; @@ -288,7 +289,7 @@ where { let (tx, mut rx) = mpsc::unbounded(); - let mut fut = Box::pin(async move { + let fut = async move { let tx = &tx; debug!("Getting pieces from farmer cache"); @@ -373,6 +374,10 @@ where .collect::>() .await; + if pieces_not_found_on_node.is_empty() { + return; + } + debug!( remaining_piece_count = %pieces_not_found_on_node.len(), "Some pieces were not easily reachable" @@ -389,17 +394,22 @@ where // Simply drain everything .for_each(|()| async {}) .await; - }); + }; + let mut fut = Box::pin(fut.fuse()); // Drive above future and stream back any pieces that were downloaded so far Ok(Box::new(stream::poll_fn(move |cx| { - let end_result = fut.poll_unpin(cx); + if !fut.is_terminated() { + // Result doesn't matter, we'll need to poll stream below anyway + let _ = fut.poll_unpin(cx); + } - if let Ok(maybe_result) = rx.try_next() { + if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) { return Poll::Ready(maybe_result); } - end_result.map(|()| None) + // Exit will be done by the stream above + Poll::Pending }))) } } diff --git a/crates/subspace-gateway-rpc/Cargo.toml b/crates/subspace-gateway-rpc/Cargo.toml new file mode 100644 index 00000000000..4380666357a --- /dev/null +++ b/crates/subspace-gateway-rpc/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "subspace-gateway-rpc" +version = "0.1.0" +authors = ["Teor "] +description = "A Subspace Network data gateway." +edition = "2021" +license = "MIT OR Apache-2.0" +homepage = "https://subspace.network" +repository = "https://github.com/autonomys/subspace" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[dependencies] +hex = "0.4.3" +jsonrpsee = { version = "0.24.5", features = ["server", "macros"] } +serde = { version = "1.0.110", default-features = false, features = ["alloc", "derive"] } +subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" } +subspace-data-retrieval = { version = "0.1.0", path = "../../shared/subspace-data-retrieval" } +thiserror = "1.0.64" +tracing = "0.1.40" diff --git a/crates/subspace-gateway-rpc/README.md b/crates/subspace-gateway-rpc/README.md new file mode 100644 index 00000000000..781d715a8a3 --- /dev/null +++ b/crates/subspace-gateway-rpc/README.md @@ -0,0 +1,43 @@ +## Subspace Gateway RPCs + +RPC API for Subspace Gateway. + +### Using the gateway RPCs + +The gateway RPCs can fetch data using object mappings supplied by a node. + +Launch a node using the instructions in its README, and wait for mappings from the node RPCs: +```bash +$ websocat --jsonrpc ws://127.0.0.1:9944 +subspace_subscribeObjectMappings +``` + +```json +{ + "jsonrpc": "2.0", + "method": "subspace_archived_object_mappings", + "params": { + "subscription": "o7M85uu9ir39R5PJ", + "result": { + "v0": { + "objects": [ + ["0000000000000000000000000000000000000000000000000000000000000000", 0, 0] + ] + } + } + } +} +``` + +Then use those mappings to get object data from the gateway RPCs: +```bash +$ websocat --jsonrpc ws://127.0.0.1:9955 +subspace_fetchObject ["v0": { "objects": [["0000000000000000000000000000000000000000000000000000000000000000", 0, 0]]}] +``` + +```json +{ + "jsonrpc": "2.0", + "result": ["00000000"] +} +``` diff --git a/crates/subspace-gateway-rpc/src/lib.rs b/crates/subspace-gateway-rpc/src/lib.rs new file mode 100644 index 00000000000..af04fed933f --- /dev/null +++ b/crates/subspace-gateway-rpc/src/lib.rs @@ -0,0 +1,157 @@ +//! RPC API for the Subspace Gateway. + +use jsonrpsee::core::async_trait; +use jsonrpsee::proc_macros::rpc; +use jsonrpsee::types::{ErrorObject, ErrorObjectOwned}; +use std::fmt; +use std::ops::{Deref, DerefMut}; +use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash}; +use subspace_core_primitives::objects::GlobalObjectMapping; +use subspace_data_retrieval::object_fetcher::{self, ObjectFetcher}; +use tracing::debug; + +const SUBSPACE_ERROR: i32 = 9000; + +/// The maximum number of objects that can be requested in a single RPC call. +/// +/// If the returned objects are large, they could overflow the RPC server (or client) buffers, +/// despite this limit. +// TODO: turn this into a CLI option +const MAX_OBJECTS_PER_REQUEST: usize = 100; + +/// Top-level error type for the RPC handler. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Too many mappings were supplied. + #[error("Mapping count {count} exceeded request limit {MAX_OBJECTS_PER_REQUEST}")] + TooManyMappings { + /// The number of supplied mappings. + count: usize, + }, + + /// The object fetcher failed. + #[error(transparent)] + ObjectFetcherError(#[from] object_fetcher::Error), + + /// The returned object data did not match the hash in the mapping. + #[error( + "Invalid object hash, mapping had {mapping_hash:?}, but fetched data had {data_hash:?}" + )] + InvalidObjectHash { + /// The expected hash from the mapping. + mapping_hash: Blake3Hash, + /// The actual hash of the returned object data. + data_hash: Blake3Hash, + }, +} + +impl From for ErrorObjectOwned { + fn from(error: Error) -> Self { + ErrorObject::owned(SUBSPACE_ERROR + 1, format!("{error:?}"), None::<()>) + } +} + +/// Binary data, encoded as hex. +#[derive(Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] +#[serde(transparent)] +pub struct HexData { + #[serde(with = "hex")] + pub data: Vec, +} + +impl fmt::Debug for HexData { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "HexData({})", hex::encode(&self.data)) + } +} + +impl fmt::Display for HexData { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", hex::encode(&self.data)) + } +} + +impl From> for HexData { + fn from(data: Vec) -> Self { + Self { data } + } +} + +impl Deref for HexData { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.data + } +} + +impl DerefMut for HexData { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.data + } +} + +/// Provides rpc methods for interacting with a Subspace DSN Gateway. +#[rpc(client, server)] +pub trait SubspaceGatewayRpcApi { + /// Get object data from DSN object mappings. + /// Returns an error if any object fetch was unsuccessful. + #[method(name = "subspace_fetchObject")] + async fn fetch_object(&self, mappings: GlobalObjectMapping) -> Result, Error>; +} +/// Subspace Gateway RPC configuration +pub struct SubspaceGatewayRpcConfig { + /// DSN object fetcher instance. + pub object_fetcher: ObjectFetcher, +} + +/// Implements the [`SubspaceGatewayRpcApiServer`] trait for interacting with the Subspace Gateway. +pub struct SubspaceGatewayRpc { + /// DSN object fetcher instance. + object_fetcher: ObjectFetcher, +} + +/// [`SubspaceGatewayRpc`] is used to fetch objects from the DSN. +impl SubspaceGatewayRpc { + /// Creates a new instance of the `SubspaceGatewayRpc` handler. + pub fn new(config: SubspaceGatewayRpcConfig) -> Self { + Self { + object_fetcher: config.object_fetcher, + } + } +} + +#[async_trait] +impl SubspaceGatewayRpcApiServer for SubspaceGatewayRpc { + async fn fetch_object(&self, mappings: GlobalObjectMapping) -> Result, Error> { + // TODO: deny unsafe RPC calls + + let count = mappings.objects().len(); + if count > MAX_OBJECTS_PER_REQUEST { + debug!(%count, %MAX_OBJECTS_PER_REQUEST, "Too many mappings in request"); + return Err(Error::TooManyMappings { count }); + } + + let mut objects = Vec::with_capacity(count); + // TODO: fetch concurrently + for mapping in mappings.objects() { + let data = self + .object_fetcher + .fetch_object(mapping.piece_index, mapping.offset) + .await?; + + let data_hash = blake3_hash(&data); + if data_hash != mapping.hash { + debug!(?data_hash, ?mapping.hash, "Retrieved data did not match mapping hash"); + return Err(Error::InvalidObjectHash { + mapping_hash: mapping.hash, + data_hash, + }); + } + + objects.push(data.into()); + } + + Ok(objects) + } +} diff --git a/crates/subspace-gateway/Cargo.toml b/crates/subspace-gateway/Cargo.toml index 739eca1990c..2ee2195d1bd 100644 --- a/crates/subspace-gateway/Cargo.toml +++ b/crates/subspace-gateway/Cargo.toml @@ -20,7 +20,7 @@ targets = ["x86_64-unknown-linux-gnu"] anyhow = "1.0.89" clap = { version = "4.5.18", features = ["derive"] } fdlimit = "0.3.0" -futures = "0.3.30" +futures = "0.3.31" mimalloc = "0.1.43" supports-color = "3.0.1" thiserror = "1.0.64" diff --git a/crates/subspace-malicious-operator/Cargo.toml b/crates/subspace-malicious-operator/Cargo.toml index 98c2f479fc3..d696749f638 100644 --- a/crates/subspace-malicious-operator/Cargo.toml +++ b/crates/subspace-malicious-operator/Cargo.toml @@ -31,7 +31,7 @@ evm-domain-runtime = { version = "0.1.0", path = "../../domains/runtime/evm" } fp-evm = { version = "3.0.0-dev", git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968" } frame-system = { default-features = false, git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } frame-system-rpc-runtime-api = { default-features = false, git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } -futures = "0.3.30" +futures = "0.3.31" hex-literal = "0.4.1" log = "0.4.22" mimalloc = "0.1.43" diff --git a/crates/subspace-networking/Cargo.toml b/crates/subspace-networking/Cargo.toml index d329bec8917..e90e9dac808 100644 --- a/crates/subspace-networking/Cargo.toml +++ b/crates/subspace-networking/Cargo.toml @@ -26,7 +26,7 @@ either = "1.13.0" event-listener-primitives = "2.0.1" # TODO: Switch to fs4 once https://github.com/al8n/fs4-rs/issues/15 is resolved fs2 = "0.4.3" -futures = "0.3.30" +futures = "0.3.31" futures-timer = "3.0.3" hex = "0.4.3" memmap2 = "0.9.5" @@ -51,9 +51,10 @@ unsigned-varint = { version = "0.8.0", features = ["futures", "asynchronous_code void = "1.0.2" [dependencies.libp2p] -# TODO: Replace with upstream once https://github.com/libp2p/rust-libp2p/issues/5626 is resolved +# TODO: Replace with upstream once https://github.com/libp2p/rust-libp2p/issues/5626 and +# https://github.com/libp2p/rust-libp2p/issues/5634 are resolved git = "https://github.com/autonomys/rust-libp2p" -rev = "458d22ef382641a6f42f7baddff99b70f33cdcc0" +rev = "ae7527453146df24aff6afed5f5b9efdffbc15b8" version = "0.54.1" default-features = false features = [ @@ -77,4 +78,4 @@ features = [ [dev-dependencies] rand = "0.8.5" # TODO: Replace with upstream once https://github.com/libp2p/rust-libp2p/issues/5626 is resolved -libp2p-swarm-test = { version = "0.4.0", git = "https://github.com/autonomys/rust-libp2p", rev = "458d22ef382641a6f42f7baddff99b70f33cdcc0" } +libp2p-swarm-test = { version = "0.4.0", git = "https://github.com/autonomys/rust-libp2p", rev = "ae7527453146df24aff6afed5f5b9efdffbc15b8" } diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index f56e64d025b..1ea65d62bc8 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -6,14 +6,14 @@ use crate::constructor; use crate::constructor::temporary_bans::TemporaryBans; use crate::constructor::LocalOnlyRecordStore; use crate::protocols::request_response::request_response_factory::{ - Event as RequestResponseEvent, IfDisconnected, OutboundFailure, RequestFailure, + Event as RequestResponseEvent, IfDisconnected, }; use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared}; use crate::utils::{is_global_address_or_dns, strip_peer_id, SubspaceMetrics}; use async_mutex::Mutex as AsyncMutex; use bytes::Bytes; use event_listener_primitives::HandlerId; -use futures::channel::{mpsc, oneshot}; +use futures::channel::mpsc; use futures::future::Fuse; use futures::{FutureExt, StreamExt}; use libp2p::autonat::{Event as AutonatEvent, NatStatus, OutboundProbeEvent}; @@ -28,7 +28,7 @@ use libp2p::kad::{ }; use libp2p::metrics::{Metrics, Recorder}; use libp2p::multiaddr::Protocol; -use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; +use libp2p::swarm::dial_opts::DialOpts; use libp2p::swarm::{DialError, SwarmEvent}; use libp2p::{Multiaddr, PeerId, Swarm, TransportError}; use nohash_hasher::IntMap; @@ -81,13 +81,6 @@ enum BootstrapCommandState { Finished, } -#[derive(Debug)] -struct PendingGenericRequest { - protocol_name: &'static str, - request: Vec, - result_sender: oneshot::Sender, RequestFailure>>, -} - /// Runner for the Node. #[must_use = "Node does not function properly unless its runner is driven forward"] pub struct NodeRunner @@ -133,7 +126,6 @@ where bootstrap_command_state: Arc>, /// Receives an event on peer address removal from the persistent storage. removed_addresses_rx: mpsc::UnboundedReceiver, - requests_pending_connections: HashMap>, /// Optional storage for the [`HandlerId`] of the address removal task. /// We keep to stop the task along with the rest of the networking. _address_removal_task_handler_id: Option, @@ -228,7 +220,6 @@ where bootstrap_addresses, bootstrap_command_state: Arc::new(AsyncMutex::new(BootstrapCommandState::default())), removed_addresses_rx, - requests_pending_connections: HashMap::new(), _address_removal_task_handler_id: address_removal_task_handler_id, } } @@ -493,18 +484,6 @@ where num_established, .. } => { - if let Some(generic_requests) = self.requests_pending_connections.remove(&peer_id) { - let request_response = &mut self.swarm.behaviour_mut().request_response; - for request in generic_requests { - request_response.send_request( - &peer_id, - request.protocol_name, - request.request, - request.result_sender, - IfDisconnected::ImmediateError, - ); - } - } // Save known addresses that were successfully dialed. if let ConnectedPoint::Dialer { address, .. } = &endpoint { // filter non-global addresses when non-globals addresses are disabled @@ -611,19 +590,6 @@ where }; } SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { - if let Some(peer_id) = &peer_id { - if let Some(generic_requests) = - self.requests_pending_connections.remove(peer_id) - { - for request in generic_requests { - // Do not care if receiver is gone - let _: Result<(), _> = request - .result_sender - .send(Err(RequestFailure::Network(OutboundFailure::DialFailure))); - } - } - } - if let Some(peer_id) = &peer_id { let should_ban_temporarily = self.should_temporary_ban_on_dial_error(peer_id, &error); @@ -1469,38 +1435,14 @@ where request, result_sender, } => { - // TODO: Ideally it'd be much simpler with https://github.com/libp2p/rust-libp2p/issues/5634 - if !addresses.is_empty() - && !self - .swarm - .connected_peers() - .any(|candidate| candidate == &peer_id) - { - self.requests_pending_connections - .entry(peer_id) - .or_default() - .push(PendingGenericRequest { - protocol_name, - request, - result_sender, - }); - if let Err(error) = self.swarm.dial( - DialOpts::peer_id(peer_id) - .addresses(addresses) - .condition(PeerCondition::DisconnectedAndNotDialing) - .build(), - ) { - warn!(%error, "Failed to dial disconnected peer on generic request"); - } - } else { - self.swarm.behaviour_mut().request_response.send_request( - &peer_id, - protocol_name, - request, - result_sender, - IfDisconnected::TryConnect, - ); - } + self.swarm.behaviour_mut().request_response.send_request( + &peer_id, + protocol_name, + request, + result_sender, + IfDisconnected::TryConnect, + addresses, + ); } Command::GetProviders { key, diff --git a/crates/subspace-networking/src/protocols/request_response/request_response_factory.rs b/crates/subspace-networking/src/protocols/request_response/request_response_factory.rs index 5df4d9de1db..9c576158fac 100644 --- a/crates/subspace-networking/src/protocols/request_response/request_response_factory.rs +++ b/crates/subspace-networking/src/protocols/request_response/request_response_factory.rs @@ -379,10 +379,11 @@ impl RequestResponseFactoryBehaviour { request: Vec, pending_response: oneshot::Sender, RequestFailure>>, connect: IfDisconnected, + addresses: Vec, ) { if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) { if protocol.is_connected(target) || connect.should_connect() { - let request_id = protocol.send_request(target, request); + let request_id = protocol.send_request(target, request, addresses); let prev_req_id = self.pending_requests.insert( (protocol_name.to_string().into(), request_id).into(), (Instant::now(), pending_response), diff --git a/crates/subspace-networking/src/protocols/request_response/request_response_factory/tests.rs b/crates/subspace-networking/src/protocols/request_response/request_response_factory/tests.rs index bce255b744c..baf21fd2314 100644 --- a/crates/subspace-networking/src/protocols/request_response/request_response_factory/tests.rs +++ b/crates/subspace-networking/src/protocols/request_response/request_response_factory/tests.rs @@ -132,6 +132,7 @@ async fn basic_request_response_works() { b"this is a request".to_vec(), sender, IfDisconnected::ImmediateError, + Vec::new(), ); // Wait for request to finish loop { @@ -209,6 +210,7 @@ async fn max_response_size_exceeded() { b"this is a request".to_vec(), sender, IfDisconnected::ImmediateError, + Vec::new(), ); // Wait for request to finish loop { @@ -349,6 +351,7 @@ async fn request_id_collision() { b"this is a request 1".to_vec(), sender_1, IfDisconnected::ImmediateError, + Vec::new(), ); swarm_1.behaviour_mut().send_request( &peer_id_2, @@ -356,6 +359,7 @@ async fn request_id_collision() { b"this is a request 2".to_vec(), sender_2, IfDisconnected::ImmediateError, + Vec::new(), ); // Expect both to finish loop { diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index e0cf6d21669..76e134d982b 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -10,6 +10,7 @@ use crate::utils::multihash::ToMultihash; use crate::{Multihash, Node}; use async_trait::async_trait; use futures::channel::mpsc; +use futures::future::FusedFuture; use futures::stream::FuturesUnordered; use futures::task::noop_waker_ref; use futures::{stream, FutureExt, Stream, StreamExt}; @@ -89,22 +90,27 @@ where PieceIndices: IntoIterator + 'a, { let (tx, mut rx) = mpsc::unbounded(); - let mut fut = Box::pin(get_from_cache_inner( + let fut = get_from_cache_inner( piece_indices.into_iter(), &self.node, &self.piece_validator, tx, - )); + ); + let mut fut = Box::pin(fut.fuse()); // Drive above future and stream back any pieces that were downloaded so far stream::poll_fn(move |cx| { - let end_result = fut.poll_unpin(cx); + if !fut.is_terminated() { + // Result doesn't matter, we'll need to poll stream below anyway + let _ = fut.poll_unpin(cx); + } - if let Ok(maybe_result) = rx.try_next() { + if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) { return Poll::Ready(maybe_result); } - end_result.map(|()| None) + // Exit will be done by the stream above + Poll::Pending }) } @@ -529,6 +535,10 @@ where break; }; + if connected_peers.is_empty() || pieces_to_download.is_empty() { + break; + } + let num_pieces = pieces_to_download.len(); let step = num_pieces / connected_peers.len().min(num_pieces); diff --git a/crates/subspace-node/Cargo.toml b/crates/subspace-node/Cargo.toml index 9d389366592..8b197a10ab1 100644 --- a/crates/subspace-node/Cargo.toml +++ b/crates/subspace-node/Cargo.toml @@ -35,7 +35,7 @@ fp-evm = { version = "3.0.0-dev", git = "https://github.com/autonomys/frontier", frame-benchmarking = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42", default-features = false } frame-benchmarking-cli = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42", default-features = false } frame-support = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } -futures = "0.3.30" +futures = "0.3.31" hex = "0.4.3" hex-literal = "0.4.1" mimalloc = "0.1.43" diff --git a/crates/subspace-service/Cargo.toml b/crates/subspace-service/Cargo.toml index 30479a35c1b..ec9668805ef 100644 --- a/crates/subspace-service/Cargo.toml +++ b/crates/subspace-service/Cargo.toml @@ -22,7 +22,7 @@ async-trait = "0.1.83" cross-domain-message-gossip = { version = "0.1.0", path = "../../domains/client/cross-domain-message-gossip" } domain-runtime-primitives = { version = "0.1.0", path = "../../domains/primitives/runtime" } frame-benchmarking = { default-features = false, git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42", optional = true } -futures = "0.3.30" +futures = "0.3.31" hex = "0.4.3" jsonrpsee = { version = "0.24.5", features = ["server"] } mmr-gadget = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } diff --git a/domains/client/cross-domain-message-gossip/Cargo.toml b/domains/client/cross-domain-message-gossip/Cargo.toml index 59a73f8148b..59b75828748 100644 --- a/domains/client/cross-domain-message-gossip/Cargo.toml +++ b/domains/client/cross-domain-message-gossip/Cargo.toml @@ -14,7 +14,7 @@ include = [ [dependencies] domain-block-preprocessor = { version = "0.1.0", path = "../../client/block-preprocessor" } fp-account = { version = "1.0.0-dev", git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968" } -futures = "0.3.30" +futures = "0.3.31" parity-scale-codec = { version = "3.6.12", features = ["derive"] } parking_lot = "0.12.2" sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } diff --git a/domains/client/domain-operator/Cargo.toml b/domains/client/domain-operator/Cargo.toml index ac9544d3b16..6ccd3742f6c 100644 --- a/domains/client/domain-operator/Cargo.toml +++ b/domains/client/domain-operator/Cargo.toml @@ -9,7 +9,7 @@ codec = { package = "parity-scale-codec", version = "3.6.12", features = ["deriv domain-block-builder = { version = "0.1.0", path = "../block-builder" } domain-block-preprocessor = { version = "0.1.0", path = "../block-preprocessor" } domain-runtime-primitives = { version = "0.1.0", path = "../../primitives/runtime" } -futures = "0.3.30" +futures = "0.3.31" futures-timer = "3.0.3" parking_lot = "0.12.2" sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } diff --git a/domains/client/eth-service/Cargo.toml b/domains/client/eth-service/Cargo.toml index 046fae7ac3b..263548ba419 100644 --- a/domains/client/eth-service/Cargo.toml +++ b/domains/client/eth-service/Cargo.toml @@ -22,7 +22,7 @@ fc-rpc = { version = "2.0.0-dev", git = "https://github.com/autonomys/frontier", fc-rpc-core = { version = "1.1.0-dev", git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968" } fc-storage = { version = "1.0.0-dev", git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968" } fp-rpc = { version = "3.0.0-dev", git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968", features = ['default'] } -futures = "0.3.30" +futures = "0.3.31" jsonrpsee = { version = "0.24.5", features = ["server"] } pallet-transaction-payment-rpc = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } parity-scale-codec = "3.6.12" diff --git a/domains/client/relayer/Cargo.toml b/domains/client/relayer/Cargo.toml index 4214a845fc3..4b4bcb7106a 100644 --- a/domains/client/relayer/Cargo.toml +++ b/domains/client/relayer/Cargo.toml @@ -14,7 +14,7 @@ include = [ [dependencies] async-channel = "1.9.0" cross-domain-message-gossip = { path = "../../client/cross-domain-message-gossip" } -futures = "0.3.30" +futures = "0.3.31" parity-scale-codec = { version = "3.6.12", features = ["derive"] } sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } sc-state-db = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } diff --git a/domains/service/Cargo.toml b/domains/service/Cargo.toml index 536a1e6e9fc..8196fa3e31f 100644 --- a/domains/service/Cargo.toml +++ b/domains/service/Cargo.toml @@ -21,7 +21,7 @@ domain-client-message-relayer = { version = "0.1.0", path = "../client/relayer" domain-client-operator = { version = "0.1.0", path = "../client/domain-operator" } domain-runtime-primitives = { version = "0.1.0", path = "../primitives/runtime" } frame-benchmarking = { default-features = false, git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42", optional = true } -futures = "0.3.30" +futures = "0.3.31" jsonrpsee = { version = "0.24.5", features = ["server"] } log = "0.4.22" pallet-transaction-payment-rpc = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } diff --git a/shared/subspace-data-retrieval/Cargo.toml b/shared/subspace-data-retrieval/Cargo.toml index 6fa44cb679a..6c5ec9e6f99 100644 --- a/shared/subspace-data-retrieval/Cargo.toml +++ b/shared/subspace-data-retrieval/Cargo.toml @@ -14,7 +14,7 @@ include = [ [dependencies] async-lock = "3.4.0" async-trait = "0.1.83" -futures = "0.3.30" +futures = "0.3.31" parity-scale-codec = { version = "3.6.12", features = ["derive"] } subspace-archiving = { version = "0.1.0", path = "../../crates/subspace-archiving" } subspace-core-primitives = { version = "0.1.0", path = "../../crates/subspace-core-primitives" } diff --git a/test/subspace-test-client/Cargo.toml b/test/subspace-test-client/Cargo.toml index aea3ba2d823..901fe1beebc 100644 --- a/test/subspace-test-client/Cargo.toml +++ b/test/subspace-test-client/Cargo.toml @@ -20,7 +20,7 @@ codec = { package = "parity-scale-codec", version = "3.6.12", features = ["deriv domain-runtime-primitives = { version = "0.1.0", path = "../../domains/primitives/runtime" } evm-domain-test-runtime = { version = "0.1.0", path = "../../domains/test/runtime/evm" } fp-evm = { version = "3.0.0-dev", git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968" } -futures = "0.3.30" +futures = "0.3.31" schnorrkel = "0.11.4" sc-chain-spec = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } diff --git a/test/subspace-test-service/Cargo.toml b/test/subspace-test-service/Cargo.toml index 2fffe71ca5b..77fd9ca44df 100644 --- a/test/subspace-test-service/Cargo.toml +++ b/test/subspace-test-service/Cargo.toml @@ -21,7 +21,7 @@ codec = { package = "parity-scale-codec", version = "3.6.12", features = ["deriv domain-client-message-relayer = { version = "0.1.0", path = "../../domains/client/relayer" } domain-runtime-primitives = { version = "0.1.0", path = "../../domains/primitives/runtime" } frame-system = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } -futures = "0.3.30" +futures = "0.3.31" jsonrpsee = { version = "0.24.5", features = ["server"] } pallet-transaction-payment = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } mmr-gadget = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" }