-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36701][cdc-runtime] Add steps to get and emit schemaManager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent #3802
base: master
Are you sure you want to change the base?
[FLINK-36701][cdc-runtime] Add steps to get and emit schemaManager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent #3802
Conversation
...untime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
Outdated
Show resolved
Hide resolved
20a00d4
to
705d763
Compare
Thanks for @Jzjsnow's clear and detailed diagram! Just noticed that we still lack failover tests for pipeline jobs. Could we add some recovering tests like |
596fa99
to
468feec
Compare
@yuxiqian Thanks for the suggestion, I have added some tests to the |
Here it looks like we have once again encountered a |
It's worrying to notice similar OceanBase test case failures here, but seems irrelevant to this PR. Will investigate this. |
@Jzjsnow Should be fixed now... please rebase to master branch. |
…latest evolvedSchema when SinkDataWriterOperator handles FlushEvent.
468feec
to
84bf4d0
Compare
@yuxiqian Thanks for the quick fix, now we've rebased to the master branch. |
The Here is the error log:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @Jzjsnow's great work, left some minor comments.
|
||
/** Which subTask ID this FlushEvent was initiated from. */ | ||
private final int sourceSubTaskId; | ||
|
||
/** Flag indicating whether the FlushEvent is sent before a create table event. */ | ||
private final Boolean isForCreateTableEvent; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we may need to trace FlushEvent
more specifically, about what type of schema change event it was caused by.
If so, we may store a SchemaChangeEventType
enum value for extensibility.
If it turns out to be unnecessary, at least we can use boolean
to avoid [un]boxing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this change can be split into an individual commit to keep commit history accurate.
e0a3e30
to
c7de55b
Compare
…ager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent.
…ager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent.
Currently, directly after a failover, when the pipeline first handles a schema change event (e.g. AddColumnEvent) and then a DataChangeEvent, it may cause the job to fail again as sink has repeatedly applied that schema change.
The issue is revealed as follows:
I add steps to get and emit schemaManager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent. If the operator doesn't have a local cache of the schema when handling the flushevent, it will request Schema Manager's latest evolved schema. At this point the evolved schema is the same with the sink table's, as the schema change event hasn't been applied to the evolved schema by the master yet.
Now the new process is changed as follows: