Skip to content

Commit

Permalink
more tests & refactorings
Browse files Browse the repository at this point in the history
  • Loading branch information
Kleonikos Kyriakis committed Apr 26, 2024
1 parent a8adc10 commit cbcb30e
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 14 deletions.
23 changes: 11 additions & 12 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ func (a *App) Run(ctx context.Context) error {
messenger, userIDUpdatedChan := a.startMessenger(gCtx, g)

// initiate tvm client
responseHandler := a.initTVMClient()
tvmClient, err := tvm.NewClient(a.cfg.TvmConfig) // TODO make client init conditional based on provided config
if err != nil {
a.logger.Warn(err)
}

// create response handler
responseHandler := a.newResponseHandler(tvmClient)

// start msg processor
msgProcessor := a.startMessageProcessor(ctx, messenger, serviceRegistry, responseHandler, g, userIDUpdatedChan)
Expand All @@ -72,18 +78,11 @@ func (a *App) Run(ctx context.Context) error {
return nil
}

func (a *App) initTVMClient() messaging.ResponseHandler {
var responseHandler messaging.ResponseHandler
// TODO make client init conditional based on provided config
tvmClient, err := tvm.NewClient(a.cfg.TvmConfig)
if err != nil {
// do no return error here, let the bot continue
a.logger.Warnf("Failed to create tvm client: %v", err)
responseHandler = messaging.NoopResponseHandler{}
} else {
responseHandler = messaging.NewResponseHandler(tvmClient, a.logger)
func (a *App) newResponseHandler(tvmClient *tvm.Client) messaging.ResponseHandler {
if tvmClient != nil {
return messaging.NewResponseHandler(tvmClient, a.logger)
}
return responseHandler
return messaging.NoopResponseHandler{}
}

func (a *App) startRPCClient(ctx context.Context, g *errgroup.Group, serviceRegistry messaging.ServiceRegistry) {
Expand Down
4 changes: 2 additions & 2 deletions internal/messaging/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var (
ErrOnlyRequestMessagesAllowed = errors.New("only request messages allowed")
ErrUnsupportedRequestType = errors.New("unsupported request type")
ErrMissingRecipient = errors.New("missing recipient")
ErrExceededResponseTimeout = errors.New("response exceeded configured timeout")
)

type MsgHandler interface {
Expand Down Expand Up @@ -148,9 +149,8 @@ func (p *processor) Request(ctx context.Context, msg *Message) (*Message, error)
p.responseHandler.HandleResponse(ctx, msg.Type, &msg.Content.RequestContent, &response.Content.ResponseContent)
return response, nil
}
// p.logger.Debugf("Ignoring response message with request id: %s, expecting: %s", response.Metadata.RequestID, msg.Metadata.RequestID)
case <-ctx.Done():
return nil, fmt.Errorf("response exceeded configured timeout of %v seconds for request: %s", p.timeout, msg.Metadata.RequestID)
return nil, fmt.Errorf("%w of %v seconds for request: %s", ErrExceededResponseTimeout, p.timeout, msg.Metadata.RequestID)
}
}
}
Expand Down
135 changes: 135 additions & 0 deletions internal/messaging/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package messaging

import (
"context"
"errors"
"testing"

Expand Down Expand Up @@ -162,3 +163,137 @@ func TestProcessInbound(t *testing.T) {
})
}
}

func TestProcessOutbound(t *testing.T) {
requestID := "requestID"
userID := "userID"
anotherUserID := "anotherUserID"
someError := errors.New("some error")
productListResponse := &Message{Type: ActivityProductListResponse, Metadata: metadata.Metadata{RequestID: requestID}}

mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockServiceRegistry := NewMockServiceRegistry(mockCtrl)
mockMessenger := NewMockMessenger(mockCtrl)

type fields struct {
cfg config.ProcessorConfig
messenger Messenger
serviceRegistry ServiceRegistry
responseHandler ResponseHandler
}
type args struct {
msg *Message
}
tests := map[string]struct {
fields fields
args args
want *Message
err error
prepare func(p *processor)
writeResponseToChannel func(p *processor)
}{
"err: non-request outbound message": {
fields: fields{
cfg: config.ProcessorConfig{},
serviceRegistry: mockServiceRegistry,
responseHandler: NoopResponseHandler{},
messenger: mockMessenger,
},
args: args{
msg: &Message{Type: ActivityProductListResponse},
},
err: ErrOnlyRequestMessagesAllowed,
},
"err: missing recipient": {
fields: fields{
cfg: config.ProcessorConfig{},
serviceRegistry: mockServiceRegistry,
responseHandler: NoopResponseHandler{},
messenger: mockMessenger,
},
args: args{
msg: &Message{Type: ActivityProductListRequest},
},
prepare: func(p *processor) {
p.SetUserID(userID)
},
err: ErrMissingRecipient,
},
"err: awaiting-response-timeout exceeded": {
fields: fields{
cfg: config.ProcessorConfig{Timeout: 10}, // 10ms
serviceRegistry: mockServiceRegistry,
responseHandler: NoopResponseHandler{},
messenger: mockMessenger,
},
args: args{
msg: &Message{Type: ActivityProductListRequest, Metadata: metadata.Metadata{Recipient: anotherUserID}},
},
prepare: func(p *processor) {
p.SetUserID(userID)
mockMessenger.EXPECT().SendAsync(gomock.Any(), gomock.Any()).Times(1).Return(nil)
},
err: ErrExceededResponseTimeout,
},
"err: while sending request": {
fields: fields{
cfg: config.ProcessorConfig{Timeout: 100}, // 10ms
serviceRegistry: mockServiceRegistry,
responseHandler: NoopResponseHandler{},
messenger: mockMessenger,
},
args: args{
msg: &Message{Type: ActivityProductListRequest, Metadata: metadata.Metadata{Recipient: anotherUserID}},
},
prepare: func(p *processor) {
p.SetUserID(userID)
mockMessenger.EXPECT().SendAsync(gomock.Any(), gomock.Any()).Times(1).Return(someError)
},
err: someError,
},
"success: response before timeout": {
fields: fields{
cfg: config.ProcessorConfig{Timeout: 500}, // long enough timeout for response to be received
serviceRegistry: mockServiceRegistry,
responseHandler: NoopResponseHandler{},
messenger: mockMessenger,
},
args: args{
msg: &Message{Type: ActivityProductListRequest, Metadata: metadata.Metadata{Recipient: anotherUserID, RequestID: requestID}},
},
prepare: func(p *processor) {
p.SetUserID(userID)
mockMessenger.EXPECT().SendAsync(gomock.Any(), gomock.Any()).Times(1).Return(nil)
},
writeResponseToChannel: func(p *processor) {
for {
// wait until the response channel is created
if _, ok := p.responseChannels[requestID]; ok {
p.mu.Lock()
p.responseChannels[requestID] <- productListResponse
p.mu.Unlock()
break
}
}
},
want: productListResponse,
},
}

for tc, tt := range tests {
t.Run(tc, func(t *testing.T) {
p := NewProcessor(tt.fields.messenger, zap.NewNop().Sugar(), tt.fields.cfg, tt.fields.serviceRegistry, tt.fields.responseHandler)
if tt.prepare != nil {
tt.prepare(p.(*processor))
}
if tt.writeResponseToChannel != nil {
go tt.writeResponseToChannel(p.(*processor))
}
got, err := p.ProcessOutbound(context.Background(), tt.args.msg)

require.ErrorIs(t, err, tt.err)
require.Equal(t, tt.want, got)
})
}
}

0 comments on commit cbcb30e

Please sign in to comment.