From cbcb30e9777282a34f1a03129f4d6d154e7e12de Mon Sep 17 00:00:00 2001 From: Kleonikos Kyriakis Date: Fri, 26 Apr 2024 11:43:50 +0300 Subject: [PATCH] more tests & refactorings --- internal/app/app.go | 23 +++-- internal/messaging/processor.go | 4 +- internal/messaging/processor_test.go | 135 +++++++++++++++++++++++++++ 3 files changed, 148 insertions(+), 14 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index 8fded156..4a6b3107 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -58,7 +58,13 @@ func (a *App) Run(ctx context.Context) error { messenger, userIDUpdatedChan := a.startMessenger(gCtx, g) // initiate tvm client - responseHandler := a.initTVMClient() + tvmClient, err := tvm.NewClient(a.cfg.TvmConfig) // TODO make client init conditional based on provided config + if err != nil { + a.logger.Warn(err) + } + + // create response handler + responseHandler := a.newResponseHandler(tvmClient) // start msg processor msgProcessor := a.startMessageProcessor(ctx, messenger, serviceRegistry, responseHandler, g, userIDUpdatedChan) @@ -72,18 +78,11 @@ func (a *App) Run(ctx context.Context) error { return nil } -func (a *App) initTVMClient() messaging.ResponseHandler { - var responseHandler messaging.ResponseHandler - // TODO make client init conditional based on provided config - tvmClient, err := tvm.NewClient(a.cfg.TvmConfig) - if err != nil { - // do no return error here, let the bot continue - a.logger.Warnf("Failed to create tvm client: %v", err) - responseHandler = messaging.NoopResponseHandler{} - } else { - responseHandler = messaging.NewResponseHandler(tvmClient, a.logger) +func (a *App) newResponseHandler(tvmClient *tvm.Client) messaging.ResponseHandler { + if tvmClient != nil { + return messaging.NewResponseHandler(tvmClient, a.logger) } - return responseHandler + return messaging.NoopResponseHandler{} } func (a *App) startRPCClient(ctx context.Context, g *errgroup.Group, serviceRegistry messaging.ServiceRegistry) { diff --git a/internal/messaging/processor.go b/internal/messaging/processor.go index 0df25b74..d51bae46 100644 --- a/internal/messaging/processor.go +++ b/internal/messaging/processor.go @@ -22,6 +22,7 @@ var ( ErrOnlyRequestMessagesAllowed = errors.New("only request messages allowed") ErrUnsupportedRequestType = errors.New("unsupported request type") ErrMissingRecipient = errors.New("missing recipient") + ErrExceededResponseTimeout = errors.New("response exceeded configured timeout") ) type MsgHandler interface { @@ -148,9 +149,8 @@ func (p *processor) Request(ctx context.Context, msg *Message) (*Message, error) p.responseHandler.HandleResponse(ctx, msg.Type, &msg.Content.RequestContent, &response.Content.ResponseContent) return response, nil } - // p.logger.Debugf("Ignoring response message with request id: %s, expecting: %s", response.Metadata.RequestID, msg.Metadata.RequestID) case <-ctx.Done(): - return nil, fmt.Errorf("response exceeded configured timeout of %v seconds for request: %s", p.timeout, msg.Metadata.RequestID) + return nil, fmt.Errorf("%w of %v seconds for request: %s", ErrExceededResponseTimeout, p.timeout, msg.Metadata.RequestID) } } } diff --git a/internal/messaging/processor_test.go b/internal/messaging/processor_test.go index 0d161ffa..83fc389f 100644 --- a/internal/messaging/processor_test.go +++ b/internal/messaging/processor_test.go @@ -6,6 +6,7 @@ package messaging import ( + "context" "errors" "testing" @@ -162,3 +163,137 @@ func TestProcessInbound(t *testing.T) { }) } } + +func TestProcessOutbound(t *testing.T) { + requestID := "requestID" + userID := "userID" + anotherUserID := "anotherUserID" + someError := errors.New("some error") + productListResponse := &Message{Type: ActivityProductListResponse, Metadata: metadata.Metadata{RequestID: requestID}} + + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + mockServiceRegistry := NewMockServiceRegistry(mockCtrl) + mockMessenger := NewMockMessenger(mockCtrl) + + type fields struct { + cfg config.ProcessorConfig + messenger Messenger + serviceRegistry ServiceRegistry + responseHandler ResponseHandler + } + type args struct { + msg *Message + } + tests := map[string]struct { + fields fields + args args + want *Message + err error + prepare func(p *processor) + writeResponseToChannel func(p *processor) + }{ + "err: non-request outbound message": { + fields: fields{ + cfg: config.ProcessorConfig{}, + serviceRegistry: mockServiceRegistry, + responseHandler: NoopResponseHandler{}, + messenger: mockMessenger, + }, + args: args{ + msg: &Message{Type: ActivityProductListResponse}, + }, + err: ErrOnlyRequestMessagesAllowed, + }, + "err: missing recipient": { + fields: fields{ + cfg: config.ProcessorConfig{}, + serviceRegistry: mockServiceRegistry, + responseHandler: NoopResponseHandler{}, + messenger: mockMessenger, + }, + args: args{ + msg: &Message{Type: ActivityProductListRequest}, + }, + prepare: func(p *processor) { + p.SetUserID(userID) + }, + err: ErrMissingRecipient, + }, + "err: awaiting-response-timeout exceeded": { + fields: fields{ + cfg: config.ProcessorConfig{Timeout: 10}, // 10ms + serviceRegistry: mockServiceRegistry, + responseHandler: NoopResponseHandler{}, + messenger: mockMessenger, + }, + args: args{ + msg: &Message{Type: ActivityProductListRequest, Metadata: metadata.Metadata{Recipient: anotherUserID}}, + }, + prepare: func(p *processor) { + p.SetUserID(userID) + mockMessenger.EXPECT().SendAsync(gomock.Any(), gomock.Any()).Times(1).Return(nil) + }, + err: ErrExceededResponseTimeout, + }, + "err: while sending request": { + fields: fields{ + cfg: config.ProcessorConfig{Timeout: 100}, // 10ms + serviceRegistry: mockServiceRegistry, + responseHandler: NoopResponseHandler{}, + messenger: mockMessenger, + }, + args: args{ + msg: &Message{Type: ActivityProductListRequest, Metadata: metadata.Metadata{Recipient: anotherUserID}}, + }, + prepare: func(p *processor) { + p.SetUserID(userID) + mockMessenger.EXPECT().SendAsync(gomock.Any(), gomock.Any()).Times(1).Return(someError) + }, + err: someError, + }, + "success: response before timeout": { + fields: fields{ + cfg: config.ProcessorConfig{Timeout: 500}, // long enough timeout for response to be received + serviceRegistry: mockServiceRegistry, + responseHandler: NoopResponseHandler{}, + messenger: mockMessenger, + }, + args: args{ + msg: &Message{Type: ActivityProductListRequest, Metadata: metadata.Metadata{Recipient: anotherUserID, RequestID: requestID}}, + }, + prepare: func(p *processor) { + p.SetUserID(userID) + mockMessenger.EXPECT().SendAsync(gomock.Any(), gomock.Any()).Times(1).Return(nil) + }, + writeResponseToChannel: func(p *processor) { + for { + // wait until the response channel is created + if _, ok := p.responseChannels[requestID]; ok { + p.mu.Lock() + p.responseChannels[requestID] <- productListResponse + p.mu.Unlock() + break + } + } + }, + want: productListResponse, + }, + } + + for tc, tt := range tests { + t.Run(tc, func(t *testing.T) { + p := NewProcessor(tt.fields.messenger, zap.NewNop().Sugar(), tt.fields.cfg, tt.fields.serviceRegistry, tt.fields.responseHandler) + if tt.prepare != nil { + tt.prepare(p.(*processor)) + } + if tt.writeResponseToChannel != nil { + go tt.writeResponseToChannel(p.(*processor)) + } + got, err := p.ProcessOutbound(context.Background(), tt.args.msg) + + require.ErrorIs(t, err, tt.err) + require.Equal(t, tt.want, got) + }) + } +}