Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36701][cdc-runtime] Add steps to get and emit schemaManager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent #3802

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

Jzjsnow
Copy link

@Jzjsnow Jzjsnow commented Dec 13, 2024

Currently, directly after a failover, when the pipeline first handles a schema change event (e.g. AddColumnEvent) and then a DataChangeEvent, it may cause the job to fail again as sink has repeatedly applied that schema change.

The issue is revealed as follows:
before

I add steps to get and emit schemaManager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent. If the operator doesn't have a local cache of the schema when handling the flushevent, it will request Schema Manager's latest evolved schema. At this point the evolved schema is the same with the sink table's, as the schema change event hasn't been applied to the evolved schema by the master yet.

Now the new process is changed as follows:
after

@yuxiqian
Copy link
Contributor

yuxiqian commented Dec 17, 2024

Thanks for @Jzjsnow's clear and detailed diagram!

Just noticed that we still lack failover tests for pipeline jobs. Could we add some recovering tests like MySqlSourceITCase#testMySqlParallelSource and trigger TM/JM Failures in different phases? flink-cdc-composer unit tests might be a place for it.

@Jzjsnow Jzjsnow force-pushed the master-Add_steps_to_get_and_emit_schemaManager's_latest_evolvedSchema_when_SinkDataWriterOperator_handles_FlushEvent branch 2 times, most recently from 596fa99 to 468feec Compare January 2, 2025 01:41
@Jzjsnow
Copy link
Author

Jzjsnow commented Jan 2, 2025

Thanks for @Jzjsnow's clear and detailed diagram!

Just noticed that we still lack failover tests for pipeline jobs. Could we add some recovering tests like MySqlSourceITCase#testMySqlParallelSource and trigger TM/JM Failures in different phases? flink-cdc-composer unit tests might be a place for it.

@yuxiqian Thanks for the suggestion, I have added some tests to the flink-cdc-runtime unit test for testing DataSinkOperator's handling of schema change events, which was lacking. Among them, DataSinkOperatorWithSchemaEvolveTest#testSchemaChangeEventAfterFailover is used to test the underlying schema change process after failure recovery, which is applicable to the scenario of restarting after jm/tm failure.

@Jzjsnow
Copy link
Author

Jzjsnow commented Jan 2, 2025

Here it looks like we have once again encountered a OceanBaseMySQLModeITCase test failure similar to the one in #3712. Any ideas on how to fix this?

@yuxiqian
Copy link
Contributor

yuxiqian commented Jan 2, 2025

It's worrying to notice similar OceanBase test case failures here, but seems irrelevant to this PR. Will investigate this.

@yuxiqian
Copy link
Contributor

yuxiqian commented Jan 3, 2025

@Jzjsnow Should be fixed now... please rebase to master branch.

…latest evolvedSchema when SinkDataWriterOperator handles FlushEvent.
@Jzjsnow Jzjsnow force-pushed the master-Add_steps_to_get_and_emit_schemaManager's_latest_evolvedSchema_when_SinkDataWriterOperator_handles_FlushEvent branch from 468feec to 84bf4d0 Compare January 3, 2025 07:02
@Jzjsnow
Copy link
Author

Jzjsnow commented Jan 3, 2025

@yuxiqian Thanks for the quick fix, now we've rebased to the master branch.

@Jzjsnow
Copy link
Author

Jzjsnow commented Jan 3, 2025

The Source E2E Tests#OceanBaseE2eITCase test did not pass in this round of CIs, but it is not relevant to this PR. It seems that the 1 minute timeout set in the method checkResultWithTimeout (timeout=60000L) is a bit too short, so that the row id=111 in the sink table that should have been deleted has not been synchronized yet.

Here is the error log:

2025-01-03T07:38:53.8553280Z 1254259 [main] ERROR org.apache.flink.cdc.connectors.tests.OceanBaseE2eITCase -
2025-01-03T07:38:53.8553917Z --------------------------------------------------------------------------------
2025-01-03T07:38:53.8554680Z Test testOceanBaseCDCflinkVersion: 1.19.1 failed with:
2025-01-03T07:38:53.8555684Z array lengths differed, expected.length=10 actual.length=11; arrays first differed at element [10]; expected: but was:<111,scooter,Big 2-wheel scooter ,5.18,null,null>
2025-01-03T07:38:53.8556858Z at org.junit.internal.ComparisonCriteria.arrayEquals(ComparisonCriteria.java:89)
2025-01-03T07:38:53.8557558Z at org.junit.internal.ComparisonCriteria.arrayEquals(ComparisonCriteria.java:28)
2025-01-03T07:38:53.8558017Z at org.junit.Assert.internalArrayEquals(Assert.java:534)
2025-01-03T07:38:53.8558492Z at org.junit.Assert.assertArrayEquals(Assert.java:285)
2025-01-03T07:38:53.8558895Z at org.junit.Assert.assertArrayEquals(Assert.java:300)
2025-01-03T07:38:53.8559341Z at org.apache.flink.cdc.common.test.utils.JdbcProxy.checkResult(JdbcProxy.java:70)
2025-01-03T07:38:53.8559996Z at org.apache.flink.cdc.common.test.utils.JdbcProxy.checkResultWithTimeout(JdbcProxy.java:93)
2025-01-03T07:38:53.8560819Z at org.apache.flink.cdc.connectors.tests.OceanBaseE2eITCase.testOceanBaseCDC(OceanBaseE2eITCase.java:179)

Copy link
Contributor

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @Jzjsnow's great work, left some minor comments.


/** Which subTask ID this FlushEvent was initiated from. */
private final int sourceSubTaskId;

/** Flag indicating whether the FlushEvent is sent before a create table event. */
private final Boolean isForCreateTableEvent;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we may need to trace FlushEvent more specifically, about what type of schema change event it was caused by.

If so, we may store a SchemaChangeEventType enum value for extensibility.

If it turns out to be unnecessary, at least we can use boolean to avoid [un]boxing.

Copy link
Contributor

@yuxiqian yuxiqian Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this change can be split into an individual commit to keep commit history accurate.

@Jzjsnow Jzjsnow force-pushed the master-Add_steps_to_get_and_emit_schemaManager's_latest_evolvedSchema_when_SinkDataWriterOperator_handles_FlushEvent branch from e0a3e30 to c7de55b Compare January 7, 2025 11:55
jzjsnow added 2 commits January 7, 2025 20:05
…ager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent.
…ager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants