From f7d9368a27ea701207091a240f2265f2c040b40f Mon Sep 17 00:00:00 2001 From: Restu Muzakir Date: Wed, 24 Nov 2021 21:40:14 +0700 Subject: [PATCH] refactor: improve publisher initialization --- amqp/amqp.go | 27 ++++--------------- amqp/amqp_test.go | 8 ++++-- publisher/publisher.go | 61 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 24 deletions(-) diff --git a/amqp/amqp.go b/amqp/amqp.go index 4f31a04..b275d1f 100644 --- a/amqp/amqp.go +++ b/amqp/amqp.go @@ -4,7 +4,6 @@ import ( "fmt" "log" - "github.com/Gate2Up/rabbitmq-go/publisher" "github.com/Gate2Up/rabbitmq-go/subscriber" "github.com/streadway/amqp" ) @@ -38,28 +37,12 @@ func NewClient(config Config) (*AmqpClient, error) { return &AmqpClient{Connection: amqpConn, ServiceName: config.ServiceName}, nil } -func (a *AmqpClient) AddPublisher(publisher *publisher.PublisherConfig) { - channel, err := a.Connection.Channel() - if err != nil { - log.Println(err.Error()) - return - } - - err = channel.ExchangeDeclare( - publisher.TopicName, - amqp.ExchangeTopic, - true, - false, - false, - false, - nil, - ) - - if err != nil { - log.Println(`Create exchange failed: `, err.Error()) - } +type Publisher interface { + Build(client *AmqpClient) +} - log.Println(fmt.Sprintf(`Exchange: %s created`, publisher.TopicName)) +func (a *AmqpClient) AddPublisher(publisher Publisher) { + publisher.Build(a) } func (a *AmqpClient) AddSubscriber(subscriber *subscriber.SubscriberConfig) { diff --git a/amqp/amqp_test.go b/amqp/amqp_test.go index 7898a53..0fa0bfd 100644 --- a/amqp/amqp_test.go +++ b/amqp/amqp_test.go @@ -35,11 +35,15 @@ func TestNewClient(t *testing.T) { func TestAddPublisher(t *testing.T) { - publisherConfig := publisher.NewPublisher("TEST_TOPIC", nil) + publisher := publisher.NewPublisher("TEST_TOPIC", nil) conn, _ := amqp.NewClient(config) // if no error this case is passed - void - conn.AddPublisher(publisherConfig) + conn.AddPublisher(publisher) + status, err := publisher.Publish([]byte(`Hello World`)) + + assert.Equal(t, status, true) + assert.Equal(t, err, nil) } func TestAddSubscriber(t *testing.T) { diff --git a/publisher/publisher.go b/publisher/publisher.go index 5eb84fa..0e065d9 100644 --- a/publisher/publisher.go +++ b/publisher/publisher.go @@ -1,8 +1,17 @@ package publisher +import ( + "fmt" + "log" + + "github.com/Gate2Up/rabbitmq-go/amqp" + amqpLegacy "github.com/streadway/amqp" +) + type PublisherConfig struct { TopicName string Schema interface{} + Client *amqp.AmqpClient } type schemaType interface{} @@ -11,7 +20,59 @@ func NewPublisher(topicName string, schema schemaType) *PublisherConfig { publisherConfig := PublisherConfig{ TopicName: topicName, Schema: schema, + Client: nil, } return &publisherConfig } + +func (p *PublisherConfig) Build(client *amqp.AmqpClient) { + + if client == nil { + log.Fatalln(`amqp client is nil`) + } + + p.Client = client + + channel, err := client.Connection.Channel() + if err != nil { + log.Println(err.Error()) + return + } + + err = channel.ExchangeDeclare( + p.TopicName, + amqpLegacy.ExchangeTopic, + true, + false, + false, + false, + nil, + ) + + if err != nil { + log.Println(`Create exchange failed: `, err.Error()) + } + + log.Println(fmt.Sprintf(`Exchange: %s created`, p.TopicName)) +} + +func (p *PublisherConfig) Publish(data []byte) (bool, error) { + channel, err := p.Client.Connection.Channel() + if err != nil { + return false, err + } + + content := amqpLegacy.Publishing{ + ContentType: "text/plain", + Body: data, + } + + err = channel.Publish(p.TopicName, "*", true, true, content) + + if err != nil { + return false, err + } + + return true, nil +}