From 3e4d60bae4c117178b90a3adb66d2d140ce873a9 Mon Sep 17 00:00:00 2001 From: Vlad Date: Tue, 10 Oct 2023 16:33:28 +0400 Subject: [PATCH 1/3] fix 3 races in tests infra --- share/eds/cache/accessor_cache_test.go | 13 ++++++++++++- share/p2p/peers/pool.go | 5 ++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/share/eds/cache/accessor_cache_test.go b/share/eds/cache/accessor_cache_test.go index 4f12301dee..347b251a88 100644 --- a/share/eds/cache/accessor_cache_test.go +++ b/share/eds/cache/accessor_cache_test.go @@ -5,6 +5,7 @@ import ( "context" "errors" "io" + "sync" "testing" "time" @@ -210,7 +211,8 @@ func TestAccessorCache(t *testing.T) { // initialize close done := make(chan struct{}) go func() { - err = cache.Remove(key) + err := cache.Remove(key) + require.NoError(t, err) close(done) }() @@ -284,16 +286,21 @@ func TestAccessorCache(t *testing.T) { } type mockAccessor struct { + m sync.Mutex data []byte isClosed bool returnedBs int } func (m *mockAccessor) Reader() io.Reader { + m.m.Lock() + defer m.m.Unlock() return bytes.NewBuffer(m.data) } func (m *mockAccessor) Blockstore() (dagstore.ReadBlockstore, error) { + m.m.Lock() + defer m.m.Unlock() if m.returnedBs > 0 { return nil, errors.New("blockstore already returned") } @@ -302,6 +309,8 @@ func (m *mockAccessor) Blockstore() (dagstore.ReadBlockstore, error) { } func (m *mockAccessor) Close() error { + m.m.Lock() + defer m.m.Unlock() if m.isClosed { return errors.New("already closed") } @@ -312,6 +321,8 @@ func (m *mockAccessor) Close() error { func (m *mockAccessor) checkClosed(t *testing.T, expected bool) { // item will be removed in background, so give it some time to settle time.Sleep(time.Millisecond * 100) + m.m.Lock() + defer m.m.Unlock() require.Equal(t, expected, m.isClosed) } diff --git a/share/p2p/peers/pool.go b/share/p2p/peers/pool.go index 609d68e0b3..d0cc45ac44 100644 --- a/share/p2p/peers/pool.go +++ b/share/p2p/peers/pool.go @@ -89,8 +89,11 @@ func (p *pool) next(ctx context.Context) <-chan peer.ID { return } + p.m.RLock() + hasPeerCh := p.hasPeerCh + p.m.RUnlock() select { - case <-p.hasPeerCh: + case <-hasPeerCh: case <-ctx.Done(): return } From eed718025f22bc7c30d0d4ae9d88e06408a66365 Mon Sep 17 00:00:00 2001 From: Vlad Date: Tue, 10 Oct 2023 16:38:49 +0400 Subject: [PATCH 2/3] extract peer-manager race fix --- share/p2p/peers/pool.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/share/p2p/peers/pool.go b/share/p2p/peers/pool.go index d0cc45ac44..609d68e0b3 100644 --- a/share/p2p/peers/pool.go +++ b/share/p2p/peers/pool.go @@ -89,11 +89,8 @@ func (p *pool) next(ctx context.Context) <-chan peer.ID { return } - p.m.RLock() - hasPeerCh := p.hasPeerCh - p.m.RUnlock() select { - case <-hasPeerCh: + case <-p.hasPeerCh: case <-ctx.Done(): return } From 27024ee18ecb35820c24dea714f99346d97554fe Mon Sep 17 00:00:00 2001 From: Wondertan Date: Tue, 10 Oct 2023 14:57:12 +0200 Subject: [PATCH 3/3] fix concurrent cache access in test with atomic --- share/eds/blockstore.go | 6 +++--- share/eds/metrics.go | 2 +- share/eds/store.go | 12 ++++++------ share/eds/store_test.go | 22 +++++++++++----------- 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/share/eds/blockstore.go b/share/eds/blockstore.go index 9cbb3f4e8a..e44601870e 100644 --- a/share/eds/blockstore.go +++ b/share/eds/blockstore.go @@ -6,7 +6,7 @@ import ( "fmt" bstore "github.com/ipfs/boxo/blockstore" - dshelp "github.com/ipfs/boxo/datastore/dshelp" + "github.com/ipfs/boxo/datastore/dshelp" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -154,13 +154,13 @@ func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (* // check if either cache contains an accessor shardKey := keys[0] - accessor, err := bs.store.cache.Get(shardKey) + accessor, err := bs.store.cache.Load().Get(shardKey) if err == nil { return blockstoreCloser(accessor) } // load accessor to the blockstore cache and use it as blockstoreCloser - accessor, err = bs.store.cache.Second().GetOrLoad(ctx, shardKey, bs.store.getAccessor) + accessor, err = bs.store.cache.Load().Second().GetOrLoad(ctx, shardKey, bs.store.getAccessor) if err != nil { return nil, fmt.Errorf("failed to get accessor for shard %s: %w", shardKey, err) } diff --git a/share/eds/metrics.go b/share/eds/metrics.go index 8f87643f17..cbebf8321a 100644 --- a/share/eds/metrics.go +++ b/share/eds/metrics.go @@ -124,7 +124,7 @@ func (s *Store) WithMetrics() error { return err } - if err = s.cache.EnableMetrics(); err != nil { + if err = s.cache.Load().EnableMetrics(); err != nil { return err } diff --git a/share/eds/store.go b/share/eds/store.go index 9fc90046fc..816065909e 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -49,7 +49,7 @@ type Store struct { mounts *mount.Registry bs *blockstore - cache *cache.DoubleCache + cache atomic.Pointer[cache.DoubleCache] carIdx index.FullIndexRepo invertedIdx *simpleInvertedIndex @@ -129,9 +129,9 @@ func NewStore(params *Parameters, basePath string, ds datastore.Batching) (*Stor gcInterval: params.GCInterval, mounts: r, shardFailures: failureChan, - cache: cache.NewDoubleCache(recentBlocksCache, blockstoreCache), } store.bs = newBlockstore(store, ds) + store.cache.Store(cache.NewDoubleCache(recentBlocksCache, blockstoreCache)) return store, nil } @@ -286,7 +286,7 @@ func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext go func() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - ac, err := s.cache.First().GetOrLoad(ctx, result.Key, s.getAccessor) + ac, err := s.cache.Load().First().GetOrLoad(ctx, result.Key, s.getAccessor) if err != nil { log.Warnw("unable to put accessor to recent blocks accessors cache", "err", err) return @@ -347,7 +347,7 @@ func (s *Store) GetCAR(ctx context.Context, root share.DataHash) (io.ReadCloser, func (s *Store) getCAR(ctx context.Context, root share.DataHash) (io.ReadCloser, error) { key := shard.KeyFromString(root.String()) - accessor, err := s.cache.Get(key) + accessor, err := s.cache.Load().Get(key) if err == nil { return newReadCloser(accessor), nil } @@ -391,7 +391,7 @@ func (s *Store) carBlockstore( root share.DataHash, ) (*BlockstoreCloser, error) { key := shard.KeyFromString(root.String()) - accessor, err := s.cache.Get(key) + accessor, err := s.cache.Load().Get(key) if err == nil { return blockstoreCloser(accessor) } @@ -482,7 +482,7 @@ func (s *Store) Remove(ctx context.Context, root share.DataHash) error { func (s *Store) remove(ctx context.Context, root share.DataHash) (err error) { key := shard.KeyFromString(root.String()) // remove open links to accessor from cache - if err := s.cache.Remove(key); err != nil { + if err := s.cache.Load().Remove(key); err != nil { log.Warnw("remove accessor from cache", "err", err) } ch := make(chan dagstore.ShardResult, 1) diff --git a/share/eds/store_test.go b/share/eds/store_test.go index 7052533555..09357347d0 100644 --- a/share/eds/store_test.go +++ b/share/eds/store_test.go @@ -158,7 +158,7 @@ func TestEDSStore(t *testing.T) { time.Sleep(time.Millisecond * 100) // remove non-failed accessor from cache - err = edsStore.cache.Remove(shard.KeyFromString(dah.String())) + err = edsStore.cache.Load().Remove(shard.KeyFromString(dah.String())) assert.NoError(t, err) _, err = edsStore.GetCAR(ctx, dah.Hash()) @@ -205,7 +205,7 @@ func TestEDSStore(t *testing.T) { // check, that the key is in the cache after put shardKey := shard.KeyFromString(dah.String()) - _, err = edsStore.cache.Get(shardKey) + _, err = edsStore.cache.Load().Get(shardKey) assert.NoError(t, err) }) @@ -276,7 +276,7 @@ func TestEDSStore_GC(t *testing.T) { // remove links to the shard from cache time.Sleep(time.Millisecond * 100) key := shard.KeyFromString(share.DataHash(dah.Hash()).String()) - err = edsStore.cache.Remove(key) + err = edsStore.cache.Load().Remove(key) require.NoError(t, err) // doesn't exist yet @@ -305,8 +305,8 @@ func Test_BlockstoreCache(t *testing.T) { require.NoError(t, err) // store eds to the store with noopCache to allow clean cache after put - swap := edsStore.cache - edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{}) + swap := edsStore.cache.Load() + edsStore.cache.Store(cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{})) eds, dah := randomEDS(t) err = edsStore.Put(ctx, dah.Hash(), eds) require.NoError(t, err) @@ -327,11 +327,11 @@ func Test_BlockstoreCache(t *testing.T) { } // swap back original cache - edsStore.cache = swap + edsStore.cache.Store(swap) // key shouldn't be in cache yet, check for returned errCacheMiss shardKey := shard.KeyFromString(dah.String()) - _, err = edsStore.cache.Get(shardKey) + _, err = edsStore.cache.Load().Get(shardKey) require.Error(t, err) // now get it from blockstore, to trigger storing to cache @@ -339,7 +339,7 @@ func Test_BlockstoreCache(t *testing.T) { require.NoError(t, err) // should be no errCacheMiss anymore - _, err = edsStore.cache.Get(shardKey) + _, err = edsStore.cache.Load().Get(shardKey) require.NoError(t, err) } @@ -362,7 +362,7 @@ func Test_CachedAccessor(t *testing.T) { time.Sleep(time.Millisecond * 100) // accessor should be in cache - _, err = edsStore.cache.Get(shard.KeyFromString(dah.String())) + _, err = edsStore.cache.Load().Get(shard.KeyFromString(dah.String())) require.NoError(t, err) // first read from cached accessor @@ -393,7 +393,7 @@ func Test_NotCachedAccessor(t *testing.T) { err = edsStore.Start(ctx) require.NoError(t, err) // replace cache with noopCache to - edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{}) + edsStore.cache.Store(cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{})) eds, dah := randomEDS(t) err = edsStore.Put(ctx, dah.Hash(), eds) @@ -403,7 +403,7 @@ func Test_NotCachedAccessor(t *testing.T) { time.Sleep(time.Millisecond * 100) // accessor should not be in cache - _, err = edsStore.cache.Get(shard.KeyFromString(dah.String())) + _, err = edsStore.cache.Load().Get(shard.KeyFromString(dah.String())) require.Error(t, err) // first read from direct accessor (not from cache)