Skip to content

Commit

Permalink
Multi broadcast txns (#60)
Browse files Browse the repository at this point in the history
* wip: broadcast claim tx to multiple rpcs

* Fix endpoints env var parsing

* Adjust tests to rpc broadcast

* Use the first successful rpc provider result when waiting for the transaction submit results. Use the first one if none are found

* Apply prettier

* rebase multibroadcast onto feat/multiple-rpc

* Refactor submitClaims signature back to return a single array for each ecosystem submit

* Fix cancellation signal and max priority fee

* Revert initial changes to tests

* small bug fix

* Update .env.sample

* Add additional check for the fringe case of getting mixed responses from the rpcs

* Add fallback ENDPOINT env variable

* Apply prettier

---------

Co-authored-by: chase-45 <chasemoran45@gmail.com>
  • Loading branch information
M-Picco and chase-45 authored Apr 2, 2024
1 parent 905d94f commit 6856880
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 36 deletions.
2 changes: 1 addition & 1 deletion frontend/.env.sample
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

ENDPOINT=http://localhost:8899
ENDPOINTS=["http://localhost:8899"]
CLUSTER=localnet

# Postgres connection variables. These default values should work for local development.
Expand Down
196 changes: 169 additions & 27 deletions frontend/claim_sdk/solana.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import { SignedMessage } from './ecosystems/signatures'
import { extractChainId } from './ecosystems/cosmos'
import { fetchFundTransaction } from '../utils/api'
import { getClaimPayers } from './treasury'
import { inspect } from 'util'
import { bs58 } from '@coral-xyz/anchor/dist/cjs/utils/bytes'

export const ERROR_SIGNING_TX = 'error: signing transaction'
export const ERROR_FUNDING_TX = 'error: funding transaction'
Expand Down Expand Up @@ -59,23 +59,32 @@ export class TokenDispenserProvider {
tokenDispenserProgram: anchor.Program<TokenDispenser>
configPda: [anchor.web3.PublicKey, bump]
config: IdlAccounts<TokenDispenser>['Config'] | undefined
providers: anchor.Provider[]

constructor(
endpoint: string,
endpoints: string[],
wallet: Wallet,
programId: anchor.web3.PublicKey,
confirmOpts?: anchor.web3.ConfirmOptions
) {
confirmOpts = confirmOpts ?? anchor.AnchorProvider.defaultOptions()
const provider = new anchor.AnchorProvider(
new anchor.web3.Connection(endpoint, confirmOpts.preflightCommitment),
wallet,
confirmOpts

this.providers = endpoints.map(
(endpoint) =>
new anchor.AnchorProvider(
new anchor.web3.Connection(
endpoint,
confirmOpts?.preflightCommitment
),
wallet,
confirmOpts ?? anchor.AnchorProvider.defaultOptions()
)
)

this.tokenDispenserProgram = new Program(
tokenDispenser as Idl,
programId,
provider
this.providers[0]
) as unknown as Program<TokenDispenser>

this.configPda = anchor.web3.PublicKey.findProgramAddressSync(
Expand Down Expand Up @@ -280,7 +289,7 @@ export class TokenDispenserProvider {
}
)

let txsSignedTwice
let txsSignedTwice: VersionedTransaction[]
try {
txsSignedTwice = await fetchFundTransactionFunction(
txsSignedOnceWithPayers
Expand All @@ -291,28 +300,23 @@ export class TokenDispenserProvider {
}

// send the txns. Associated token account will be created if needed.
const sendTxs = txsSignedTwice.map(async (signedTx) => {
try {
const signature = await this.connection.sendTransaction(signedTx, {
skipPreflight: true,
})
const latestBlockHash = await this.connection.getLatestBlockhash()
const result = await this.connection.confirmTransaction(
{
signature,
blockhash: latestBlockHash.blockhash,
lastValidBlockHeight: latestBlockHash.lastValidBlockHeight,
},
'confirmed'
)
const sendTxs = await this.multiBroadcastTransactions(txsSignedTwice)

return result.value.err
} catch {
throw new Error(ERROR_RPC_CONNECTION)
const mapToOutput = sendTxs.map((tx) => {
// if the transaction comes back null this is actually an error
if (tx === null) {
//return as a promise
return Promise.resolve('Transaction failed to broadcast')
}
// if the transaction errored we will also have to handle that
if (tx.err) {
return Promise.resolve(tx.err)
}
//otherwise we are fine, return null
return Promise.resolve(null)
})

return sendTxs
return mapToOutput
}

public async generateClaimTransaction(
Expand Down Expand Up @@ -433,7 +437,7 @@ export class TokenDispenserProvider {
: ataCreationCost + pdaDerivationCosts(claimaintFundBump))
ixs.push(ComputeBudgetProgram.setComputeUnitLimit({ units }))

const microLamports = 1_000_000 //somewhat arbitrary choice
const microLamports = 1_000_000 - 1 //somewhat arbitrary choice
ixs.push(ComputeBudgetProgram.setComputeUnitPrice({ microLamports }))

// 5. build and return the transaction
Expand Down Expand Up @@ -590,8 +594,146 @@ export class TokenDispenserProvider {
)
return { mint, treasury }
}

private async multiBroadcastTransactions(
transactions: VersionedTransaction[]
): Promise<(anchor.web3.SignatureStatus | null)[]> {
const output: (anchor.web3.SignatureStatus | null)[] = []
if (this.providers.length === 0) {
throw new Error('No valid endpoints to broadcast transactions')
}
const redundantBroadcasts = new Map<
VersionedTransaction,
Promise<anchor.web3.SignatureStatus | null>[]
>()

try {
for (const transaction of transactions) {
redundantBroadcasts.set(transaction, [])
//Cancellation token closure
let cancelled = false
const getCancellationSignal = () => cancelled
for (const endpoint of this.providers.map(
(provider) => provider.connection.rpcEndpoint
)) {
redundantBroadcasts.get(transaction)!.push(
this.broadcastTransaction(
transaction,
endpoint,
getCancellationSignal
).then(
//call the cancellation only if the transaction signature is successful
(result) => {
if (
result != null &&
result.confirmations &&
result.confirmations > 0
) {
cancelled = true
}
return result
}
)
)
}
}

for (const transaction of transactions) {
const allSettledPromises = await Promise.allSettled(
redundantBroadcasts.get(transaction)!
)
const successfulResults = allSettledPromises
.filter(
(result) =>
result.status === 'fulfilled' &&
result.value != null &&
result.value.confirmations &&
result.value.confirmations > 0
)
.map((result) =>
result.status === 'fulfilled' ? result.value : null
)

if (successfulResults.length >= 1) {
// rule out that one of the RPCs succeeds and the rest fail for some reason
const successSubmit = successfulResults.find(
(res) => res?.err !== null && res?.err !== undefined
)
output.push(successSubmit || successfulResults[0])
} else {
output.push(null)
}
}

return output
} catch (e) {
//This should never hit
throw new Error('Top level error broadcasting transactions: ', e)
}
}

private async broadcastTransaction(
transaction: VersionedTransaction,
endpoint: string,
getCancellationSignal: () => boolean
): Promise<anchor.web3.SignatureStatus | null> {
//Noting the time at start so that we can't inifinte loop in the event of a halted thread or dead rpc.
const timeStart = Date.now()
// 35 seconds
const maxTimeout = 35 * 1000
const connection = new Connection(endpoint)
let isCancelled = getCancellationSignal()
//TODO check that this is really the txId
const txId = bs58.encode(transaction.signatures[0])

while (!isCancelled) {
//first send the transaction raw
try {
await connection.sendRawTransaction(transaction.serialize(), {
skipPreflight: true,
maxRetries: 0,
})
} catch (e) {
//just swallow it if it fails for now.
}

//Pull the signature status
try {
const status = await connection.getSignatureStatus(txId)
if (
status.value?.confirmationStatus === 'confirmed' ||
status.value?.confirmationStatus === 'finalized'
//Intentionally omitting processed, confirmed means 66% of stake voted on it
) {
//If the transaction is confirmed or finalized we're all done.
return status.value
//Multibroadcast function should invoke the cancel token to cancel the other parallel broadcasts
}
} catch (e) {
//This means the status call actually rejected, which we can't really do anything about
}

//If we're here, we need to check the time and see if we should retry.
const timeEnd = Date.now()
//This should only trigger after we are sufficiently sure the blockhash will have expired.
if (timeEnd - timeStart > maxTimeout) {
return null
}

//wait 1.8 seconds so as to not murder the RPC
await wait(1800)

isCancelled = getCancellationSignal()
}

//This means we got manually cancelled.
return null
}
}

const wait = (milliseconds: number) =>
new Promise((resolve) => setTimeout(resolve, milliseconds))

export async function airdrop(
connection: Connection,
amount: number,
Expand Down
4 changes: 2 additions & 2 deletions frontend/components/wallets/Solana.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { useMemo, ReactElement, ReactNode, useCallback, useEffect } from 'react'
import { clusterApiUrl } from '@solana/web3.js'
import { Adapter, WalletAdapterNetwork } from '@solana/wallet-adapter-base'
import { Wallet, WalletButton, WalletConnectedButton } from './WalletButton'
import config from '../../utils/config'

export const PHANTOM_WALLET_ADAPTER = new PhantomWalletAdapter()
export const BACKPACK_WALLET_ADAPTER = new BackpackWalletAdapter()
Expand Down Expand Up @@ -51,8 +52,7 @@ type SolanaWalletProviderProps = {
export function SolanaWalletProvider({
children,
}: SolanaWalletProviderProps): ReactElement {
const endpoint =
process.env.ENDPOINT || clusterApiUrl(WalletAdapterNetwork.Devnet)
const endpoint = config.ENDPOINTS[0]

const wallets = useSolanaWalletAdapters()

Expand Down
4 changes: 3 additions & 1 deletion frontend/hooks/useTokenDispenserProvider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ import { useMemo } from 'react'
import { useAnchorWallet } from '@solana/wallet-adapter-react'
import { web3 } from '@coral-xyz/anchor'
import { tokenDispenserProgramId } from '../utils/constants'
import config from '../utils/config'

// It will return undefined if no Solana wallet is connected.
export function useTokenDispenserProvider() {
const anchorWallet = useAnchorWallet()
return useMemo(() => {
if (anchorWallet === undefined) return undefined

return new CTokenDispenserProvider(
process.env.ENDPOINT!,
config.ENDPOINTS,
anchorWallet,
new web3.PublicKey(tokenDispenserProgramId)
)
Expand Down
4 changes: 2 additions & 2 deletions frontend/integration/integrationTest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ describe('integration test', () => {
}

const deployerTokenDispenserProvider = new TokenDispenserProvider(
endpoint,
[endpoint],
funderWallet,
tokenDispenserPid,
confirmOpts
)

const tokenDispenserProvider = new TokenDispenserProvider(
endpoint,
[endpoint],
wallet,
tokenDispenserPid,
confirmOpts
Expand Down
2 changes: 1 addition & 1 deletion frontend/next.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ module.exports = withBundleAnalyzer({
},
swcMinify: false,
env: {
ENDPOINT: process.env.ENDPOINT,
ENDPOINTS: process.env.ENDPOINTS,
CLUSTER: process.env.CLUSTER,
},
webpack: (config, { isServer }) => {
Expand Down
3 changes: 2 additions & 1 deletion frontend/sections/SignAndClaim.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { SignForEligibleWallets } from './SignForEligibleWallets'
import { StepProps } from './common'
import { PathnameStore } from 'utils/store'
import { BoxTitle } from '@components/BoxTitle'
import { TransactionError } from '@solana/web3.js'

// Following the convention,
// If error is:
Expand Down Expand Up @@ -102,7 +103,7 @@ export const SignAndClaim = ({ onBack, onProceed }: SignAndClaimProps) => {
setEcosystemsClaimState(stateObj)

let totalCoinsClaimed = new BN(0)
let broadcastPromises
let broadcastPromises: Promise<TransactionError | null>[]
try {
broadcastPromises = await tokenDispenser?.submitClaims(
claims.map((claim) => ({
Expand Down
25 changes: 25 additions & 0 deletions frontend/utils/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { clusterApiUrl } from '@solana/web3.js'

const parseEndpoints = () => {
let endpoints
try {
endpoints = JSON.parse(process.env.ENDPOINTS || process.env.ENDPOINT!)
} catch (e) {
// if parse fails, assume it's a single endpoint
endpoints = [
process.env.ENDPOINTS || process.env.ENDPOINT || clusterApiUrl('devnet'),
]
}

if (!Array.isArray(endpoints)) {
throw new Error('ENDPOINTS must be an array')
}

return endpoints
}

const config = {
ENDPOINTS: parseEndpoints(),
}

export default config
3 changes: 2 additions & 1 deletion frontend/utils/isClaimAlreadySubmitted.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import { Keypair } from '@solana/web3.js'
import { ClaimInfo } from 'claim_sdk/claim'
import { TokenDispenserProvider } from 'claim_sdk/solana'
import { tokenDispenserProgramId } from './constants'
import config from './config'

// Tokendispenser with randomly generated keypair. Since we don't need a
// specific one to check if claims were already submitted
const tokenDispenser = new TokenDispenserProvider(
process.env.ENDPOINT!,
config.ENDPOINTS,
new NodeWallet(new Keypair()),
new web3.PublicKey(tokenDispenserProgramId)
)
Expand Down

0 comments on commit 6856880

Please sign in to comment.