-
Notifications
You must be signed in to change notification settings - Fork 105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add WithContext methods to the Fluent struct including handling of context deadlines #86
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package fluent | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
|
@@ -78,17 +79,24 @@ func NewErrUnknownNetwork(network string) error { | |
} | ||
|
||
type msgToSend struct { | ||
ctx context.Context | ||
data []byte | ||
ack string | ||
} | ||
|
||
type bufferInput struct { | ||
msg *msgToSend | ||
result chan<- error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be |
||
} | ||
|
||
type Fluent struct { | ||
Config | ||
|
||
dialer dialer | ||
stopRunning chan bool | ||
pending chan *msgToSend | ||
pending chan bufferInput | ||
wg sync.WaitGroup | ||
resultPool sync.Pool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Want a different name for this too, like |
||
|
||
muconn sync.Mutex | ||
conn net.Conn | ||
|
@@ -108,6 +116,10 @@ type dialer interface { | |
Dial(string, string) (net.Conn, error) | ||
} | ||
|
||
type dialerWithContext interface { | ||
DialContext(context.Context, string, string) (net.Conn, error) | ||
} | ||
|
||
func newWithDialer(config Config, d dialer) (f *Fluent, err error) { | ||
if config.FluentNetwork == "" { | ||
config.FluentNetwork = defaultNetwork | ||
|
@@ -140,22 +152,24 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { | |
fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead") | ||
config.Async = config.Async || config.AsyncConnect | ||
} | ||
|
||
if config.Async { | ||
f = &Fluent{ | ||
Config: config, | ||
dialer: d, | ||
pending: make(chan *msgToSend, config.BufferLimit), | ||
} | ||
f.wg.Add(1) | ||
go f.run() | ||
} else { | ||
f = &Fluent{ | ||
Config: config, | ||
dialer: d, | ||
f = &Fluent{ | ||
Config: config, | ||
dialer: d, | ||
pending: make(chan bufferInput, config.BufferLimit), | ||
resultPool: sync.Pool{ | ||
New: func() interface{} { | ||
return make(chan error, 1) | ||
}, | ||
}, | ||
} | ||
if !config.Async { | ||
if err = f.connect(context.Background()); err != nil { | ||
return | ||
} | ||
err = f.connect() | ||
} | ||
|
||
f.wg.Add(1) | ||
go f.run() | ||
Comment on lines
+171
to
+172
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Those should be only for Async, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
return | ||
} | ||
|
||
|
@@ -185,17 +199,25 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { | |
// f.Post("tag_name", structData) | ||
// | ||
func (f *Fluent) Post(tag string, message interface{}) error { | ||
return f.PostWithContext(context.Background(), tag, message) | ||
} | ||
|
||
func (f *Fluent) PostWithContext(ctx context.Context, tag string, message interface{}) error { | ||
timeNow := time.Now() | ||
return f.PostWithTime(tag, timeNow, message) | ||
return f.PostWithTimeAndContext(ctx, tag, timeNow, message) | ||
} | ||
|
||
func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) error { | ||
return f.PostWithTimeAndContext(context.Background(), tag, tm, message) | ||
} | ||
|
||
func (f *Fluent) PostWithTimeAndContext(ctx context.Context, tag string, tm time.Time, message interface{}) error { | ||
if len(f.TagPrefix) > 0 { | ||
tag = f.TagPrefix + "." + tag | ||
} | ||
|
||
if m, ok := message.(msgp.Marshaler); ok { | ||
return f.EncodeAndPostData(tag, tm, m) | ||
return f.EncodeAndPostDataWithContext(ctx, tag, tm, m) | ||
} | ||
|
||
msg := reflect.ValueOf(message) | ||
|
@@ -215,7 +237,7 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err | |
} | ||
kv[name] = msg.FieldByIndex(field.Index).Interface() | ||
} | ||
return f.EncodeAndPostData(tag, tm, kv) | ||
return f.EncodeAndPostDataWithContext(ctx, tag, tm, kv) | ||
} | ||
|
||
if msgtype.Kind() != reflect.Map { | ||
|
@@ -229,13 +251,17 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err | |
kv[k.String()] = msg.MapIndex(k).Interface() | ||
} | ||
|
||
return f.EncodeAndPostData(tag, tm, kv) | ||
return f.EncodeAndPostDataWithContext(ctx, tag, tm, kv) | ||
} | ||
|
||
func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}) error { | ||
return f.EncodeAndPostDataWithContext(context.Background(), tag, tm, message) | ||
} | ||
|
||
func (f *Fluent) EncodeAndPostDataWithContext(ctx context.Context, tag string, tm time.Time, message interface{}) error { | ||
var msg *msgToSend | ||
var err error | ||
if msg, err = f.EncodeData(tag, tm, message); err != nil { | ||
if msg, err = f.EncodeDataWithContext(ctx, tag, tm, message); err != nil { | ||
return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v", message, err) | ||
} | ||
return f.postRawData(msg) | ||
|
@@ -251,7 +277,7 @@ func (f *Fluent) postRawData(msg *msgToSend) error { | |
return f.appendBuffer(msg) | ||
} | ||
// Synchronous write | ||
return f.write(msg) | ||
return f.appendBufferBlocking(msg) | ||
} | ||
|
||
// For sending forward protocol adopted JSON | ||
|
@@ -296,8 +322,12 @@ func getUniqueID(timeUnix int64) (string, error) { | |
} | ||
|
||
func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg *msgToSend, err error) { | ||
return f.EncodeDataWithContext(context.Background(), tag, tm, message) | ||
} | ||
|
||
func (f *Fluent) EncodeDataWithContext(ctx context.Context, tag string, tm time.Time, message interface{}) (msg *msgToSend, err error) { | ||
option := make(map[string]string) | ||
msg = &msgToSend{} | ||
msg = &msgToSend{ctx: ctx} | ||
timeUnix := tm.Unix() | ||
if f.Config.RequestAck { | ||
var err error | ||
|
@@ -338,13 +368,37 @@ func (f *Fluent) Close() (err error) { | |
// appendBuffer appends data to buffer with lock. | ||
func (f *Fluent) appendBuffer(msg *msgToSend) error { | ||
select { | ||
case f.pending <- msg: | ||
case f.pending <- bufferInput{msg: msg}: | ||
default: | ||
return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit) | ||
} | ||
return nil | ||
} | ||
|
||
// appendBufferWithFeedback appends data to buffer and waits for the result | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did, will change the comment :) |
||
func (f *Fluent) appendBufferBlocking(msg *msgToSend) error { | ||
result := f.resultPool.Get().(chan error) | ||
// write the data to the buffer and block if the buffer is full | ||
select { | ||
case f.pending <- bufferInput{msg: msg, result: result}: | ||
// don't do anything | ||
case <-msg.ctx.Done(): | ||
// because the result channel is not used, it can safely be returned to the sync pool. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I love this kind of comment 👍 |
||
f.resultPool.Put(result) | ||
return msg.ctx.Err() | ||
} | ||
|
||
select { | ||
case err := <-result: | ||
f.resultPool.Put(result) | ||
return err | ||
case <-msg.ctx.Done(): | ||
// the context deadline has exceeded, but there is no result yet. So the result channel cannot be returned to | ||
// the pool, as it might be written later. | ||
return msg.ctx.Err() | ||
} | ||
} | ||
|
||
// close closes the connection. | ||
func (f *Fluent) close(c net.Conn) { | ||
f.muconn.Lock() | ||
|
@@ -356,19 +410,23 @@ func (f *Fluent) close(c net.Conn) { | |
} | ||
|
||
// connect establishes a new connection using the specified transport. | ||
func (f *Fluent) connect() (err error) { | ||
func (f *Fluent) connect(ctx context.Context) (err error) { | ||
var address string | ||
switch f.Config.FluentNetwork { | ||
case "tcp": | ||
f.conn, err = f.dialer.Dial( | ||
f.Config.FluentNetwork, | ||
f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort)) | ||
address = f.Config.FluentHost + ":" + strconv.Itoa(f.Config.FluentPort) | ||
case "unix": | ||
f.conn, err = f.dialer.Dial( | ||
f.Config.FluentNetwork, | ||
f.Config.FluentSocketPath) | ||
address = f.Config.FluentSocketPath | ||
default: | ||
err = NewErrUnknownNetwork(f.Config.FluentNetwork) | ||
return | ||
} | ||
if d, ok := f.dialer.(dialerWithContext); ok { | ||
f.conn, err = d.DialContext(ctx, f.Config.FluentNetwork, address) | ||
} else { | ||
f.conn, err = f.dialer.Dial(f.Config.FluentNetwork, address) | ||
} | ||
|
||
return err | ||
} | ||
|
||
|
@@ -386,7 +444,11 @@ func (f *Fluent) run() { | |
emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) }) | ||
continue | ||
} | ||
err := f.write(entry) | ||
err := f.write(entry.msg) | ||
if entry.result != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there any cases to get There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, so in this change the sync flow and the async flow get very similar. Both write to the In case of the async flow no feedback is required, so there will be no result channel in the entry. As you can see in In case of the sync flow, a result channel is added to the entry, so the feedback can be returned to the caller. And as you can see |
||
entry.result <- err | ||
continue | ||
} | ||
if err != nil { | ||
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339)) | ||
} | ||
|
@@ -413,7 +475,7 @@ func (f *Fluent) write(msg *msgToSend) error { | |
if c == nil { | ||
f.muconn.Lock() | ||
if f.conn == nil { | ||
err := f.connect() | ||
err := f.connect(msg.ctx) | ||
if err != nil { | ||
f.muconn.Unlock() | ||
|
||
|
@@ -425,7 +487,13 @@ func (f *Fluent) write(msg *msgToSend) error { | |
if waitTime > f.Config.MaxRetryWait { | ||
waitTime = f.Config.MaxRetryWait | ||
} | ||
time.Sleep(time.Duration(waitTime) * time.Millisecond) | ||
waitDuration := time.Duration(waitTime) * time.Millisecond | ||
if deadline, hasDeadLine := msg.ctx.Deadline(); hasDeadLine && deadline.Before(time.Now().Add(waitDuration)) { | ||
// the context deadline is within the wait time, so after the sleep the deadline will have been | ||
// exceeded. It is a waste of time to wait on that. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't it wait until the deadline? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If |
||
return context.DeadlineExceeded | ||
} | ||
time.Sleep(waitDuration) | ||
continue | ||
} | ||
} | ||
|
@@ -435,11 +503,14 @@ func (f *Fluent) write(msg *msgToSend) error { | |
|
||
// We're connected, write msg | ||
t := f.Config.WriteTimeout | ||
var deadline time.Time | ||
if time.Duration(0) < t { | ||
c.SetWriteDeadline(time.Now().Add(t)) | ||
} else { | ||
c.SetWriteDeadline(time.Time{}) | ||
deadline = time.Now().Add(t) | ||
} | ||
if ctxDeadline, hasDeadline := msg.ctx.Deadline(); hasDeadline && (deadline.IsZero() || ctxDeadline.Before(deadline)) { | ||
deadline = ctxDeadline | ||
} | ||
c.SetWriteDeadline(deadline) | ||
_, err := c.Write(msg.data) | ||
if err != nil { | ||
f.close(c) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package fluent | |
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"io/ioutil" | ||
|
@@ -449,6 +450,59 @@ func TestPostWithTime(t *testing.T) { | |
} | ||
} | ||
|
||
func TestPostWithTimeAndContext(t *testing.T) { | ||
testcases := map[string]Config{ | ||
"with Async": { | ||
Async: true, | ||
MarshalAsJSON: true, | ||
TagPrefix: "acme", | ||
}, | ||
"without Async": { | ||
Async: false, | ||
MarshalAsJSON: true, | ||
TagPrefix: "acme", | ||
}, | ||
} | ||
|
||
for tcname := range testcases { | ||
t.Run(tcname, func(t *testing.T) { | ||
tc := testcases[tcname] | ||
t.Parallel() | ||
|
||
d := newTestDialer() | ||
var f *Fluent | ||
defer func() { | ||
if f != nil { | ||
f.Close() | ||
} | ||
}() | ||
deadline := time.Now().Add(1 * time.Second) | ||
|
||
go func() { | ||
var err error | ||
if f, err = newWithDialer(tc, d); err != nil { | ||
t.Errorf("Unexpected error: %v", err) | ||
} | ||
ctx, cancelFunc := context.WithDeadline(context.Background(), deadline) | ||
defer cancelFunc() | ||
|
||
_ = f.PostWithTimeAndContext(ctx, "tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) | ||
_ = f.PostWithTimeAndContext(ctx, "tag_name", time.Unix(1482493050, 0), map[string]string{"fluentd": "is awesome"}) | ||
}() | ||
|
||
conn := d.waitForNextDialing(true) | ||
assertReceived(t, | ||
conn.waitForNextWrite(true, ""), | ||
"[\"acme.tag_name\",1482493046,{\"foo\":\"bar\"},{}]") | ||
|
||
assertReceived(t, | ||
conn.waitForNextWrite(true, ""), | ||
"[\"acme.tag_name\",1482493050,{\"fluentd\":\"is awesome\"},{}]") | ||
assert.Equal(t, conn.writeDeadline, deadline) | ||
}) | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add another test case for the context timeout, via sync and async logger? |
||
func TestReconnectAndResendAfterTransientFailure(t *testing.T) { | ||
testcases := map[string]Config{ | ||
"with Async": { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This
ctx
is referred only inf.write()
, and its argument is only themsg
. Putting contexts intomsgToSend
only for passing it tof.write()
looks too much to me (because msgToSend is a message, not any other internal structure!)How about containing
ctx
inbufferInput
? (And add an argumentctx
tof.write()
?)It's a structure of
msg
and internal objects, so it seems a better struct to have the context.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do so,
EncodeDataWithContext
is not needed anymore, right?That function looks only for putting contexts to
msgToSend
structure.