From 05bc5c72b48bc168d68a24f82044356cbdc465e4 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Tue, 23 Aug 2022 16:58:10 -0400 Subject: [PATCH] Do not rely on "topic" to be present in event streams Signed-off-by: Andrew Richardson --- internal/blockchain/ethereum/ethereum_test.go | 18 ++++----- internal/blockchain/ethereum/eventstream.go | 38 +++++++++---------- internal/blockchain/fabric/eventstream.go | 22 +++++++---- internal/blockchain/fabric/fabric_test.go | 14 +++---- 4 files changed, 48 insertions(+), 44 deletions(-) diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index 6aeba046f..274303c1a 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -444,7 +444,7 @@ func TestStreamUpdateError(t *testing.T) { defer httpmock.DeactivateAndReset() httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", - httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}})) httpmock.RegisterResponder("PATCH", "http://localhost:12345/eventstreams/es12345", httpmock.NewStringResponder(500, `pop`)) @@ -468,13 +468,13 @@ func TestInitAllExistingStreams(t *testing.T) { defer httpmock.DeactivateAndReset() httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", - httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}})) httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, []subscription{ {ID: "sub12345", Stream: "es12345", Name: "ns1_BatchPin_3078373143373635" /* this is the subname for our combo of instance path and BatchPin */}, })) httpmock.RegisterResponder("PATCH", "http://localhost:12345/eventstreams/es12345", - httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}})) + httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", Name: "topic1"})) httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(t, 2)) httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, subscription{})) @@ -508,13 +508,13 @@ func TestInitAllExistingStreamsV1(t *testing.T) { defer httpmock.DeactivateAndReset() httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", - httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}})) httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, []subscription{ {ID: "sub12345", Stream: "es12345", Name: "BatchPin_3078373143373635" /* this is the subname for our combo of instance path and BatchPin */}, })) httpmock.RegisterResponder("PATCH", "http://localhost:12345/eventstreams/es12345", - httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}})) + httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", Name: "topic1"})) httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(t, 1)) httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, subscription{})) @@ -548,13 +548,13 @@ func TestInitAllExistingStreamsOld(t *testing.T) { defer httpmock.DeactivateAndReset() httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", - httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}})) httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, []subscription{ {ID: "sub12345", Stream: "es12345", Name: "BatchPin"}, })) httpmock.RegisterResponder("PATCH", "http://localhost:12345/eventstreams/es12345", - httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}})) + httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", Name: "topic1"})) httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(t, 1)) httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, subscription{})) @@ -588,13 +588,13 @@ func TestInitAllExistingStreamsInvalidName(t *testing.T) { defer httpmock.DeactivateAndReset() httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", - httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}})) httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, []subscription{ {ID: "sub12345", Stream: "es12345", Name: "BatchPin_3078373143373635" /* this is the subname for our combo of instance path and BatchPin */}, })) httpmock.RegisterResponder("PATCH", "http://localhost:12345/eventstreams/es12345", - httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}})) + httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", Name: "topic1"})) httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(t, 2)) httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, subscription{})) diff --git a/internal/blockchain/ethereum/eventstream.go b/internal/blockchain/ethereum/eventstream.go index a5ad9ed0b..5979f3eee 100644 --- a/internal/blockchain/ethereum/eventstream.go +++ b/internal/blockchain/ethereum/eventstream.go @@ -78,46 +78,44 @@ func (s *streamManager) getEventStreams(ctx context.Context) (streams []*eventSt return streams, nil } -func (s *streamManager) createEventStream(ctx context.Context, topic string, batchSize, batchTimeout uint) (*eventStream, error) { - stream := eventStream{ +func buildEventStream(topic string, batchSize, batchTimeout uint) *eventStream { + return &eventStream{ Name: topic, ErrorHandling: "block", BatchSize: batchSize, BatchTimeoutMS: batchTimeout, Type: "websocket", - WebSocket: eventStreamWebsocket{Topic: topic}, - Timestamps: true, + // Some implementations require a "topic" to be set separately, while others rely only on the name. + // We set them to the same thing for cross compatibility. + WebSocket: eventStreamWebsocket{Topic: topic}, + Timestamps: true, } +} + +func (s *streamManager) createEventStream(ctx context.Context, topic string, batchSize, batchTimeout uint) (*eventStream, error) { + stream := buildEventStream(topic, batchSize, batchTimeout) res, err := s.client.R(). SetContext(ctx). - SetBody(&stream). - SetResult(&stream). + SetBody(stream). + SetResult(stream). Post("/eventstreams") if err != nil || !res.IsSuccess() { return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgEthconnectRESTErr) } - return &stream, nil + return stream, nil } func (s *streamManager) updateEventStream(ctx context.Context, topic string, batchSize, batchTimeout uint, eventStreamID string) (*eventStream, error) { - stream := eventStream{ - Name: topic, - ErrorHandling: "block", - BatchSize: batchSize, - BatchTimeoutMS: batchTimeout, - Type: "websocket", - WebSocket: eventStreamWebsocket{Topic: topic}, - Timestamps: true, - } + stream := buildEventStream(topic, batchSize, batchTimeout) res, err := s.client.R(). SetContext(ctx). - SetBody(&stream). - SetResult(&stream). + SetBody(stream). + SetResult(stream). Patch("/eventstreams/" + eventStreamID) if err != nil || !res.IsSuccess() { return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgEthconnectRESTErr) } - return &stream, nil + return stream, nil } func (s *streamManager) ensureEventStream(ctx context.Context, topic string, batchSize, batchTimeout uint) (*eventStream, error) { @@ -126,7 +124,7 @@ func (s *streamManager) ensureEventStream(ctx context.Context, topic string, bat return nil, err } for _, stream := range existingStreams { - if stream.WebSocket.Topic == topic { + if stream.Name == topic { stream, err = s.updateEventStream(ctx, topic, batchSize, batchTimeout, stream.ID) if err != nil { return nil, err diff --git a/internal/blockchain/fabric/eventstream.go b/internal/blockchain/fabric/eventstream.go index f26028fa8..8b883ec40 100644 --- a/internal/blockchain/fabric/eventstream.go +++ b/internal/blockchain/fabric/eventstream.go @@ -83,25 +83,31 @@ func (s *streamManager) getEventStreams(ctx context.Context) (streams []*eventSt return streams, nil } -func (s *streamManager) createEventStream(ctx context.Context, topic string, batchSize, batchTimeout uint) (*eventStream, error) { - stream := eventStream{ +func buildEventStream(topic string, batchSize, batchTimeout uint) *eventStream { + return &eventStream{ Name: topic, ErrorHandling: "block", BatchSize: batchSize, BatchTimeoutMS: batchTimeout, Type: "websocket", - WebSocket: eventStreamWebsocket{Topic: topic}, - Timestamps: true, + // Some implementations require a "topic" to be set separately, while others rely only on the name. + // We set them to the same thing for cross compatibility. + WebSocket: eventStreamWebsocket{Topic: topic}, + Timestamps: true, } +} + +func (s *streamManager) createEventStream(ctx context.Context, topic string, batchSize, batchTimeout uint) (*eventStream, error) { + stream := buildEventStream(topic, batchSize, batchTimeout) res, err := s.client.R(). SetContext(ctx). - SetBody(&stream). - SetResult(&stream). + SetBody(stream). + SetResult(stream). Post("/eventstreams") if err != nil || !res.IsSuccess() { return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgFabconnectRESTErr) } - return &stream, nil + return stream, nil } func (s *streamManager) ensureEventStream(ctx context.Context, topic string, batchSize, batchTimeout uint) (*eventStream, error) { @@ -110,7 +116,7 @@ func (s *streamManager) ensureEventStream(ctx context.Context, topic string, bat return nil, err } for _, stream := range existingStreams { - if stream.WebSocket.Topic == topic { + if stream.Name == topic { return stream, nil } } diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index 1e88fda77..a52df957c 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -225,7 +225,7 @@ func TestInitAllExistingStreams(t *testing.T) { defer httpmock.DeactivateAndReset() httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", - httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}})) httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, []subscription{ {ID: "sub12345", Stream: "es12345", Name: "ns1_BatchPin"}, @@ -264,7 +264,7 @@ func TestInitAllExistingStreamsV1(t *testing.T) { defer httpmock.DeactivateAndReset() httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", - httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}})) httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, []subscription{ {ID: "sub12345", Stream: "es12345", Name: "BatchPin"}, @@ -300,7 +300,7 @@ func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) { resetConf(e) httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", - httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}})) httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(500, "pop")) httpmock.RegisterResponder("POST", fmt.Sprintf("http://localhost:12345/query"), @@ -334,7 +334,7 @@ func TestAddFireflySubscriptionGetVersionError(t *testing.T) { resetConf(e) httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", - httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}})) httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, []subscription{ {ID: "sub12345", Stream: "es12345", Name: "ns1_BatchPin"}, @@ -373,7 +373,7 @@ func TestAddAndRemoveFireflySubscriptionDeprecatedSubName(t *testing.T) { defer httpmock.DeactivateAndReset() httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", - httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}})) httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, []subscription{ {ID: "sub12345", Stream: "es12345", Name: "BatchPin"}, @@ -416,7 +416,7 @@ func TestAddFireflySubscriptionInvalidSubName(t *testing.T) { defer httpmock.DeactivateAndReset() httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", - httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}})) httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, []subscription{ {ID: "sub12345", Stream: "es12345", Name: "BatchPin"}, @@ -462,7 +462,7 @@ func TestInitNewConfig(t *testing.T) { defer httpmock.DeactivateAndReset() httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", - httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}})) resetConf(e) utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345")