Skip to content

Commit

Permalink
Merge pull request #919 from pastelnetwork/PSL-1251_cloudConnectionVe…
Browse files Browse the repository at this point in the history
…rification

[PSL-1251] verify cloud connection & configuration before starting SN
  • Loading branch information
j-rafique authored Aug 12, 2024
2 parents 4666539 + 0cbc0e9 commit 322fafe
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 19 deletions.
49 changes: 46 additions & 3 deletions p2p/kademlia/store/cloud.go/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package cloud
import (
"bytes"
"fmt"

"os"
"os/exec"
"path/filepath"
"sync"

"github.com/google/uuid"
"github.com/pastelnetwork/gonode/common/log"
)

Expand All @@ -23,6 +23,8 @@ type Storage interface {
FetchBatch(keys []string) (map[string][]byte, error)
Upload(key string, data []byte) (string, error)
UploadBatch(keys []string, data [][]byte) ([]string, error)
CheckCloudConnection() error
Delete(key string) error
}

type RcloneStorage struct {
Expand Down Expand Up @@ -51,10 +53,11 @@ func (r *RcloneStorage) Store(key string, data []byte) (string, error) {

// Use rclone to copy the file to the remote
cmd := exec.Command("rclone", "copyto", filePath, remotePath)
cmdOutput := &bytes.Buffer{}
cmd.Stderr = cmdOutput // Capture standard error
if err := cmd.Run(); err != nil {
// Clean up the local file if the upload fails
os.Remove(filePath)
return "", fmt.Errorf("rclone command failed: %w", err)
return "", fmt.Errorf("rclone command failed: %s, error: %w", cmdOutput.String(), err)
}

// Delete the local file after successful upload
Expand Down Expand Up @@ -192,3 +195,43 @@ func (r *RcloneStorage) UploadBatch(keys []string, data [][]byte) ([]string, err

return successfulKeys, nil
}

// Delete - deletes the file from the bucket of the configured spec
func (r *RcloneStorage) Delete(key string) error {
remotePath := fmt.Sprintf("%s:%s/%s", r.specName, r.bucketName, key)

cmd := exec.Command("rclone", "deletefile", remotePath)
var stderr bytes.Buffer
cmd.Stderr = &stderr

// Execute the command
err := cmd.Run()
if err != nil {
return fmt.Errorf("rclone command failed to delete file: %s, error: %w", stderr.String(), err)
}

return nil
}

// CheckCloudConnection verifies the R-clone connection by storing and fetching a test file.
func (r *RcloneStorage) CheckCloudConnection() error {
testKey := uuid.NewString()
testData := []byte("this is test data to verify cloud storage connectivity through r-clone")

_, err := r.Store(testKey, testData)
if err != nil {
return err
}

_, err = r.Fetch(testKey)
if err != nil {
return err
}

err = r.Delete(testKey)
if err != nil {
return err
}

return nil
}
2 changes: 1 addition & 1 deletion p2p/kademlia/store/sqlite/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func retrieveBatchValues(ctx context.Context, db *sqlx.DB, keys []string, getFro
values[idx] = value
keysFound++

if s.isCloudBackupOn() {
if s.IsCloudBackupOn() {
if len(value) == 0 && is_on_cloud {
cloudKeys = append(cloudKeys, key)
}
Expand Down
12 changes: 6 additions & 6 deletions p2p/kademlia/store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func NewStore(ctx context.Context, dataDir string, _ time.Duration, _ time.Durat
// Run WAL checkpoint worker every 5 seconds
go s.startCheckpointWorker(ctx)

if s.isCloudBackupOn() {
if s.IsCloudBackupOn() {
_, err = NewMigrationMetaStore(ctx, dataDir, cloud)
if err != nil {
return nil, fmt.Errorf("cannot create meta store: %w", err)
Expand All @@ -174,7 +174,7 @@ func NewStore(ctx context.Context, dataDir string, _ time.Duration, _ time.Durat
return s, nil
}

func (s *Store) isCloudBackupOn() bool {
func (s *Store) IsCloudBackupOn() bool {
return s.cloud != nil
}

Expand Down Expand Up @@ -434,7 +434,7 @@ func (s *Store) Retrieve(_ context.Context, key []byte) ([]byte, error) {
return nil, fmt.Errorf("failed to get record by key %s: %w", hkey, err)
}

if s.isCloudBackupOn() {
if s.IsCloudBackupOn() {
PostAccessUpdate([]string{hkey})
}

Expand All @@ -446,7 +446,7 @@ func (s *Store) Retrieve(_ context.Context, key []byte) ([]byte, error) {
return nil, fmt.Errorf("failed to retrieve data from cloud: data is neither on cloud nor on local - this shouldn't happen")
}

if !s.isCloudBackupOn() {
if !s.IsCloudBackupOn() {
return nil, fmt.Errorf("failed to retrieve data from cloud: data is supposed to be on cloud but backup is not enabled")
}

Expand Down Expand Up @@ -536,7 +536,7 @@ func (s *Store) storeRecord(key []byte, value []byte, typ int, isOriginal bool)
return fmt.Errorf("error storing data: %w", err)
}

if s.isCloudBackupOn() {
if s.IsCloudBackupOn() {
PostKeysInsert([]UpdateMessage{{Key: hkey, LastAccessTime: time.Now(), Size: len(value)}})
}

Expand Down Expand Up @@ -602,7 +602,7 @@ func (s *Store) storeBatchRecord(values [][]byte, typ int, isOriginal bool) erro
return fmt.Errorf("error storing data: %w", err)
}

if s.isCloudBackupOn() {
if s.IsCloudBackupOn() {
PostKeysInsert(hkeys)
}

Expand Down
11 changes: 9 additions & 2 deletions supernode/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ func runApp(ctx context.Context, config *configs.Config) error {
log.WithContext(ctx).Info("Interrupt signal received. Gracefully shutting down...")
})

// entities
pastelClient := pastel.NewClient(config.Pastel, config.Pastel.BurnAddress())

if config.PassPhrase == "" {
Expand Down Expand Up @@ -225,7 +224,15 @@ func runApp(ctx context.Context, config *configs.Config) error {
// p2p service (currently using kademlia)
config.P2P.SetWorkDir(config.WorkDir)
config.P2P.ID = config.PastelID
cloudStorage := cloud.NewRcloneStorage("bucket", "spec")

cloudStorage := cloud.NewRcloneStorage(config.RcloneStorageConfig.BucketName, config.RcloneStorageConfig.SpecName)
if config.RcloneStorageConfig.BucketName != "" && config.RcloneStorageConfig.SpecName != "" {
if err := cloudStorage.CheckCloudConnection(); err != nil {
log.WithContext(ctx).WithError(err).Fatal("error establishing connection with the cloud")
return fmt.Errorf("rclone connection check failed: %w", err)
}
}

p2p, err := p2p.New(ctx, config.P2P, pastelClient, secInfo, rqstore, cloudStorage)
if err != nil {
return errors.Errorf("could not create p2p service, %w", err)
Expand Down
15 changes: 11 additions & 4 deletions supernode/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,17 @@ type Config struct {
P2P *p2p.Config `mapstructure:"p2p" json:"p2p,omitempty"`
//MetaDB *metadb.Config `mapstructure:"metadb" json:"metadb,omitempty"`
//UserDB *database.Config `mapstructure:"userdb" json:"userdb,omitempty"`
DDServer *ddclient.Config `mapstructure:"dd-server" json:"dd-server,omitempty"`
RaptorQ *raptorq.Config `mapstructure:"raptorq" json:"raptorq,omitempty"`
HealthCheck *healthcheck_lib.Config `mapstructure:"health-check" json:"health-check,omitempty"`
DebugService *debug.Config `mapstructure:"debug-service" json:"debug-service,omitempty"`
DDServer *ddclient.Config `mapstructure:"dd-server" json:"dd-server,omitempty"`
RaptorQ *raptorq.Config `mapstructure:"raptorq" json:"raptorq,omitempty"`
HealthCheck *healthcheck_lib.Config `mapstructure:"health-check" json:"health-check,omitempty"`
DebugService *debug.Config `mapstructure:"debug-service" json:"debug-service,omitempty"`
RcloneStorageConfig *RcloneStorageConfig `mapstructure:"storage-rclone" json:"storage-rclone,omitempty"`
}

// RcloneStorageConfig contains settings for RcloneStorage
type RcloneStorageConfig struct {
SpecName string `mapstructure:"spec_name" json:"spec_name,omitempty"`
BucketName string `mapstructure:"bucket_name" json:"bucket_name,omitempty"`
}

// LogConfig contains log configs
Expand Down
3 changes: 2 additions & 1 deletion walletnode/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/json-iterator/go v1.1.12
github.com/pastelnetwork/gonode/common v0.0.0-20240229105633-1f295fe18563
github.com/pastelnetwork/gonode/mixins v0.0.0-00010101000000-000000000000
github.com/pastelnetwork/gonode/p2p v0.0.0-00010101000000-000000000000
github.com/pastelnetwork/gonode/pastel v0.0.0-00010101000000-000000000000
github.com/pastelnetwork/gonode/proto v0.0.0-00010101000000-000000000000
github.com/pastelnetwork/gonode/raptorq v0.0.0-00010101000000-000000000000
Expand Down Expand Up @@ -55,7 +56,7 @@ require (
github.com/manveru/faker v0.0.0-20171103152722-9fbc68a78c4d // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-sqlite3 v1.14.17 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down
4 changes: 2 additions & 2 deletions walletnode/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM=
github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI=
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
Expand Down

0 comments on commit 322fafe

Please sign in to comment.