From 7da3818a3a0dfa7e86e2d4d205a617f1b34f09e2 Mon Sep 17 00:00:00 2001 From: Marwan Sulaiman Date: Tue, 11 Aug 2020 12:43:06 -0400 Subject: [PATCH] Add Event Hooks To Athens --- cmd/proxy/actions/app_proxy.go | 27 +++++++- config.dev.toml | 9 +++ pkg/config/config.go | 1 + pkg/events/client.go | 80 ++++++++++++++++++++++ pkg/events/events.go | 50 ++++++++++++++ pkg/events/events_test.go | 121 +++++++++++++++++++++++++++++++++ pkg/events/server.go | 45 ++++++++++++ pkg/events/type_string.go | 25 +++++++ pkg/stash/with_event.go | 34 +++++++++ 9 files changed, 391 insertions(+), 1 deletion(-) create mode 100644 pkg/events/client.go create mode 100644 pkg/events/events.go create mode 100644 pkg/events/events_test.go create mode 100644 pkg/events/server.go create mode 100644 pkg/events/type_string.go create mode 100644 pkg/stash/with_event.go diff --git a/cmd/proxy/actions/app_proxy.go b/cmd/proxy/actions/app_proxy.go index b3dd3450d..5f2d76d4c 100644 --- a/cmd/proxy/actions/app_proxy.go +++ b/cmd/proxy/actions/app_proxy.go @@ -1,16 +1,20 @@ package actions import ( + "context" "fmt" "net/http" "net/url" "path" "strings" + "time" "github.com/gomods/athens/pkg/config" "github.com/gomods/athens/pkg/download" "github.com/gomods/athens/pkg/download/addons" "github.com/gomods/athens/pkg/download/mode" + "github.com/gomods/athens/pkg/errors" + "github.com/gomods/athens/pkg/events" "github.com/gomods/athens/pkg/index" "github.com/gomods/athens/pkg/index/mem" "github.com/gomods/athens/pkg/index/mysql" @@ -107,7 +111,11 @@ func addProxyRoutes( if err != nil { return err } - st := stash.New(mf, s, indexer, stash.WithPool(c.GoGetWorkers), withSingleFlight) + withEventsHook, err := getEventHook(c) + if err != nil { + return err + } + st := stash.New(mf, s, indexer, stash.WithPool(c.GoGetWorkers), withSingleFlight, withEventsHook) df, err := mode.NewFile(c.DownloadMode, c.DownloadURL) if err != nil { @@ -129,6 +137,23 @@ func addProxyRoutes( return nil } +func getEventHook(c *config.Config) (stash.Wrapper, error) { + const op errors.Op = "actions.getEventHook" + if c.EventsHook == "" { + return func(s stash.Stasher) stash.Stasher { + return s + }, nil + } + eh := events.NewClient(c.EventsHook, nil) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + err := eh.Ping(ctx) + if err != nil { + return nil, errors.E(op, err) + } + return stash.WithEventsHook(eh), nil +} + func getSingleFlight(c *config.Config, checker storage.Checker) (stash.Wrapper, error) { switch c.SingleFlightType { case "", "memory": diff --git a/config.dev.toml b/config.dev.toml index ba34b3d5f..2afaba657 100755 --- a/config.dev.toml +++ b/config.dev.toml @@ -199,6 +199,15 @@ ForceSSL = false # Env override: ATHENS_PROXY_VALIDATOR ValidatorHook = "" +# EventsHook specifies a URL that will receive POST request events +# throughout an Athens request lifecycle. +# +# To see what type of events you will get and what the payload looks like +# check the pkg/event in this repository. +# +# Env override: ATHENS_EVENTS_HOOK +EventsHook = "" + # PathPrefix specifies whether the Proxy # should have a basepath. Certain proxies and services # are distinguished based on subdomain, while others are based diff --git a/pkg/config/config.go b/pkg/config/config.go index 7f05cb58e..a58fbd36c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -44,6 +44,7 @@ type Config struct { PropagateAuthHost string `envconfig:"ATHENS_PROPAGATE_AUTH_HOST"` ForceSSL bool `envconfig:"PROXY_FORCE_SSL"` ValidatorHook string `envconfig:"ATHENS_PROXY_VALIDATOR"` + EventsHook string `envconfig:"ATHENS_EVENTS_HOOK"` PathPrefix string `envconfig:"ATHENS_PATH_PREFIX"` NETRCPath string `envconfig:"ATHENS_NETRC_PATH"` GithubToken string `envconfig:"ATHENS_GITHUB_TOKEN"` diff --git a/pkg/events/client.go b/pkg/events/client.go new file mode 100644 index 000000000..aece7e5e7 --- /dev/null +++ b/pkg/events/client.go @@ -0,0 +1,80 @@ +package events + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + + "github.com/gomods/athens/pkg/build" + "github.com/gomods/athens/pkg/errors" + "github.com/gomods/athens/pkg/requestid" +) + +// NewClient returns a new http service +func NewClient(url string, c *http.Client) Hook { + if c == nil { + c = http.DefaultClient + } + return &service{url, c} +} + +type service struct { + url string + c *http.Client +} + +func (s *service) Ping(ctx context.Context) error { + const op errors.Op = "events.Ping" + return s.sendEvent(ctx, op, Ping, PingEvent{BaseEvent: BaseEvent{ + Event: Ping.String(), + Version: build.Data().Version, + }}) +} + +func (s *service) Stashed(ctx context.Context, mod, ver string) error { + const op errors.Op = "events.Stashed" + return s.sendEvent(ctx, op, Stashed, StashedEvent{ + BaseEvent: BaseEvent{ + Event: Stashed.String(), + Version: build.Data().Version, + }, + Module: mod, + Version: ver, + }) +} + +func (s *service) sendEvent(ctx context.Context, op errors.Op, event Type, payload interface{}) error { + req, err := s.getRequest(ctx, event, payload) + if err != nil { + return errors.E(op, err) + } + resp, err := s.c.Do(req) + if err != nil { + return errors.E(op, err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + body, _ := ioutil.ReadAll(resp.Body) + return errors.E(op, fmt.Errorf("event backend returned non-200 code: %d - body: %s", resp.StatusCode, body)) + } + return nil +} + +func (s *service) getRequest(ctx context.Context, event Type, payload interface{}) (*http.Request, error) { + const op errors.Op = "events.getRequest" + var buf bytes.Buffer + err := json.NewEncoder(&buf).Encode(payload) + if err != nil { + return nil, errors.E(op, err) + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url, &buf) + if err != nil { + return nil, errors.E(op, err) + } + req.Header.Set(HeaderKey, event.String()) + req.Header.Set(requestid.HeaderKey, requestid.FromContext(ctx)) + return req, nil +} diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100644 index 000000000..bdd9c69c5 --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,50 @@ +package events + +import ( + "context" +) + +//go:generate stringer -type=Type + +// Type describe various event types +type Type int + +// HeaderKey is the HTTP Header that Athens will send +// along every event. This helps you know which JSON shape +// to use when parsing a request body +const HeaderKey = "Athens-Event" + +// Event types +const ( + Ping Type = iota + 1 + Stashed +) + +// BaseEvent is the common data that all +// event payloads are composed of. +type BaseEvent struct { + Event string + Version string +} + +// PingEvent describes the payload for a Ping event +type PingEvent struct { + BaseEvent +} + +// StashedEvent describes the payload for the Stashed event +type StashedEvent struct { + BaseEvent + Module, Version string +} + +// Hook describes a service that can be used to send events to +type Hook interface { + // Ping pings the underlying server to ensure that + // the event hook url is ready to receive requests + Ping(ctx context.Context) error + + // Stashed is called whenever a new module is succesfully persisted + // to the storage Backend + Stashed(ctx context.Context, mod, ver string) error +} diff --git a/pkg/events/events_test.go b/pkg/events/events_test.go new file mode 100644 index 000000000..e758df550 --- /dev/null +++ b/pkg/events/events_test.go @@ -0,0 +1,121 @@ +package events + +import ( + "context" + "fmt" + "net/http/httptest" + "testing" + + "github.com/gomods/athens/pkg/requestid" + "github.com/stretchr/testify/require" + "github.com/technosophos/moniker" +) + +var pingTests = []struct { + name string + err error +}{ + { + name: "ping", + }, + { + name: "ping_err", + err: fmt.Errorf("could not ping"), + }, +} + +func TestClientServerPing(t *testing.T) { + for _, tc := range pingTests { + t.Run(tc.name, func(t *testing.T) { + hook := &mockHook{err: tc.err} + srv := httptest.NewServer(NewServer(hook)) + t.Cleanup(srv.Close) + client := NewClient(srv.URL, nil) + err := client.Ping(context.Background()) + checkErr(t, tc.err != nil, err) + }) + } +} + +var stashedTests = []struct { + name string + mod string + ver string + err error +}{ + { + name: "happy path", + mod: "github.com/gomods/athens", + ver: "v0.10.0", + }, + { + name: "stashed error", + mod: "mod", + ver: "ver", + err: fmt.Errorf("server error"), + }, +} + +func TestClientServerStashed(t *testing.T) { + for _, tc := range stashedTests { + t.Run(tc.name, func(t *testing.T) { + hook := &mockHook{err: tc.err} + srv := httptest.NewServer(NewServer(hook)) + t.Cleanup(srv.Close) + client := NewClient(srv.URL, nil) + err := client.Stashed(context.Background(), "github.com/gomods/athens", "v0.10.0") + if checkErr(t, tc.err != nil, err) { + return + } + if tc.mod != hook.mod { + t.Fatalf("expected module to be %q but got %q", tc.mod, hook.mod) + } + if tc.ver != hook.ver { + t.Fatalf("expected version to be %q but got %q", tc.ver, hook.ver) + } + }) + } +} + +func TestRequestIDPropagation(t *testing.T) { + hook := &mockHook{} + srv := httptest.NewServer(NewServer(hook)) + t.Cleanup(srv.Close) + client := NewClient(srv.URL, nil) + reqID := moniker.New().Name() + ctx := requestid.SetInContext(context.Background(), reqID) + err := client.Ping(ctx) + if err != nil { + t.Fatal(err) + } + if reqID != hook.reqid { + t.Fatalf("expected request id to be %q but got %q", reqID, hook.reqid) + } +} + +type mockHook struct { + mod, ver string + reqid string + err error +} + +func (mh *mockHook) Ping(ctx context.Context) error { + mh.reqid = requestid.FromContext(ctx) + return mh.err +} + +func (mh *mockHook) Stashed(ctx context.Context, mod, ver string) error { + mh.mod, mh.ver = mod, ver + return mh.err +} + +func checkErr(t *testing.T, wantErr bool, err error) bool { + if wantErr { + if err == nil { + t.Fatal("expected an error but got nil") + } + return true + } + require.NoError(t, err) + return false +} diff --git a/pkg/events/server.go b/pkg/events/server.go new file mode 100644 index 000000000..a144a2557 --- /dev/null +++ b/pkg/events/server.go @@ -0,0 +1,45 @@ +package events + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/gomods/athens/pkg/requestid" +) + +// NewServer returns an http.Handler that parses +func NewServer(h Hook) http.Handler { + return &server{h} +} + +type server struct { + h Hook +} + +func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.NotFound(w, r) + return + } + ctx := r.Context() + ctx = requestid.SetInContext(ctx, r.Header.Get(requestid.HeaderKey)) + var err error + switch event := r.Header.Get(HeaderKey); event { + case Ping.String(): + err = s.h.Ping(ctx) + case Stashed.String(): + var body StashedEvent + err = json.NewDecoder(r.Body).Decode(&body) + if err != nil { + break + } + err = s.h.Stashed(ctx, body.Module, body.Version) + default: + err = fmt.Errorf("unknown event: %q", event) + } + if err != nil { + http.Error(w, err.Error(), 500) + return + } +} diff --git a/pkg/events/type_string.go b/pkg/events/type_string.go new file mode 100644 index 000000000..8ad649f9f --- /dev/null +++ b/pkg/events/type_string.go @@ -0,0 +1,25 @@ +// Code generated by "stringer -type=Type"; DO NOT EDIT. + +package events + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[Ping-1] + _ = x[Stashed-2] +} + +const _Type_name = "PingStashed" + +var _Type_index = [...]uint8{0, 4, 11} + +func (i Type) String() string { + i -= 1 + if i < 0 || i >= Type(len(_Type_index)-1) { + return "Type(" + strconv.FormatInt(int64(i+1), 10) + ")" + } + return _Type_name[_Type_index[i]:_Type_index[i+1]] +} diff --git a/pkg/stash/with_event.go b/pkg/stash/with_event.go new file mode 100644 index 000000000..e6152be3d --- /dev/null +++ b/pkg/stash/with_event.go @@ -0,0 +1,34 @@ +package stash + +import ( + "context" + + "github.com/gomods/athens/pkg/errors" + "github.com/gomods/athens/pkg/events" +) + +// WithEventsHook returns a stasher that can send out Stashed events +// to the given implementation +func WithEventsHook(e events.Hook) Wrapper { + return func(s Stasher) Stasher { + return &withEvent{s, e} + } +} + +type withEvent struct { + s Stasher + e events.Hook +} + +func (we *withEvent) Stash(ctx context.Context, mod string, ver string) (string, error) { + const op errors.Op = "stash.withEvent" + resolvedVer, err := we.s.Stash(ctx, mod, ver) + if err != nil { + return "", errors.E(op, err) + } + err = we.e.Stashed(ctx, mod, resolvedVer) + if err != nil { + return "", errors.E(op, err) + } + return resolvedVer, nil +}