Skip to content

Commit

Permalink
Pass the context to the Venafi clients
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Wall <richard.wall@venafi.com>
  • Loading branch information
wallrj committed Nov 27, 2024
1 parent 24e02f3 commit 80a3117
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 50 deletions.
6 changes: 4 additions & 2 deletions hack/e2e/values.venafi-kubernetes-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@ authentication:
enabled: true

extraArgs:
- --logging-format=json
- --log-level=2
- --logging-format=text
# Show trace logs for the venafi-connection-lib client
# See https://github.com/jetstack/venafi-connection-lib/blob/13c2342fe0140ff084d2aabfd29ae3d10721691b/internal/http_client/metrics_transport.go#L93-L115
- --vmodule=metrics_transport=6
15 changes: 3 additions & 12 deletions pkg/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,6 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
// If any of the go routines exit (with nil or error) the main context will
// be cancelled, which will cause this blocking loop to exit
// instead of waiting for the time period.
// TODO(wallrj): Pass a context to gatherAndOutputData, so that we don't
// have to wait for it to finish before exiting the process.
for {
if err := gatherAndOutputData(klog.NewContext(ctx, log), eventf, config, preflightClient, dataGatherers); err != nil {
return err
Expand Down Expand Up @@ -397,9 +395,7 @@ func postData(ctx context.Context, config CombinedConfig, preflightClient client

if config.AuthMode == VenafiCloudKeypair || config.AuthMode == VenafiCloudVenafiConnection {
// orgID and clusterID are not required for Venafi Cloud auth
// TODO(wallrj): Pass the context to PostDataReadingsWithOptions, so
// that its network operations can be cancelled.
err := preflightClient.PostDataReadingsWithOptions(readings, client.Options{
err := preflightClient.PostDataReadingsWithOptions(ctx, readings, client.Options{
ClusterName: config.ClusterID,
ClusterDescription: config.ClusterDescription,
})
Expand Down Expand Up @@ -427,10 +423,7 @@ func postData(ctx context.Context, config CombinedConfig, preflightClient client
if path == "" {
path = "/api/v1/datareadings"
}
// TODO(wallrj): Pass the context to Post, so that its network
// operations can be cancelled.
res, err := preflightClient.Post(path, bytes.NewBuffer(data))

res, err := preflightClient.Post(ctx, path, bytes.NewBuffer(data))
if err != nil {
return fmt.Errorf("failed to post data: %+v", err)
}
Expand All @@ -453,9 +446,7 @@ func postData(ctx context.Context, config CombinedConfig, preflightClient client
return fmt.Errorf("post to server failed: missing clusterID from agent configuration")
}

// TODO(wallrj): Pass the context to PostDataReadings, so
// that its network operations can be cancelled.
err := preflightClient.PostDataReadings(config.OrganizationID, config.ClusterID, readings)
err := preflightClient.PostDataReadings(ctx, config.OrganizationID, config.ClusterID, readings)
if err != nil {
return fmt.Errorf("post to server failed: %+v", err)
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
"context"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -29,9 +30,9 @@ type (

// The Client interface describes types that perform requests against the Jetstack Secure backend.
Client interface {
PostDataReadings(orgID, clusterID string, readings []*api.DataReading) error
PostDataReadingsWithOptions(readings []*api.DataReading, options Options) error
Post(path string, body io.Reader) (*http.Response, error)
PostDataReadings(ctx context.Context, orgID, clusterID string, readings []*api.DataReading) error
PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, options Options) error
Post(ctx context.Context, path string, body io.Reader) (*http.Response, error)
}

// The Credentials interface describes methods for credential types to implement for verification.
Expand Down
13 changes: 7 additions & 6 deletions pkg/client/client_api_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -40,13 +41,13 @@ func NewAPITokenClient(agentMetadata *api.AgentMetadata, apiToken, baseURL strin

// PostDataReadingsWithOptions uploads the slice of api.DataReading to the Jetstack Secure backend to be processed for later
// viewing in the user-interface.
func (c *APITokenClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error {
return c.PostDataReadings(opts.OrgID, opts.ClusterID, readings)
func (c *APITokenClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error {
return c.PostDataReadings(ctx, opts.OrgID, opts.ClusterID, readings)
}

// PostDataReadings uploads the slice of api.DataReading to the Jetstack Secure backend to be processed for later
// viewing in the user-interface.
func (c *APITokenClient) PostDataReadings(orgID, clusterID string, readings []*api.DataReading) error {
func (c *APITokenClient) PostDataReadings(ctx context.Context, orgID, clusterID string, readings []*api.DataReading) error {
payload := api.DataReadingsPost{
AgentMetadata: c.agentMetadata,
DataGatherTime: time.Now().UTC(),
Expand All @@ -57,7 +58,7 @@ func (c *APITokenClient) PostDataReadings(orgID, clusterID string, readings []*a
return err
}

res, err := c.Post(filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data))
res, err := c.Post(ctx, filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data))
if err != nil {
return err
}
Expand All @@ -77,8 +78,8 @@ func (c *APITokenClient) PostDataReadings(orgID, clusterID string, readings []*a
}

// Post performs an HTTP POST request.
func (c *APITokenClient) Post(path string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest(http.MethodPost, fullURL(c.baseURL, path), body)
func (c *APITokenClient) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullURL(c.baseURL, path), body)
if err != nil {
return nil, err
}
Expand Down
26 changes: 14 additions & 12 deletions pkg/client/client_oauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -97,13 +98,13 @@ func NewOAuthClient(agentMetadata *api.AgentMetadata, credentials *OAuthCredenti
}, nil
}

func (c *OAuthClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error {
return c.PostDataReadings(opts.OrgID, opts.ClusterID, readings)
func (c *OAuthClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error {
return c.PostDataReadings(ctx, opts.OrgID, opts.ClusterID, readings)
}

// PostDataReadings uploads the slice of api.DataReading to the Jetstack Secure backend to be processed for later
// viewing in the user-interface.
func (c *OAuthClient) PostDataReadings(orgID, clusterID string, readings []*api.DataReading) error {
func (c *OAuthClient) PostDataReadings(ctx context.Context, orgID, clusterID string, readings []*api.DataReading) error {
payload := api.DataReadingsPost{
AgentMetadata: c.agentMetadata,
DataGatherTime: time.Now().UTC(),
Expand All @@ -114,7 +115,7 @@ func (c *OAuthClient) PostDataReadings(orgID, clusterID string, readings []*api.
return err
}

res, err := c.Post(filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data))
res, err := c.Post(ctx, filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data))
if err != nil {
return err
}
Expand All @@ -134,13 +135,13 @@ func (c *OAuthClient) PostDataReadings(orgID, clusterID string, readings []*api.
}

// Post performs an HTTP POST request.
func (c *OAuthClient) Post(path string, body io.Reader) (*http.Response, error) {
token, err := c.getValidAccessToken()
func (c *OAuthClient) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) {
token, err := c.getValidAccessToken(ctx)
if err != nil {
return nil, err
}

req, err := http.NewRequest(http.MethodPost, fullURL(c.baseURL, path), body)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullURL(c.baseURL, path), body)
if err != nil {
return nil, err
}
Expand All @@ -157,9 +158,9 @@ func (c *OAuthClient) Post(path string, body io.Reader) (*http.Response, error)
// getValidAccessToken returns a valid access token. It will fetch a new access
// token from the auth server in case the current access token does not exist
// or it is expired.
func (c *OAuthClient) getValidAccessToken() (*accessToken, error) {
func (c *OAuthClient) getValidAccessToken(ctx context.Context) (*accessToken, error) {
if c.accessToken.needsRenew() {
err := c.renewAccessToken()
err := c.renewAccessToken(ctx)
if err != nil {
return nil, err
}
Expand All @@ -168,7 +169,7 @@ func (c *OAuthClient) getValidAccessToken() (*accessToken, error) {
return c.accessToken, nil
}

func (c *OAuthClient) renewAccessToken() error {
func (c *OAuthClient) renewAccessToken(ctx context.Context) error {
tokenURL := fmt.Sprintf("https://%s/oauth/token", c.credentials.AuthServerDomain)
audience := "https://preflight.jetstack.io/api/v1"
payload := url.Values{}
Expand All @@ -178,7 +179,7 @@ func (c *OAuthClient) renewAccessToken() error {
payload.Set("audience", audience)
payload.Set("username", c.credentials.UserID)
payload.Set("password", c.credentials.UserSecret)
req, err := http.NewRequest("POST", tokenURL, strings.NewReader(payload.Encode()))
req, err := http.NewRequestWithContext(ctx, "POST", tokenURL, strings.NewReader(payload.Encode()))
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -188,7 +189,8 @@ func (c *OAuthClient) renewAccessToken() error {
if err != nil {
return errors.WithStack(err)
}

// TODO(wallrj): This will block. Read the body incrementally and check for
// context cancellation.
body, err := io.ReadAll(res.Body)
if err != nil {
return errors.WithStack(err)
Expand Down
21 changes: 11 additions & 10 deletions pkg/client/client_venafi_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"bytes"
"context"
"crypto"
"crypto/ecdsa"
"crypto/ed25519"
Expand Down Expand Up @@ -168,7 +169,7 @@ func (c *VenafiSvcAccountCredentials) IsClientSet() (ok bool, why string) {

// PostDataReadingsWithOptions uploads the slice of api.DataReading to the Venafi Cloud backend to be processed.
// The Options are then passed as URL params in the request
func (c *VenafiCloudClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error {
func (c *VenafiCloudClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error {
payload := api.DataReadingsPost{
AgentMetadata: c.agentMetadata,
DataGatherTime: time.Now().UTC(),
Expand Down Expand Up @@ -199,7 +200,7 @@ func (c *VenafiCloudClient) PostDataReadingsWithOptions(readings []*api.DataRead
}
venafiCloudUploadURL.RawQuery = query.Encode()

res, err := c.Post(venafiCloudUploadURL.String(), bytes.NewBuffer(data))
res, err := c.Post(ctx, venafiCloudUploadURL.String(), bytes.NewBuffer(data))
if err != nil {
return err
}
Expand All @@ -219,7 +220,7 @@ func (c *VenafiCloudClient) PostDataReadingsWithOptions(readings []*api.DataRead

// PostDataReadings uploads the slice of api.DataReading to the Venafi Cloud backend to be processed for later
// viewing in the user-interface.
func (c *VenafiCloudClient) PostDataReadings(_ string, _ string, readings []*api.DataReading) error {
func (c *VenafiCloudClient) PostDataReadings(ctx context.Context, _ string, _ string, readings []*api.DataReading) error {
// orgID and clusterID are ignored in Venafi Cloud auth

payload := api.DataReadingsPost{
Expand All @@ -235,7 +236,7 @@ func (c *VenafiCloudClient) PostDataReadings(_ string, _ string, readings []*api
if !strings.HasSuffix(c.uploadPath, "/") {
c.uploadPath = fmt.Sprintf("%s/", c.uploadPath)
}
res, err := c.Post(filepath.Join(c.uploadPath, c.uploaderID), bytes.NewBuffer(data))
res, err := c.Post(ctx, filepath.Join(c.uploadPath, c.uploaderID), bytes.NewBuffer(data))
if err != nil {
return err
}
Expand All @@ -254,8 +255,8 @@ func (c *VenafiCloudClient) PostDataReadings(_ string, _ string, readings []*api
}

// Post performs an HTTP POST request.
func (c *VenafiCloudClient) Post(path string, body io.Reader) (*http.Response, error) {
token, err := c.getValidAccessToken()
func (c *VenafiCloudClient) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) {
token, err := c.getValidAccessToken(ctx)
if err != nil {
return nil, err
}
Expand All @@ -278,9 +279,9 @@ func (c *VenafiCloudClient) Post(path string, body io.Reader) (*http.Response, e
// getValidAccessToken returns a valid access token. It will fetch a new access
// token from the auth server in case the current access token does not exist
// or it is expired.
func (c *VenafiCloudClient) getValidAccessToken() (*venafiCloudAccessToken, error) {
func (c *VenafiCloudClient) getValidAccessToken(ctx context.Context) (*venafiCloudAccessToken, error) {
if c.accessToken == nil || time.Now().Add(time.Minute).After(c.accessToken.expirationTime) {
err := c.updateAccessToken()
err := c.updateAccessToken(ctx)
if err != nil {
return nil, err
}
Expand All @@ -289,7 +290,7 @@ func (c *VenafiCloudClient) getValidAccessToken() (*venafiCloudAccessToken, erro
return c.accessToken, nil
}

func (c *VenafiCloudClient) updateAccessToken() error {
func (c *VenafiCloudClient) updateAccessToken(ctx context.Context) error {
jwtToken, err := c.generateAndSignJwtToken()
if err != nil {
return err
Expand All @@ -302,7 +303,7 @@ func (c *VenafiCloudClient) updateAccessToken() error {
tokenURL := fullURL(c.baseURL, accessTokenEndpoint)

encoded := values.Encode()
request, err := http.NewRequest(http.MethodPost, tokenURL, strings.NewReader(encoded))
request, err := http.NewRequestWithContext(ctx, http.MethodPost, tokenURL, strings.NewReader(encoded))
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/client/client_venconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ func (c *VenConnClient) Start(ctx context.Context) error {

// `opts.ClusterName` and `opts.ClusterDescription` are the only values used
// from the Options struct. OrgID and ClusterID are not used in Venafi Cloud.
func (c *VenConnClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error {
func (c *VenConnClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error {
if opts.ClusterName == "" {
return fmt.Errorf("programmer mistake: the cluster name (aka `cluster_id` in the config file) cannot be left empty")
}

_, token, err := c.connHandler.Get(context.Background(), c.installNS, auth.Scope{}, types.NamespacedName{Name: c.venConnName, Namespace: c.venConnNS})
_, token, err := c.connHandler.Get(ctx, c.installNS, auth.Scope{}, types.NamespacedName{Name: c.venConnName, Namespace: c.venConnNS})
if err != nil {
return fmt.Errorf("while loading the VenafiConnection %s/%s: %w", c.venConnNS, c.venConnName, err)
}
Expand Down Expand Up @@ -161,7 +161,7 @@ func (c *VenConnClient) PostDataReadingsWithOptions(readings []*api.DataReading,
// The path parameter "no" is a dummy parameter to make the Venafi Cloud
// backend happy. This parameter, named `uploaderID` in the backend, is not
// actually used by the backend.
req, err := http.NewRequest(http.MethodPost, fullURL(token.BaseURL, "/v1/tlspk/upload/clusterdata/no"), encodedBody)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullURL(token.BaseURL, "/v1/tlspk/upload/clusterdata/no"), encodedBody)
if err != nil {
return err
}
Expand Down Expand Up @@ -206,13 +206,13 @@ func (c *VenConnClient) PostDataReadingsWithOptions(readings []*api.DataReading,
// Cloud needs a `clusterName` and `clusterDescription`, but this function can
// only pass `orgID` and `clusterID` which are both useless in Venafi Cloud. Use
// PostDataReadingsWithOptions instead.
func (c *VenConnClient) PostDataReadings(_orgID, _clusterID string, readings []*api.DataReading) error {
func (c *VenConnClient) PostDataReadings(_ context.Context, _orgID, _clusterID string, readings []*api.DataReading) error {
return fmt.Errorf("programmer mistake: PostDataReadings is not implemented for Venafi Cloud")
}

// Post isn't implemented for Venafi Cloud because /v1/tlspk/upload/clusterdata
// requires using the query parameters `name` and `description` which can't be
// set using Post. Use PostDataReadingsWithOptions instead.
func (c *VenConnClient) Post(path string, body io.Reader) (*http.Response, error) {
func (c *VenConnClient) Post(_ context.Context, path string, body io.Reader) (*http.Response, error) {
return nil, fmt.Errorf("programmer mistake: Post is not implemented for Venafi Cloud")
}

0 comments on commit 80a3117

Please sign in to comment.