From 1c92592a18df184e3c716a98e9307a78f7a75512 Mon Sep 17 00:00:00 2001 From: J Bilal rafique <113895287+j-rafique@users.noreply.github.com> Date: Mon, 5 Aug 2024 17:44:12 +0500 Subject: [PATCH] [PSL-1250] implement a job to sync meta with p2p data (#914) --- p2p/kademlia/store/sqlite/meta_worker.go | 98 +++++++++++++++++++++++- 1 file changed, 95 insertions(+), 3 deletions(-) diff --git a/p2p/kademlia/store/sqlite/meta_worker.go b/p2p/kademlia/store/sqlite/meta_worker.go index c30925bc2..1b6b2f96b 100644 --- a/p2p/kademlia/store/sqlite/meta_worker.go +++ b/p2p/kademlia/store/sqlite/meta_worker.go @@ -19,8 +19,10 @@ var ( migrationMetaDB = "data001-migration-meta.sqlite3" accessUpdateBufferSize = 100000 commitInsertsInterval = 90 * time.Second - updateChannel chan UpdateMessage - insertChannel chan UpdateMessage + metaSyncBatchSize = 10000 + + updateChannel chan UpdateMessage + insertChannel chan UpdateMessage ) func init() { @@ -39,7 +41,8 @@ type UpdateMessage struct { // MigrationMetaStore manages database operations. type MigrationMetaStore struct { - db *sqlx.DB + db *sqlx.DB + p2pDataStore *sqlx.DB updateTicker *time.Ticker insertTicker *time.Ticker @@ -66,8 +69,14 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string) (*MigrationMetaS db.SetMaxOpenConns(20) db.SetMaxIdleConns(10) + p2pDataStore, err := connectP2PDataStore(dataDir) + if err != nil { + log.WithContext(ctx).WithError(err).Error("error connecting p2p store from meta-migration store") + } + handler := &MigrationMetaStore{ db: db, + p2pDataStore: p2pDataStore, updateTicker: time.NewTicker(commitLastAccessedInterval), insertTicker: time.NewTicker(commitInsertsInterval), } @@ -84,6 +93,13 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string) (*MigrationMetaS log.P2P().WithContext(ctx).Errorf("cannot create migration table in sqlite database: %s", err.Error()) } + if handler.isMetaSyncRequired() { + err := handler.syncMetaWithData(ctx) + if err != nil { + log.WithContext(ctx).WithError(err).Error("error syncing meta with p2p data") + } + } + go handler.startLastAccessedUpdateWorker(ctx) go handler.startInsertWorker(ctx) @@ -142,6 +158,82 @@ func (d *MigrationMetaStore) migrateMigration() error { return nil } +func (d *MigrationMetaStore) isMetaSyncRequired() bool { + var exists int + query := `SELECT EXISTS(SELECT 1 FROM meta LIMIT 1);` + + err := d.db.QueryRow(query).Scan(&exists) + if err != nil { + return false + } + + return exists == 0 +} + +func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error { + query := `SELECT key, data, updated_at FROM data LIMIT ? OFFSET ?` + var offset int + + for { + rows, err := d.p2pDataStore.Queryx(query, metaSyncBatchSize, offset) + if err != nil { + log.WithContext(ctx).WithError(err).Error("Error querying p2p data store") + return err + } + + var batchUpdates []UpdateMessage + found := false + for rows.Next() { + found = true + var r Record + + if err := rows.Scan(&r.Key, &r.Data, &r.UpdatedAt); err != nil { + log.WithContext(ctx).WithError(err).Error("Error scanning row from p2p data store") + continue + } + + dataSize := len(r.Data) + batchUpdates = append(batchUpdates, UpdateMessage{ + Key: r.Key, + LastAccessTime: r.UpdatedAt, + Size: dataSize, + }) + } + rows.Close() + + if !found { + break + } + + // Send batch for insertion using the existing channel-based mechanism. + PostKeysInsert(batchUpdates) + + offset += len(batchUpdates) // Move the offset forward by the number of items processed. + } + + return nil +} + +func connectP2PDataStore(dataDir string) (*sqlx.DB, error) { + if _, err := os.Stat(dataDir); os.IsNotExist(err) { + if err := os.MkdirAll(dataDir, 0750); err != nil { + return nil, fmt.Errorf("mkdir %q: %w", dataDir, err) + } + } else if err != nil { + return nil, fmt.Errorf("cannot create data folder: %w", err) + } + + dbFile := path.Join(dataDir, dbName) + dataDb, err := sqlx.Connect("sqlite3", dbFile) + if err != nil { + return nil, fmt.Errorf("cannot open sqlite database: %w", err) + } + dataDb.SetMaxOpenConns(200) + dataDb.SetMaxIdleConns(10) + + return dataDb, nil +} + // PostAccessUpdate sends access updates to be handled by the worker. func PostAccessUpdate(updates []string) { for _, update := range updates {