Skip to content

Commit

Permalink
feat(logfwd): add log labels (#312)
Browse files Browse the repository at this point in the history
  • Loading branch information
barrettj12 authored Nov 10, 2023
1 parent b36325c commit ae3351f
Show file tree
Hide file tree
Showing 8 changed files with 639 additions and 54 deletions.
48 changes: 45 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ $ pebble run --verbose
...
```
#### Log forwarding
### Log forwarding
Pebble supports forwarding its services' logs to a remote Loki server. In the `log-targets` section of the plan, you can specify destinations for log forwarding, for example:
```yaml
Expand All @@ -422,6 +422,8 @@ log-targets:
services: [svc1, svc2]
```

#### Specifying services

For each log target, use the `services` key to specify a list of services to collect logs from. In the above example, the `production-logs` target will collect logs from `svc1` and `svc2`.

Use the special keyword `all` to match all services, including services that might be added in future layers. In the above example, `staging-logs` will collect logs from all services.
Expand Down Expand Up @@ -453,6 +455,38 @@ my-target:
```
would remove all services and then add `svc1`, so `my-target` would receive logs from only `svc1`.

#### Labels

In the `labels` section, you can specify custom labels to be added to any outgoing logs. These labels may contain `$ENVIRONMENT_VARIABLES` - these will be interpreted in the environment of the corresponding service. Pebble may also add its own default labels (depending on the protocol). For example, given the following plan:
```yaml
services:
svc1:
environment:
OWNER: 'alice'
svc2:
environment:
OWNER: 'bob'
log-targets:
tgt1:
type: loki
labels:
product: 'juju'
owner: 'user-$OWNER'
```
the logs from `svc1` will be sent with the following labels:
```yaml
product: juju
owner: user-alice # env var $OWNER substituted
pebble_service: svc1 # default label for Loki
```
and for svc2, the labels will be
```yaml
product: juju
owner: user-bob # env var $OWNER substituted
pebble_service: svc2 # default label for Loki
```


## Container usage

Expand Down Expand Up @@ -733,8 +767,10 @@ log-targets:
override: merge | replace
# (Required) The type of log target, which determines the format in
# which logs will be sent. Currently, the only supported type is 'loki',
# but more protocols may be added in the future.
# which logs will be sent. The supported types are:
#
# - loki: Use the Grafana Loki protocol. A "pebble_service" label is
# added automatically, with the name of the Pebble service as its value.
type: loki
# (Required) The URL of the remote log target.
Expand All @@ -749,6 +785,12 @@ log-targets:
# service name with a minus (e.g. '-svc1') to remove a previously added
# service. '-all' will remove all services.
services: [<service names>]
# (Optional) A list of key/value pairs defining labels which should be set
# on the outgoing logs. The label values may contain $ENV_VARS, which will
# be substituted using the environment for the corresponding service.
labels:
<label name>: <label value>
```

## API and clients
Expand Down
82 changes: 66 additions & 16 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package logstate
import (
"context"
"fmt"
"os"
"time"

"gopkg.in/tomb.v2"
Expand Down Expand Up @@ -70,6 +71,9 @@ type logGatherer struct {
// ensure the client is not blocking subsequent teardown steps.
clientCancel context.CancelFunc

// Channel used to notify the main loop to set the client's labels
setLabels chan svcWithLabels

pullers *pullerGroup
// All pullers send logs on this channel, received by main loop
entryCh chan servicelog.Entry
Expand All @@ -78,9 +82,10 @@ type logGatherer struct {
// logGathererOptions allows overriding the newLogClient method and time values
// in testing.
type logGathererOptions struct {
bufferTimeout time.Duration
maxBufferedEntries int
timeoutFinalFlush time.Duration
bufferTimeout time.Duration
maxBufferedEntries int
timeoutCurrentFlush time.Duration
timeoutFinalFlush time.Duration
// method to get a new client
newClient func(*plan.LogTarget) (logClient, error)
}
Expand All @@ -104,6 +109,7 @@ func newLogGathererInternal(target *plan.LogTarget, options *logGathererOptions)

targetName: target.Name,
client: client,
setLabels: make(chan svcWithLabels),
entryCh: make(chan servicelog.Entry),
pullers: newPullerGroup(target.Name),
}
Expand All @@ -121,6 +127,9 @@ func fillDefaultOptions(options *logGathererOptions) *logGathererOptions {
if options.maxBufferedEntries == 0 {
options.maxBufferedEntries = maxBufferedEntries
}
if options.timeoutCurrentFlush == 0 {
options.timeoutCurrentFlush = timeoutCurrentFlush
}
if options.timeoutFinalFlush == 0 {
options.timeoutFinalFlush = timeoutFinalFlush
}
Expand All @@ -133,35 +142,46 @@ func fillDefaultOptions(options *logGathererOptions) *logGathererOptions {
// PlanChanged is called by the LogManager when the plan is changed, if this
// gatherer's target exists in the new plan.
func (g *logGatherer) PlanChanged(pl *plan.Plan, buffers map[string]*servicelog.RingBuffer) {
target := pl.LogTargets[g.targetName]

// Remove old pullers
for _, svcName := range g.pullers.Services() {
svc, svcExists := pl.Services[svcName]
if !svcExists {
g.pullers.Remove(svcName)
if svcExists && svc.LogsTo(target) {
// We're still collecting logs from this service, so don't remove it.
continue
}

tgt := pl.LogTargets[g.targetName]
if !svc.LogsTo(tgt) {
g.pullers.Remove(svcName)
// Service no longer forwarding to this log target (or it was removed from
// the plan). Remove it from the gatherer.
g.pullers.Remove(svcName)
select {
case g.setLabels <- svcWithLabels{svcName, nil}:
case <-g.tomb.Dying():
return
}
}

// Add new pullers
for _, service := range pl.Services {
target := pl.LogTargets[g.targetName]
if !service.LogsTo(target) {
continue
}

buffer, bufferExists := buffers[service.Name]
if !bufferExists {
// We don't yet have a reference to the service's ring buffer
// Need to wait until ServiceStarted
continue
labels := evaluateLabels(target.Labels, service.Environment)
select {
case g.setLabels <- svcWithLabels{service.Name, labels}:
case <-g.tomb.Dying():
return
}

g.pullers.Add(service.Name, buffer, g.entryCh)
// If the service was just added, it may not be started yet. In this case,
// we need to wait until the buffer is created, and then we can update the
// pullers inside ServiceStarted.
buffer, svcStarted := buffers[service.Name]
if svcStarted {
g.pullers.Add(service.Name, buffer, g.entryCh)
}
}
}

Expand All @@ -171,6 +191,21 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R
g.pullers.Add(service.Name, buffer, g.entryCh)
}

// evaluateLabels interprets the labels defined in the plan, substituting any
// $env_vars with the corresponding value in the service's environment.
func evaluateLabels(rawLabels, env map[string]string) map[string]string {
substitute := func(k string) string {
// Undefined variables default to "", just like Bash
return env[k]
}

labels := make(map[string]string, len(rawLabels))
for key, rawLabel := range rawLabels {
labels[key] = os.Expand(rawLabel, substitute)
}
return labels
}

// The main control loop for the logGatherer. loop receives logs from the
// pullers on entryCh, and writes them to the client. It also flushes the
// client periodically, and exits when the gatherer's tomb is killed.
Expand Down Expand Up @@ -199,6 +234,12 @@ mainLoop:
case <-flushTimer.Expired():
flushClient(g.clientCtx)

case args := <-g.setLabels:
// Before we change the labels, flush any logs currently in the buffer,
// so that these logs are sent with the correct (old) labels.
flushClient(g.clientCtx)
g.client.SetLabels(args.service, args.labels)

case entry := <-g.entryCh:
err := g.client.Add(entry)
if err != nil {
Expand Down Expand Up @@ -236,7 +277,7 @@ mainLoop:
// - Flush out any final logs buffered in the client.
func (g *logGatherer) Stop() {
// Wait up to timeoutCurrentFlush for the current flush to complete (if any)
time.AfterFunc(timeoutCurrentFlush, g.clientCancel)
time.AfterFunc(g.timeoutCurrentFlush, g.clientCancel)

// Wait up to timeoutPullers for the pullers to pull the final logs from the
// iterator and send to the main loop.
Expand Down Expand Up @@ -264,6 +305,11 @@ func (g *logGatherer) Stop() {
}
}

type svcWithLabels struct {
service string
labels map[string]string
}

// timer wraps time.Timer and provides a better API.
type timer struct {
timer *time.Timer
Expand Down Expand Up @@ -312,6 +358,10 @@ type logClient interface {

// Flush sends buffered logs (if any) to the remote target.
Flush(context.Context) error

// SetLabels sets the log labels for the given service, or releases
// previously allocated label resources if the labels parameter is nil.
SetLabels(serviceName string, labels map[string]string)
}

func newLogClient(target *plan.LogTarget) (logClient, error) {
Expand Down
Loading

0 comments on commit ae3351f

Please sign in to comment.