From e5a19e4de35535bea4262c97ecdd8fa9d1f1c448 Mon Sep 17 00:00:00 2001 From: iuwqyir Date: Mon, 13 Jan 2025 21:10:00 +0200 Subject: [PATCH] improve delete performance for clickhouse --- internal/storage/clickhouse.go | 194 ++++++++++++++++++++++++++++----- 1 file changed, 168 insertions(+), 26 deletions(-) diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 5a62dbf..8a8ba34 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -64,8 +64,9 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) { Settings: func() clickhouse.Settings { if cfg.AsyncInsert { return clickhouse.Settings{ - "async_insert": "1", - "wait_for_async_insert": "1", + "async_insert": "1", + "wait_for_async_insert": "1", + "lightweight_deletes_sync": "0", } } return clickhouse.Settings{} @@ -954,68 +955,209 @@ func (c *ClickHouseConnector) LookbackBlockHeaders(chainId *big.Int, limit int, } func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error { - var saveErr error - var saveErrMutex sync.Mutex + var deleteErr error + var deleteErrMutex sync.Mutex var wg sync.WaitGroup wg.Add(4) go func() { defer wg.Done() - if err := c.deleteBatch(chainId, blockNumbers, "blocks", "number"); err != nil { - saveErrMutex.Lock() - saveErr = fmt.Errorf("error deleting blocks: %v", err) - saveErrMutex.Unlock() + if err := c.deleteBlocksByNumbers(chainId, blockNumbers); err != nil { + deleteErrMutex.Lock() + deleteErr = fmt.Errorf("error deleting blocks: %v", err) + deleteErrMutex.Unlock() } }() go func() { defer wg.Done() - if err := c.deleteBatch(chainId, blockNumbers, "logs", "block_number"); err != nil { - saveErrMutex.Lock() - saveErr = fmt.Errorf("error deleting logs: %v", err) - saveErrMutex.Unlock() + if err := c.deleteLogsByNumbers(chainId, blockNumbers); err != nil { + deleteErrMutex.Lock() + deleteErr = fmt.Errorf("error deleting logs: %v", err) + deleteErrMutex.Unlock() } }() go func() { defer wg.Done() - if err := c.deleteBatch(chainId, blockNumbers, "transactions", "block_number"); err != nil { - saveErrMutex.Lock() - saveErr = fmt.Errorf("error deleting transactions: %v", err) - saveErrMutex.Unlock() + if err := c.deleteTransactionsByNumbers(chainId, blockNumbers); err != nil { + deleteErrMutex.Lock() + deleteErr = fmt.Errorf("error deleting transactions: %v", err) + deleteErrMutex.Unlock() } }() go func() { defer wg.Done() - if err := c.deleteBatch(chainId, blockNumbers, "traces", "block_number"); err != nil { - saveErrMutex.Lock() - saveErr = fmt.Errorf("error deleting traces: %v", err) - saveErrMutex.Unlock() + if err := c.deleteTracesByNumbers(chainId, blockNumbers); err != nil { + deleteErrMutex.Lock() + deleteErr = fmt.Errorf("error deleting traces: %v", err) + deleteErrMutex.Unlock() } }() wg.Wait() - if saveErr != nil { - return saveErr + if deleteErr != nil { + return deleteErr } return nil } -func (c *ClickHouseConnector) deleteBatch(chainId *big.Int, blockNumbers []*big.Int, table string, blockNumberColumn string) error { - query := fmt.Sprintf("DELETE FROM %s.%s WHERE chain_id = ? AND %s IN (?)", c.cfg.Database, table, blockNumberColumn) +func (c *ClickHouseConnector) deleteBlocksByNumbers(chainId *big.Int, blockNumbers []*big.Int) error { + query := fmt.Sprintf("DELETE FROM %s.blocks WHERE _partition_value.1 = ? AND chain_id = ? AND number IN (?)", c.cfg.Database) blockNumbersStr := make([]string, len(blockNumbers)) for i, bn := range blockNumbers { blockNumbersStr[i] = bn.String() } + err := c.conn.Exec(context.Background(), query, chainId, chainId, blockNumbersStr) + if err != nil { + return fmt.Errorf("error deleting blocks: %w", err) + } + return nil +} + +func (c *ClickHouseConnector) deleteLogsByNumbers(chainId *big.Int, blockNumbers []*big.Int) error { + blockNumbersStr := make([]string, len(blockNumbers)) + for i, bn := range blockNumbers { + blockNumbersStr[i] = bn.String() + } + getQuery := fmt.Sprintf("SELECT block_number, transaction_hash, log_index FROM %s.logs WHERE chain_id = %s AND block_number IN (?) AND is_deleted = 0", c.cfg.Database, chainId.String()) + + rows, getErr := c.conn.Query(context.Background(), getQuery) + if getErr != nil { + return getErr + } + defer rows.Close() + + logsToDelete := make([]common.Log, 0) + for rows.Next() { + var logToDelete common.Log + err := rows.ScanStruct(&logToDelete) + if err != nil { + return err + } + logsToDelete = append(logsToDelete, logToDelete) + } + + deleteQuery := fmt.Sprintf("DELETE FROM %s.logs WHERE _partition_value.1 = ? AND chain_id = ? AND block_number = ? AND transaction_hash = ? AND log_index = ?", c.cfg.Database) - err := c.conn.Exec(context.Background(), query, chainId, blockNumbersStr) + batch, err := c.conn.PrepareBatch(context.Background(), deleteQuery) if err != nil { - return fmt.Errorf("error deleting from %s: %w", table, err) + return fmt.Errorf("error preparing batch for deleting logs: %w", err) } + for _, log := range logsToDelete { + err := batch.Append( + chainId, + chainId, + log.BlockNumber, + log.TransactionHash, + log.LogIndex, + ) + if err != nil { + return fmt.Errorf("error appending log to delete batch: %w", err) + } + } + if err := batch.Send(); err != nil { + return fmt.Errorf("error deleting logs: %w", err) + } + return nil +} + +func (c *ClickHouseConnector) deleteTransactionsByNumbers(chainId *big.Int, blockNumbers []*big.Int) error { + blockNumbersStr := make([]string, len(blockNumbers)) + for i, bn := range blockNumbers { + blockNumbersStr[i] = bn.String() + } + getQuery := fmt.Sprintf("SELECT block_number, hash FROM %s.transactions WHERE chain_id = %s AND block_number IN (?) AND is_deleted = 0", c.cfg.Database, chainId.String()) + + rows, getErr := c.conn.Query(context.Background(), getQuery) + if getErr != nil { + return getErr + } + defer rows.Close() + + txsToDelete := make([]common.Transaction, 0) + for rows.Next() { + var txToDelete common.Transaction + err := rows.ScanStruct(&txToDelete) + if err != nil { + return err + } + txsToDelete = append(txsToDelete, txToDelete) + } + + deleteQuery := fmt.Sprintf("DELETE FROM %s.transactions WHERE _partition_value.1 = ? AND chain_id = ? AND block_number = ? AND hash = ?", c.cfg.Database) + + batch, err := c.conn.PrepareBatch(context.Background(), deleteQuery) + if err != nil { + return fmt.Errorf("error preparing batch for deleting transactions: %w", err) + } + + for _, tx := range txsToDelete { + err := batch.Append( + chainId, + chainId, + tx.BlockNumber, + tx.Hash, + ) + if err != nil { + return fmt.Errorf("error appending transaction to delete batch: %w", err) + } + } + if err := batch.Send(); err != nil { + return fmt.Errorf("error deleting transactions: %w", err) + } + return nil +} + +func (c *ClickHouseConnector) deleteTracesByNumbers(chainId *big.Int, blockNumbers []*big.Int) error { + blockNumbersStr := make([]string, len(blockNumbers)) + for i, bn := range blockNumbers { + blockNumbersStr[i] = bn.String() + } + getQuery := fmt.Sprintf("SELECT block_number, transaction_hash, trace_address FROM %s.traces WHERE chain_id = %s AND block_number IN (?) AND is_deleted = 0", c.cfg.Database, chainId.String()) + + rows, getErr := c.conn.Query(context.Background(), getQuery) + if getErr != nil { + return getErr + } + defer rows.Close() + + tracesToDelete := make([]common.Trace, 0) + for rows.Next() { + var traceToDelete common.Trace + err := rows.ScanStruct(&traceToDelete) + if err != nil { + return err + } + tracesToDelete = append(tracesToDelete, traceToDelete) + } + + deleteQuery := fmt.Sprintf("DELETE FROM %s.traces WHERE _partition_value.1 = ? AND chain_id = ? AND block_number = ? AND transaction_hash = ? AND trace_address = ?", c.cfg.Database) + + batch, err := c.conn.PrepareBatch(context.Background(), deleteQuery) + if err != nil { + return fmt.Errorf("error preparing batch for deleting traces: %w", err) + } + + for _, trace := range tracesToDelete { + err := batch.Append( + chainId, + chainId, + trace.BlockNumber, + trace.TransactionHash, + trace.TraceAddress, + ) + if err != nil { + return fmt.Errorf("error appending trace to delete batch: %w", err) + } + } + if err := batch.Send(); err != nil { + return fmt.Errorf("error deleting traces: %w", err) + } return nil }