Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 make garbage collection a runnable #231

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 11 additions & 35 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package main

import (
"context"
"flag"
"fmt"
"net/http"
Expand All @@ -31,20 +30,18 @@ import (
"k8s.io/client-go/metadata"
_ "k8s.io/client-go/plugin/pkg/client/auth"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"github.com/go-logr/logr"
"github.com/spf13/pflag"

"github.com/operator-framework/catalogd/internal/garbagecollection"
"github.com/operator-framework/catalogd/internal/source"
"github.com/operator-framework/catalogd/internal/third_party/server"
"github.com/operator-framework/catalogd/internal/version"
Expand Down Expand Up @@ -82,6 +79,7 @@ func main() {
catalogServerAddr string
httpExternalAddr string
cacheDir string
gcInterval time.Duration
)
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
Expand All @@ -94,6 +92,7 @@ func main() {
flag.StringVar(&httpExternalAddr, "http-external-address", "http://catalogd-catalogserver.catalogd-system.svc", "The external address at which the http server is reachable.")
flag.StringVar(&cacheDir, "cache-dir", "/var/cache/", "The directory in the filesystem that catalogd will use for file based caching")
flag.BoolVar(&catalogdVersion, "version", false, "print the catalogd version and exit")
flag.DurationVar(&gcInterval, "gc-interval", 12*time.Hour, "interval in which garbage collection should be run against the catalog content cache")
opts := zap.Options{
Development: true,
}
Expand Down Expand Up @@ -202,8 +201,14 @@ func main() {
}

ctx := ctrl.SetupSignalHandler()
if err := unpackStartupGarbageCollection(ctx, filepath.Join(cacheDir, source.UnpackCacheDir), setupLog, metaClient); err != nil {
setupLog.Error(err, "running garbage collection")
gc := &garbagecollection.GarbageCollector{
CachePath: filepath.Join(cacheDir, source.UnpackCacheDir),
Logger: ctrl.Log.WithName("garbage-collector"),
MetadataClient: metaClient,
Interval: gcInterval,
}
if err := mgr.Add(gc); err != nil {
setupLog.Error(err, "problem adding garbage collector to manager")
os.Exit(1)
}

Expand All @@ -221,32 +226,3 @@ func podNamespace() string {
}
return string(namespace)
}

func unpackStartupGarbageCollection(ctx context.Context, cachePath string, log logr.Logger, metaClient metadata.Interface) error {
getter := metaClient.Resource(v1alpha1.GroupVersion.WithResource("catalogs"))
metaList, err := getter.List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("error listing catalogs: %w", err)
}

expectedCatalogs := sets.New[string]()
for _, meta := range metaList.Items {
expectedCatalogs.Insert(meta.GetName())
}

cacheDirEntries, err := os.ReadDir(cachePath)
if err != nil {
return fmt.Errorf("error reading cache directory: %w", err)
}
for _, cacheDirEntry := range cacheDirEntries {
if cacheDirEntry.IsDir() && expectedCatalogs.Has(cacheDirEntry.Name()) {
continue
}
if err := os.RemoveAll(filepath.Join(cachePath, cacheDirEntry.Name())); err != nil {
return fmt.Errorf("error removing cache directory entry %q: %w ", cacheDirEntry.Name(), err)
}

log.Info("deleted unexpected cache directory entry", "path", cacheDirEntry.Name(), "isDir", cacheDirEntry.IsDir())
}
return nil
}
94 changes: 94 additions & 0 deletions internal/garbagecollection/garbage_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package garbagecollection

import (
"context"
"fmt"
"os"
"path/filepath"
"time"

"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/metadata"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/operator-framework/catalogd/api/core/v1alpha1"
)

var _ manager.Runnable = (*GarbageCollector)(nil)

// GarbageCollector is an implementation of the manager.Runnable
// interface for running garbage collection on the Catalog content
// cache that is served by the catalogd HTTP server. It runs in a loop
// and will ensure that no cache entries exist for Catalog resources
// that no longer exist. This should only clean up cache entries that
// were missed by the handling of a DELETE event on a Catalog resource.
type GarbageCollector struct {
CachePath string
Logger logr.Logger
MetadataClient metadata.Interface
Interval time.Duration
}

// Start will start the garbage collector. It will always run once on startup
// and loop until context is canceled after an initial garbage collection run.
// Garbage collection will run again every X amount of time, where X is the
// supplied garbage collection interval.
func (gc *GarbageCollector) Start(ctx context.Context) error {
// Run once on startup
removed, err := runGarbageCollection(ctx, gc.CachePath, gc.MetadataClient)
if err != nil {
gc.Logger.Error(err, "running garbage collection")
}
if len(removed) > 0 {
gc.Logger.Info("removed stale cache entries", "removed entries", removed)
}

// Loop until context is canceled, running garbage collection
// at the configured interval
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(gc.Interval):
removed, err := runGarbageCollection(ctx, gc.CachePath, gc.MetadataClient)
if err != nil {
gc.Logger.Error(err, "running garbage collection")
}
if len(removed) > 0 {
gc.Logger.Info("removed stale cache entries", "removed entries", removed)
}
}
}
}

func runGarbageCollection(ctx context.Context, cachePath string, metaClient metadata.Interface) ([]string, error) {
getter := metaClient.Resource(v1alpha1.GroupVersion.WithResource("catalogs"))
metaList, err := getter.List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("error listing catalogs: %w", err)
}

expectedCatalogs := sets.New[string]()
for _, meta := range metaList.Items {
expectedCatalogs.Insert(meta.GetName())
}

cacheDirEntries, err := os.ReadDir(cachePath)
if err != nil {
return nil, fmt.Errorf("error reading cache directory: %w", err)
}
removed := []string{}
for _, cacheDirEntry := range cacheDirEntries {
if cacheDirEntry.IsDir() && expectedCatalogs.Has(cacheDirEntry.Name()) {
continue
}
if err := os.RemoveAll(filepath.Join(cachePath, cacheDirEntry.Name())); err != nil {
return nil, fmt.Errorf("error removing cache directory entry %q: %w ", cacheDirEntry.Name(), err)
}

removed = append(removed, cacheDirEntry.Name())
}
return removed, nil
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package main
package garbagecollection

import (
"context"
"os"
"path/filepath"
"testing"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,7 +15,7 @@ import (
"github.com/operator-framework/catalogd/api/core/v1alpha1"
)

func TestUnpackStartupGarbageCollection(t *testing.T) {
func TestRunGarbageCollection(t *testing.T) {
for _, tt := range []struct {
name string
existCatalogs []*metav1.PartialObjectMetadata
Expand Down Expand Up @@ -76,7 +75,7 @@ func TestUnpackStartupGarbageCollection(t *testing.T) {

metaClient := fake.NewSimpleMetadataClient(scheme, runtimeObjs...)

err := unpackStartupGarbageCollection(ctx, cachePath, logr.Discard(), metaClient)
_, err := runGarbageCollection(ctx, cachePath, metaClient)
if !tt.wantErr {
assert.NoError(t, err)
entries, err := os.ReadDir(cachePath)
Expand Down
Loading