diff --git a/pkg/agent/run.go b/pkg/agent/run.go index 54832dbe..508fc9f6 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -24,7 +24,9 @@ import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" clientgocorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" @@ -161,11 +163,44 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) { if err != nil { return fmt.Errorf("failed to create event recorder: %v", err) } + // Data gatherers are loaded depending on what the Kubernetes API supports. + // First, let's do a /api discovery to see what the API supports. + discoveryClient, err := k8s.NewDiscoveryClient("") + if err != nil { + return fmt.Errorf("failed to create a discovery client: %v", err) + } + _, resources, err := discoveryClient.ServerGroupsAndResources() + if err != nil { + return fmt.Errorf("failed to get server resources: %v", err) + } + supported := sets.NewString() + for _, r := range resources { + gv, err := schema.ParseGroupVersion(r.GroupVersion) + if err != nil { + log.Error(err, "Failed to parse group version") + } + for _, r2 := range r.APIResources { + supported.Insert(gv.WithResource(r2.Name).String()) + } + } + log.V(logs.Debug).Info("Supported", "resources", supported.List()) dataGatherers := map[string]datagatherer.DataGatherer{} // load datagatherer config and boot each one for _, dgConfig := range config.DataGatherers { + if c, ok := dgConfig.Config.(*k8s.ConfigDynamic); ok { + gvr := c.GroupVersionResource + if !supported.Has(gvr.String()) { + log.Info( + "Skipping DataGatherer", + "name", dgConfig.Name, + "reason", "GroupVersionResource not installed", + "groupVersionResource", gvr, + ) + continue + } + } kind := dgConfig.Kind if dgConfig.DataPath != "" { kind = "local" @@ -213,17 +248,16 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) { // too, it will backoff and retry of its own accord. Initial boot // will only be delayed by a max of 5 seconds. bootCtx, bootCancel := context.WithTimeout(gctx, 5*time.Second) - defer bootCancel() - for _, dgConfig := range config.DataGatherers { - dg := dataGatherers[dgConfig.Name] + for dgName, dg := range dataGatherers { // wait for the informer to complete an initial sync, we do this to // attempt to have an initial set of data for the first upload of // the run. if err := dg.WaitForCacheSync(bootCtx.Done()); err != nil { // log sync failure, this might recover in future - log.Error(err, "Failed to complete initial sync of DataGatherer", "kind", dgConfig.Kind, "name", dgConfig.Name) + log.Error(err, "Failed to complete initial sync of DataGatherer", "name", dgName) } } + bootCancel() // begin the datagathering loop, periodically sending data to the // configured output using data in datagatherer caches or refreshing from