Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make unchained great again #908

Merged
merged 7 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -609,12 +609,26 @@ aliases:
api-memory-limit: 1Gi
api-memory-request: 500Mi
stateful-service-replicas: 2
service-name-1: indexer
service-image-1: shapeshiftdao/unchained-blockbook:559cfbc
service-cpu-limit-1: "4"
service-cpu-request-1: "2"
service-memory-limit-1: 24Gi
service-storage-size-1: 750Gi
service-name-1: daemon
service-image-1: 0xpolygon/bor:1.2.0
service-cpu-limit-1: "8"
service-cpu-request-1: "4"
service-memory-limit-1: 48Gi
service-storage-size-1: 4000Gi
service-storage-iops-1: "6000"
service-storage-throughput-1: "300"
service-name-2: heimdall
service-image-2: 0xpolygon/heimdall:1.0.3
service-cpu-limit-2: "2"
service-cpu-request-2: "1"
service-memory-limit-2: 1Gi
service-storage-size-2: 400Gi
service-name-3: indexer
service-image-3: shapeshiftdao/unchained-blockbook:559cfbc
service-cpu-limit-3: "4"
service-cpu-request-3: "2"
service-memory-limit-3: 24Gi
service-storage-size-3: 750Gi

- &polygon-dev
<<: *polygon
Expand Down
2 changes: 1 addition & 1 deletion node/coinstacks/arbitrum-nova/api/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ const server = app.listen(PORT, () => logger.info('Server started'))
const wsServer = new Server({ server })

wsServer.on('connection', (connection) => {
ConnectionHandler.start(connection, registry, prometheus)
ConnectionHandler.start(connection, registry, prometheus, logger)
})

new WebsocketClient(INDEXER_WS_URL, {
Expand Down
2 changes: 1 addition & 1 deletion node/coinstacks/arbitrum-nova/api/src/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export const logger = new Logger({
level: process.env.LOG_LEVEL,
})

const blockbook = new Blockbook({ httpURL: INDEXER_URL, wsURL: INDEXER_WS_URL })
const blockbook = new Blockbook({ httpURL: INDEXER_URL, wsURL: INDEXER_WS_URL, logger })
const provider = new ethers.providers.JsonRpcProvider(RPC_URL)
export const gasOracle = new GasOracle({ logger, provider, coinstack: 'arbitrum-nova' })

Expand Down
2 changes: 1 addition & 1 deletion node/coinstacks/arbitrum/api/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const server = app.listen(PORT, () => logger.info('Server started'))
const wsServer = new Server({ server })

wsServer.on('connection', (connection) => {
ConnectionHandler.start(connection, registry, prometheus)
ConnectionHandler.start(connection, registry, prometheus, logger)
})

new WebsocketClient(INDEXER_WS_URL, {
Expand Down
2 changes: 1 addition & 1 deletion node/coinstacks/arbitrum/api/src/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export const logger = new Logger({
level: process.env.LOG_LEVEL,
})

const blockbook = new Blockbook({ httpURL: INDEXER_URL, wsURL: INDEXER_WS_URL })
const blockbook = new Blockbook({ httpURL: INDEXER_URL, wsURL: INDEXER_WS_URL, logger })
const provider = new ethers.providers.JsonRpcProvider(RPC_URL)
export const gasOracle = new GasOracle({ logger, provider, coinstack: 'arbitrum' })

Expand Down
2 changes: 1 addition & 1 deletion node/coinstacks/avalanche/api/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const server = app.listen(PORT, () => logger.info('Server started'))
const wsServer = new Server({ server })

wsServer.on('connection', (connection) => {
ConnectionHandler.start(connection, registry, prometheus)
ConnectionHandler.start(connection, registry, prometheus, logger)
})

new WebsocketClient(INDEXER_WS_URL, {
Expand Down
2 changes: 1 addition & 1 deletion node/coinstacks/avalanche/api/src/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export const logger = new Logger({
level: process.env.LOG_LEVEL,
})

const blockbook = new Blockbook({ httpURL: INDEXER_URL, wsURL: INDEXER_WS_URL })
const blockbook = new Blockbook({ httpURL: INDEXER_URL, wsURL: INDEXER_WS_URL, logger })
const provider = new ethers.providers.JsonRpcProvider(RPC_URL)
export const gasOracle = new GasOracle({ logger, provider, coinstack: 'avalanche' })

Expand Down
2 changes: 1 addition & 1 deletion node/coinstacks/bitcoin/api/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ const server = app.listen(PORT, () => logger.info('Server started'))
const wsServer = new Server({ server })

wsServer.on('connection', (connection) => {
ConnectionHandler.start(connection, registry, prometheus)
ConnectionHandler.start(connection, registry, prometheus, logger)
})

new WebsocketClient(INDEXER_WS_URL, {
Expand Down
8 changes: 7 additions & 1 deletion node/coinstacks/bitcoin/api/src/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { bech32 } from 'bech32'
import { Blockbook } from '@shapeshiftoss/blockbook'
import { Service } from '../../../common/api/src/utxo/service'
import { UTXO } from '../../../common/api/src/utxo/controller'
import { Logger } from '@shapeshiftoss/logger'

const INDEXER_URL = process.env.INDEXER_URL
const INDEXER_WS_URL = process.env.INDEXER_WS_URL
Expand All @@ -11,7 +12,12 @@ if (!INDEXER_URL) throw new Error('INDEXER_URL env var not set')
if (!INDEXER_WS_URL) throw new Error('INDEXER_WS_URL env var not set')
if (!RPC_URL) throw new Error('RPC_URL env var not set')

const blockbook = new Blockbook({ httpURL: INDEXER_URL, wsURL: INDEXER_WS_URL })
export const logger = new Logger({
namespace: ['unchained', 'coinstacks', 'bitcoin', 'api'],
level: process.env.LOG_LEVEL,
})

const blockbook = new Blockbook({ httpURL: INDEXER_URL, wsURL: INDEXER_WS_URL, logger })

const isXpub = (pubkey: string): boolean => {
return pubkey.startsWith('xpub') || pubkey.startsWith('ypub') || pubkey.startsWith('zpub')
Expand Down
2 changes: 1 addition & 1 deletion node/coinstacks/bitcoincash/api/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const server = app.listen(PORT, () => logger.info('Server started'))
const wsServer = new Server({ server })

wsServer.on('connection', (connection) => {
ConnectionHandler.start(connection, registry, prometheus)
ConnectionHandler.start(connection, registry, prometheus, logger)
})

new WebsocketClient(INDEXER_WS_URL, {
Expand Down
8 changes: 7 additions & 1 deletion node/coinstacks/bitcoincash/api/src/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { bech32 } from 'bech32'
import { Blockbook } from '@shapeshiftoss/blockbook'
import { Service } from '../../../common/api/src/utxo/service'
import { UTXO } from '../../../common/api/src/utxo/controller'
import { Logger } from '@shapeshiftoss/logger'

const INDEXER_URL = process.env.INDEXER_URL
const INDEXER_WS_URL = process.env.INDEXER_WS_URL
Expand All @@ -11,7 +12,12 @@ if (!INDEXER_URL) throw new Error('INDEXER_URL env var not set')
if (!INDEXER_WS_URL) throw new Error('INDEXER_WS_URL env var not set')
if (!RPC_URL) throw new Error('RPC_URL env var not set')

const blockbook = new Blockbook({ httpURL: INDEXER_URL, wsURL: INDEXER_WS_URL })
export const logger = new Logger({
namespace: ['unchained', 'coinstacks', 'bitcoincash', 'api'],
level: process.env.LOG_LEVEL,
})

const blockbook = new Blockbook({ httpURL: INDEXER_URL, wsURL: INDEXER_WS_URL, logger })

const isXpub = (pubkey: string): boolean => {
return pubkey.startsWith('xpub') || pubkey.startsWith('ypub') || pubkey.startsWith('zpub')
Expand Down
2 changes: 1 addition & 1 deletion node/coinstacks/bnbsmartchain/api/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ const server = app.listen(PORT, () => logger.info('Server started'))
const wsServer = new Server({ server })

wsServer.on('connection', (connection) => {
ConnectionHandler.start(connection, registry, prometheus)
ConnectionHandler.start(connection, registry, prometheus, logger)
})

new WebsocketClient(INDEXER_WS_URL, {
Expand Down
2 changes: 1 addition & 1 deletion node/coinstacks/bnbsmartchain/api/src/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export const logger = new Logger({
level: process.env.LOG_LEVEL,
})

const blockbook = new Blockbook({ httpURL: INDEXER_URL, wsURL: INDEXER_WS_URL })
const blockbook = new Blockbook({ httpURL: INDEXER_URL, wsURL: INDEXER_WS_URL, logger })
const provider = new ethers.providers.JsonRpcProvider(RPC_URL)
export const gasOracle = new GasOracle({ logger, provider, coinstack: 'bnbsmartchain' })

Expand Down
38 changes: 10 additions & 28 deletions node/coinstacks/common/api/src/evm/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import {
} from './models'
import {
Cursor,
NodeBlock,
DebugCallStack,
ExplorerApiResponse,
TraceCall,
Expand Down Expand Up @@ -323,34 +322,17 @@ export class Service implements Omit<BaseAPI, 'getInfo'>, API {
}
}

async handleBlock(hash: string, retryCount = 0): Promise<Array<BlockbookTx>> {
const request: RPCRequest = {
jsonrpc: '2.0',
id: `eth_getBlockByHash-${hash}`,
method: 'eth_getBlockByHash',
params: [hash, false],
}

const { data } = await axios.post<RPCResponse>(this.rpcUrl, request)

if (data.error) throw new Error(`failed to get block: ${hash}: ${data.error.message}`)

// retry if no results are returned, this typically means we queried a node that hasn't indexed the data yet
if (!data.result) {
if (retryCount >= 5) throw new Error(`failed to get block: ${hash}: ${JSON.stringify(data)}`)
retryCount++
await exponentialDelay(retryCount)
return this.handleBlock(hash, retryCount)
async handleBlock(hash: string): Promise<Array<BlockbookTx>> {
try {
const { txs = [], totalPages = 1 } = await this.blockbook.getBlock(hash)
for (let page = 1; page < totalPages; ++page) {
const data = await this.blockbook.getBlock(hash, page)
data.txs && txs.push(...data.txs)
}
return txs
} catch (err) {
throw handleError(err)
}

const block = data.result as NodeBlock

// make best effort to fetch all transactions, but don't fail handling block if a single transaction fails
const txs = await Promise.allSettled(block.transactions.map((hash) => this.blockbook.getTransaction(hash)))

return txs
.filter((tx): tx is PromiseFulfilledResult<BlockbookTx> => tx.status === 'fulfilled')
.map((tx) => tx.value)
}

handleTransaction(tx: BlockbookTx): Tx {
Expand Down
33 changes: 10 additions & 23 deletions node/coinstacks/common/api/src/utxo/service.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import axios from 'axios'
import axiosRetry from 'axios-retry'
import { ApiError as BlockbookApiError, Blockbook, Tx as BlockbookTx } from '@shapeshiftoss/blockbook'
import { AddressFormatter, ApiError, BadRequestError, BaseAPI, Cursor, RPCRequest, RPCResponse, SendTxBody } from '../'
import { AddressFormatter, ApiError, BadRequestError, BaseAPI, Cursor, SendTxBody } from '../'
import { Account, Address, API, NetworkFee, NetworkFees, RawTx, Tx, TxHistory, Utxo } from './models'
import { NodeBlock } from './types'
import { validatePageSize } from '../utils'

axiosRetry(axios, { retries: 5, retryDelay: axiosRetry.exponentialDelay })
Expand Down Expand Up @@ -31,12 +30,10 @@ export class Service implements Omit<BaseAPI, 'getInfo'>, API {
readonly isXpub: (pubkey: string) => boolean

private readonly blockbook: Blockbook
private readonly rpcUrl: string
private formatAddress: AddressFormatter = (address: string) => address.toLowerCase()

constructor(args: ServiceArgs) {
this.blockbook = args.blockbook
this.rpcUrl = args.rpcUrl
this.isXpub = args.isXpub

if (args.addressFormatter) this.formatAddress = args.addressFormatter
Expand Down Expand Up @@ -182,26 +179,16 @@ export class Service implements Omit<BaseAPI, 'getInfo'>, API {
}

async handleBlock(hash: string): Promise<Array<BlockbookTx>> {
const request: RPCRequest = {
jsonrpc: '2.0',
id: `getblock-${hash}`,
method: 'getblock',
params: [hash],
try {
const { txs = [], totalPages = 1 } = await this.blockbook.getBlock(hash)
for (let page = 1; page < totalPages; ++page) {
const data = await this.blockbook.getBlock(hash, page)
data.txs && txs.push(...data.txs)
}
return txs
} catch (err) {
throw handleError(err)
}

const { data } = await axios.post<RPCResponse>(this.rpcUrl, request)

if (data.error) throw new Error(`failed to get block: ${hash}: ${data.error.message}`)
if (!data.result) throw new Error(`failed to get block: ${hash}: ${JSON.stringify(data)}`)

const block = data.result as NodeBlock

// make best effort to fetch all transactions, but don't fail handling block if a single transaction fails
const txs = await Promise.allSettled(block.tx.map((hash) => this.blockbook.getTransaction(hash)))

return txs
.filter((tx): tx is PromiseFulfilledResult<BlockbookTx> => tx.status === 'fulfilled')
.map((tx) => tx.value)
}

handleTransaction(tx: BlockbookTx): Tx {
Expand Down
35 changes: 18 additions & 17 deletions node/coinstacks/common/api/src/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,49 +42,50 @@ export class ConnectionHandler {
private readonly websocket: WebSocket
private readonly registry: Registry
private readonly prometheus: Prometheus
private readonly logger: Logger
private readonly routes: Record<Topics, Methods>
private readonly pingInterval = 10000
private readonly pingIntervalMs = 10000

private pingTimeout?: NodeJS.Timeout
private subscriptionIds = new Map<string, void>()

private logger = new Logger({ namespace: ['unchained', 'coinstacks', 'common', 'api'], level: process.env.LOG_LEVEL })

private constructor(websocket: WebSocket, registry: Registry, prometheus: Prometheus) {
private constructor(websocket: WebSocket, registry: Registry, prometheus: Prometheus, logger: Logger) {
this.clientId = v4()
this.registry = registry
this.prometheus = prometheus
this.logger = logger.child({ namespace: ['websocket'] })
this.routes = {
txs: {
subscribe: (subscriptionId: string, data?: TxsTopicData) => this.handleSubscribeTxs(subscriptionId, data),
unsubscribe: (subscriptionId: string, data?: TxsTopicData) => this.handleUnsubscribeTxs(subscriptionId, data),
},
}

const interval = setInterval(() => {
this.websocket.ping()
}, this.pingInterval)
this.pingTimeout = undefined
this.prometheus.metrics.websocketCount.inc()
this.websocket = websocket
this.websocket.ping()

this.heartbeat()
const pingInterval = setInterval(() => {
this.websocket.ping()
}, this.pingIntervalMs)

this.websocket = websocket
this.websocket.onerror = (error) => {
this.logger.error({ clientId: this.clientId, error, fn: 'ws.onerror' }, 'websocket error')
this.close(interval)
this.close(pingInterval)
}
this.websocket.onclose = ({ code, reason }) => {
this.prometheus.metrics.websocketCount.dec()
this.logger.debug({ clientId: this.clientId, code, reason, fn: 'ws.close' }, 'websocket closed')
this.close(interval)
this.close(pingInterval)
}
this.websocket.on('pong', () => this.heartbeat())
this.websocket.on('ping', () => this.websocket.pong())
this.websocket.onmessage = (event) => this.onMessage(event)
}

static start(websocket: WebSocket, registry: Registry, prometheus: Prometheus): void {
prometheus.metrics.websocketCount.inc()
new ConnectionHandler(websocket, registry, prometheus)
static start(websocket: WebSocket, registry: Registry, prometheus: Prometheus, logger: Logger): void {
new ConnectionHandler(websocket, registry, prometheus, logger)
}

private heartbeat(): void {
Expand All @@ -93,9 +94,9 @@ export class ConnectionHandler {
}

this.pingTimeout = setTimeout(() => {
this.logger.debug({ fn: 'pingTimeout' }, 'heartbeat failed')
this.logger.debug({ clientId: this.clientId, fn: 'pingTimeout' }, 'heartbeat failed')
this.websocket.terminate()
}, this.pingInterval + 1000)
}, this.pingIntervalMs + 1000)
}

private sendError(message: string, subscriptionId: string): void {
Expand Down Expand Up @@ -130,7 +131,7 @@ export class ConnectionHandler {
}
}
} catch (err) {
this.logger.error(err, { fn: 'onMessage', event }, 'Error processing message')
this.logger.error(err, { clientId: this.clientId, fn: 'onMessage', event }, 'Error processing message')
}
}

Expand Down
2 changes: 1 addition & 1 deletion node/coinstacks/dogecoin/api/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const server = app.listen(PORT, () => logger.info('Server started'))
const wsServer = new Server({ server })

wsServer.on('connection', (connection) => {
ConnectionHandler.start(connection, registry, prometheus)
ConnectionHandler.start(connection, registry, prometheus, logger)
})

new WebsocketClient(INDEXER_WS_URL, {
Expand Down
8 changes: 7 additions & 1 deletion node/coinstacks/dogecoin/api/src/controller.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Blockbook } from '@shapeshiftoss/blockbook'
import { Service } from '../../../common/api/src/utxo/service'
import { UTXO } from '../../../common/api/src/utxo/controller'
import { Logger } from '@shapeshiftoss/logger'

const INDEXER_URL = process.env.INDEXER_URL
const INDEXER_WS_URL = process.env.INDEXER_WS_URL
Expand All @@ -10,7 +11,12 @@ if (!INDEXER_URL) throw new Error('INDEXER_URL env var not set')
if (!INDEXER_WS_URL) throw new Error('INDEXER_WS_URL env var not set')
if (!RPC_URL) throw new Error('RPC_URL env var not set')

const blockbook = new Blockbook({ httpURL: INDEXER_URL, wsURL: INDEXER_WS_URL })
export const logger = new Logger({
namespace: ['unchained', 'coinstacks', 'dogecoin', 'api'],
level: process.env.LOG_LEVEL,
})

const blockbook = new Blockbook({ httpURL: INDEXER_URL, wsURL: INDEXER_WS_URL, logger })

const isXpub = (pubkey: string): boolean => {
return pubkey.startsWith('dgub')
Expand Down
Loading