Skip to content

Commit

Permalink
fixup! [FLINK-36701][cdc-runtime] Add steps to get and emit schemaMan…
Browse files Browse the repository at this point in the history
…ager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent.
  • Loading branch information
jzjsnow authored and 姜卓君 committed Dec 30, 2024
1 parent 798ab3b commit 596fa99
Show file tree
Hide file tree
Showing 4 changed files with 528 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testEvolveSchema() throws Exception {

Assertions.assertThat(
ListUtils.union(
Collections.singletonList(new FlushEvent(tableId)),
Collections.singletonList(new FlushEvent(tableId, true)),
createAndInsertDataEvents))
.isEqualTo(
harness.getOutputRecords().stream()
Expand Down Expand Up @@ -369,7 +369,7 @@ public void testTryEvolveSchema() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
Collections.singletonList(new FlushEvent(tableId)),
Collections.singletonList(new FlushEvent(tableId, true)),
createAndInsertDataEvents));

Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
Expand Down Expand Up @@ -626,7 +626,7 @@ public void testExceptionEvolveSchema() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
Collections.singletonList(new FlushEvent(tableId)),
Collections.singletonList(new FlushEvent(tableId, true)),
createAndInsertDataEvents));

Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
Expand Down Expand Up @@ -735,7 +735,7 @@ public void testIgnoreEvolveSchema() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
Collections.singletonList(new FlushEvent(tableId)),
Collections.singletonList(new FlushEvent(tableId, true)),
createAndInsertDataEvents));

Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
Expand Down Expand Up @@ -1021,7 +1021,7 @@ tableId, buildRecord(INT, 2, STRING, "Bob", SMALLINT, (short) 18)),
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
Collections.singletonList(new FlushEvent(tableId)),
Collections.singletonList(new FlushEvent(tableId, true)),
createAndInsertDataEvents));

Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
Expand Down Expand Up @@ -1108,7 +1108,7 @@ public void testTryEvolveSchemaWithFailure() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
Collections.singletonList(new FlushEvent(tableId)),
Collections.singletonList(new FlushEvent(tableId, true)),
createAndInsertDataEvents));

Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
Expand Down Expand Up @@ -1420,7 +1420,7 @@ public void testFineGrainedSchemaEvolves() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
Collections.singletonList(new FlushEvent(tableId)),
Collections.singletonList(new FlushEvent(tableId, true)),
createAndInsertDataEvents));

Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
Expand Down Expand Up @@ -1720,7 +1720,7 @@ public void testLenientSchemaEvolves() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
Collections.singletonList(new FlushEvent(tableId)),
Collections.singletonList(new FlushEvent(tableId, true)),
createAndInsertDataEvents));

Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
Expand Down Expand Up @@ -2100,7 +2100,7 @@ public void testLenientEvolveTweaks() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
Collections.singletonList(new FlushEvent(tableId)),
Collections.singletonList(new FlushEvent(tableId, true)),
createAndInsertDataEvents));

Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package org.apache.flink.cdc.runtime.operators.sink;

import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;

/**
* The DataSinkOperatorAdapter class acts as an adapter for testing the core schema evolution
* process in both {@link DataSinkWriterOperator} and {@link DataSinkFunctionOperator}.
*/
public class DataSinkOperatorAdapter extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event>, BoundedOneInput {

private SchemaEvolutionClient schemaEvolutionClient;

private final OperatorID schemaOperatorID;

/** A set of {@link TableId} that already processed {@link CreateTableEvent}. */
private final Set<TableId> processedTableIds;

public DataSinkOperatorAdapter() {
this.schemaOperatorID = new OperatorID();
this.processedTableIds = new HashSet<>();
this.chainingStrategy = ChainingStrategy.ALWAYS;
}

@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<Event>> output) {
super.setup(containingTask, config, output);
schemaEvolutionClient =
new SchemaEvolutionClient(
containingTask.getEnvironment().getOperatorCoordinatorEventGateway(),
schemaOperatorID);
}

@Override
public void open() throws Exception {}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
schemaEvolutionClient.registerSubtask(getRuntimeContext().getIndexOfThisSubtask());
}

@Override
public void snapshotState(StateSnapshotContext context) {}

@Override
public void processWatermark(Watermark mark) {}

@Override
public void processWatermarkStatus(WatermarkStatus watermarkStatus) {}

@Override
public void processElement(StreamRecord<Event> element) throws Exception {
Event event = element.getValue();

// FlushEvent triggers flush
if (event instanceof FlushEvent) {
handleFlushEvent(((FlushEvent) event));
return;
}

// CreateTableEvent marks the table as processed directly
if (event instanceof CreateTableEvent) {
processedTableIds.add(((CreateTableEvent) event).tableId());
// replace FlinkWriterOperator/StreamSink and emit the event for testing
output.collect(element);
return;
}

// Check if the table is processed before emitting all other events, because we have to make
// sure that sink have a view of the full schema before processing any change events,
// including schema changes.
ChangeEvent changeEvent = (ChangeEvent) event;
if (!processedTableIds.contains(changeEvent.tableId())) {
emitLatestSchema(changeEvent.tableId());
processedTableIds.add(changeEvent.tableId());
}
processedTableIds.add(changeEvent.tableId());
output.collect(element);
}

@Override
public void prepareSnapshotPreBarrier(long checkpointId) {}

@Override
public void close() throws Exception {}

@Override
public void endInput() {}

// ----------------------------- Helper functions -------------------------------

private void handleFlushEvent(FlushEvent event) throws Exception {
// omit copySinkWriter/userFunction flush from testing
if (!processedTableIds.contains(event.getTableId()) && !event.getIsForCreateTableEvent()) {
LOG.info("Table {} has not been processed", event.getTableId());
emitLatestSchema(event.getTableId());
processedTableIds.add(event.getTableId());
}
schemaEvolutionClient.notifyFlushSuccess(
getRuntimeContext().getIndexOfThisSubtask(), event.getTableId());
}

private void emitLatestSchema(TableId tableId) throws Exception {
Optional<Schema> schema = schemaEvolutionClient.getLatestEvolvedSchema(tableId);
if (schema.isPresent()) {
// request and process CreateTableEvent because SinkWriter need to retrieve
// Schema to deserialize RecordData after resuming job.
output.collect(new StreamRecord<>(new CreateTableEvent(tableId, schema.get())));
processedTableIds.add(tableId);
} else {
throw new RuntimeException(
"Could not find schema message from SchemaRegistry for " + tableId);
}
}
}
Loading

0 comments on commit 596fa99

Please sign in to comment.