Skip to content

Commit

Permalink
Multistream specs (#15603)
Browse files Browse the repository at this point in the history
* Multistream specs

* .

* w

* Fix lint

* Lint

* Add concurrency stress tests

* Fix lint

* Lint

* Add benchmark

* Benchmark on datasource.Observe

* Fix lint
  • Loading branch information
samsondav authored Jan 9, 2025
1 parent bfe91d4 commit c7759e1
Show file tree
Hide file tree
Showing 18 changed files with 1,103 additions and 287 deletions.
5 changes: 5 additions & 0 deletions .changeset/rotten-books-cross.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Support multiple streamIDs in stream specs #added
3 changes: 2 additions & 1 deletion core/internal/testutils/httptest/httptest.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
// NewTestHTTPClient returns a real HTTP client that may only make requests to
// localhost
func NewTestLocalOnlyHTTPClient() *http.Client {
tr := http.DefaultTransport.(*http.Transport).Clone()
// Don't use the default transport, we want zero limits and zero timeouts
tr := &http.Transport{}
tr.DialContext = testDialContext
tr.DisableCompression = true
return &http.Client{Transport: tr}
Expand Down
95 changes: 13 additions & 82 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package llo

import (
"context"
"errors"
"fmt"
"slices"
"sort"
"strconv"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/shopspring/decimal"
"golang.org/x/exp/maps"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
Expand All @@ -19,7 +20,6 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var (
Expand All @@ -42,7 +42,7 @@ var (
)

type Registry interface {
Get(streamID streams.StreamID) (strm streams.Stream, exists bool)
Get(streamID streams.StreamID) (p streams.Pipeline, exists bool)
}

type ErrObservationFailed struct {
Expand Down Expand Up @@ -109,43 +109,25 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
successfulStreamIDs := make([]streams.StreamID, 0, len(streamValues))
var errs []ErrObservationFailed

// oc only lives for the duration of this Observe call
oc := NewObservationContext(d.registry, d.t)

for _, streamID := range maps.Keys(streamValues) {
go func(streamID llotypes.StreamID) {
defer wg.Done()

var val llo.StreamValue

stream, exists := d.registry.Get(streamID)
if !exists {
mu.Lock()
errs = append(errs, ErrObservationFailed{streamID: streamID, reason: fmt.Sprintf("missing stream: %d", streamID)})
mu.Unlock()
promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
return
}
run, trrs, err := stream.Run(ctx)
if err != nil {
mu.Lock()
errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "pipeline run failed"})
mu.Unlock()
promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
// TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet
// https://smartcontract-it.atlassian.net/browse/MERC-6290
d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, nil, err)
return
}
// TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet
// https://smartcontract-it.atlassian.net/browse/MERC-6290
val, err = ExtractStreamValue(trrs)
val, err := oc.Observe(ctx, streamID, opts)
if err != nil {
strmIDStr := strconv.FormatUint(uint64(streamID), 10)
if errors.As(err, &MissingStreamError{}) {
promMissingStreamCount.WithLabelValues(strmIDStr).Inc()
}
promObservationErrorCount.WithLabelValues(strmIDStr).Inc()
mu.Lock()
errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "failed to extract big.Int"})
errs = append(errs, ErrObservationFailed{inner: err, streamID: streamID, reason: "failed to observe stream"})
mu.Unlock()
return
}

d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil)

mu.Lock()
defer mu.Unlock()

Expand Down Expand Up @@ -186,54 +168,3 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,

return nil
}

// ExtractStreamValue extracts a StreamValue from a TaskRunResults
func ExtractStreamValue(trrs pipeline.TaskRunResults) (llo.StreamValue, error) {
// pipeline.TaskRunResults comes ordered asc by index, this is guaranteed
// by the pipeline executor
finaltrrs := trrs.Terminals()

// HACK: Right now we rely on the number of outputs to determine whether
// its a Decimal or a Quote.
// This isn't very robust or future-proof but is sufficient to support v0.3
// compat.
// There are a number of different possible ways to solve this in future.
// See: https://smartcontract-it.atlassian.net/browse/MERC-5934
switch len(finaltrrs) {
case 1:
res := finaltrrs[0].Result
if res.Error != nil {
return nil, res.Error
}
val, err := toDecimal(res.Value)
if err != nil {
return nil, fmt.Errorf("failed to parse BenchmarkPrice: %w", err)
}
return llo.ToDecimal(val), nil
case 3:
// Expect ordering of Benchmark, Bid, Ask
results := make([]decimal.Decimal, 3)
for i, trr := range finaltrrs {
res := trr.Result
if res.Error != nil {
return nil, fmt.Errorf("failed to parse stream output into Quote (task index: %d): %w", i, res.Error)
}
val, err := toDecimal(res.Value)
if err != nil {
return nil, fmt.Errorf("failed to parse decimal: %w", err)
}
results[i] = val
}
return &llo.Quote{
Benchmark: results[0],
Bid: results[1],
Ask: results[2],
}, nil
default:
return nil, fmt.Errorf("invalid number of results, expected: 1 or 3, got: %d", len(finaltrrs))
}
}

func toDecimal(val interface{}) (decimal.Decimal, error) {
return utils.ToDecimal(val)
}
Loading

0 comments on commit c7759e1

Please sign in to comment.