Skip to content

Commit

Permalink
feat: use the queue for interests created events
Browse files Browse the repository at this point in the history
  • Loading branch information
akurilov committed Aug 1, 2024
1 parent 58d6293 commit c177d3a
Show file tree
Hide file tree
Showing 18 changed files with 563 additions and 41 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ proto:
PATH=${PATH}:~/go/bin protoc --go_out=plugins=grpc:. --go_opt=paths=source_relative \
api/grpc/admin/*.proto \
api/grpc/tgbot/*.proto \
api/grpc/messages/*.proto
api/grpc/messages/*.proto \
api/grpc/queue/*.proto

vet: proto
go vet
Expand Down
33 changes: 33 additions & 0 deletions api/grpc/queue/client_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package queue

import (
"context"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
"time"
)

type clientMock struct {
rcvLimit int
rcvDelay time.Duration
}

func newClientMock(rcvLimit int, rcvDelay time.Duration) ServiceClient {
return clientMock{
rcvLimit: rcvLimit,
rcvDelay: rcvDelay,
}
}

func (cm clientMock) SetQueue(ctx context.Context, in *SetQueueRequest, opts ...grpc.CallOption) (resp *emptypb.Empty, err error) {
resp = &emptypb.Empty{}
switch in.Name {
case "fail":
err = ErrInternal
}
return
}

func (cm clientMock) ReceiveMessages(ctx context.Context, opts ...grpc.CallOption) (Service_ReceiveMessagesClient, error) {
return newRcvStreamMock(cm.rcvLimit, cm.rcvDelay), nil
}
45 changes: 45 additions & 0 deletions api/grpc/queue/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package queue

import (
"context"
"fmt"
"github.com/awakari/bot-telegram/util"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
"log/slog"
)

type logging struct {
svc Service
log *slog.Logger
}

func NewLoggingMiddleware(svc Service, log *slog.Logger) Service {
return logging{
svc: svc,
log: log,
}
}

func (l logging) SetConsumer(ctx context.Context, name, subj string) (err error) {
err = l.svc.SetConsumer(ctx, name, subj)
ll := l.logLevel(err)
l.log.Log(ctx, ll, fmt.Sprintf("queue.SetConsumer(name=%s, subj=%s): err=%s", name, subj, err))
return
}

func (l logging) ReceiveMessages(ctx context.Context, queue, subj string, batchSize uint32, consume util.ConsumeFunc[[]*pb.CloudEvent]) (err error) {
err = l.svc.ReceiveMessages(ctx, queue, subj, batchSize, consume)
ll := l.logLevel(err)
l.log.Log(ctx, ll, fmt.Sprintf("queue.ReceiveMessages(queue=%s, subj=%s, batchSize=%d): err=%s", queue, subj, batchSize, err))
return
}

func (l logging) logLevel(err error) (ll slog.Level) {
switch err {
case nil:
ll = slog.LevelInfo
default:
ll = slog.LevelError
}
return
}
87 changes: 87 additions & 0 deletions api/grpc/queue/rcv_stream_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package queue

import (
"context"
"fmt"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"io"
"time"
)

type rcvStreamMock struct {
queue string
limit int
count int
delay time.Duration
}

func newRcvStreamMock(limit int, delay time.Duration) Service_ReceiveMessagesClient {
return &rcvStreamMock{
limit: limit,
delay: delay,
}
}

func (rsm *rcvStreamMock) Recv() (resp *ReceiveMessagesResponse, err error) {
resp = &ReceiveMessagesResponse{}
time.Sleep(rsm.delay)
switch {
case rsm.queue == "fail":
err = status.Errorf(codes.Internal, "internal failure")
case rsm.queue == "missing":
err = status.Error(codes.NotFound, "queue missing")
case rsm.count >= rsm.limit:
err = io.EOF
default:
for i := 0; i < 3 && rsm.count < rsm.limit; i++ {
msg := pb.CloudEvent{
Id: fmt.Sprintf("msg%d", rsm.count),
}
resp.Msgs = append(resp.Msgs, &msg)
rsm.count++
}
}
return
}

func (rsm *rcvStreamMock) Send(req *ReceiveMessagesRequest) error {
start := req.GetStart()
switch {
case start != nil:
rsm.queue = start.Queue
}
return nil
}

func (rsm *rcvStreamMock) Header() (metadata.MD, error) {
//TODO implement me
panic("implement me")
}

func (rsm *rcvStreamMock) Trailer() metadata.MD {
//TODO implement me
panic("implement me")
}

func (rsm *rcvStreamMock) CloseSend() error {
//TODO implement me
panic("implement me")
}

func (rsm *rcvStreamMock) Context() context.Context {
//TODO implement me
panic("implement me")
}

func (rsm *rcvStreamMock) SendMsg(m interface{}) error {
//TODO implement me
panic("implement me")
}

func (rsm *rcvStreamMock) RecvMsg(m interface{}) error {
//TODO implement me
panic("implement me")
}
113 changes: 113 additions & 0 deletions api/grpc/queue/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package queue

import (
"context"
"errors"
"fmt"
"github.com/awakari/bot-telegram/util"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io"
)

type Service interface {
SetConsumer(ctx context.Context, name, subj string) (err error)
ReceiveMessages(ctx context.Context, queue, subj string, batchSize uint32, consume util.ConsumeFunc[[]*pb.CloudEvent]) (err error)
}

type service struct {
client ServiceClient
}

var ErrInternal = errors.New("queue: internal failure")

var ErrQueueMissing = errors.New("missing queue")

func NewService(client ServiceClient) Service {
return service{
client: client,
}
}

func (svc service) SetConsumer(ctx context.Context, name, subj string) (err error) {
req := SetQueueRequest{
Name: name,
Subj: subj,
}
_, err = svc.client.SetQueue(ctx, &req)
err = decodeError(ctx, err)
return
}

func (svc service) ReceiveMessages(ctx context.Context, queue, subj string, batchSize uint32, consume util.ConsumeFunc[[]*pb.CloudEvent]) (err error) {
var stream Service_ReceiveMessagesClient
stream, err = svc.client.ReceiveMessages(ctx)
if err != nil {
err = decodeError(ctx, err)
}
var req *ReceiveMessagesRequest
if err == nil {
req = &ReceiveMessagesRequest{
Command: &ReceiveMessagesRequest_Start{
Start: &ReceiveMessagesCommandStart{
Queue: queue,
BatchSize: batchSize,
Subj: subj,
},
},
}
err = stream.Send(req)
}
if err == nil {
var resp *ReceiveMessagesResponse
for {
resp, err = stream.Recv()
if err == io.EOF {
err = nil
break
}
if err != nil {
err = decodeError(ctx, err)
break
}
if resp != nil {
err = errors.Join(err, consume(resp.Msgs))
req = &ReceiveMessagesRequest{
Command: &ReceiveMessagesRequest_Ack{
Ack: &ReceiveMessagesCommandAck{
Count: uint32(len(resp.Msgs)),
},
},
}
err = errors.Join(err, stream.Send(req))
}
if err == nil {
select {
case <-ctx.Done():
err = ctx.Err()
default:
continue
}
}
if err != nil {
break
}
}
}
return
}

func decodeError(ctx context.Context, src error) (dst error) {
switch {
case src == io.EOF:
dst = src // return as it is
case status.Code(src) == codes.OK:
dst = nil
case status.Code(src) == codes.NotFound:
dst = ErrQueueMissing
default:
dst = fmt.Errorf("%w: %s", ErrInternal, src)
}
return
}
43 changes: 43 additions & 0 deletions api/grpc/queue/service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
syntax = "proto3";

package awakari.queue;

option go_package = "github.com/awakari/bot-telegram/api/grpc/queue";

import "google/protobuf/empty.proto";
import "api/grpc/cloudevents/cloudevent.proto";

service Service {

// Creates a new queue or updates the existing one's length limit.
rpc SetQueue(SetQueueRequest) returns (google.protobuf.Empty);

// Start receiving a messages for the certain queue.
rpc ReceiveMessages(stream ReceiveMessagesRequest) returns (stream ReceiveMessagesResponse);
}

message SetQueueRequest {
string name = 1;
string subj = 3;
}

message ReceiveMessagesRequest {
oneof command {
ReceiveMessagesCommandStart start = 1;
ReceiveMessagesCommandAck ack = 2;
}
}

message ReceiveMessagesCommandStart {
string queue = 1;
uint32 batchSize = 2;
string subj = 3;
}

message ReceiveMessagesCommandAck {
uint32 count = 1;
}

message ReceiveMessagesResponse {
repeated pb.CloudEvent msgs = 1;
}
54 changes: 54 additions & 0 deletions api/grpc/queue/service_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package queue

import (
"context"
"fmt"
"github.com/awakari/bot-telegram/util"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
"time"
)

type serviceMock struct {
msgs []*pb.CloudEvent
}

func NewServiceMock(msgs []*pb.CloudEvent) Service {
return serviceMock{
msgs: msgs,
}
}

func (sm serviceMock) SetConsumer(ctx context.Context, name, subj string) (err error) {
switch name {
case "fail":
err = ErrInternal
}
return
}

func (sm serviceMock) ReceiveMessages(ctx context.Context, queue, subj string, batchSize uint32, consume util.ConsumeFunc[[]*pb.CloudEvent]) (err error) {
switch {
case queue == "fail":
err = ErrInternal
case queue == "queue_missing":
err = ErrQueueMissing
default:
time.Sleep(1 * time.Microsecond)
var msgs []*pb.CloudEvent
for i := 0; i < int(batchSize); i++ {
msg := pb.CloudEvent{
Id: fmt.Sprintf("msg%d", i),
Source: fmt.Sprintf("source%d", i),
SpecVersion: fmt.Sprintf("specversion%d", i),
Type: fmt.Sprintf("type%d", i),
Attributes: map[string]*pb.CloudEventAttributeValue{},
Data: &pb.CloudEvent_TextData{
TextData: "yohoho",
},
}
msgs = append(msgs, &msg)
}
_ = consume(msgs)
}
return
}
Loading

0 comments on commit c177d3a

Please sign in to comment.