diff --git a/.github/workflows/build-api.yml b/.github/workflows/build-api.yml index 15684355a..f8e7fd198 100644 --- a/.github/workflows/build-api.yml +++ b/.github/workflows/build-api.yml @@ -12,10 +12,10 @@ on: workflow_dispatch: push: branches: - - 'main' + - "main" env: - CARGO_TERM_COLOR: always + CARGO_TERM_COLOR: always jobs: build-api: @@ -33,15 +33,15 @@ jobs: - name: set build cache uses: actions/cache@v3 with: - path: | - ~/.cargo/bin/ - ~/.cargo/registry/index/ - ~/.cargo/registry/cache/ - ~/.cargo/git/db/ - digital-asset-rpc-infrastructure/target/ - key: ${{ matrix.os }}_digital-asset-rpc-infrastructure_${{ hashFiles('digital-asset-rpc-infrastructure/Cargo.lock') }} - restore-keys: | - ${{ matrix.os }}_digital-asset-rpc-infrastructure + path: | + ~/.cargo/bin/ + ~/.cargo/registry/index/ + ~/.cargo/registry/cache/ + ~/.cargo/git/db/ + digital-asset-rpc-infrastructure/target/ + key: ${{ matrix.os }}_digital-asset-rpc-infrastructure_${{ hashFiles('digital-asset-rpc-infrastructure/Cargo.lock') }} + restore-keys: | + ${{ matrix.os }}_digital-asset-rpc-infrastructure - name: build digital asset rpc infra run: cargo build --verbose --release @@ -54,7 +54,7 @@ jobs: mv target/release/migration target/release/migration22 mv target/release/das_api target/release/das_api22 - # This steps can be omited to save space, are mostly in place to validate binaries (manually) and path to them + # This steps can be omited to save space, are mostly in place to validate binaries (manually) and path to them # Omiting this will save on storage consumption on the account - name: Publish artifact if: matrix.os == 'ubuntu-22.04' diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4ced2cd37..60f1b69ab 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,11 +8,11 @@ on: pull_request: push: branches: - - 'main' + - "main" workflow_dispatch: env: - CARGO_TERM_COLOR: always + CARGO_TERM_COLOR: always jobs: test: @@ -25,20 +25,20 @@ jobs: - name: set build cache uses: actions/cache@v3 with: - path: | - ~/.cargo/bin/ - ~/.cargo/registry/index/ - ~/.cargo/registry/cache/ - ~/.cargo/git/db/ - digital-asset-rpc-infrastructure/target/ - key: cargo-${{ hashFiles('**/Cargo.lock') }}-0001 + path: | + ~/.cargo/bin/ + ~/.cargo/registry/index/ + ~/.cargo/registry/cache/ + ~/.cargo/git/db/ + digital-asset-rpc-infrastructure/target/ + key: cargo-${{ hashFiles('**/Cargo.lock') }}-0001 # Cargo.lock - name: Check lock file run: | cargo tree git checkout Cargo.lock - cargo tree --frozen + cargo tree # fmt - name: Check fmt diff --git a/Cargo.lock b/Cargo.lock index 0f7e9aea0..3fe72e831 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -840,6 +840,60 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1fdabc7756949593fe60f30ec81974b613357de856987752631dea1e3394c80" +[[package]] +name = "autotools" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef941527c41b0fc0dd48511a8154cd5fc7e29200a0ff8b7203c5d777dbc795cf" +dependencies = [ + "cc", +] + +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backon" version = "0.4.4" @@ -1327,6 +1381,18 @@ dependencies = [ "thiserror", ] +[[package]] +name = "cargo-lock" +version = "9.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e11c675378efb449ed3ce8de78d75d0d80542fc98487c26aba28eb3b82feac72" +dependencies = [ + "semver", + "serde", + "toml 0.7.8", + "url", +] + [[package]] name = "cc" version = "1.0.94" @@ -1756,6 +1822,46 @@ dependencies = [ "tokio", ] +[[package]] +name = "das-grpc-ingest" +version = "0.7.2" +dependencies = [ + "anyhow", + "async-stream", + "atty", + "cargo-lock", + "chrono", + "clap 4.5.4", + "digital_asset_types", + "futures", + "git-version", + "hyper", + "json5", + "lazy_static", + "opentelemetry", + "opentelemetry-jaeger", + "opentelemetry_sdk", + "program_transformers", + "prometheus", + "redis 0.25.3", + "reqwest", + "rust-crypto", + "sea-orm", + "serde", + "serde_json", + "serde_yaml", + "solana-sdk", + "sqlx", + "tokio", + "tracing", + "tracing-opentelemetry", + "tracing-subscriber", + "vergen", + "yellowstone-grpc-client", + "yellowstone-grpc-proto", + "yellowstone-grpc-tools", +] + [[package]] name = "das-metadata-json" version = "0.7.2" @@ -2326,6 +2432,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flatbuffers" version = "23.5.26" @@ -2590,6 +2702,32 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "git-version" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ad568aa3db0fcbc81f2f116137f263d7304f512a1209b35b85150d3ef88ad19" +dependencies = [ + "git-version-macro", +] + +[[package]] +name = "git-version-macro" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53010ccb100b96a67bc32c0175f0ed1426b31b655d562898e57325f81c023ac0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.59", +] + +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "globset" version = "0.4.14" @@ -2874,6 +3012,18 @@ dependencies = [ "tokio-rustls 0.24.1", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -3005,6 +3155,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "integration_tests" version = "0.1.0" @@ -3109,6 +3265,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json5" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1" +dependencies = [ + "pest", + "pest_derive", + "serde", +] + [[package]] name = "jsonpath_lib" version = "0.3.0" @@ -3393,6 +3560,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -3579,6 +3752,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "multimap" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" + [[package]] name = "native-tls" version = "0.2.11" @@ -3839,6 +4018,15 @@ dependencies = [ "syn 2.0.59", ] +[[package]] +name = "num_threads" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c7398b9c8b70908f6371f47ed36737907c87c52af34c268fed0bf0ceb92ead9" +dependencies = [ + "libc", +] + [[package]] name = "number_prefix" version = "0.4.0" @@ -3943,6 +4131,87 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a" +dependencies = [ + "futures-core", + "futures-sink", + "indexmap 2.2.6", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", +] + +[[package]] +name = "opentelemetry-jaeger" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e617c66fd588e40e0dbbd66932fdc87393095b125d4459b1a3a10feb1712f8a1" +dependencies = [ + "async-trait", + "futures-core", + "futures-util", + "opentelemetry", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "thrift", + "tokio", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5774f1ef1f982ef2a447f6ee04ec383981a3ab99c8e77a1a7b30182e65bbc84" +dependencies = [ + "opentelemetry", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f16aec8a98a457a52664d69e0091bac3a0abd18ead9b641cb00202ba4e0efe4" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry", + "ordered-float 4.2.0", + "percent-encoding", + "rand 0.8.5", + "thiserror", + "tokio", + "tokio-stream", +] + +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + +[[package]] +name = "ordered-float" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a76df7075c7d4d01fdcb46c912dd17fba5b60c78ea480b475f2b6ab6f666584e" +dependencies = [ + "num-traits", +] + [[package]] name = "os_str_bytes" version = "6.6.1" @@ -4103,6 +4372,61 @@ dependencies = [ "num", ] +[[package]] +name = "pest" +version = "2.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "311fb059dee1a7b802f036316d790138c613a4e8b180c822e3925a662e9f0c95" +dependencies = [ + "memchr", + "thiserror", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f73541b156d32197eecda1a4014d7f868fd2bcb3c550d5386087cfba442bf69c" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c35eeed0a3fab112f75165fdc026b3913f4183133f19b49be773ac9ea966e8bd" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn 2.0.59", +] + +[[package]] +name = "pest_meta" +version = "2.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2adbf29bb9776f28caece835398781ab24435585fe0d4dc1374a61db5accedca" +dependencies = [ + "once_cell", + "pest", + "sha2 0.10.8", +] + +[[package]] +name = "petgraph" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" +dependencies = [ + "fixedbitset", + "indexmap 2.2.6", +] + [[package]] name = "pin-project" version = "1.1.5" @@ -4273,6 +4597,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "prettyplease" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ac2cf0f2e4f42b49f5ffd07dae8d746508ef7526c13940e5f524012ae6c6550" +dependencies = [ + "proc-macro2", + "syn 2.0.59", +] + [[package]] name = "proc-macro-crate" version = "0.1.5" @@ -4361,6 +4695,7 @@ dependencies = [ "mpl-bubblegum", "num-traits", "sea-orm", + "serde", "serde_json", "solana-sdk", "solana-transaction-status", @@ -4387,12 +4722,74 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prost" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0f5d036824e4761737860779c906171497f6d55681139d8312388f8fe398922" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1" +dependencies = [ + "bytes", + "heck 0.5.0", + "itertools 0.12.1", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.59", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19de2de2a00075bf566bee3bd4db014b11587e84184d3f7a791bc17f1a8e9e48" +dependencies = [ + "anyhow", + "itertools 0.12.1", + "proc-macro2", + "quote", + "syn 2.0.59", +] + +[[package]] +name = "prost-types" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3235c33eb02c1f1e212abdbe34c78b264b038fb58ca612664343271e36e55ffe" +dependencies = [ + "prost", +] + [[package]] name = "protobuf" version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +[[package]] +name = "protobuf-src" +version = "1.1.0+21.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7ac8852baeb3cc6fb83b93646fb93c0ffe5d14bf138c945ceb4b9948ee0e3c1" +dependencies = [ + "autotools", +] + [[package]] name = "proxy-wasm" version = "0.2.1" @@ -5364,6 +5761,9 @@ name = "semver" version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92d43fe69e652f3df9bdc2b85b2854a0825b86e4fb76bc44d945137d053639ca" +dependencies = [ + "serde", +] [[package]] name = "serde" @@ -5709,9 +6109,9 @@ dependencies = [ [[package]] name = "solana-account-decoder" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4e29f060cabd0e1bd90a63f8e1517ddd3365d3dc2eaa05f9a9fa542f4adeaaa" +checksum = "8d76c43ef61f527d719b5c6bfa5a62ebba60839739125da9e8a00fb82349afd2" dependencies = [ "Inflector", "base64 0.21.7", @@ -5734,9 +6134,9 @@ dependencies = [ [[package]] name = "solana-clap-utils" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62e5cdc0ae0c8ae79c39a4a362066d0d61764bc7ea7e033961fd7510fd24da2a" +checksum = "eb19b9bbd92eee2d8f637026559a9fb48bd98aba534caedf070498a50c91fce8" dependencies = [ "chrono", "clap 2.34.0", @@ -5751,9 +6151,9 @@ dependencies = [ [[package]] name = "solana-client" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1e2301c2af7e5a1dba0855f710329a2bb993829ed9fdf8f6207d02ee6fc54a4" +checksum = "9538e3db584a8b1e70060f1f24222b8e0429f18b607f531fb45eb826f4917265" dependencies = [ "async-trait", "bincode", @@ -5784,9 +6184,9 @@ dependencies = [ [[package]] name = "solana-config-program" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "595118948b966b110aad3f9d8d8464958abe379ecfa7a813b4fc82659c8259bc" +checksum = "e3afd4e309d304e296765cab716fb1fd66c66ec300465c8b26f8cce763275132" dependencies = [ "bincode", "chrono", @@ -5798,9 +6198,9 @@ dependencies = [ [[package]] name = "solana-connection-cache" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d363d6bb43e618b6010b47c2eb0579777ce4ed388ca15b84a610a738edf0b97e" +checksum = "92716758e8c0e1c0bc2a5ac2eb3df443a0337fd3991cd38a3b02b12c3fbd18ce" dependencies = [ "async-trait", "bincode", @@ -5820,9 +6220,9 @@ dependencies = [ [[package]] name = "solana-frozen-abi" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96734b05823c8b515f8e3cc02641a27aee2c9760b1a43c74cb20f2a1ab0ab76c" +checksum = "fb1b8230474ae9f7c841060c299999124582e8d2a0448d7847720792e98cc64e" dependencies = [ "ahash 0.8.5", "blake3", @@ -5850,9 +6250,9 @@ dependencies = [ [[package]] name = "solana-frozen-abi-macro" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a0f1291a464fd046135d019d57a81be165ee3d23aa7df880b47dac683a0582a" +checksum = "793910ab733b113b80c357f8f492dda2fabd5671c4ea03db3aa4e46b938fdbe3" dependencies = [ "proc-macro2", "quote", @@ -5862,9 +6262,9 @@ dependencies = [ [[package]] name = "solana-logger" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5977c8f24b83cf50e7139ffdb25d70bad6a177f18ccc79ca2293d6a987fa81c" +checksum = "6d3f819af39632dc538a566c937253bf46256e4c0e60f621c6db448bc7c76294" dependencies = [ "env_logger 0.9.3", "lazy_static", @@ -5873,9 +6273,9 @@ dependencies = [ [[package]] name = "solana-measure" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a39ef01b2c65552d05013b2642ffd73258f2c80e3a59e44c499762047df9456" +checksum = "cb045f0235b16f7d926f6e0338db822747d61559a1368c3cb017ba6e02c516d0" dependencies = [ "log", "solana-sdk", @@ -5883,9 +6283,9 @@ dependencies = [ [[package]] name = "solana-metrics" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ad30ff3775412f2929d440446aef8b070676920bc5df495ea6398a8f28ce91f" +checksum = "1af84362ad5804dc64ca88b1ca5c35bd41321e12d42c798ac06a6fbb60dd0e70" dependencies = [ "crossbeam-channel", "gethostname", @@ -5898,9 +6298,9 @@ dependencies = [ [[package]] name = "solana-net-utils" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6eafd5178a38a039e12c14780f1b6a74f1e672d62357343e0aee6d0fc7e5bd18" +checksum = "f8e640a95d317cad1322015c5a2b6a71697fd8dabebcb8dd33ed7f5a22869d12" dependencies = [ "bincode", "clap 3.2.25", @@ -5920,9 +6320,9 @@ dependencies = [ [[package]] name = "solana-perf" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10d6293cddcc98ae092d00f43f741405da30aa083acb96666606130810b064f3" +checksum = "4266c4bd46620a925b8d508c26578d5559e97fcff6735fd22e39f369c3996ee1" dependencies = [ "ahash 0.8.5", "bincode", @@ -5949,9 +6349,9 @@ dependencies = [ [[package]] name = "solana-program" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6412447793f8a3ef7526655906728325093b472e481791ac5c584e8d272166dc" +checksum = "581f38a870bffbe623d900c68579984671f8dfa35bbfb3309d7134de22ce8652" dependencies = [ "ark-bn254", "ark-ec", @@ -6003,9 +6403,9 @@ dependencies = [ [[package]] name = "solana-program-runtime" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1977e741a6793fca27413507457d797df0f41bc0ae634247d112bc77ab2b0325" +checksum = "490b6f65aced077e0c5e57c20f151a134458fc350905c20d7dcf3f2162eaa6f6" dependencies = [ "base64 0.21.7", "bincode", @@ -6031,9 +6431,9 @@ dependencies = [ [[package]] name = "solana-pubsub-client" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ad21dd5d6fe09116dbc29aec279b7cf08d250b564899dc87437bd780ed26290" +checksum = "c0dc2b26a7a9860f180ce11f69b0ff2a8bea0d4b9e97daee741b1e76565b3c82" dependencies = [ "crossbeam-channel", "futures-util", @@ -6056,9 +6456,9 @@ dependencies = [ [[package]] name = "solana-quic-client" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6201869768fe133ce9b8088e4f718f53ff164b8e5df3d0d46a6563a22545924f" +checksum = "727474945d51be37ffe03e7b1d6c9630da41228c7b298a8f45098c203a78ac89" dependencies = [ "async-mutex", "async-trait", @@ -6083,9 +6483,9 @@ dependencies = [ [[package]] name = "solana-rayon-threadlimit" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f100d0c3214d67bb847a1eefc7079f6bb755534266423f4c994ad3b40c685ed" +checksum = "853794cccf3bd1984419a594040dfed19666e5a9ad33b0906d4174bc394b22af" dependencies = [ "lazy_static", "num_cpus", @@ -6093,9 +6493,9 @@ dependencies = [ [[package]] name = "solana-remote-wallet" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3328c891079086b408a04e701470a346d517c9c51c0a96f2f166f616a3e1c3c8" +checksum = "b368f270526a5f92ec47c45a6b74ac304b62b08c169b45cf91e0d2f1703889bd" dependencies = [ "console", "dialoguer", @@ -6112,9 +6512,9 @@ dependencies = [ [[package]] name = "solana-rpc-client" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfacf1163a375d98c29779a03ba278b2ef43494f77e33826a33f9460563c0887" +checksum = "71b766876b0c56950ab530d8495ef7eeaeb79e162f03dadaffc0d6852de9e844" dependencies = [ "async-trait", "base64 0.21.7", @@ -6138,9 +6538,9 @@ dependencies = [ [[package]] name = "solana-rpc-client-api" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fab293a88113511e66607d76bd027edfe0b1372b467fd76bbb5af03448539a2" +checksum = "876b2e410cc2403ea3216893f05034b02a180431100eb831d0b67b14fca4d29f" dependencies = [ "base64 0.21.7", "bs58 0.4.0", @@ -6160,9 +6560,9 @@ dependencies = [ [[package]] name = "solana-rpc-client-nonce-utils" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e43cb51374a6ec8fd401b3387334ef93e04f6d8ae87bbb29892aff42aeb1061" +checksum = "ebdb3f02fb3cce3c967f718bc77b79433c24aa801b63dc70f374e8759b2424e4" dependencies = [ "clap 2.34.0", "solana-clap-utils", @@ -6173,9 +6573,9 @@ dependencies = [ [[package]] name = "solana-sdk" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de1ce8848de4198f9bc7e4574252be02b1ed86ecbc2fff506780d5f8d6e4c4a8" +checksum = "0d70ab837cc79ed67df6fdb145f1ffd544f1eaa60b0757b750f4864b90498bad" dependencies = [ "assert_matches", "base64 0.21.7", @@ -6227,9 +6627,9 @@ dependencies = [ [[package]] name = "solana-sdk-macro" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc5cc46bbda0a5472d8d0a4c846b22941436ac45c31456d3e885a387a5f264f7" +checksum = "5f9d0433c4084a3260a32ec67f6b4272c4232d15e732be542cd5dfdf0ae1e784" dependencies = [ "bs58 0.4.0", "proc-macro2", @@ -6246,9 +6646,9 @@ checksum = "468aa43b7edb1f9b7b7b686d5c3aeb6630dc1708e86e31343499dd5c4d775183" [[package]] name = "solana-streamer" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46f02b475fc20c55ebbcfa5638ff93f9b780414cc6185e3a6d0992bca0ae81ee" +checksum = "d70eda40efb5bc57ad50b1ac8452485065c1adae0e701a0348b397db054e2ab5" dependencies = [ "async-channel 1.9.0", "bytes", @@ -6268,7 +6668,6 @@ dependencies = [ "rand 0.8.5", "rcgen", "rustls 0.21.10", - "smallvec", "solana-metrics", "solana-perf", "solana-sdk", @@ -6279,9 +6678,9 @@ dependencies = [ [[package]] name = "solana-thin-client" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b6ce2304764b8bb699db734fde9cd19ace038d3895d828a557ea0ec2a9e0ecd" +checksum = "ca3c510144695c3d1ee1f84dd9975af7f7d35c168447c484bbd35c21e903c515" dependencies = [ "bincode", "log", @@ -6294,9 +6693,9 @@ dependencies = [ [[package]] name = "solana-tpu-client" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa3e2351625e26f55e5e08f8e5aadaa2380fd0649f25641d6ba3f3848dbe5c9a" +checksum = "44f27c8fec609179a7dfc287060df2a926c8cd89329235c4b8d78bd019a72462" dependencies = [ "async-trait", "bincode", @@ -6318,9 +6717,9 @@ dependencies = [ [[package]] name = "solana-transaction-status" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0841bbd1845c87043e4184961e45cc7c08b36d96d0d146256b26ea5c74630a0f" +checksum = "29f58f2f864d900eddf2e21a99ebe445b6be525d597e44952f075d8237035b8e" dependencies = [ "Inflector", "base64 0.21.7", @@ -6343,9 +6742,9 @@ dependencies = [ [[package]] name = "solana-udp-client" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bae54a100f0b0b5be065f5d05f2259f6d4a7b39f5866d579927f3ca35a01773b" +checksum = "27ead118c5d549e4345dc59cbc5d9b282164f3e5334707f186e3aa10d40e3b30" dependencies = [ "async-trait", "solana-connection-cache", @@ -6358,9 +6757,9 @@ dependencies = [ [[package]] name = "solana-version" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f69945e38d7440221e2fac0aaa57a9d72adb329b0de705ca5bd9ba981aedc16" +checksum = "532f5d631562587facc5fe88abd2e31c0d1f29012b6766c664db9f05a39fb05b" dependencies = [ "log", "rustc_version", @@ -6374,9 +6773,9 @@ dependencies = [ [[package]] name = "solana-vote-program" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e574aafc3c5adc7106ab4605d8ad378c9a12f2cf1dec2e8ba1aa6fd97a5d5490" +checksum = "c684430058b0a2e733936a8851c8843a3a6316ccd5c969d39411a479d6489642" dependencies = [ "bincode", "log", @@ -6396,9 +6795,9 @@ dependencies = [ [[package]] name = "solana-zk-token-sdk" -version = "1.17.31" +version = "1.17.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597dddc8ab46852dea7fc3d22e031fa4ffdb1b2291ac24d960605424a510a5f5" +checksum = "5aef1b48d9fdb2619349d2d15942d83c99aabe995ff945d9b418176373aa823c" dependencies = [ "aes-gcm-siv", "base64 0.21.7", @@ -7022,6 +7421,28 @@ dependencies = [ "once_cell", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "log", + "ordered-float 2.10.1", + "threadpool", +] + [[package]] name = "time" version = "0.1.45" @@ -7041,7 +7462,9 @@ checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" dependencies = [ "deranged", "itoa", + "libc", "num-conv", + "num_threads", "powerfmt", "serde", "time-core", @@ -7118,6 +7541,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.2.0" @@ -7210,6 +7643,18 @@ dependencies = [ "serde", ] +[[package]] +name = "toml" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd79e69d3b627db300ff956027cc6c3798cef26d22526befdfcd12feeb6d2257" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit 0.19.15", +] + [[package]] name = "toml" version = "0.8.12" @@ -7238,6 +7683,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ "indexmap 2.2.6", + "serde", + "serde_spanned", "toml_datetime", "winnow 0.5.40", ] @@ -7266,12 +7713,79 @@ dependencies = [ "winnow 0.6.6", ] +[[package]] +name = "tonic" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.7", + "bytes", + "flate2", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "rustls 0.21.10", + "rustls-native-certs", + "rustls-pemfile", + "tokio", + "tokio-rustls 0.24.1", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn 2.0.59", +] + +[[package]] +name = "tonic-health" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f80db390246dfb46553481f6024f0082ba00178ea495dbb99e70ba9a4fafb5e1" +dependencies = [ + "async-stream", + "prost", + "tokio", + "tokio-stream", + "tonic", +] + [[package]] name = "tower" version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -7351,6 +7865,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c67ac25c5407e7b961fafc6f7e9aa5958fd297aada2d20fa2ae1737357e55596" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.1.3" @@ -7479,6 +8011,12 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "ucd-trie" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" + [[package]] name = "uncased" version = "0.9.10" @@ -7600,6 +8138,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf-8" version = "0.7.6" @@ -7646,6 +8190,18 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" +[[package]] +name = "vergen" +version = "8.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e27d6bdd219887a9eadd19e1c34f32e47fa332301184935c6d9bca26f3cca525" +dependencies = [ + "anyhow", + "rustc_version", + "rustversion", + "time 0.3.36", +] + [[package]] name = "version_check" version = "0.9.4" @@ -7779,6 +8335,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa30049b1c872b72c89866d458eae9f20380ab280ffd1b1e18df2d3e2d98cfe0" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki" version = "0.22.4" @@ -8090,6 +8656,65 @@ dependencies = [ "time 0.3.36", ] +[[package]] +name = "yellowstone-grpc-client" +version = "1.15.0+solana.1.17.28" +source = "git+https://github.com/rpcpool/yellowstone-grpc.git?tag=v1.14.1+solana.1.17.28#8fb667501cdcee0f50d8edbf5c382c97f6b4bf07" +dependencies = [ + "bytes", + "futures", + "thiserror", + "tonic", + "tonic-health", + "yellowstone-grpc-proto", +] + +[[package]] +name = "yellowstone-grpc-proto" +version = "1.14.0+solana.1.17.28" +source = "git+https://github.com/rpcpool/yellowstone-grpc.git?tag=v1.14.1+solana.1.17.28#8fb667501cdcee0f50d8edbf5c382c97f6b4bf07" +dependencies = [ + "anyhow", + "bincode", + "prost", + "protobuf-src", + "solana-account-decoder", + "solana-sdk", + "solana-transaction-status", + "tonic", + "tonic-build", +] + +[[package]] +name = "yellowstone-grpc-tools" +version = "1.0.0-rc.11+solana.1.17.28" +source = "git+https://github.com/rpcpool/yellowstone-grpc.git?tag=v1.14.1+solana.1.17.28#8fb667501cdcee0f50d8edbf5c382c97f6b4bf07" +dependencies = [ + "anyhow", + "async-trait", + "atty", + "cargo-lock", + "clap 4.5.4", + "futures", + "git-version", + "hyper", + "json5", + "lazy_static", + "prometheus", + "serde", + "serde_json", + "serde_yaml", + "tokio", + "tokio-stream", + "tonic", + "tonic-health", + "tracing", + "tracing-subscriber", + "vergen", + "yellowstone-grpc-client", + "yellowstone-grpc-proto", +] + [[package]] name = "zerocopy" version = "0.7.32" diff --git a/Cargo.toml b/Cargo.toml index 2b9a8e5e6..8bad57b15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "migration", "nft_ingester", "ops", + "grpc-ingest", "program_transformers", "tools/acc_forwarder", "tools/bgtask_creator", @@ -26,18 +27,23 @@ repository = "https://github.com/metaplex-foundation/digital-asset-rpc-infrastru version = "0.7.2" [workspace.dependencies] + +backon = "0.4.1" +heck = "0.5.0" anchor-client = "0.29.0" anchor-lang = "0.29.0" anyhow = "1.0.75" async-std = "1.0.0" +async-stream = "0.3.5" async-trait = "0.1.60" -backon = "0.4.1" blockbuster = "2.2.1" +atty = "0.2.14" borsh = "~0.10.3" borsh-derive = "~0.10.3" bs58 = "0.4.0" cadence = "0.29.0" cadence-macros = "0.29.0" +cargo-lock = "9.0.0" chrono = "0.4.19" clap = "4.2.2" das_api = { path = "das_api" } @@ -52,13 +58,14 @@ figment = "0.10.8" flatbuffers = "23.1.21" function_name = "0.3.0" futures = "0.3.28" -heck = "0.5.0" hex = "0.4.3" hyper = "0.14.23" indexmap = "1.9.3" indicatif = "0.17.5" insta = "1.34.0" itertools = "0.10.1" +git-version = "0.3.5" +json5 = "0.4.1" jsonpath_lib = "0.3.0" jsonrpsee = "0.16.2" jsonrpsee-core = "0.16.2" @@ -75,12 +82,16 @@ num-traits = "0.2.15" once_cell = "1.19.0" open-rpc-derive = "0.0.4" open-rpc-schema = "0.0.4" +opentelemetry = "0.21.0" +opentelemetry-jaeger = "0.20.0" +opentelemetry_sdk = "0.21.1" plerkle_messenger = "1.6.0" plerkle_serialization = "1.8.0" program_transformers = { path = "program_transformers" } prometheus = "0.13.3" proxy-wasm = "0.2.0" rand = "0.8.5" +redis = "0.25.3" regex = "1.6.0" reqwest = "0.11.13" rust-crypto = "0.2.36" @@ -91,6 +102,7 @@ sea-orm-migration = "0.10.6" sea-query = "0.28.1" serde = "1.0.137" serde_json = "1.0.81" +serde_yaml = "0.9.34" serial_test = "2.0.0" solana-account-decoder = "~1.17" solana-client = "~1.17" @@ -110,11 +122,16 @@ tokio-stream = "0.1.14" tower = "0.4.13" tower-http = "0.3.5" tracing = "0.1.35" +tracing-opentelemetry = "0.22.0" tracing-subscriber = "0.3.16" txn_forwarder = { path = "tools/txn_forwarder" } url = "2.3.1" wasi = "0.7.0" wasm-bindgen = "0.2.83" +vergen = "8.2.1" +yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.14.1+solana.1.17.28" } # tag is geyser plugin +yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.14.1+solana.1.17.28" } # tag is geyser plugin +yellowstone-grpc-tools = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.14.1+solana.1.17.28", default-features = false } # tag is geyser plugin [workspace.lints.clippy] clone_on_ref_ptr = "deny" diff --git a/grpc-ingest/Cargo.toml b/grpc-ingest/Cargo.toml new file mode 100644 index 000000000..958bca6b4 --- /dev/null +++ b/grpc-ingest/Cargo.toml @@ -0,0 +1,61 @@ +[package] +name = "das-grpc-ingest" +version = { workspace = true } +edition = { workspace = true } +repository = { workspace = true } +publish = { workspace = true } + +[dependencies] +anyhow = { workspace = true } +async-stream = { workspace = true } +atty = { workspace = true } +sqlx = { workspace = true, features = [ + "macros", + "runtime-tokio-rustls", + "postgres", + "uuid", + "offline", + "json", +] } +chrono = { workspace = true } +clap = { workspace = true, features = ["cargo", "derive"] } +digital_asset_types = { workspace = true } +futures = { workspace = true } +hyper = { workspace = true, features = ["server"] } +json5 = { workspace = true } +lazy_static = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry-jaeger = { workspace = true, features = ["rt-tokio"] } +opentelemetry_sdk = { workspace = true, features = ["trace"] } +program_transformers = { workspace = true } +prometheus = { workspace = true } +redis = { workspace = true, features = ["tokio-comp", "tokio-native-tls-comp"] } +reqwest = { workspace = true } +rust-crypto = { workspace = true } +sea-orm = { workspace = true, features = ["sqlx-postgres"] } +serde = { workspace = true } +serde_json = { workspace = true } +serde_yaml = { workspace = true } +solana-sdk = { workspace = true } # only prom rn +tokio = { workspace = true, features = [ + "rt-multi-thread", + "macros", + "time", + "fs", + "tracing", +] } +tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } +yellowstone-grpc-client = { workspace = true } +yellowstone-grpc-proto = { workspace = true } +yellowstone-grpc-tools = { workspace = true } + +[build-dependencies] +anyhow = { workspace = true } +cargo-lock = { workspace = true } +git-version = { workspace = true } +vergen = { workspace = true, features = ["build", "rustc"] } + +[lints] +workspace = true diff --git a/grpc-ingest/README.md b/grpc-ingest/README.md new file mode 100644 index 000000000..d721d863b --- /dev/null +++ b/grpc-ingest/README.md @@ -0,0 +1,33 @@ +## Dev process + +### geyser gRPC source + +Use [Triton One](https://triton.one/) provided endpoint or run own node with geyser plugin: https://github.com/rpcpool/yellowstone-grpc + +### Redis server + +``` +redis-server +``` + +### PostgreSQL server + +Run: + +``` +docker run -it --rm -e POSTGRES_PASSWORD=solana -e POSTGRES_USER=solana -e POSTGRES_DB=solana -p 5432:5432 postgres +``` + +Schema: + +> Also note: The migration `m20230224_093722_performance_improvements` needs to be commented out of the migration lib.rs in order for the Sea ORM `Relations` to generate correctly. + +``` +DATABASE_URL=postgres://solana:solana@localhost/solana INIT_FILE_PATH=init.sql cargo run -p migration --bin migration -- up +``` + +psql: + +``` +PGPASSWORD=solana psql -h localhost -U solana -d solana +``` \ No newline at end of file diff --git a/grpc-ingest/build.rs b/grpc-ingest/build.rs new file mode 100644 index 000000000..92e1f4c7c --- /dev/null +++ b/grpc-ingest/build.rs @@ -0,0 +1,38 @@ +use {cargo_lock::Lockfile, std::collections::HashSet}; + +fn main() -> anyhow::Result<()> { + let mut envs = vergen::EmitBuilder::builder(); + envs.all_build().all_rustc(); + envs.emit()?; + + // vergen git version does not looks cool + println!( + "cargo:rustc-env=GIT_VERSION={}", + git_version::git_version!() + ); + + // Extract packages version + let lockfile = Lockfile::load("../Cargo.lock")?; + println!( + "cargo:rustc-env=SOLANA_SDK_VERSION={}", + get_pkg_version(&lockfile, "solana-sdk") + ); + println!( + "cargo:rustc-env=YELLOWSTONE_GRPC_PROTO_VERSION={}", + get_pkg_version(&lockfile, "yellowstone-grpc-proto") + ); + + Ok(()) +} + +fn get_pkg_version(lockfile: &Lockfile, pkg_name: &str) -> String { + lockfile + .packages + .iter() + .filter(|pkg| pkg.name.as_str() == pkg_name) + .map(|pkg| pkg.version.to_string()) + .collect::>() + .into_iter() + .collect::>() + .join(",") +} diff --git a/grpc-ingest/config-download-metadata.yml b/grpc-ingest/config-download-metadata.yml new file mode 100644 index 000000000..c42307be4 --- /dev/null +++ b/grpc-ingest/config-download-metadata.yml @@ -0,0 +1,12 @@ +# Important: only ONE `download-metadata` instance is supported right now! +prometheus: 127.0.0.1:8875 +postgres: + url: postgres://solana:solana@localhost/solana + min_connections: 10 + max_connections: 50 +download_metadata: + max_in_process: 50 # maximum tasks in process (downloading metadata) + prefetch_queue_size: 100 + limit_to_fetch: 200 # maximum number of tasks fetched from database + wait_tasks_max_idle_ms: 100 # if we do not have pending tasks, wait max ms + download_timeout_ms: 5_000 diff --git a/grpc-ingest/config-grpc2redis.yml b/grpc-ingest/config-grpc2redis.yml new file mode 100644 index 000000000..a7e79d1dc --- /dev/null +++ b/grpc-ingest/config-grpc2redis.yml @@ -0,0 +1,28 @@ +prometheus: 127.0.0.1:8873 +endpoint: http://127.0.0.1:10000 +x_token: null +commitment: finalized +accounts: + stream: ACCOUNTS + stream_maxlen: 100_000_000 + stream_data_key: data + filter: + owner: + - "metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s" + - "TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb" + - "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" + - "ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL" + - "BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY" + - "CoREENxT6tW1HoK8ypY1SxRMZTcVPm7R94rH4PZNhX7d" +transactions: + stream: TRANSACTIONS + stream_maxlen: 10_000_000 + stream_data_key: data + filter: + account_include: + - BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY +redis: + url: redis://localhost:6379 + pipeline_max_size: 10 + pipeline_max_idle_ms: 10 + max_xadd_in_process: 100 diff --git a/grpc-ingest/config-ingester.yml b/grpc-ingest/config-ingester.yml new file mode 100644 index 000000000..bbb377a81 --- /dev/null +++ b/grpc-ingest/config-ingester.yml @@ -0,0 +1,28 @@ +prometheus: 127.0.0.1:8874 +redis: + url: redis://localhost:6379 + group: ingester + consumer: consumer # every new ingester instance should have uniq name + streams: + - type: account # possible values: `account` and `transaction`, required for message decoding + stream: ACCOUNTS + data_key: data + - type: transaction + stream: TRANSACTIONS + data_key: data + xack_batch_max_size: 100 + xack_batch_max_idle_ms: 10 + xack_max_in_process: 100 + prefetch_queue_size: 1_000 # max number of messages available in the read queue for processing + xpending_max: 250 # used for reading pending messages + xpending_only: false # exit once all pending messages consumed (should be applied if you want downscale number of ingesters) + xreadgroup_max: 1_000 # applied per each stream in one request +postgres: + url: postgres://solana:solana@localhost/solana + min_connections: 10 + max_connections: 50 # `max_connection` should be bigger than `program_transformer.max_tasks_in_process` otherwise unresolved lock is possible +program_transformer: + transactions_cl_audits: false + max_tasks_in_process: 40 +download_metadata: + max_attempts: 3 diff --git a/grpc-ingest/src/config.rs b/grpc-ingest/src/config.rs new file mode 100644 index 000000000..90b7c005c --- /dev/null +++ b/grpc-ingest/src/config.rs @@ -0,0 +1,415 @@ +use { + anyhow::Context, + serde::{de, Deserialize}, + std::{net::SocketAddr, path::Path, time::Duration}, + tokio::fs, + tracing::warn, + yellowstone_grpc_tools::config::{ + deserialize_usize_str, ConfigGrpcRequestAccounts, ConfigGrpcRequestCommitment, + ConfigGrpcRequestTransactions, + }, +}; + +pub const REDIS_STREAM_ACCOUNTS: &str = "ACCOUNTS"; +pub const REDIS_STREAM_TRANSACTIONS: &str = "TRANSACTIONS"; +pub const REDIS_STREAM_DATA_KEY: &str = "data"; + +pub async fn load(path: impl AsRef + Copy) -> anyhow::Result +where + T: de::DeserializeOwned, +{ + let text = fs::read_to_string(path) + .await + .context("failed to read config from file")?; + + match path.as_ref().extension().and_then(|e| e.to_str()) { + Some("yaml") | Some("yml") => { + serde_yaml::from_str(&text).context("failed to parse config from YAML file") + } + Some("json") => json5::from_str(&text).context("failed to parse config from JSON file"), + value => anyhow::bail!("unknown config extension: {value:?}"), + } +} + +#[derive(Debug, Default, Deserialize)] +#[serde(default)] +pub struct ConfigPrometheus { + pub prometheus: Option, +} + +#[derive(Debug, Deserialize)] +pub struct ConfigGrpc { + pub endpoint: String, + pub x_token: Option, + + pub commitment: ConfigGrpcRequestCommitment, + pub accounts: ConfigGrpcAccounts, + pub transactions: ConfigGrpcTransactions, + + pub redis: ConfigGrpcRedis, +} + +#[derive(Debug, Deserialize)] +pub struct ConfigGrpcAccounts { + #[serde(default = "ConfigGrpcAccounts::default_stream")] + pub stream: String, + #[serde( + default = "ConfigGrpcAccounts::default_stream_maxlen", + deserialize_with = "deserialize_usize_str" + )] + pub stream_maxlen: usize, + #[serde(default = "ConfigGrpcAccounts::default_stream_data_key")] + pub stream_data_key: String, + + pub filter: ConfigGrpcRequestAccounts, +} + +impl ConfigGrpcAccounts { + pub fn default_stream() -> String { + REDIS_STREAM_ACCOUNTS.to_owned() + } + + pub const fn default_stream_maxlen() -> usize { + 100_000_000 + } + + pub fn default_stream_data_key() -> String { + REDIS_STREAM_DATA_KEY.to_owned() + } +} + +#[derive(Debug, Deserialize)] +pub struct ConfigGrpcTransactions { + #[serde(default = "ConfigGrpcTransactions::default_stream")] + pub stream: String, + #[serde( + default = "ConfigGrpcTransactions::default_stream_maxlen", + deserialize_with = "deserialize_usize_str" + )] + pub stream_maxlen: usize, + #[serde(default = "ConfigGrpcTransactions::default_stream_data_key")] + pub stream_data_key: String, + + pub filter: ConfigGrpcRequestTransactions, +} + +impl ConfigGrpcTransactions { + pub fn default_stream() -> String { + REDIS_STREAM_TRANSACTIONS.to_owned() + } + + pub const fn default_stream_maxlen() -> usize { + 10_000_000 + } + + pub fn default_stream_data_key() -> String { + REDIS_STREAM_DATA_KEY.to_owned() + } +} + +#[derive(Debug, Deserialize)] +pub struct ConfigGrpcRedis { + pub url: String, + #[serde( + default = "ConfigGrpcRedis::default_pipeline_max_size", + deserialize_with = "deserialize_usize_str" + )] + pub pipeline_max_size: usize, + #[serde( + default = "ConfigGrpcRedis::default_pipeline_max_idle", + deserialize_with = "deserialize_duration_str", + rename = "pipeline_max_idle_ms" + )] + pub pipeline_max_idle: Duration, + #[serde( + default = "ConfigGrpcRedis::default_max_xadd_in_process", + deserialize_with = "deserialize_usize_str" + )] + pub max_xadd_in_process: usize, +} + +impl ConfigGrpcRedis { + pub const fn default_pipeline_max_size() -> usize { + 10 + } + + pub const fn default_pipeline_max_idle() -> Duration { + Duration::from_millis(10) + } + + pub const fn default_max_xadd_in_process() -> usize { + 100 + } +} + +pub fn deserialize_duration_str<'de, D>(deserializer: D) -> Result +where + D: de::Deserializer<'de>, +{ + let ms = deserialize_usize_str(deserializer)?; + Ok(Duration::from_millis(ms as u64)) +} + +#[derive(Debug, Deserialize)] +pub struct ConfigIngester { + pub redis: ConfigIngesterRedis, + pub postgres: ConfigIngesterPostgres, + pub program_transformer: ConfigIngesterProgramTransformer, + pub download_metadata: ConfigIngesterDownloadMetadata, +} + +impl ConfigIngester { + pub fn check(&self) { + if self.postgres.max_connections < self.program_transformer.max_tasks_in_process { + warn!("`postgres.max_connections` should be bigger than `program_transformer.max_tasks_in_process` otherwise unresolved lock is possible"); + } + } +} + +#[derive(Debug, Deserialize)] +pub struct ConfigIngesterRedis { + pub url: String, + #[serde(default = "ConfigIngesterRedis::default_group")] + pub group: String, + #[serde(default = "ConfigIngesterRedis::default_consumer")] + pub consumer: String, + pub streams: Vec, + #[serde( + default = "ConfigIngesterRedis::default_prefetch_queue_size", + deserialize_with = "deserialize_usize_str" + )] + pub prefetch_queue_size: usize, + #[serde( + default = "ConfigIngesterRedis::default_xpending_max", + deserialize_with = "deserialize_usize_str" + )] + pub xpending_max: usize, + #[serde(default = "ConfigIngesterRedis::default_xpending_only")] + pub xpending_only: bool, + #[serde( + default = "ConfigIngesterRedis::default_xreadgroup_max", + deserialize_with = "deserialize_usize_str" + )] + pub xreadgroup_max: usize, +} + +impl ConfigIngesterRedis { + pub fn default_group() -> String { + "ingester".to_owned() + } + + pub fn default_consumer() -> String { + "consumer".to_owned() + } + + pub const fn default_prefetch_queue_size() -> usize { + 1_000 + } + + pub const fn default_xpending_max() -> usize { + 100 + } + + pub const fn default_xpending_only() -> bool { + false + } + + pub const fn default_xreadgroup_max() -> usize { + 1_000 + } +} + +#[derive(Debug, Clone)] +pub struct ConfigIngesterRedisStream { + pub stream_type: ConfigIngesterRedisStreamType, + pub stream: String, + pub data_key: String, + pub xack_batch_max_size: usize, + pub xack_batch_max_idle: Duration, + pub xack_max_in_process: usize, +} + +impl<'de> Deserialize<'de> for ConfigIngesterRedisStream { + fn deserialize(deserializer: D) -> Result + where + D: de::Deserializer<'de>, + { + #[derive(Debug, Deserialize)] + struct Raw { + #[serde(rename = "type")] + pub stream_type: ConfigIngesterRedisStreamType, + pub stream: Option, + pub data_key: Option, + #[serde( + default = "default_xack_batch_max_size", + deserialize_with = "deserialize_usize_str" + )] + pub xack_batch_max_size: usize, + #[serde( + default = "default_xack_batch_max_idle", + deserialize_with = "deserialize_duration_str", + rename = "xack_batch_max_idle_ms" + )] + pub xack_batch_max_idle: Duration, + #[serde( + default = "default_xack_max_in_process", + deserialize_with = "deserialize_usize_str" + )] + pub xack_max_in_process: usize, + } + + const fn default_xack_batch_max_size() -> usize { + 100 + } + + const fn default_xack_batch_max_idle() -> Duration { + Duration::from_millis(10) + } + + const fn default_xack_max_in_process() -> usize { + 100 + } + + let raw = Raw::deserialize(deserializer)?; + Ok(Self { + stream_type: raw.stream_type, + stream: raw.stream.unwrap_or_else(|| match raw.stream_type { + ConfigIngesterRedisStreamType::Account => REDIS_STREAM_ACCOUNTS.to_owned(), + ConfigIngesterRedisStreamType::Transaction => REDIS_STREAM_TRANSACTIONS.to_owned(), + }), + data_key: raw + .data_key + .unwrap_or_else(|| REDIS_STREAM_DATA_KEY.to_owned()), + xack_batch_max_size: raw.xack_batch_max_size, + xack_batch_max_idle: raw.xack_batch_max_idle, + xack_max_in_process: raw.xack_max_in_process, + }) + } +} + +#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum ConfigIngesterRedisStreamType { + Account, + Transaction, +} + +#[derive(Debug, Deserialize)] +pub struct ConfigIngesterPostgres { + pub url: String, + #[serde( + default = "ConfigIngesterPostgres::default_min_connections", + deserialize_with = "deserialize_usize_str" + )] + pub min_connections: usize, + #[serde( + default = "ConfigIngesterPostgres::default_max_connections", + deserialize_with = "deserialize_usize_str" + )] + pub max_connections: usize, +} + +impl ConfigIngesterPostgres { + pub const fn default_min_connections() -> usize { + 10 + } + + pub const fn default_max_connections() -> usize { + 50 + } +} + +#[derive(Debug, Deserialize)] +pub struct ConfigIngesterProgramTransformer { + #[serde(default = "ConfigIngesterProgramTransformer::default_transactions_cl_audits")] + pub transactions_cl_audits: bool, + #[serde( + default = "ConfigIngesterProgramTransformer::default_max_tasks_in_process", + deserialize_with = "deserialize_usize_str" + )] + pub max_tasks_in_process: usize, +} + +impl ConfigIngesterProgramTransformer { + pub const fn default_transactions_cl_audits() -> bool { + false + } + + pub const fn default_max_tasks_in_process() -> usize { + 40 + } +} + +#[derive(Debug, Clone, Copy, Deserialize)] +pub struct ConfigIngesterDownloadMetadata { + #[serde( + default = "ConfigIngesterDownloadMetadata::default_max_attempts", + deserialize_with = "deserialize_usize_str" + )] + pub max_attempts: usize, +} + +impl ConfigIngesterDownloadMetadata { + pub const fn default_max_attempts() -> usize { + 3 + } +} + +#[derive(Debug, Deserialize)] +pub struct ConfigDownloadMetadata { + pub postgres: ConfigIngesterPostgres, + pub download_metadata: ConfigDownloadMetadataOpts, +} + +#[derive(Debug, Clone, Copy, Deserialize)] +pub struct ConfigDownloadMetadataOpts { + #[serde( + default = "ConfigDownloadMetadataOpts::default_max_in_process", + deserialize_with = "deserialize_usize_str" + )] + pub max_in_process: usize, + #[serde( + default = "ConfigDownloadMetadataOpts::default_prefetch_queue_size", + deserialize_with = "deserialize_usize_str" + )] + pub prefetch_queue_size: usize, + #[serde( + default = "ConfigDownloadMetadataOpts::default_limit_to_fetch", + deserialize_with = "deserialize_usize_str" + )] + pub limit_to_fetch: usize, + #[serde( + default = "ConfigDownloadMetadataOpts::default_wait_tasks_max_idle", + deserialize_with = "deserialize_duration_str", + rename = "wait_tasks_max_idle_ms" + )] + pub wait_tasks_max_idle: Duration, + #[serde( + default = "ConfigDownloadMetadataOpts::default_download_timeout", + deserialize_with = "deserialize_duration_str", + rename = "download_timeout_ms" + )] + pub download_timeout: Duration, +} + +impl ConfigDownloadMetadataOpts { + pub const fn default_max_in_process() -> usize { + 50 + } + + pub const fn default_prefetch_queue_size() -> usize { + 100 + } + + pub const fn default_limit_to_fetch() -> usize { + 200 + } + + pub const fn default_wait_tasks_max_idle() -> Duration { + Duration::from_millis(100) + } + + pub const fn default_download_timeout() -> Duration { + Duration::from_millis(5_000) + } +} diff --git a/grpc-ingest/src/download_metadata.rs b/grpc-ingest/src/download_metadata.rs new file mode 100644 index 000000000..4392710b2 --- /dev/null +++ b/grpc-ingest/src/download_metadata.rs @@ -0,0 +1,236 @@ +use { + crate::{ + config::{ConfigDownloadMetadata, ConfigDownloadMetadataOpts}, + postgres::{create_pool as pg_create_pool, metrics_pgpool}, + util::create_shutdown, + }, + digital_asset_types::dao::{asset_data, sea_orm_active_enums::TaskStatus, tasks}, + futures::{ + future::{pending, FutureExt}, + stream::StreamExt, + }, + program_transformers::DownloadMetadataInfo, + reqwest::{ClientBuilder, StatusCode}, + sea_orm::{ + entity::{ActiveValue, ColumnTrait, EntityTrait}, + query::{Condition, Order, QueryFilter, QueryOrder, QuerySelect}, + sea_query::expr::Expr, + SqlxPostgresConnector, TransactionTrait, + }, + sqlx::PgPool, + std::{sync::Arc, time::Duration}, + tokio::{ + sync::{mpsc, Notify}, + task::JoinSet, + time::sleep, + }, + tracing::{info, warn}, +}; + +pub const TASK_TYPE: &str = "DownloadMetadata"; + +pub async fn run(config: ConfigDownloadMetadata) -> anyhow::Result<()> { + let mut shutdown = create_shutdown()?; + + // open connection to postgres + let pool = pg_create_pool(config.postgres).await?; + tokio::spawn({ + let pool = pool.clone(); + async move { metrics_pgpool(pool).await } + }); + + // reset previously runned tasks + tokio::select! { + result = reset_pending_tasks(pool.clone()) => { + let updated = result?; + info!("Reset {updated} tasks to Pending status"); + }, + Some(signal) = shutdown.next() => { + warn!("{signal} received, waiting spawned tasks..."); + return Ok(()) + }, + } + + // prefetch queue + let (tasks_tx, mut tasks_rx) = mpsc::channel(config.download_metadata.prefetch_queue_size); + let prefetch_shutdown = Arc::new(Notify::new()); + let prefetch_jh = { + let pool = pool.clone(); + let download_metadata = config.download_metadata; + let shutdown = Arc::clone(&prefetch_shutdown); + async move { + tokio::select! { + result = get_pending_tasks(pool, tasks_tx, download_metadata) => result, + _ = shutdown.notified() => Ok(()) + } + } + }; + tokio::pin!(prefetch_jh); + + // process tasks + let mut tasks = JoinSet::new(); + loop { + let pending_task_fut = if tasks.len() >= config.download_metadata.max_in_process { + pending().boxed() + } else { + tasks_rx.recv().boxed() + }; + + let tasks_fut = if tasks.is_empty() { + pending().boxed() + } else { + tasks.join_next().boxed() + }; + + tokio::select! { + Some(signal) = shutdown.next() => { + warn!("{signal} received, waiting spawned tasks..."); + break Ok(()); + }, + result = &mut prefetch_jh => break result, + Some(result) = tasks_fut => { + result??; + }, + Some(pending_task) = pending_task_fut => { + tasks.spawn(execute_task(pool.clone(), pending_task, config.download_metadata.download_timeout)); + } + }; + }?; + + tokio::select! { + Some(signal) = shutdown.next() => { + anyhow::bail!("{signal} received, force shutdown..."); + } + result = async move { + // shutdown `prefetch` channel + prefetch_shutdown.notify_one(); + // wait all spawned tasks + while let Some(result) = tasks.join_next().await { + result??; + } + // shutdown database connection + pool.close().await; + Ok::<(), anyhow::Error>(()) + } => result, + } +} + +// On startup reset tasks status +async fn reset_pending_tasks(pool: PgPool) -> anyhow::Result { + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); + tasks::Entity::update_many() + .set(tasks::ActiveModel { + status: ActiveValue::Set(TaskStatus::Pending), + ..Default::default() + }) + .filter( + Condition::all() + .add(tasks::Column::Status.eq(TaskStatus::Running)) + .add(tasks::Column::TaskType.eq(TASK_TYPE)), + ) + .exec(&conn) + .await + .map(|result| result.rows_affected) + .map_err(Into::into) +} + +// Select Pending tasks, update status to Running and send to prefetch queue +async fn get_pending_tasks( + pool: PgPool, + tasks_tx: mpsc::Sender, + config: ConfigDownloadMetadataOpts, +) -> anyhow::Result<()> { + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); + loop { + let pending_tasks = tasks::Entity::find() + .filter( + Condition::all() + .add(tasks::Column::Status.eq(TaskStatus::Pending)) + .add( + Expr::col(tasks::Column::Attempts) + .less_or_equal(Expr::col(tasks::Column::MaxAttempts)), + ), + ) + .order_by(tasks::Column::Attempts, Order::Asc) + .order_by(tasks::Column::CreatedAt, Order::Desc) + .limit(config.limit_to_fetch as u64) + .all(&conn) + .await?; + + if pending_tasks.is_empty() { + sleep(config.wait_tasks_max_idle).await; + } else { + tasks::Entity::update_many() + .set(tasks::ActiveModel { + status: ActiveValue::Set(TaskStatus::Running), + ..Default::default() + }) + .filter(tasks::Column::Id.is_in(pending_tasks.iter().map(|v| v.id.clone()))) + .exec(&conn) + .await?; + + for task in pending_tasks { + tasks_tx + .send(task) + .await + .map_err(|_error| anyhow::anyhow!("failed to send task to prefetch queue"))?; + } + } + } +} + +// Try to download metadata and remove task with asset_data update or update tasks to Pending/Failed +async fn execute_task(pool: PgPool, task: tasks::Model, timeout: Duration) -> anyhow::Result<()> { + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); + match download_metadata(task.data, timeout).await { + Ok((asset_data_id, metadata)) => { + // Remove task and set metadata in transacstion + let txn = conn.begin().await?; + tasks::Entity::delete_by_id(task.id).exec(&txn).await?; + asset_data::Entity::update(asset_data::ActiveModel { + id: ActiveValue::Unchanged(asset_data_id), + metadata: ActiveValue::Set(metadata), + reindex: ActiveValue::Set(Some(false)), + ..Default::default() + }) + .exec(&txn) + .await?; + txn.commit().await?; + } + Err(error) => { + let status = if task.attempts + 1 == task.max_attempts { + TaskStatus::Failed + } else { + TaskStatus::Pending + }; + tasks::Entity::update(tasks::ActiveModel { + id: ActiveValue::Unchanged(task.id), + status: ActiveValue::Set(status), + attempts: ActiveValue::Set(task.attempts + 1), + errors: ActiveValue::Set(Some(error.to_string())), + ..Default::default() + }) + .exec(&conn) + .await?; + } + } + Ok(()) +} + +async fn download_metadata( + data: serde_json::Value, + timeout: Duration, +) -> anyhow::Result<(Vec, serde_json::Value)> { + let (id, uri) = serde_json::from_value::(data)?.into_inner(); + + // Need to check for malicious sites ? + let client = ClientBuilder::new().timeout(timeout).build()?; + let response = client.get(uri).send().await?; + + anyhow::ensure!( + response.status() == StatusCode::OK, + "HttpError status_code: {}", + response.status().as_str() + ); + Ok((id, response.json().await?)) +} diff --git a/grpc-ingest/src/grpc.rs b/grpc-ingest/src/grpc.rs new file mode 100644 index 000000000..47cf9c718 --- /dev/null +++ b/grpc-ingest/src/grpc.rs @@ -0,0 +1,166 @@ +use { + crate::{ + config::ConfigGrpc, prom::redis_xadd_status_inc, redis::metrics_xlen, util::create_shutdown, + }, + anyhow::Context, + futures::stream::StreamExt, + redis::{streams::StreamMaxlen, RedisResult, Value as RedisValue}, + std::collections::HashMap, + std::{sync::Arc, time::Duration}, + tokio::{ + task::JoinSet, + time::{sleep, Instant}, + }, + tracing::warn, + yellowstone_grpc_client::GeyserGrpcClient, + yellowstone_grpc_proto::{ + geyser::SubscribeRequest, prelude::subscribe_update::UpdateOneof, prost::Message, + }, + yellowstone_grpc_tools::config::GrpcRequestToProto, +}; + +pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { + let config = Arc::new(config); + + // Connect to Redis + let client = redis::Client::open(config.redis.url.clone())?; + let connection = client.get_multiplexed_tokio_connection().await?; + + // Check stream length for the metrics + let jh_metrics_xlen = tokio::spawn({ + let connection = connection.clone(); + let streams = vec![ + config.accounts.stream.clone(), + config.transactions.stream.clone(), + ]; + async move { metrics_xlen(connection, &streams).await } + }); + tokio::pin!(jh_metrics_xlen); + + // Create gRPC client, subscribe and handle messages + let mut client = GeyserGrpcClient::build_from_shared(config.endpoint.clone())? + .x_token(config.x_token.clone())? + .connect_timeout(Duration::from_secs(10)) + .timeout(Duration::from_secs(10)) + .connect() + .await + .context("failed to connect go gRPC")?; + + let mut accounts = HashMap::with_capacity(1); + let mut transactions = HashMap::with_capacity(1); + + accounts.insert("das".to_string(), config.accounts.filter.clone().to_proto()); + transactions.insert( + "das".to_string(), + config.transactions.filter.clone().to_proto(), + ); + + let request = SubscribeRequest { + accounts, + transactions, + ..Default::default() + }; + let (mut _subscribe_tx, mut stream) = client.subscribe_with_request(Some(request)).await?; + + // recv-send loop + let mut shutdown = create_shutdown()?; + let mut pipe = redis::pipe(); + let mut pipe_accounts = 0; + let mut pipe_transactions = 0; + let deadline = sleep(config.redis.pipeline_max_idle); + tokio::pin!(deadline); + let mut tasks = JoinSet::new(); + + let result = loop { + tokio::select! { + result = &mut jh_metrics_xlen => match result { + Ok(Ok(_)) => unreachable!(), + Ok(Err(error)) => break Err(error), + Err(error) => break Err(error.into()), + }, + Some(signal) = shutdown.next() => { + warn!("{signal} received, waiting spawned tasks..."); + break Ok(()); + }, + msg = stream.next() => { + match msg { + Some(Ok(msg)) => match msg.update_oneof { + Some(UpdateOneof::Account(account)) => { + pipe.xadd_maxlen( + &config.accounts.stream, + StreamMaxlen::Approx(config.accounts.stream_maxlen), + "*", + &[(&config.accounts.stream_data_key, account.encode_to_vec())], + ); + pipe_accounts += 1; + } + Some(UpdateOneof::Slot(_)) => continue, + Some(UpdateOneof::Transaction(transaction)) => { + pipe.xadd_maxlen( + &config.transactions.stream, + StreamMaxlen::Approx(config.transactions.stream_maxlen), + "*", + &[(&config.transactions.stream_data_key, transaction.encode_to_vec())] + ); + pipe_transactions += 1; + } + Some(UpdateOneof::Block(_)) => continue, + Some(UpdateOneof::Ping(_)) => continue, + Some(UpdateOneof::Pong(_)) => continue, + Some(UpdateOneof::BlockMeta(_)) => continue, + Some(UpdateOneof::Entry(_)) => continue, + Some(UpdateOneof::TransactionStatus(_)) => continue, + None => break Err(anyhow::anyhow!("received invalid update gRPC message")), + }, + Some(Err(error)) => break Err(error.into()), + None => break Err(anyhow::anyhow!("geyser gRPC request is finished")), + }; + if pipe_accounts + pipe_transactions < config.redis.pipeline_max_size { + continue; + } + }, + _ = &mut deadline => {}, + }; + + let mut pipe = std::mem::replace(&mut pipe, redis::pipe()); + let pipe_accounts = std::mem::replace(&mut pipe_accounts, 0); + let pipe_transactions = std::mem::replace(&mut pipe_transactions, 0); + deadline + .as_mut() + .reset(Instant::now() + config.redis.pipeline_max_idle); + + tasks.spawn({ + let mut connection = connection.clone(); + let config = Arc::clone(&config); + async move { + let result: RedisResult = + pipe.atomic().query_async(&mut connection).await; + + let status = result.map(|_| ()).map_err(|_| ()); + redis_xadd_status_inc(&config.accounts.stream, status, pipe_accounts); + redis_xadd_status_inc(&config.transactions.stream, status, pipe_transactions); + + Ok::<(), anyhow::Error>(()) + } + }); + while tasks.len() >= config.redis.max_xadd_in_process { + if let Some(result) = tasks.join_next().await { + result??; + } + } + }; + + tokio::select! { + Some(signal) = shutdown.next() => { + anyhow::bail!("{signal} received, force shutdown..."); + } + result = async move { + while let Some(result) = tasks.join_next().await { + result??; + } + Ok::<(), anyhow::Error>(()) + } => result?, + }; + + result +} diff --git a/grpc-ingest/src/ingester.rs b/grpc-ingest/src/ingester.rs new file mode 100644 index 000000000..82cdc3cb5 --- /dev/null +++ b/grpc-ingest/src/ingester.rs @@ -0,0 +1,269 @@ +use { + crate::{ + config::{ConfigIngester, ConfigIngesterDownloadMetadata}, + download_metadata::TASK_TYPE, + postgres::{create_pool as pg_create_pool, metrics_pgpool}, + prom::{ + download_metadata_inserted_total_inc, program_transformer_task_status_inc, + program_transformer_tasks_total_set, ProgramTransformerTaskStatusKind, + }, + redis::{metrics_xlen, ProgramTransformerInfo, RedisStream}, + util::create_shutdown, + }, + chrono::Utc, + crypto::{digest::Digest, sha2::Sha256}, + digital_asset_types::dao::{sea_orm_active_enums::TaskStatus, tasks}, + futures::{ + future::{pending, BoxFuture, FusedFuture, FutureExt}, + stream::StreamExt, + }, + program_transformers::{ + error::ProgramTransformerError, DownloadMetadataInfo, DownloadMetadataNotifier, + ProgramTransformer, + }, + sea_orm::{ + entity::{ActiveModelTrait, ActiveValue}, + error::{DbErr, RuntimeErr}, + SqlxPostgresConnector, + }, + sqlx::{Error as SqlxError, PgPool}, + std::{ + borrow::Cow, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + }, + tokio::{ + task::JoinSet, + time::{sleep, Duration}, + }, + tracing::warn, +}; + +pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { + // connect to Redis + let client = redis::Client::open(config.redis.url.clone())?; + let connection = client.get_multiplexed_tokio_connection().await?; + + // check stream length for the metrics in spawned task + let jh_metrics_xlen = tokio::spawn({ + let connection = connection.clone(); + let streams = config + .redis + .streams + .iter() + .map(|config| config.stream.clone()) + .collect::>(); + async move { metrics_xlen(connection, &streams).await } + }); + tokio::pin!(jh_metrics_xlen); + + // open connection to postgres + let pgpool = pg_create_pool(config.postgres).await?; + tokio::spawn({ + let pgpool = pgpool.clone(); + async move { metrics_pgpool(pgpool).await } + }); + + // create redis stream reader + let (mut redis_messages, redis_tasks_fut) = RedisStream::new(config.redis, connection).await?; + tokio::pin!(redis_tasks_fut); + + // program transforms related + let pt_accounts = Arc::new(ProgramTransformer::new( + pgpool.clone(), + create_download_metadata_notifier(pgpool.clone(), config.download_metadata)?, + false, + )); + let pt_transactions = Arc::new(ProgramTransformer::new( + pgpool.clone(), + create_download_metadata_notifier(pgpool.clone(), config.download_metadata)?, + config.program_transformer.transactions_cl_audits, + )); + let pt_max_tasks_in_process = config.program_transformer.max_tasks_in_process; + let mut pt_tasks = JoinSet::new(); + let pt_tasks_len = Arc::new(AtomicUsize::new(0)); + tokio::spawn({ + let pt_tasks_len = Arc::clone(&pt_tasks_len); + async move { + loop { + program_transformer_tasks_total_set(pt_tasks_len.load(Ordering::Relaxed)); + sleep(Duration::from_millis(100)).await; + } + } + }); + + // read and process messages in the loop + let mut shutdown = create_shutdown()?; + loop { + pt_tasks_len.store(pt_tasks.len(), Ordering::Relaxed); + + let redis_messages_recv = if pt_tasks.len() == pt_max_tasks_in_process { + pending().boxed() + } else { + redis_messages.recv().boxed() + }; + let pt_tasks_next = if pt_tasks.is_empty() { + pending().boxed() + } else { + pt_tasks.join_next().boxed() + }; + + let msg = tokio::select! { + result = &mut jh_metrics_xlen => match result { + Ok(Ok(_)) => unreachable!(), + Ok(Err(error)) => break Err(error), + Err(error) => break Err(error.into()), + }, + Some(signal) = shutdown.next() => { + warn!("{signal} received, waiting spawned tasks..."); + break Ok(()); + }, + result = &mut redis_tasks_fut => break result, + msg = redis_messages_recv => match msg { + Some(msg) => msg, + None => break Ok(()), + }, + result = pt_tasks_next => { + if let Some(result) = result { + result??; + } + continue; + } + }; + + pt_tasks.spawn({ + let pt_accounts = Arc::clone(&pt_accounts); + let pt_transactions = Arc::clone(&pt_transactions); + async move { + let result = match &msg.get_data() { + ProgramTransformerInfo::Account(account) => { + pt_accounts.handle_account_update(account).await + } + ProgramTransformerInfo::Transaction(transaction) => { + pt_transactions.handle_transaction(transaction).await + } + }; + + macro_rules! log_or_bail { + ($action:path, $msg:expr, $error:ident) => { + match msg.get_data() { + ProgramTransformerInfo::Account(account) => { + $action!("{} account {}: {:?}", $msg, account.pubkey, $error) + } + ProgramTransformerInfo::Transaction(transaction) => { + $action!( + "{} transaction {}: {:?}", + $msg, + transaction.signature, + $error + ) + } + } + }; + } + + match result { + Ok(()) => program_transformer_task_status_inc( + ProgramTransformerTaskStatusKind::Success, + ), + Err(ProgramTransformerError::NotImplemented) => { + program_transformer_task_status_inc( + ProgramTransformerTaskStatusKind::NotImplemented, + ) + } + Err(ProgramTransformerError::DeserializationError(error)) => { + program_transformer_task_status_inc( + ProgramTransformerTaskStatusKind::DeserializationError, + ); + log_or_bail!(warn, "failed to deserialize", error) + } + Err(ProgramTransformerError::ParsingError(error)) => { + program_transformer_task_status_inc( + ProgramTransformerTaskStatusKind::ParsingError, + ); + log_or_bail!(warn, "failed to parse", error) + } + Err(ProgramTransformerError::DatabaseError(error)) => { + log_or_bail!(anyhow::bail, "database error for", error) + } + Err(ProgramTransformerError::AssetIndexError(error)) => { + log_or_bail!(anyhow::bail, "indexing error for ", error) + } + Err(error) => { + log_or_bail!(anyhow::bail, "failed to handle", error) + } + } + + msg.ack() + } + }); + }?; + + tokio::select! { + Some(signal) = shutdown.next() => { + anyhow::bail!("{signal} received, force shutdown..."); + } + result = async move { + // shutdown `prefetch` channel (but not Receiver) + redis_messages.shutdown(); + // wait all `program_transformer` spawned tasks + while let Some(result) = pt_tasks.join_next().await { + result??; + } + // wait all `ack` spawned tasks + if !redis_tasks_fut.is_terminated() { + redis_tasks_fut.await?; + } + // shutdown database connection + pgpool.close().await; + Ok::<(), anyhow::Error>(()) + } => result, + } +} + +fn create_download_metadata_notifier( + pgpool: PgPool, + config: ConfigIngesterDownloadMetadata, +) -> anyhow::Result { + let max_attempts = config.max_attempts.try_into()?; + Ok(Box::new(move |info: DownloadMetadataInfo| -> BoxFuture< + 'static, + Result<(), Box>, + > { + let pgpool = pgpool.clone(); + Box::pin(async move { + let data = serde_json::to_value(info)?; + + let mut hasher = Sha256::new(); + hasher.input(TASK_TYPE.as_bytes()); + hasher.input(serde_json::to_vec(&data)?.as_slice()); + let hash = hasher.result_str(); + + let model = tasks::ActiveModel { + id: ActiveValue::Set(hash), + task_type: ActiveValue::Set(TASK_TYPE.to_owned()), + data: ActiveValue::Set(data), + status: ActiveValue::Set(TaskStatus::Pending), + created_at: ActiveValue::Set(Utc::now().naive_utc()), + locked_until: ActiveValue::Set(None), + locked_by: ActiveValue::Set(None), + max_attempts: ActiveValue::Set(max_attempts), + attempts: ActiveValue::Set(0), + duration: ActiveValue::Set(None), + errors: ActiveValue::Set(None), + }; + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pgpool); + + match model.insert(&conn).await.map(|_mode| ()) { + // skip unique_violation error + Err(DbErr::Query(RuntimeErr::SqlxError(SqlxError::Database(dberr)))) if dberr.code() == Some(Cow::Borrowed("23505")) => {}, + value => value?, + }; + download_metadata_inserted_total_inc(); + + Ok(()) + }) + })) +} diff --git a/grpc-ingest/src/main.rs b/grpc-ingest/src/main.rs new file mode 100644 index 000000000..9ca97d111 --- /dev/null +++ b/grpc-ingest/src/main.rs @@ -0,0 +1,90 @@ +use { + crate::{ + config::{ + load as config_load, ConfigDownloadMetadata, ConfigGrpc, ConfigIngester, + ConfigPrometheus, + }, + prom::run_server as prometheus_run_server, + tracing::init as tracing_init, + }, + anyhow::Context, + clap::{Parser, Subcommand}, + std::net::SocketAddr, +}; + +mod config; +mod download_metadata; +mod grpc; +mod ingester; +mod postgres; +mod prom; +mod redis; +mod tracing; +mod util; +mod version; + +#[derive(Debug, Parser)] +#[clap(author, version)] +struct Args { + /// Path to config file + #[clap(short, long)] + config: String, + + /// Prometheus listen address + #[clap(long)] + prometheus: Option, + + #[command(subcommand)] + action: ArgsAction, +} + +#[derive(Debug, Clone, Subcommand)] +enum ArgsAction { + /// Subscribe on Geyser events using gRPC and send them to Redis + #[command(name = "grpc2redis")] + Grpc, + /// Run ingester process (process events from Redis) + #[command(name = "ingester")] + Ingester, + /// Run metadata downloader + #[command(name = "download-metadata")] + DownloadMetadata, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_init()?; + + let args = Args::parse(); + + // Run prometheus server + let config = config_load::(&args.config) + .await + .with_context(|| format!("failed to parse prometheus config from: {}", args.config))?; + if let Some(address) = args.prometheus.or(config.prometheus) { + prometheus_run_server(address)?; + } + + // Run grpc / ingester / download-metadata + match args.action { + ArgsAction::Grpc => { + let config = config_load::(&args.config) + .await + .with_context(|| format!("failed to parse config from: {}", args.config))?; + grpc::run(config).await + } + ArgsAction::Ingester => { + let config = config_load::(&args.config) + .await + .with_context(|| format!("failed to parse config from: {}", args.config))?; + config.check(); + ingester::run(config).await + } + ArgsAction::DownloadMetadata => { + let config = config_load::(&args.config) + .await + .with_context(|| format!("failed to parse config from: {}", args.config))?; + download_metadata::run(config).await + } + } +} diff --git a/grpc-ingest/src/postgres.rs b/grpc-ingest/src/postgres.rs new file mode 100644 index 000000000..05e535b4d --- /dev/null +++ b/grpc-ingest/src/postgres.rs @@ -0,0 +1,29 @@ +use { + crate::{ + config::ConfigIngesterPostgres, + prom::{pgpool_connections_set, PgpoolConnectionsKind}, + }, + sqlx::{ + postgres::{PgConnectOptions, PgPoolOptions}, + PgPool, + }, + tokio::time::{sleep, Duration}, +}; + +pub async fn create_pool(config: ConfigIngesterPostgres) -> anyhow::Result { + let options: PgConnectOptions = config.url.parse()?; + PgPoolOptions::new() + .min_connections(config.min_connections.try_into()?) + .max_connections(config.max_connections.try_into()?) + .connect_with(options) + .await + .map_err(Into::into) +} + +pub async fn metrics_pgpool(pgpool: PgPool) { + loop { + pgpool_connections_set(PgpoolConnectionsKind::Total, pgpool.size() as usize); + pgpool_connections_set(PgpoolConnectionsKind::Idle, pgpool.num_idle()); + sleep(Duration::from_millis(100)).await; + } +} diff --git a/grpc-ingest/src/prom.rs b/grpc-ingest/src/prom.rs new file mode 100644 index 000000000..e81de4255 --- /dev/null +++ b/grpc-ingest/src/prom.rs @@ -0,0 +1,182 @@ +use { + crate::version::VERSION as VERSION_INFO, + hyper::{ + server::conn::AddrStream, + service::{make_service_fn, service_fn}, + Body, Request, Response, Server, StatusCode, + }, + prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder}, + std::{net::SocketAddr, sync::Once}, + tracing::{error, info}, +}; + +lazy_static::lazy_static! { + static ref REGISTRY: Registry = Registry::new(); + + static ref VERSION: IntCounterVec = IntCounterVec::new( + Opts::new("version", "Plugin version info"), + &["buildts", "git", "package", "proto", "rustc", "solana", "version"] + ).unwrap(); + + static ref REDIS_XLEN_TOTAL: IntGaugeVec = IntGaugeVec::new( + Opts::new("redis_xlen_total", "Length of stream in Redis"), + &["stream"] + ).unwrap(); + + static ref REDIS_XADD_STATUS: IntCounterVec = IntCounterVec::new( + Opts::new("redis_xadd_status", "Status of messages sent to Redis stream"), + &["stream", "status"] + ).unwrap(); + + static ref REDIS_XACK_TOTAL: IntCounterVec = IntCounterVec::new( + Opts::new("redis_xack_total", "Total number of processed messages"), + &["stream"] + ).unwrap(); + + static ref PGPOOL_CONNECTIONS_TOTAL: IntGaugeVec = IntGaugeVec::new( + Opts::new("pgpool_connections_total", "Total number of connections in Postgres Pool"), + &["kind"] + ).unwrap(); + + static ref PROGRAM_TRANSFORMER_TASKS_TOTAL: IntGauge = IntGauge::new( + "program_transformer_tasks_total", "Number of tasks spawned for program transform" + ).unwrap(); + + static ref PROGRAM_TRANSFORMER_TASK_STATUS: IntCounterVec = IntCounterVec::new( + Opts::new("program_transformer_task_status", "Status of processed messages"), + &["status"], + ).unwrap(); + + static ref DOWNLOAD_METADATA_INSERTED_TOTAL: IntCounter = IntCounter::new( + "download_metadata_inserted_total", "Total number of inserted tasks for download metadata" + ).unwrap(); +} + +pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { + static REGISTER: Once = Once::new(); + REGISTER.call_once(|| { + macro_rules! register { + ($collector:ident) => { + REGISTRY + .register(Box::new($collector.clone())) + .expect("collector can't be registered"); + }; + } + register!(VERSION); + register!(REDIS_XLEN_TOTAL); + register!(REDIS_XADD_STATUS); + register!(REDIS_XACK_TOTAL); + register!(PGPOOL_CONNECTIONS_TOTAL); + register!(PROGRAM_TRANSFORMER_TASKS_TOTAL); + register!(PROGRAM_TRANSFORMER_TASK_STATUS); + register!(DOWNLOAD_METADATA_INSERTED_TOTAL); + + VERSION + .with_label_values(&[ + VERSION_INFO.buildts, + VERSION_INFO.git, + VERSION_INFO.package, + VERSION_INFO.proto, + VERSION_INFO.rustc, + VERSION_INFO.solana, + VERSION_INFO.version, + ]) + .inc(); + }); + + let make_service = make_service_fn(move |_: &AddrStream| async move { + Ok::<_, hyper::Error>(service_fn(move |req: Request| async move { + let response = match req.uri().path() { + "/metrics" => metrics_handler(), + _ => not_found_handler(), + }; + Ok::<_, hyper::Error>(response) + })) + }); + let server = Server::try_bind(&address)?.serve(make_service); + info!("prometheus server started: {address:?}"); + tokio::spawn(async move { + if let Err(error) = server.await { + error!("prometheus server failed: {error:?}"); + } + }); + + Ok(()) +} + +fn metrics_handler() -> Response { + let metrics = TextEncoder::new() + .encode_to_string(®ISTRY.gather()) + .unwrap_or_else(|error| { + error!("could not encode custom metrics: {}", error); + String::new() + }); + Response::builder().body(Body::from(metrics)).unwrap() +} + +fn not_found_handler() -> Response { + Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()) + .unwrap() +} + +pub fn redis_xlen_set(stream: &str, len: usize) { + REDIS_XLEN_TOTAL + .with_label_values(&[stream]) + .set(len as i64); +} + +pub fn redis_xadd_status_inc(stream: &str, status: Result<(), ()>, delta: usize) { + REDIS_XADD_STATUS + .with_label_values(&[stream, if status.is_ok() { "success" } else { "failed" }]) + .inc_by(delta as u64); +} + +pub fn redis_xack_inc(stream: &str, delta: usize) { + REDIS_XACK_TOTAL + .with_label_values(&[stream]) + .inc_by(delta as u64) +} + +#[derive(Debug, Clone, Copy)] +pub enum PgpoolConnectionsKind { + Total, + Idle, +} + +pub fn pgpool_connections_set(kind: PgpoolConnectionsKind, size: usize) { + PGPOOL_CONNECTIONS_TOTAL + .with_label_values(&[match kind { + PgpoolConnectionsKind::Total => "total", + PgpoolConnectionsKind::Idle => "idle", + }]) + .set(size as i64) +} + +pub fn program_transformer_tasks_total_set(size: usize) { + PROGRAM_TRANSFORMER_TASKS_TOTAL.set(size as i64) +} + +#[derive(Debug, Clone, Copy)] +pub enum ProgramTransformerTaskStatusKind { + Success, + NotImplemented, + DeserializationError, + ParsingError, +} + +pub fn program_transformer_task_status_inc(kind: ProgramTransformerTaskStatusKind) { + PROGRAM_TRANSFORMER_TASK_STATUS + .with_label_values(&[match kind { + ProgramTransformerTaskStatusKind::Success => "success", + ProgramTransformerTaskStatusKind::NotImplemented => "not_implemented", + ProgramTransformerTaskStatusKind::DeserializationError => "deserialization_error", + ProgramTransformerTaskStatusKind::ParsingError => "parsing_error", + }]) + .inc() +} + +pub fn download_metadata_inserted_total_inc() { + DOWNLOAD_METADATA_INSERTED_TOTAL.inc() +} diff --git a/grpc-ingest/src/redis.rs b/grpc-ingest/src/redis.rs new file mode 100644 index 000000000..1baec8799 --- /dev/null +++ b/grpc-ingest/src/redis.rs @@ -0,0 +1,454 @@ +use { + crate::{ + config::{ConfigIngesterRedis, ConfigIngesterRedisStreamType}, + prom::{redis_xack_inc, redis_xlen_set}, + }, + futures::future::{BoxFuture, Fuse, FutureExt}, + program_transformers::{AccountInfo, TransactionInfo}, + redis::{ + aio::MultiplexedConnection, + streams::{ + StreamClaimReply, StreamId, StreamKey, StreamPendingCountReply, StreamReadOptions, + StreamReadReply, + }, + AsyncCommands, ErrorKind as RedisErrorKind, RedisResult, Value as RedisValue, + }, + solana_sdk::{pubkey::Pubkey, signature::Signature}, + std::{ + collections::HashMap, + convert::Infallible, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + }, + tokio::{ + sync::mpsc, + task::JoinSet, + time::{sleep, Duration, Instant}, + }, + yellowstone_grpc_proto::{ + convert_from::{ + create_message_instructions, create_meta_inner_instructions, create_pubkey_vec, + }, + prelude::{SubscribeUpdateAccount, SubscribeUpdateTransaction}, + prost::Message, + }, +}; + +pub async fn metrics_xlen( + mut connection: C, + streams: &[String], +) -> anyhow::Result { + loop { + let mut pipe = redis::pipe(); + for stream in streams { + pipe.xlen(stream); + } + let xlens: Vec = pipe.query_async(&mut connection).await?; + + for (stream, xlen) in streams.iter().zip(xlens.into_iter()) { + redis_xlen_set(stream, xlen); + } + + sleep(Duration::from_millis(100)).await; + } +} + +pub async fn xgroup_create( + connection: &mut C, + name: &str, + group: &str, + consumer: &str, +) -> anyhow::Result<()> { + let result: RedisResult = connection.xgroup_create_mkstream(name, group, "0").await; + if let Err(error) = result { + if !(error.kind() == RedisErrorKind::ExtensionError + && error.detail() == Some("Consumer Group name already exists") + && error.code() == Some("BUSYGROUP")) + { + return Err(error.into()); + } + } + + // XGROUP CREATECONSUMER key group consumer + redis::cmd("XGROUP") + .arg("CREATECONSUMER") + .arg(name) + .arg(group) + .arg(consumer) + .query_async(connection) + .await?; + + Ok(()) +} + +#[derive(Debug)] +struct RedisStreamInfo { + group: String, + consumer: String, + stream_name: String, + stream_type: ConfigIngesterRedisStreamType, + stream_data_key: String, + xack_batch_max_size: usize, + xack_batch_max_idle: Duration, + xack_max_in_process: usize, +} + +#[derive(Debug)] +pub enum ProgramTransformerInfo { + Account(AccountInfo), + Transaction(TransactionInfo), +} + +#[derive(Debug)] +pub struct RedisStreamMessageInfo { + id: String, + data: ProgramTransformerInfo, + ack_tx: mpsc::UnboundedSender, +} + +impl RedisStreamMessageInfo { + fn parse( + stream: &RedisStreamInfo, + StreamId { id, map }: StreamId, + ack_tx: mpsc::UnboundedSender, + ) -> anyhow::Result { + let to_anyhow = |error: String| anyhow::anyhow!(error); + + let data = match map.get(&stream.stream_data_key) { + Some(RedisValue::Data(vec)) => match stream.stream_type { + ConfigIngesterRedisStreamType::Account => { + let SubscribeUpdateAccount { account, slot, .. } = + Message::decode(vec.as_ref())?; + + let account = account.ok_or_else(|| { + anyhow::anyhow!("received invalid SubscribeUpdateAccount") + })?; + + ProgramTransformerInfo::Account(AccountInfo { + slot, + pubkey: Pubkey::try_from(account.pubkey.as_slice())?, + owner: Pubkey::try_from(account.owner.as_slice())?, + data: account.data, + }) + } + ConfigIngesterRedisStreamType::Transaction => { + let SubscribeUpdateTransaction { transaction, slot } = + Message::decode(vec.as_ref())?; + + let transaction = transaction.ok_or_else(|| { + anyhow::anyhow!("received invalid SubscribeUpdateTransaction") + })?; + let tx = transaction.transaction.ok_or_else(|| { + anyhow::anyhow!( + "received invalid transaction in SubscribeUpdateTransaction" + ) + })?; + let message = tx.message.ok_or_else(|| { + anyhow::anyhow!("received invalid message in SubscribeUpdateTransaction") + })?; + let meta = transaction.meta.ok_or_else(|| { + anyhow::anyhow!("received invalid meta in SubscribeUpdateTransaction") + })?; + + let mut account_keys = + create_pubkey_vec(message.account_keys).map_err(to_anyhow)?; + for pubkey in + create_pubkey_vec(meta.loaded_writable_addresses).map_err(to_anyhow)? + { + account_keys.push(pubkey); + } + for pubkey in + create_pubkey_vec(meta.loaded_readonly_addresses).map_err(to_anyhow)? + { + account_keys.push(pubkey); + } + + ProgramTransformerInfo::Transaction(TransactionInfo { + slot, + signature: Signature::try_from(transaction.signature.as_slice())?, + account_keys, + message_instructions: create_message_instructions(message.instructions) + .map_err(to_anyhow)?, + meta_inner_instructions: create_meta_inner_instructions( + meta.inner_instructions, + ) + .map_err(to_anyhow)?, + }) + } + }, + Some(_) => anyhow::bail!( + "invalid data (key: {:?}) from stream {:?}", + stream.stream_data_key, + stream.stream_name + ), + None => anyhow::bail!( + "failed to get data (key: {:?}) from stream {:?}", + stream.stream_data_key, + stream.stream_name + ), + }; + Ok(Self { id, data, ack_tx }) + } + + pub const fn get_data(&self) -> &ProgramTransformerInfo { + &self.data + } + + pub fn ack(self) -> anyhow::Result<()> { + self.ack_tx + .send(self.id) + .map_err(|_error| anyhow::anyhow!("failed to send message to ack channel")) + } +} + +#[derive(Debug)] +pub struct RedisStream { + shutdown: Arc, + messages_rx: mpsc::Receiver, +} + +impl RedisStream { + pub async fn new( + config: ConfigIngesterRedis, + mut connection: MultiplexedConnection, + ) -> anyhow::Result<(Self, Fuse>>)> { + // create group with consumer per stream + for stream in config.streams.iter() { + xgroup_create( + &mut connection, + &stream.stream, + &config.group, + &config.consumer, + ) + .await?; + } + + // shutdown flag + let shutdown = Arc::new(AtomicBool::new(false)); + + // create stream info wrapped by Arc + let mut ack_tasks = vec![]; + let streams = config + .streams + .iter() + .map(|stream| { + let (ack_tx, ack_rx) = mpsc::unbounded_channel(); + let info = Arc::new(RedisStreamInfo { + group: config.group.clone(), + consumer: config.consumer.clone(), + stream_name: stream.stream.clone(), + stream_type: stream.stream_type, + stream_data_key: stream.data_key.clone(), + xack_batch_max_size: stream.xack_batch_max_size, + xack_batch_max_idle: stream.xack_batch_max_idle, + xack_max_in_process: stream.xack_max_in_process, + }); + ack_tasks.push((Arc::clone(&info), ack_rx)); + (stream.stream.clone(), (ack_tx, info)) + }) + .collect::>(); + + // spawn xack tasks + let ack_jh_vec = ack_tasks + .into_iter() + .map(|(stream, ack_rx)| { + let connection = connection.clone(); + tokio::spawn(async move { Self::run_ack(stream, connection, ack_rx).await }) + }) + .collect::>(); + + // spawn prefetch task + let (messages_tx, messages_rx) = mpsc::channel(config.prefetch_queue_size); + let jh_prefetch = tokio::spawn({ + let shutdown = Arc::clone(&shutdown); + async move { Self::run_prefetch(config, streams, connection, messages_tx, shutdown).await } + }); + + // merge spawned xack / prefetch tasks + let spawned_tasks = async move { + jh_prefetch.await??; + for jh in ack_jh_vec.into_iter() { + jh.await??; + } + Ok::<(), anyhow::Error>(()) + }; + + Ok(( + Self { + shutdown, + messages_rx, + }, + spawned_tasks.boxed().fuse(), + )) + } + + pub async fn recv(&mut self) -> Option { + self.messages_rx.recv().await + } + + pub fn shutdown(mut self) { + self.shutdown.store(true, Ordering::Relaxed); + tokio::spawn(async move { while self.messages_rx.recv().await.is_some() {} }); + } + + async fn run_prefetch( + config: ConfigIngesterRedis, + streams: HashMap, Arc)>, + mut connection: MultiplexedConnection, + messages_tx: mpsc::Sender, + shutdown: Arc, + ) -> anyhow::Result<()> { + // read pending first + for (ack_tx, stream) in streams.values() { + let mut start = "-".to_owned(); + while !shutdown.load(Ordering::Relaxed) { + let StreamPendingCountReply { ids: pending_ids } = redis::cmd("XPENDING") + .arg(&stream.stream_name) + .arg(&stream.group) + .arg(&start) + .arg("+") + .arg(config.xpending_max) + .arg(&stream.consumer) // we can't use `xpending_count` because it doesn't support `consumer` filter + .query_async(&mut connection) + .await?; + + // drop first item if we do not start from the beginning + let used_ids = if start == "-" { 0.. } else { 1.. }; + let ids_str = pending_ids[used_ids] + .iter() + .map(|pending| pending.id.as_str()) + .collect::>(); + + // check that we fetched all pendings and update start + match pending_ids.last() { + Some(id) => { + if id.id == start { + break; + } else { + start = id.id.clone(); + } + } + None => break, + } + + // read pending keys + let StreamClaimReply { ids: pendings } = connection + .xclaim( + &stream.stream_name, + &stream.group, + &stream.consumer, + 0, + &ids_str, + ) + .await?; + for pending in pendings { + let item = RedisStreamMessageInfo::parse(stream, pending, ack_tx.clone())?; + messages_tx.send(item).await.map_err(|_error| { + anyhow::anyhow!("failed to send item to prefetch channel") + })?; + } + } + } + + // exit if need to handle only pending + if config.xpending_only { + return Ok(()); + } + + let streams_keys = streams.keys().map(|name| name.as_str()).collect::>(); + let streams_ids = (0..streams_keys.len()).map(|_| ">").collect::>(); + while !shutdown.load(Ordering::Relaxed) { + let opts = StreamReadOptions::default() + .count(config.xreadgroup_max) + .group(&config.group, &config.consumer); + let results: StreamReadReply = connection + .xread_options(&streams_keys, &streams_ids, &opts) + .await?; + if results.keys.is_empty() { + sleep(Duration::from_millis(5)).await; + continue; + } + + for StreamKey { key, ids } in results.keys { + let (ack_tx, stream) = match streams.get(&key) { + Some(value) => value, + None => anyhow::bail!("unknown stream: {:?}", key), + }; + + for id in ids { + let item = RedisStreamMessageInfo::parse(stream, id, ack_tx.clone())?; + messages_tx.send(item).await.map_err(|_error| { + anyhow::anyhow!("failed to send item to prefetch channel") + })?; + } + } + } + + Ok(()) + } + + async fn run_ack( + stream: Arc, + connection: MultiplexedConnection, + mut ack_rx: mpsc::UnboundedReceiver, + ) -> anyhow::Result<()> { + let mut ids = vec![]; + let deadline = sleep(stream.xack_batch_max_idle); + tokio::pin!(deadline); + let mut tasks = JoinSet::new(); + + let result = loop { + let terminated = tokio::select! { + msg = ack_rx.recv() => match msg { + Some(msg) => { + ids.push(msg); + if ids.len() < stream.xack_batch_max_size { + continue; + } + false + } + None => true, + }, + _ = &mut deadline => false, + }; + + let ids = std::mem::take(&mut ids); + deadline + .as_mut() + .reset(Instant::now() + stream.xack_batch_max_idle); + if !ids.is_empty() { + tasks.spawn({ + let stream = Arc::clone(&stream); + let mut connection = connection.clone(); + async move { + redis::pipe() + .atomic() + .xack(&stream.stream_name, &stream.group, &ids) + .xdel(&stream.stream_name, &ids) + .query_async(&mut connection) + .await?; + redis_xack_inc(&stream.stream_name, ids.len()); + Ok::<(), anyhow::Error>(()) + } + }); + while tasks.len() >= stream.xack_max_in_process { + if let Some(result) = tasks.join_next().await { + result??; + } + } + } + + if terminated { + break Ok(()); + } + }; + + while let Some(result) = tasks.join_next().await { + result??; + } + + result + } +} diff --git a/grpc-ingest/src/tracing.rs b/grpc-ingest/src/tracing.rs new file mode 100644 index 000000000..2d50f785c --- /dev/null +++ b/grpc-ingest/src/tracing.rs @@ -0,0 +1,33 @@ +use { + opentelemetry_sdk::trace::{self, Sampler}, + std::env, + tracing_subscriber::{filter::EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}, +}; + +pub fn init() -> anyhow::Result<()> { + let open_tracer = opentelemetry_jaeger::new_agent_pipeline() + .with_service_name(env::var("CARGO_PKG_NAME")?) + .with_auto_split_batch(true) + .with_trace_config(trace::config().with_sampler(Sampler::TraceIdRatioBased(0.25))) + .install_batch(opentelemetry_sdk::runtime::Tokio)?; + let jeager_layer = tracing_opentelemetry::layer().with_tracer(open_tracer); + + let env_filter = EnvFilter::builder() + .parse(env::var(EnvFilter::DEFAULT_ENV).unwrap_or_else(|_| "info,sqlx=warn".to_owned()))?; + + let is_atty = atty::is(atty::Stream::Stdout) && atty::is(atty::Stream::Stderr); + let io_layer = tracing_subscriber::fmt::layer().with_ansi(is_atty); + + let registry = tracing_subscriber::registry() + .with(jeager_layer) + .with(env_filter) + .with(io_layer); + + if env::var_os("RUST_LOG_JSON").is_some() { + let json_layer = tracing_subscriber::fmt::layer().json().flatten_event(true); + registry.with(json_layer).try_init() + } else { + registry.try_init() + } + .map_err(Into::into) +} diff --git a/grpc-ingest/src/util.rs b/grpc-ingest/src/util.rs new file mode 100644 index 000000000..0a7800a12 --- /dev/null +++ b/grpc-ingest/src/util.rs @@ -0,0 +1,19 @@ +use { + async_stream::stream, + futures::stream::{BoxStream, StreamExt}, + tokio::signal::unix::{signal, SignalKind}, +}; + +pub fn create_shutdown() -> anyhow::Result> { + let mut sigint = signal(SignalKind::interrupt())?; + let mut sigterm = signal(SignalKind::terminate())?; + Ok(stream! { + loop { + yield tokio::select! { + _ = sigint.recv() => "SIGINT", + _ = sigterm.recv() => "SIGTERM", + }; + } + } + .boxed()) +} diff --git a/grpc-ingest/src/version.rs b/grpc-ingest/src/version.rs new file mode 100644 index 000000000..b9da62845 --- /dev/null +++ b/grpc-ingest/src/version.rs @@ -0,0 +1,22 @@ +use {serde::Serialize, std::env}; + +#[derive(Debug, Serialize)] +pub struct Version { + pub package: &'static str, + pub version: &'static str, + pub proto: &'static str, + pub solana: &'static str, + pub git: &'static str, + pub rustc: &'static str, + pub buildts: &'static str, +} + +pub const VERSION: Version = Version { + package: env!("CARGO_PKG_NAME"), + version: env!("CARGO_PKG_VERSION"), + proto: env!("YELLOWSTONE_GRPC_PROTO_VERSION"), + solana: env!("SOLANA_SDK_VERSION"), + git: env!("GIT_VERSION"), + rustc: env!("VERGEN_RUSTC_SEMVER"), + buildts: env!("VERGEN_BUILD_TIMESTAMP"), +}; diff --git a/migration/Cargo.toml b/migration/Cargo.toml index c0202ffdd..6ce0612be 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -9,7 +9,10 @@ publish = { workspace = true } async-std = { workspace = true, features = ["attributes", "tokio1"] } enum-iterator = { workspace = true } enum-iterator-derive = { workspace = true } -sea-orm-migration = { workspace = true, features = ["runtime-tokio-rustls", "sqlx-postgres"] } +sea-orm-migration = { workspace = true, features = [ + "runtime-tokio-rustls", + "sqlx-postgres", +] } [lints] workspace = true diff --git a/nft_ingester/src/tasks/mod.rs b/nft_ingester/src/tasks/mod.rs index 50d9a0c97..039ffca9b 100644 --- a/nft_ingester/src/tasks/mod.rs +++ b/nft_ingester/src/tasks/mod.rs @@ -318,6 +318,17 @@ impl TaskManager { .map_err(|e| e.into()) } + async fn save_task( + txn: &A, + task: tasks::ActiveModel, + ) -> Result + where + A: ConnectionTrait, + { + let act: tasks::ActiveModel = task; + act.save(txn).await.map_err(|e| e.into()) + } + pub fn start_listener(&mut self, process_on_receive: bool) -> JoinHandle<()> { let (producer, mut receiver) = mpsc::unbounded_channel::(); self.producer = Some(producer); @@ -328,20 +339,17 @@ impl TaskManager { tokio::task::spawn(async move { while let Some(task) = receiver.recv().await { - let instance_name = instance_name.clone(); let task_name = task.name; - let sender_pool = sender_pool.clone(); if let Some(task_created_time) = task.created_at { - let bus_time = - Utc::now().timestamp_millis() - task_created_time.timestamp_millis(); + let bus_time = Utc::now().timestamp_millis() + - task_created_time.and_utc().timestamp_millis(); metric! { statsd_histogram!("ingester.bgtask.bus_time", bus_time as u64, "type" => task.name); } } - if task_name == "DownloadMetadata" { - if let Some(sender_pool) = sender_pool { + if let Some(sender_pool) = sender_pool.clone() { let download_metadata_task = DownloadMetadata::from_task_data(task); if let Ok(download_metadata_task) = download_metadata_task { @@ -374,41 +382,27 @@ impl TaskManager { .filter(tasks::Column::Status.ne(TaskStatus::Pending)) .one(&conn) .await; - if let Ok(Some(e)) = task_entry { metric! { statsd_count!("ingester.bgtask.identical", 1, "type" => &e.task_type); } continue; } - + metric! { + statsd_count!("ingester.bgtask.new", 1, "type" => task.name); + } TaskManager::new_task_handler( pool.clone(), - instance_name, + instance_name.clone(), task, Arc::clone(&task_map), process_on_receive, ); - - metric! { - statsd_count!("ingester.bgtask.new", 1, "type" => task_name); - } } } }) } - async fn save_task( - txn: &A, - task: tasks::ActiveModel, - ) -> Result - where - A: ConnectionTrait, - { - let act: tasks::ActiveModel = task; - act.save(txn).await.map_err(|e| e.into()) - } - pub fn start_runner(&self, config: Option) -> JoinHandle<()> { let task_map = Arc::clone(&self.registered_task_types); let instance_name = self.instance_name.clone(); diff --git a/program_transformers/Cargo.toml b/program_transformers/Cargo.toml index 35bab7a19..aa96212e7 100644 --- a/program_transformers/Cargo.toml +++ b/program_transformers/Cargo.toml @@ -10,12 +10,16 @@ blockbuster = { workspace = true } bs58 = { workspace = true } cadence = { workspace = true } cadence-macros = { workspace = true } -digital_asset_types = { workspace = true, features = ["json_types", "sql_types"] } +digital_asset_types = { workspace = true, features = [ + "json_types", + "sql_types", +] } futures = { workspace = true } heck = { workspace = true } mpl-bubblegum = { workspace = true } num-traits = { workspace = true } sea-orm = { workspace = true } +serde = { workspace = true } serde_json = { workspace = true } solana-sdk = { workspace = true } solana-transaction-status = { workspace = true } diff --git a/program_transformers/src/lib.rs b/program_transformers/src/lib.rs index edde20ca3..79c407bd4 100644 --- a/program_transformers/src/lib.rs +++ b/program_transformers/src/lib.rs @@ -20,6 +20,7 @@ use { entity::EntityTrait, query::Select, ConnectionTrait, DatabaseConnection, DbErr, SqlxPostgresConnector, TransactionTrait, }, + serde::{Deserialize, Serialize}, solana_sdk::{instruction::CompiledInstruction, pubkey::Pubkey, signature::Signature}, solana_transaction_status::InnerInstructions, sqlx::PgPool, @@ -52,7 +53,7 @@ pub struct TransactionInfo { pub meta_inner_instructions: Vec, } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct DownloadMetadataInfo { asset_data_id: Vec, uri: String,