-
Notifications
You must be signed in to change notification settings - Fork 105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add WithContext methods to the Fluent struct including handling of context deadlines #86
base: master
Are you sure you want to change the base?
Conversation
…ntext deadlines Signed-off-by: Arno Geurts <arno.geurts@sqills.com>
2b2330c
to
5e1b54c
Compare
Ah, I need time to review this change. I'll take time for this later, but please ping me if there will be no further comments in weeks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just checked only the diff lines. Will check the entire code (not only diff) later.
default: | ||
return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit) | ||
} | ||
return nil | ||
} | ||
|
||
// appendBufferWithFeedback appends data to buffer and waits for the result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
appendBufferWithFeedback
did you update the name after writing this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did, will change the comment :)
case f.pending <- bufferInput{msg: msg, result: result}: | ||
// don't do anything | ||
case <-msg.ctx.Done(): | ||
// because the result channel is not used, it can safely be returned to the sync pool. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love this kind of comment 👍
@@ -386,7 +444,11 @@ func (f *Fluent) run() { | |||
emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) }) | |||
continue | |||
} | |||
err := f.write(entry) | |||
err := f.write(entry.msg) | |||
if entry.result != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any cases to get nil
on entry.result
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, so in this change the sync flow and the async flow get very similar. Both write to the pending
channel and have a separate go-routine taking entries of that channel and writing the data to the TCP connection.
In case of the async flow no feedback is required, so there will be no result channel in the entry. As you can see in appendBuffer
an entry is written to the channel without a result channel.
In case of the sync flow, a result channel is added to the entry, so the feedback can be returned to the caller. And as you can see appendBufferBlocking
puts the result channel in the entry and waits on the feedback (hence the "blocking").
f.wg.Add(1) | ||
go f.run() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those should be only for Async, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I couldn't take the time for further review. Let me try in this week or the next week... |
waitDuration := time.Duration(waitTime) * time.Millisecond | ||
if deadline, hasDeadLine := msg.ctx.Deadline(); hasDeadLine && deadline.Before(time.Now().Add(waitDuration)) { | ||
// the context deadline is within the wait time, so after the sleep the deadline will have been | ||
// exceeded. It is a waste of time to wait on that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it wait until the deadline?
This code is to return DeadlineExceeded
when now + waitDuration > deadline
but the DeadlineExceeded should occur when the current time is after the deadline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If deadline.Before(time.Now())
, it should return DeadlineExceeded
immediately.
If deadline.Before(time.Now().Add(waitDuration)
, it should sleep until the deadline.
MaxRetryWait
may be configured as minutes or hours, so the current code could cause unexpectedly early return of errors.
data []byte | ||
ack string | ||
} | ||
|
||
type bufferInput struct { | ||
msg *msgToSend | ||
result chan<- error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be syncWriteResult
or a similar one to show it's only about sync-write.
wg sync.WaitGroup | ||
resultPool sync.Pool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Want a different name for this too, like result
above. The current name looks so confusing to me.
@@ -78,17 +79,24 @@ func NewErrUnknownNetwork(network string) error { | |||
} | |||
|
|||
type msgToSend struct { | |||
ctx context.Context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ctx
is referred only in f.write()
, and its argument is only the msg
. Putting contexts into msgToSend
only for passing it to f.write()
looks too much to me (because msgToSend is a message, not any other internal structure!)
How about containing ctx
in bufferInput
? (And add an argument ctx
to f.write()
?)
It's a structure of msg
and internal objects, so it seems a better struct to have the context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do so, EncodeDataWithContext
is not needed anymore, right?
That function looks only for putting contexts to msgToSend
structure.
}) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add another test case for the context timeout, via sync and async logger?
Fixes #85
This pull requests adds PostWithContext, PostWithTimeAndContext, EncodeAndPostDataWithContext, EncodeDataWithContext to the Fluent struct. The context deadlines and cancellations are handled at different levels in the code to make sure no unnecessary work is done.