Skip to content

Commit

Permalink
Only start datagatherer if its API resource is available
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Wall <richard.wall@venafi.com>
  • Loading branch information
wallrj committed Nov 15, 2024
1 parent 1a567ef commit 34c73e3
Showing 1 changed file with 38 additions and 4 deletions.
42 changes: 38 additions & 4 deletions pkg/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
clientgocorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -161,11 +163,44 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
if err != nil {
return fmt.Errorf("failed to create event recorder: %v", err)
}
// Data gatherers are loaded depending on what the Kubernetes API supports.
// First, let's do a /api discovery to see what the API supports.
discoveryClient, err := k8s.NewDiscoveryClient("")
if err != nil {
return fmt.Errorf("failed to create a discovery client: %v", err)
}

_, resources, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
return fmt.Errorf("failed to get server resources: %v", err)
}
supported := sets.NewString()
for _, r := range resources {
gv, err := schema.ParseGroupVersion(r.GroupVersion)
if err != nil {
log.Error(err, "Failed to parse group version")
}
for _, r2 := range r.APIResources {
supported.Insert(gv.WithResource(r2.Name).String())
}
}
log.V(logs.Debug).Info("Supported", "resources", supported.List())
dataGatherers := map[string]datagatherer.DataGatherer{}

// load datagatherer config and boot each one
for _, dgConfig := range config.DataGatherers {
if c, ok := dgConfig.Config.(*k8s.ConfigDynamic); ok {
gvr := c.GroupVersionResource
if !supported.Has(gvr.String()) {
log.Info(
"Skipping DataGatherer",
"name", dgConfig.Name,
"reason", "GroupVersionResource not installed",
"groupVersionResource", gvr,
)
continue
}
}
kind := dgConfig.Kind
if dgConfig.DataPath != "" {
kind = "local"
Expand Down Expand Up @@ -213,17 +248,16 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
// too, it will backoff and retry of its own accord. Initial boot
// will only be delayed by a max of 5 seconds.
bootCtx, bootCancel := context.WithTimeout(gctx, 5*time.Second)
defer bootCancel()
for _, dgConfig := range config.DataGatherers {
dg := dataGatherers[dgConfig.Name]
for dgName, dg := range dataGatherers {
// wait for the informer to complete an initial sync, we do this to
// attempt to have an initial set of data for the first upload of
// the run.
if err := dg.WaitForCacheSync(bootCtx.Done()); err != nil {
// log sync failure, this might recover in future
log.Error(err, "Failed to complete initial sync of DataGatherer", "kind", dgConfig.Kind, "name", dgConfig.Name)
log.Error(err, "Failed to complete initial sync of DataGatherer", "name", dgName)
}
}
bootCancel()

// begin the datagathering loop, periodically sending data to the
// configured output using data in datagatherer caches or refreshing from
Expand Down

0 comments on commit 34c73e3

Please sign in to comment.