Skip to content

Commit

Permalink
more tests & refactorings
Browse files Browse the repository at this point in the history
  • Loading branch information
Kleonikos Kyriakis committed Apr 26, 2024
1 parent fb62021 commit a8adc10
Show file tree
Hide file tree
Showing 11 changed files with 432 additions and 23 deletions.
6 changes: 3 additions & 3 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (a *App) Run(ctx context.Context) error {
serviceRegistry := messaging.NewServiceRegistry(a.logger)
// start rpc client if host is provided, otherwise bot serves as a distributor bot (rpc server)
if a.cfg.PartnerPluginConfig.Host != "" {
a.startRPCClient(gCtx, g, *serviceRegistry)
a.startRPCClient(gCtx, g, serviceRegistry)
} else {
a.logger.Infof("No host for partner plugin provided, bot will serve as a distributor bot.")
serviceRegistry.RegisterServices(a.cfg.SupportedRequestTypes, nil)
Expand Down Expand Up @@ -122,7 +122,7 @@ func (a *App) startMessenger(ctx context.Context, g *errgroup.Group) (messaging.
return messenger, userIDUpdatedChan
}

func (a *App) startRPCServer(ctx context.Context, msgProcessor messaging.Processor, serviceRegistry *messaging.ServiceRegistry, g *errgroup.Group) {
func (a *App) startRPCServer(ctx context.Context, msgProcessor messaging.Processor, serviceRegistry messaging.ServiceRegistry, g *errgroup.Group) {
rpcServer := server.NewServer(&a.cfg.RPCServerConfig, a.logger, msgProcessor, serviceRegistry)
g.Go(func() error {
a.logger.Info("Starting gRPC server...")
Expand All @@ -135,7 +135,7 @@ func (a *App) startRPCServer(ctx context.Context, msgProcessor messaging.Process
})
}

func (a *App) startMessageProcessor(ctx context.Context, messenger messaging.Messenger, serviceRegistry *messaging.ServiceRegistry, responseHandler messaging.ResponseHandler, g *errgroup.Group, userIDUpdated chan string) messaging.Processor {
func (a *App) startMessageProcessor(ctx context.Context, messenger messaging.Messenger, serviceRegistry messaging.ServiceRegistry, responseHandler messaging.ResponseHandler, g *errgroup.Group, userIDUpdated chan string) messaging.Processor {
msgProcessor := messaging.NewProcessor(messenger, a.logger, a.cfg.ProcessorConfig, serviceRegistry, responseHandler)
g.Go(func() error {
// Wait for userID to be passed
Expand Down
62 changes: 62 additions & 0 deletions internal/messaging/mock_list_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

111 changes: 111 additions & 0 deletions internal/messaging/mock_messenger.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 68 additions & 0 deletions internal/messaging/mock_service_registry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/messaging/noop_response_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ var _ ResponseHandler = (*NoopResponseHandler)(nil)

type NoopResponseHandler struct{}

func (n NoopResponseHandler) HandleResponse(context.Context, MessageType, *RequestContent, *ResponseContent) {
func (NoopResponseHandler) HandleResponse(context.Context, MessageType, *RequestContent, *ResponseContent) {
}
12 changes: 4 additions & 8 deletions internal/messaging/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ import (
grpc_metadata "google.golang.org/grpc/metadata"
)

type InvalidMessageError struct {
error
}

var (
_ Processor = (*processor)(nil)

Expand Down Expand Up @@ -51,7 +47,7 @@ type processor struct {

mu sync.Mutex
responseChannels map[string]chan *Message
serviceRegistry *ServiceRegistry
serviceRegistry ServiceRegistry
responseHandler ResponseHandler
}

Expand All @@ -63,7 +59,7 @@ func (p *processor) Checkpoint() string {
return "processor"
}

func NewProcessor(messenger Messenger, logger *zap.SugaredLogger, cfg config.ProcessorConfig, registry *ServiceRegistry, responseHandler ResponseHandler) Processor {
func NewProcessor(messenger Messenger, logger *zap.SugaredLogger, cfg config.ProcessorConfig, registry ServiceRegistry, responseHandler ResponseHandler) Processor {
return &processor{
cfg: cfg,
messenger: messenger,
Expand Down Expand Up @@ -105,7 +101,7 @@ func (p *processor) ProcessInbound(msg *Message) error {
p.Forward(msg)
return nil
default:
return InvalidMessageError{ErrUnknownMessageCategory}
return ErrUnknownMessageCategory
}
} else {
return nil // ignore own outbound messages
Expand Down Expand Up @@ -198,10 +194,10 @@ func (p *processor) Respond(msg *Message) error {
func (p *processor) Forward(msg *Message) {
p.logger.Debugf("Forwarding outbound response message: %s", msg.Metadata.RequestID)
p.mu.Lock()
defer p.mu.Unlock()
responseChan, ok := p.responseChannels[msg.Metadata.RequestID]
if ok {
responseChan <- msg
close(responseChan)
}
p.mu.Unlock()
}
Loading

0 comments on commit a8adc10

Please sign in to comment.