diff --git a/core/services/gateway/handlers/capabilities/handler.go b/core/services/gateway/handlers/capabilities/handler.go index 904a64c8896..90bc2065edd 100644 --- a/core/services/gateway/handlers/capabilities/handler.go +++ b/core/services/gateway/handlers/capabilities/handler.go @@ -20,9 +20,10 @@ import ( const ( // NOTE: more methods will go here. HTTP trigger/action/target; etc. - MethodWebAPITarget = "web_api_target" - MethodWebAPITrigger = "web_api_trigger" - MethodComputeAction = "compute_action" + MethodWebAPITarget = "web_api_target" + MethodWebAPITrigger = "web_api_trigger" + MethodComputeAction = "compute_action" + MethodWorkflowSyncer = "workflow_syncer" ) type handler struct { diff --git a/core/services/workflows/syncer/fetcher.go b/core/services/workflows/syncer/fetcher.go new file mode 100644 index 00000000000..ed815a240ba --- /dev/null +++ b/core/services/workflows/syncer/fetcher.go @@ -0,0 +1,43 @@ +package syncer + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" + "github.com/smartcontractkit/chainlink/v2/core/logger" + ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" +) + +func NewFetcherFunc( + ctx context.Context, + lggr logger.Logger, + och *webapi.OutgoingConnectorHandler) FetcherFunc { + return func(ctx context.Context, url string) ([]byte, error) { + payloadBytes, err := json.Marshal(ghcapabilities.Request{ + URL: url, + Method: http.MethodGet, + }) + if err != nil { + return nil, fmt.Errorf("failed to marshal fetch request: %w", err) + } + + messageID := strings.Join([]string{ghcapabilities.MethodWorkflowSyncer, url}, "/") + resp, err := och.HandleSingleNodeRequest(ctx, messageID, payloadBytes) + if err != nil { + return nil, err + } + + lggr.Debugw("received gateway response", "resp", resp) + var payload ghcapabilities.Response + err = json.Unmarshal(resp.Body.Payload, &payload) + if err != nil { + return nil, err + } + + return payload.Body, nil + } +} diff --git a/core/services/workflows/syncer/fetcher_test.go b/core/services/workflows/syncer/fetcher_test.go new file mode 100644 index 00000000000..846a9186b5a --- /dev/null +++ b/core/services/workflows/syncer/fetcher_test.go @@ -0,0 +1,76 @@ +package syncer + +import ( + "context" + "encoding/json" + "strings" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" + gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks" + ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" +) + +func TestNewFetcherFunc(t *testing.T) { + ctx := context.Background() + lggr := logger.TestLogger(t) + + config := webapi.ServiceConfig{ + RateLimiter: common.RateLimiterConfig{ + GlobalRPS: 100.0, + GlobalBurst: 100, + PerSenderRPS: 100.0, + PerSenderBurst: 100, + }, + } + + connector := gcmocks.NewGatewayConnector(t) + och, err := webapi.NewOutgoingConnectorHandler(connector, config, ghcapabilities.MethodComputeAction, lggr) + require.NoError(t, err) + + url := "http://example.com" + + msgID := strings.Join([]string{ghcapabilities.MethodWorkflowSyncer, url}, "/") + + t.Run("OK-valid_request", func(t *testing.T) { + gatewayResp := gatewayResponse(t, msgID) + connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", mock.Anything).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) { + och.HandleGatewayMessage(ctx, "gateway1", gatewayResp) + }).Return(nil).Times(1) + connector.EXPECT().DonID().Return("don-id") + connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"}) + + fetcher := NewFetcherFunc(ctx, lggr, och) + + payload, err := fetcher(ctx, url) + require.NoError(t, err) + + expectedPayload := []byte("response body") + require.Equal(t, expectedPayload, payload) + }) +} + +func gatewayResponse(t *testing.T, msgID string) *api.Message { + headers := map[string]string{"Content-Type": "application/json"} + body := []byte("response body") + responsePayload, err := json.Marshal(ghcapabilities.Response{ + StatusCode: 200, + Headers: headers, + Body: body, + ExecutionError: false, + }) + require.NoError(t, err) + return &api.Message{ + Body: api.MessageBody{ + MessageId: msgID, + Method: ghcapabilities.MethodWebAPITarget, + Payload: responsePayload, + }, + } +}