Skip to content

Commit

Permalink
Reverted to static logging fixes #172 (#173)
Browse files Browse the repository at this point in the history
  • Loading branch information
mantzas committed Aug 13, 2018
1 parent 6806d69 commit f718a16
Show file tree
Hide file tree
Showing 22 changed files with 308 additions and 260 deletions.
29 changes: 8 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,27 +141,22 @@ downstream systems. The tracing information is added to each implementations hea

The log package is designed to be a leveled logger with field support.

The log package defines two interfaces (Logger and Factory) that have to be implemented in order to set up the logging in this framework.
After implementing the two interfaces you can setup logging by doing the following:
The log package defines the logger interface and a factory function type that needs to be implemented in order to set up the logging in this framework.

```go
// instantiate the implemented factory and fields (map[string]interface{})
// instantiate the implemented factory func type and fields (map[string]interface{})
err := log.Setup(factory, fields)
// handle error
```

`If the setup is omitted the package will not setup any logging!`

In order to get a logger use the the following:
From there logging is as simple as

```go
logger := log.Create()
log.Info("Hello world!")
```

which returns a logger with all fields appended along with the source code file of the class where the logger has been created.

`It is advisable to create one logger in every type/package once and use it to get information about the log origin!`

The implementations should support following log levels:

- Debug, which should log the message with debug level
Expand All @@ -175,7 +170,7 @@ The first four (Debug, Info, Warn and Error) give the opportunity to differentia

The package supports fields, which are logged along with the message, to augment the information further to ease querying in the log management system.

The following implementations are provided as sub-package:
The following implementations are provided as sub-package and are by default wired up in the framework:

- zerolog, which supports the excellent [zerolog](https://github.com/rs/zerolog) library and is set up by default

Expand All @@ -185,8 +180,6 @@ The logger interface defines the actual logger.

```go
type Logger interface {
Level() Level
Fields() map[string]interface{}
Fatal(...interface{})
Fatalf(string, ...interface{})
Panic(...interface{})
Expand All @@ -206,14 +199,8 @@ In order to be consistent with the design the implementation of the `Fatal(f)` h

### Factory

The factory interface defines a factory for creating a logger.
The factory function type defines a factory for creating a logger.

```go
type Factory interface {
Create(map[string]interface{}) Logger
}
```

Two methods are supported:

- Create, which creates a logger with the specified fields (or nil)
type FactoryFunc func(map[string]interface{}) Logger
```
8 changes: 3 additions & 5 deletions async/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ type Consumer struct {
cfg amqp.Config
ch *amqp.Channel
conn *amqp.Connection
log log.Logger
}

// New creates a new AMQP consumer with some defaults. Use option to change.
Expand Down Expand Up @@ -109,7 +108,6 @@ func New(name, url, queue, exchange string, oo ...OptionFunc) (*Consumer, error)

// Consume starts of consuming a AMQP queue.
func (c *Consumer) Consume(ctx context.Context) (<-chan async.Message, <-chan error, error) {
c.log = log.Create()
deliveries, err := c.consumer()
if err != nil {
return nil, nil, errors.Wrap(err, "failed initialize consumer")
Expand All @@ -121,10 +119,10 @@ func (c *Consumer) Consume(ctx context.Context) (<-chan async.Message, <-chan er
go func() {
select {
case <-ctx.Done():
c.log.Info("canceling consuming messages requested")
log.Info("canceling consuming messages requested")
return
case d := <-deliveries:
c.log.Debugf("processing message %s", d.MessageId)
log.Debugf("processing message %s", d.MessageId)
go func(d *amqp.Delivery) {
sp, chCtx := trace.ConsumerSpan(ctx, c.name, trace.AMQPConsumerComponent, mapHeader(d.Headers))

Expand Down Expand Up @@ -186,7 +184,7 @@ func (c *Consumer) consumer() (<-chan amqp.Delivery, error) {
c.ch = ch

c.tag = uuid.New().String()
c.log.Infof("consuming messages for tag %s", c.tag)
log.Infof("consuming messages for tag %s", c.tag)

q, err := ch.QueueDeclare(c.queue, true, false, false, false, nil)
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions async/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ type Component struct {
sync.Mutex
cns Consumer
cnl context.CancelFunc
log log.Logger
}

// New returns a new async component.
Expand All @@ -39,7 +38,6 @@ func (c *Component) Run(ctx context.Context) error {
c.Lock()
ctx, cnl := context.WithCancel(ctx)
c.cnl = cnl
c.log = log.Create()
c.Unlock()

chMsg, chErr, err := c.cns.Consume(ctx)
Expand All @@ -52,11 +50,11 @@ func (c *Component) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
c.log.Info("canceling consuming messages requested")
log.Info("canceling consuming messages requested")
failCh <- nil
return
case msg := <-chMsg:
c.log.Debug("New message from consumer arrived")
log.Debug("New message from consumer arrived")
go func() {
err = c.proc(ctx, msg)
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions async/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ type Consumer struct {
contentType string
cnl context.CancelFunc
ms sarama.Consumer
log log.Logger
}

// New creates a ew Kafka consumer with defaults. To override those default you should provide a option.
Expand Down Expand Up @@ -117,7 +116,6 @@ func New(name, ct, topic string, brokers []string, oo ...OptionFunc) (*Consumer,

// Consume starts consuming messages from a Kafka topic.
func (c *Consumer) Consume(ctx context.Context) (<-chan async.Message, <-chan error, error) {
c.log = log.Create()
ctx, cnl := context.WithCancel(ctx)
c.cnl = cnl

Expand All @@ -134,12 +132,12 @@ func (c *Consumer) Consume(ctx context.Context) (<-chan async.Message, <-chan er
for {
select {
case <-ctx.Done():
c.log.Info("canceling consuming messages requested")
log.Info("canceling consuming messages requested")
return
case consumerError := <-consumer.Errors():
chErr <- consumerError
case msg := <-consumer.Messages():
c.log.Debugf("data received from topic %s", msg.Topic)
log.Debugf("data received from topic %s", msg.Topic)
topicPartitionOffsetDiffGaugeSet(msg.Topic, msg.Partition, consumer.HighWaterMarkOffset(), msg.Offset)
go func() {
sp, chCtx := trace.ConsumerSpan(ctx, c.name, trace.KafkaConsumerComponent, mapHeader(msg.Headers))
Expand Down
4 changes: 1 addition & 3 deletions examples/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

type amqpComponent struct {
cmp patron.Component
log log.Logger
}

func newAmqpComponent(name, url, queue, exchange string) (*amqpComponent, error) {
Expand All @@ -33,7 +32,6 @@ func newAmqpComponent(name, url, queue, exchange string) (*amqpComponent, error)
}

func (ac *amqpComponent) Process(ctx context.Context, msg async.Message) error {
ac.log = log.Create()
var ads Audits

err := msg.Decode(&ads)
Expand All @@ -44,7 +42,7 @@ func (ac *amqpComponent) Process(ctx context.Context, msg async.Message) error {
ads.append(Audit{Name: "AMQP consumer", Started: time.Now()})

for _, a := range ads {
ac.log.Infof("%s@ took %s", a.Name, a.Duration)
log.Infof("%s@ took %s", a.Name, a.Duration)
}

return nil
Expand Down
3 changes: 0 additions & 3 deletions examples/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ services:
ports:
- "15672:15672"
- "5672:5672"
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
kafka:
image: spotify/kafka
hostname: "kafka"
Expand Down
4 changes: 1 addition & 3 deletions examples/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
type kafkaComponent struct {
cmp patron.Component
pub amqp.Publisher
log log.Logger
}

func newKafkaComponent(name, broker, topic, amqpURL, amqpExc string) (*kafkaComponent, error) {
Expand All @@ -42,7 +41,6 @@ func newKafkaComponent(name, broker, topic, amqpURL, amqpExc string) (*kafkaComp
}

func (kc *kafkaComponent) Process(ctx context.Context, msg async.Message) error {
kc.log = log.Create()
var ads Audits

err := msg.Decode(&ads)
Expand All @@ -63,7 +61,7 @@ func (kc *kafkaComponent) Process(ctx context.Context, msg async.Message) error
}

for _, a := range ads {
kc.log.Infof("%s@ took %s", a.Name, a.Duration)
log.Infof("%s@ took %s", a.Name, a.Duration)
}

return nil
Expand Down
12 changes: 6 additions & 6 deletions examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (a *Audits) append(aud Audit) {
}

const (
amqpURL = "amqp://admin:admin@localhost:5672/"
amqpURL = "amqp://guest:guest@localhost:5672/"
amqpExchange = "patron"
amqpQueue = "patron"
kafkaTopic = "patron-topic"
Expand Down Expand Up @@ -62,17 +62,17 @@ func main() {

amqpCmp, err := newAmqpComponent(name, amqpURL, amqpQueue, amqpExchange)
if err != nil {
log.Create().Fatalf("failed to create processor %v", err)
log.Fatalf("failed to create processor %v", err)
}

kafkaCmp, err := newKafkaComponent(name, kafkaBroker, kafkaTopic, amqpURL, amqpExchange)
if err != nil {
log.Create().Fatalf("failed to create processor %v", err)
log.Fatalf("failed to create processor %v", err)
}

httpCmp, err := newHTTPComponent(kafkaBroker, kafkaTopic)
if err != nil {
log.Create().Fatalf("failed to create processor %v", err)
log.Fatalf("failed to create processor %v", err)
}

// Set up routes
Expand All @@ -82,11 +82,11 @@ func main() {

srv, err := patron.New(name, version, patron.Routes(routes), patron.Components(kafkaCmp.cmp, amqpCmp.cmp))
if err != nil {
log.Create().Fatalf("failed to create service %v", err)
log.Fatalf("failed to create service %v", err)
}

err = srv.Run()
if err != nil {
log.Create().Fatalf("failed to create service %v", err)
log.Fatalf("failed to create service %v", err)
}
}
Loading

0 comments on commit f718a16

Please sign in to comment.