Skip to content

Commit

Permalink
[PSL-1250] implement a job to sync meta with p2p data (#914)
Browse files Browse the repository at this point in the history
  • Loading branch information
j-rafique authored Aug 5, 2024
1 parent d9ae328 commit 1c92592
Showing 1 changed file with 95 additions and 3 deletions.
98 changes: 95 additions & 3 deletions p2p/kademlia/store/sqlite/meta_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ var (
migrationMetaDB = "data001-migration-meta.sqlite3"
accessUpdateBufferSize = 100000
commitInsertsInterval = 90 * time.Second
updateChannel chan UpdateMessage
insertChannel chan UpdateMessage
metaSyncBatchSize = 10000

updateChannel chan UpdateMessage
insertChannel chan UpdateMessage
)

func init() {
Expand All @@ -39,7 +41,8 @@ type UpdateMessage struct {

// MigrationMetaStore manages database operations.
type MigrationMetaStore struct {
db *sqlx.DB
db *sqlx.DB
p2pDataStore *sqlx.DB

updateTicker *time.Ticker
insertTicker *time.Ticker
Expand All @@ -66,8 +69,14 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string) (*MigrationMetaS
db.SetMaxOpenConns(20)
db.SetMaxIdleConns(10)

p2pDataStore, err := connectP2PDataStore(dataDir)
if err != nil {
log.WithContext(ctx).WithError(err).Error("error connecting p2p store from meta-migration store")
}

handler := &MigrationMetaStore{
db: db,
p2pDataStore: p2pDataStore,
updateTicker: time.NewTicker(commitLastAccessedInterval),
insertTicker: time.NewTicker(commitInsertsInterval),
}
Expand All @@ -84,6 +93,13 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string) (*MigrationMetaS
log.P2P().WithContext(ctx).Errorf("cannot create migration table in sqlite database: %s", err.Error())
}

if handler.isMetaSyncRequired() {
err := handler.syncMetaWithData(ctx)
if err != nil {
log.WithContext(ctx).WithError(err).Error("error syncing meta with p2p data")
}
}

go handler.startLastAccessedUpdateWorker(ctx)
go handler.startInsertWorker(ctx)

Expand Down Expand Up @@ -142,6 +158,82 @@ func (d *MigrationMetaStore) migrateMigration() error {
return nil
}

func (d *MigrationMetaStore) isMetaSyncRequired() bool {
var exists int
query := `SELECT EXISTS(SELECT 1 FROM meta LIMIT 1);`

err := d.db.QueryRow(query).Scan(&exists)
if err != nil {
return false
}

return exists == 0
}

func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error {
query := `SELECT key, data, updated_at FROM data LIMIT ? OFFSET ?`
var offset int

for {
rows, err := d.p2pDataStore.Queryx(query, metaSyncBatchSize, offset)
if err != nil {
log.WithContext(ctx).WithError(err).Error("Error querying p2p data store")
return err
}

var batchUpdates []UpdateMessage
found := false
for rows.Next() {
found = true
var r Record

if err := rows.Scan(&r.Key, &r.Data, &r.UpdatedAt); err != nil {
log.WithContext(ctx).WithError(err).Error("Error scanning row from p2p data store")
continue
}

dataSize := len(r.Data)
batchUpdates = append(batchUpdates, UpdateMessage{
Key: r.Key,
LastAccessTime: r.UpdatedAt,
Size: dataSize,
})
}
rows.Close()

if !found {
break
}

// Send batch for insertion using the existing channel-based mechanism.
PostKeysInsert(batchUpdates)

offset += len(batchUpdates) // Move the offset forward by the number of items processed.
}

return nil
}

func connectP2PDataStore(dataDir string) (*sqlx.DB, error) {
if _, err := os.Stat(dataDir); os.IsNotExist(err) {
if err := os.MkdirAll(dataDir, 0750); err != nil {
return nil, fmt.Errorf("mkdir %q: %w", dataDir, err)
}
} else if err != nil {
return nil, fmt.Errorf("cannot create data folder: %w", err)
}

dbFile := path.Join(dataDir, dbName)
dataDb, err := sqlx.Connect("sqlite3", dbFile)
if err != nil {
return nil, fmt.Errorf("cannot open sqlite database: %w", err)
}
dataDb.SetMaxOpenConns(200)
dataDb.SetMaxIdleConns(10)

return dataDb, nil
}

// PostAccessUpdate sends access updates to be handled by the worker.
func PostAccessUpdate(updates []string) {
for _, update := range updates {
Expand Down

0 comments on commit 1c92592

Please sign in to comment.