From a4db562db8460d6372f361f22543409d88599a5a Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Tue, 24 Dec 2024 13:59:33 +0800 Subject: [PATCH] [FLINK-36956][transform] Append NOT NULL attribute to PK columns Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> # Conflicts: # flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java # flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java --- .../content.zh/docs/core-concept/transform.md | 2 + docs/content/docs/core-concept/transform.md | 2 + .../flink/FlinkPipelineComposerITCase.java | 12 +- .../FlinkPipelineComposerLenientITCase.java | 12 +- .../flink/FlinkPipelineTransformITCase.java | 189 +++++++++++++----- .../flink/FlinkPipelineUdfITCase.java | 24 +-- .../transform/PostTransformOperator.java | 2 +- .../TransformProjectionProcessor.java | 15 +- .../transform/PostTransformOperatorTest.java | 26 +-- ...TransformOperatorWithSchemaEvolveTest.java | 12 +- .../UnifiedTransformOperatorTest.java | 35 ++-- 11 files changed, 225 insertions(+), 106 deletions(-) diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md index 950b0183389..892243628ff 100644 --- a/docs/content.zh/docs/core-concept/transform.md +++ b/docs/content.zh/docs/core-concept/transform.md @@ -263,6 +263,8 @@ transform: description: reassign composite primary keys example ``` +Notice that primary key columns will be attributed as NOT NULL in the downstream table, so you should ensure that no NULL value will be assigned to these columns. + ## Reassign partition key We can reassign the partition key in transform rules. For example, given a table web_order in the database mydb, we may define a transform rule as follows: diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md index 9a8cf5c6106..b4979beca51 100644 --- a/docs/content/docs/core-concept/transform.md +++ b/docs/content/docs/core-concept/transform.md @@ -263,6 +263,8 @@ transform: description: reassign composite primary keys example ``` +Notice that primary key columns will be attributed as NOT NULL in the downstream table, so you should ensure that no NULL value will be assigned to these columns. + ## Reassign partition key We can reassign the partition key in transform rules. For example, given a table web_order in the database mydb, we may define a transform rule as follows: diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 380baaa5df3..c48f9440296 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -350,7 +350,7 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception { String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10], op=INSERT, meta=({op_ts=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20], op=INSERT, meta=({op_ts=2})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", @@ -413,7 +413,7 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING NOT NULL,`opts` BIGINT NOT NULL}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING,`rk` STRING NOT NULL,`opts` BIGINT NOT NULL}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I, 1], op=INSERT, meta=({op_ts=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I, 2], op=INSERT, meta=({op_ts=2})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", @@ -485,7 +485,7 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", @@ -1024,7 +1024,7 @@ void testTransformMergingWithRoute() throws Exception { assertThat(mergedTableSchema) .isEqualTo( Schema.newBuilder() - .physicalColumn("id", DataTypes.BIGINT()) + .physicalColumn("id", DataTypes.BIGINT().notNull()) .physicalColumn("name", DataTypes.STRING()) .physicalColumn("age", DataTypes.INT()) .physicalColumn("last_name", DataTypes.STRING()) @@ -1035,12 +1035,12 @@ void testTransformMergingWithRoute() throws Exception { String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18, last_name], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name], after=[2, Bob, 30, last_name], op=UPDATE, meta=()}", "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=AFTER, existedColumnName=last_name}]}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT}, oldTypeMapping={id=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, last_name, student], after=[], op=DELETE, meta=()}", diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java index 358094aa6cb..ad541f447fd 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java @@ -394,7 +394,7 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception { String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10], op=INSERT, meta=({op_ts=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20], op=INSERT, meta=({op_ts=2})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", @@ -455,7 +455,7 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING NOT NULL}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING,`rk` STRING NOT NULL}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I], op=INSERT, meta=({op_ts=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I], op=INSERT, meta=({op_ts=2})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", @@ -525,7 +525,7 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", @@ -1057,7 +1057,7 @@ void testTransformMergingWithRoute() throws Exception { assertThat(mergedTableSchema) .isEqualTo( Schema.newBuilder() - .physicalColumn("id", DataTypes.BIGINT()) + .physicalColumn("id", DataTypes.BIGINT().notNull()) .physicalColumn("name", DataTypes.STRING()) .physicalColumn("age", DataTypes.INT()) .physicalColumn("last_name", DataTypes.STRING()) @@ -1068,12 +1068,12 @@ void testTransformMergingWithRoute() throws Exception { String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18, last_name], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name], after=[2, Bob, 30, last_name], op=UPDATE, meta=()}", "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, existedColumnName=null}]}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT}, oldTypeMapping={id=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, last_name, student], after=[], op=DELETE, meta=()}", diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 6a775e74ae5..5610972867d 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -141,11 +141,11 @@ void testCalculatedColumns(ValuesDataSink.SinkApi sinkApi) throws Exception { null, null)), Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`uid` STRING,`double_age` INT}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`uid` STRING,`double_age` INT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, 1Alice, 36], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, 2Bob, 40], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, 2Bob, 40], after=[2, Bob, 30, 2Bob, 60], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`uid` STRING,`double_age` INT}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`uid` STRING,`double_age` INT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, 3Carol, 30], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, 4Derrida, 50], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, 4Derrida, 50], after=[], op=DELETE, meta=()}")); @@ -169,11 +169,11 @@ void testMultipleReferencedColumnsInProjection(ValuesDataSink.SinkApi sinkApi) null, null)), Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`cubic_age` INT}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`cubic_age` INT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, 5832], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, 8000], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, 8000], after=[2, Bob, 30, 27000], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`cubic_age` INT}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`cubic_age` INT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, 3375], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, 15625], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, 15625], after=[], op=DELETE, meta=()}")); @@ -255,11 +255,11 @@ void testMultipleDispatchTransform(ValuesDataSink.SinkApi sinkApi) throws Except null, null)), Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`category` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`category` STRING}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, YOUNG], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, OLD], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, OLD], after=[2, Bob, 30, OLD], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`category` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`category` STRING}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, YOUNG], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, OLD], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, OLD], after=[], op=DELETE, meta=()}")); @@ -290,11 +290,11 @@ void testMultipleTransformWithDiffRefColumn(ValuesDataSink.SinkApi sinkApi) thro null, null)), Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`age` INT,`roleName` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`age` INT,`roleName` STRING}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, 18, Alice], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, 20, Bob], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 20, Bob], after=[2, 30, Bob], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`age` TINYINT,`roleName` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`age` TINYINT,`roleName` STRING}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, 15, Juvenile], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, 25, Derrida], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 25, Derrida], after=[], op=DELETE, meta=()}")); @@ -329,7 +329,7 @@ void testMultiTransformWithAsterisk(ValuesDataSink.SinkApi sinkApi) throws Excep "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`roleName` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`roleName` STRING}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, Juvenile], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, Derrida], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, Derrida], after=[], op=DELETE, meta=()}")); @@ -364,7 +364,7 @@ void testMultiTransformMissingProjection(ValuesDataSink.SinkApi sinkApi) throws "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` STRING,`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` STRING,`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, DERRIDA, 25, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, DERRIDA, 25, student], after=[], op=DELETE, meta=()}")); @@ -387,11 +387,11 @@ void testMetadataInfo(ValuesDataSink.SinkApi sinkApi) throws Exception { "Just a Transform Block", null)), Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING NOT NULL,`age` INT}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`age` TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}")); @@ -444,11 +444,11 @@ void testMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { null, null)), Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, default_namespace, default_schema, mytable1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, default_namespace, default_schema, mytable1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, default_namespace, default_schema, mytable1], after=[2, Bob, 30, default_namespace, default_schema, mytable1], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, default_namespace, default_schema, mytable2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, default_namespace, default_schema, mytable2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, default_namespace, default_schema, mytable2], after=[], op=DELETE, meta=()}")); @@ -471,11 +471,11 @@ void testMetadataColumnWithWildcard(ValuesDataSink.SinkApi sinkApi) throws Excep null, null)), Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, default_namespace, default_schema, mytable1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, default_namespace, default_schema, mytable1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, default_namespace, default_schema, mytable1], after=[2, Bob, 30, default_namespace, default_schema, mytable1], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, default_namespace, default_schema, mytable2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, default_namespace, default_schema, mytable2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, default_namespace, default_schema, mytable2], after=[], op=DELETE, meta=()}")); @@ -502,11 +502,11 @@ void testUsingMetadataColumnLiteralWithWildcard(ValuesDataSink.SinkApi sinkApi) null, null)), Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`string_literal` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`string_literal` STRING}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, __namespace_name____schema_name____table_name__], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, __namespace_name____schema_name____table_name__], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, __namespace_name____schema_name____table_name__], after=[2, Bob, 30, __namespace_name____schema_name____table_name__], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`string_literal` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`string_literal` STRING}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, __namespace_name____schema_name____table_name__], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, __namespace_name____schema_name____table_name__], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, __namespace_name____schema_name____table_name__], after=[], op=DELETE, meta=()}")); @@ -529,11 +529,11 @@ void testConvertDeleteAsInsert(ValuesDataSink.SinkApi sinkApi) throws Exception null, "SOFT_DELETE")), Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL,`__data_event_type__` STRING NOT NULL}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL,`__data_event_type__` STRING NOT NULL}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, default_namespace, default_schema, mytable1, +I], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, default_namespace, default_schema, mytable1, +I], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, default_namespace, default_schema, mytable1, -U], after=[2, Bob, 30, default_namespace, default_schema, mytable1, +U], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL,`__data_event_type__` STRING NOT NULL}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL,`__data_event_type__` STRING NOT NULL}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, default_namespace, default_schema, mytable2, +I], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, default_namespace, default_schema, mytable2, +I], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, default_namespace, default_schema, mytable2, -D], op=INSERT, meta=()}")); @@ -562,11 +562,11 @@ void testBuiltinComparisonFunctions(ValuesDataSink.SinkApi sinkApi) throws Excep null, null)), Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` BOOLEAN,`col2` BOOLEAN,`col3` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN,`col11` BOOLEAN,`col12` BOOLEAN,`col13` BOOLEAN,`col14` BOOLEAN}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`col1` BOOLEAN,`col2` BOOLEAN,`col3` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN,`col11` BOOLEAN,`col12` BOOLEAN,`col13` BOOLEAN,`col14` BOOLEAN}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, false, true, false, false, true, true, false, true, true, true, true, false, true, false], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, true, true, false, true, true, true, false, true, true, false, false, false, false, true], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, true, true, false, true, true, true, false, true, true, false, false, false, false, true], after=[2, Bob, 30, true, true, false, true, true, true, false, true, true, false, false, false, false, true], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` BOOLEAN,`col2` BOOLEAN,`col3` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN,`col11` BOOLEAN,`col12` BOOLEAN,`col13` BOOLEAN,`col14` BOOLEAN}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` BOOLEAN,`col2` BOOLEAN,`col3` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN,`col11` BOOLEAN,`col12` BOOLEAN,`col13` BOOLEAN,`col14` BOOLEAN}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, false, true, true, true, false, true, false, true, true, false, false, true, true, false], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, false, true, true, true, false, true, false, true, false, false, false, false, false, true], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, false, true, true, true, false, true, false, true, false, false, false, false, false, true], after=[], op=DELETE, meta=()}")); @@ -595,11 +595,11 @@ void testBuiltinLogicalFunctions(ValuesDataSink.SinkApi sinkApi) throws Exceptio null, null)), Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` BOOLEAN,`col2` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`col1` BOOLEAN,`col2` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, true, true, true, false, false, true, false, true, false], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, true, true, false, false, true, true, false, true, false], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, true, true, false, false, true, true, false, true, false], after=[2, Bob, 30, true, true, false, false, true, true, false, true, false], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` BOOLEAN,`col2` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` BOOLEAN,`col2` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, true, true, false, false, true, true, false, false, false], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, true, true, false, false, true, true, true, true, false], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, true, true, false, false, true, true, true, true, false], after=[], op=DELETE, meta=()}")); @@ -629,11 +629,11 @@ void testBuiltinArithmeticFunctions(ValuesDataSink.SinkApi sinkApi) throws Excep null, null)), Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` INT,`col2` INT,`col3` INT,`col4` DOUBLE,`col5` INT,`col6` INT,`col7` DOUBLE,`col8` DOUBLE,`col9` DOUBLE,`col10` INT}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`col1` INT,`col2` INT,`col3` INT,`col4` DOUBLE,`col5` INT,`col6` INT,`col7` DOUBLE,`col8` DOUBLE,`col9` DOUBLE,`col10` INT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, 18, -16, 17, 0.5882352941176471, 1, 16, 1.0, 0.0, 1.0, 36], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, 19, -15, 34, 1.1764705882352942, 2, 15, 2.0, 1.0, 1.0, 36], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, 19, -15, 34, 1.1764705882352942, 2, 15, 2.0, 1.0, 1.0, 36], after=[2, Bob, 30, 19, -15, 34, 1.1764705882352942, 2, 15, 2.0, 1.0, 1.0, 36], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` BIGINT,`col2` BIGINT,`col3` BIGINT,`col4` DOUBLE,`col5` INT,`col6` BIGINT,`col7` DOUBLE,`col8` DOUBLE,`col9` DOUBLE,`col10` INT}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` BIGINT,`col2` BIGINT,`col3` BIGINT,`col4` DOUBLE,`col5` INT,`col6` BIGINT,`col7` DOUBLE,`col8` DOUBLE,`col9` DOUBLE,`col10` INT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, 20, -14, 51, 1.7647058823529411, 0, 14, 2.0, 1.0, 2.0, 36], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, 21, -13, 68, 2.3529411764705883, 1, 13, 3.0, 2.0, 2.0, 36], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, 21, -13, 68, 2.3529411764705883, 1, 13, 3.0, 2.0, 2.0, 36], after=[], op=DELETE, meta=()}")); @@ -666,11 +666,11 @@ void testBuiltinStringFunctions(ValuesDataSink.SinkApi sinkApi) throws Exception null, null)), Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` STRING,`col2` INT,`col3` STRING,`col4` STRING,`col5` STRING,`col6` STRING,`col7` STRING,`col8` STRING,`col9` STRING,`col10` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`col1` STRING,`col2` INT,`col3` STRING,`col4` STRING,`col5` STRING,`col6` STRING,`col7` STRING,`col8` STRING,`col9` STRING,`col10` STRING}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, Dear Alice, 5, ALICE, alice, Alice, **ice, A, l, ice, Alice - 1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, Dear Bob, 3, BOB, bob, Bob, **b, B, o, b, Bob - 2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, Dear Bob, 3, BOB, bob, Bob, **b, B, o, b, Bob - 2], after=[2, Bob, 30, Dear Bob, 3, BOB, bob, Bob, **b, B, o, b, Bob - 2], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` STRING,`col2` INT,`col3` STRING,`col4` STRING,`col5` STRING,`col6` STRING,`col7` STRING,`col8` STRING,`col9` STRING,`col10` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` STRING,`col2` INT,`col3` STRING,`col4` STRING,`col5` STRING,`col6` STRING,`col7` STRING,`col8` STRING,`col9` STRING,`col10` STRING}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, Dear Carol, 5, CAROL, carol, Carol, Carol, C, a, rol, Carol - 3], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, Dear Derrida, 7, DERRIDA, derrida, Derrida, Derrida, D, e, rrida, Derrida - 4], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, Dear Derrida, 7, DERRIDA, derrida, Derrida, Derrida, D, e, rrida, Derrida - 4], after=[], op=DELETE, meta=()}")); @@ -1078,7 +1078,7 @@ void testWildcardTransformWithSchemaEvolution() throws Exception { assertThat(outputEvents) .containsExactly( // Initial stage - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 21], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23], op=INSERT, meta=()}", @@ -1172,7 +1172,7 @@ void testExplicitTransformWithSchemaEvolution() throws Exception { assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`extend_id` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`extend_id` STRING}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 1 -> Alice], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 2 -> Barcarolle], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 3 -> Cecily], op=INSERT, meta=()}", @@ -1256,7 +1256,7 @@ void testPreAsteriskWithSchemaEvolution() throws Exception { assertThat(outputEvents) .containsExactly( // Initial stage - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`extend_id` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`extend_id` STRING}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 21, 1 -> Alice], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22, 2 -> Barcarolle], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23, 3 -> Cecily], op=INSERT, meta=()}", @@ -1351,7 +1351,7 @@ void testPostAsteriskWithSchemaEvolution() throws Exception { assertThat(outputEvents) .containsExactly( // Initial stage - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`extend_id` STRING,`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`extend_id` STRING,`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1 -> Alice, 1, Alice, 21], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2 -> Barcarolle, 2, Barcarolle, 22], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3 -> Cecily, 3, Cecily, 23], op=INSERT, meta=()}", @@ -1582,6 +1582,101 @@ void testTransformUnmatchedSchemaEvolution() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[12th, 15, Oops], after=[], op=DELETE, meta=()}"); } + @Test + void testExplicitPrimaryKeyWithNullable() throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + TableId tableId = TableId.tableId("default_namespace", "default_schema", "mytable1"); + List events = generateSchemaEvolutionEvents(tableId); + + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.mytable1", + null, + null, + "name", + "id,name", + null, + null, + null)), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + + assertThat(outputEvents) + .containsExactly( + // Initial stage + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=name, partitionKeys=id;name, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 21], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Cecily, 23], after=[3, Colin, 24], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Barcarolle, 22], after=[], op=DELETE, meta=()}", + + // Add column stage + "AddColumnEvent{tableId=default_namespace.default_schema.mytable1, addedColumns=[ColumnWithPosition{column=`rank` STRING, position=BEFORE, existedColumnName=id}, ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1st, 4, Derrida, 24, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2nd, 5, Eve, 25, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2nd, 5, Eve, 25, 1], after=[2nd, 5, Eva, 20, 2], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3rd, 6, Fiona, 26, 3], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", + + // Alter column type stage + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[6th, 9, IINA, 17.0, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6th, 9, IINA, 17.0, 0], after=[], op=DELETE, meta=()}", + + // Rename column stage + "RenameColumnEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=biological_sex, age=toshi}}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7th, 10, Julia, 24.0, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8th, 11, Kalle, 23.0, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8th, 11, Kalle, 23.0, 0], after=[8th, 11, Kella, 18.0, 0], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[9th, 12, Lynx, 17.0, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[9th, 12, Lynx, 17.0, 0], after=[], op=DELETE, meta=()}", + + // Drop column stage + "DropColumnEvent{tableId=default_namespace.default_schema.mytable1, droppedColumnNames=[biological_sex, toshi]}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[10th, 13, Munroe], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[11th, 14, Neko], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[11th, 14, Neko], after=[11th, 14, Nein], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[12th, 15, Oops], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[12th, 15, Oops], after=[], op=DELETE, meta=()}"); + } + @Test void testTransformWithCommentsAndDefaultExpr() throws Exception { FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); @@ -1660,11 +1755,11 @@ void testTransformWithCommentsAndDefaultExpr() throws Exception { Assertions.assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT 'id column' 'AUTO_INCREMENT()',`name` STRING 'name column' 'Jane Doe',`age` INT 'age column' '17',`new_name` STRING 'name column' 'Jane Doe',`new_age` INT,`extras` STRING}, primaryKeys=id, partitionKeys=id, age, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL 'id column' 'AUTO_INCREMENT()',`name` STRING 'name column' 'Jane Doe',`age` INT 'age column' '17',`new_name` STRING 'name column' 'Jane Doe',`new_age` INT,`extras` STRING}, primaryKeys=id, partitionKeys=id, age, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, Alice, 19, extras], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, Bob, 21, extras], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, Bob, 21, extras], after=[2, Bob, 30, Bob, 31, extras], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT 'column for id' 'AUTO_DECREMENT()',`name` VARCHAR(255) 'column for name' 'John Smith',`age` TINYINT 'column for age' '91',`description` STRING 'column for descriptions' 'not important',`new_name` VARCHAR(255) 'column for name' 'John Smith',`new_age` INT,`extras` STRING}, primaryKeys=id, partitionKeys=id, name, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL 'column for id' 'AUTO_DECREMENT()',`name` VARCHAR(255) 'column for name' 'John Smith',`age` TINYINT 'column for age' '91',`description` STRING 'column for descriptions' 'not important',`new_name` VARCHAR(255) 'column for name' 'John Smith',`new_age` INT,`extras` STRING}, primaryKeys=id, partitionKeys=id, name, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, Carol, 16, extras], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, Derrida, 26, extras], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, Derrida, 26, extras], after=[], op=DELETE, meta=()}"); @@ -1749,7 +1844,7 @@ private static String generateCastTo(String type) { void testNumericCastingsWithTruncation() throws Exception { assertThat(runNumericCastingWith("*")) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` TINYINT,`small_c` SMALLINT,`int_c` INT,`bigint_c` BIGINT,`float_c` FLOAT,`double_c` DOUBLE,`decimal_c` DECIMAL(10, 2),`valid_char_c` VARCHAR(17),`invalid_char_c` VARCHAR(17)}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` TINYINT,`small_c` SMALLINT,`int_c` INT,`bigint_c` BIGINT,`float_c` FLOAT,`double_c` DOUBLE,`decimal_c` DECIMAL(10, 2),`valid_char_c` VARCHAR(17),`invalid_char_c` VARCHAR(17)}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6.7, -8.9, -10.11, -12.13, foo], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0.0, 0.0, 0.00, 0, bar], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6.7, 8.9, 10.11, 12.13, baz], op=INSERT, meta=()}", @@ -1757,7 +1852,7 @@ void testNumericCastingsWithTruncation() throws Exception { assertThat(runNumericCastingWith(generateCastTo("BOOLEAN"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` BOOLEAN,`small_c` BOOLEAN,`int_c` BOOLEAN,`bigint_c` BOOLEAN,`float_c` BOOLEAN,`double_c` BOOLEAN,`decimal_c` BOOLEAN,`valid_char_c` BOOLEAN,`invalid_char_c` BOOLEAN}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` BOOLEAN,`small_c` BOOLEAN,`int_c` BOOLEAN,`bigint_c` BOOLEAN,`float_c` BOOLEAN,`double_c` BOOLEAN,`decimal_c` BOOLEAN,`valid_char_c` BOOLEAN,`invalid_char_c` BOOLEAN}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, true, true, true, true, true, true, true, false, false], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, false, false, false, false, false, false, false, false, false], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, true, true, true, true, true, true, true, false, false], op=INSERT, meta=()}", @@ -1765,7 +1860,7 @@ void testNumericCastingsWithTruncation() throws Exception { assertThat(runNumericCastingWith(generateCastTo("TINYINT"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` TINYINT,`small_c` TINYINT,`int_c` TINYINT,`bigint_c` TINYINT,`float_c` TINYINT,`double_c` TINYINT,`decimal_c` TINYINT,`valid_char_c` TINYINT,`invalid_char_c` TINYINT}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` TINYINT,`small_c` TINYINT,`int_c` TINYINT,`bigint_c` TINYINT,`float_c` TINYINT,`double_c` TINYINT,`decimal_c` TINYINT,`valid_char_c` TINYINT,`invalid_char_c` TINYINT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}", @@ -1773,7 +1868,7 @@ void testNumericCastingsWithTruncation() throws Exception { assertThat(runNumericCastingWith(generateCastTo("SMALLINT"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` SMALLINT,`small_c` SMALLINT,`int_c` SMALLINT,`bigint_c` SMALLINT,`float_c` SMALLINT,`double_c` SMALLINT,`decimal_c` SMALLINT,`valid_char_c` SMALLINT,`invalid_char_c` SMALLINT}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` SMALLINT,`small_c` SMALLINT,`int_c` SMALLINT,`bigint_c` SMALLINT,`float_c` SMALLINT,`double_c` SMALLINT,`decimal_c` SMALLINT,`valid_char_c` SMALLINT,`invalid_char_c` SMALLINT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}", @@ -1781,7 +1876,7 @@ void testNumericCastingsWithTruncation() throws Exception { assertThat(runNumericCastingWith(generateCastTo("INT"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` INT,`small_c` INT,`int_c` INT,`bigint_c` INT,`float_c` INT,`double_c` INT,`decimal_c` INT,`valid_char_c` INT,`invalid_char_c` INT}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` INT,`small_c` INT,`int_c` INT,`bigint_c` INT,`float_c` INT,`double_c` INT,`decimal_c` INT,`valid_char_c` INT,`invalid_char_c` INT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}", @@ -1789,7 +1884,7 @@ void testNumericCastingsWithTruncation() throws Exception { assertThat(runNumericCastingWith(generateCastTo("BIGINT"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` BIGINT,`small_c` BIGINT,`int_c` BIGINT,`bigint_c` BIGINT,`float_c` BIGINT,`double_c` BIGINT,`decimal_c` BIGINT,`valid_char_c` BIGINT,`invalid_char_c` BIGINT}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` BIGINT,`small_c` BIGINT,`int_c` BIGINT,`bigint_c` BIGINT,`float_c` BIGINT,`double_c` BIGINT,`decimal_c` BIGINT,`valid_char_c` BIGINT,`invalid_char_c` BIGINT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}", @@ -1797,7 +1892,7 @@ void testNumericCastingsWithTruncation() throws Exception { assertThat(runNumericCastingWith(generateCastTo("FLOAT"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` FLOAT,`small_c` FLOAT,`int_c` FLOAT,`bigint_c` FLOAT,`float_c` FLOAT,`double_c` FLOAT,`decimal_c` FLOAT,`valid_char_c` FLOAT,`invalid_char_c` FLOAT}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` FLOAT,`small_c` FLOAT,`int_c` FLOAT,`bigint_c` FLOAT,`float_c` FLOAT,`double_c` FLOAT,`decimal_c` FLOAT,`valid_char_c` FLOAT,`invalid_char_c` FLOAT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.7, -8.9, -10.11, -12.13, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.7, 8.9, 10.11, 12.13, null], op=INSERT, meta=()}", @@ -1805,7 +1900,7 @@ void testNumericCastingsWithTruncation() throws Exception { assertThat(runNumericCastingWith(generateCastTo("DOUBLE"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DOUBLE,`small_c` DOUBLE,`int_c` DOUBLE,`bigint_c` DOUBLE,`float_c` DOUBLE,`double_c` DOUBLE,`decimal_c` DOUBLE,`valid_char_c` DOUBLE,`invalid_char_c` DOUBLE}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DOUBLE,`small_c` DOUBLE,`int_c` DOUBLE,`bigint_c` DOUBLE,`float_c` DOUBLE,`double_c` DOUBLE,`decimal_c` DOUBLE,`valid_char_c` DOUBLE,`invalid_char_c` DOUBLE}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.699999809265137, -8.9, -10.11, -12.13, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.699999809265137, 8.9, 10.11, 12.13, null], op=INSERT, meta=()}", @@ -1813,7 +1908,7 @@ void testNumericCastingsWithTruncation() throws Exception { assertThat(runNumericCastingWith(generateCastTo("DECIMAL(1, 0)"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(1, 0),`small_c` DECIMAL(1, 0),`int_c` DECIMAL(1, 0),`bigint_c` DECIMAL(1, 0),`float_c` DECIMAL(1, 0),`double_c` DECIMAL(1, 0),`decimal_c` DECIMAL(1, 0),`valid_char_c` DECIMAL(1, 0),`invalid_char_c` DECIMAL(1, 0)}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(1, 0),`small_c` DECIMAL(1, 0),`int_c` DECIMAL(1, 0),`bigint_c` DECIMAL(1, 0),`float_c` DECIMAL(1, 0),`double_c` DECIMAL(1, 0),`decimal_c` DECIMAL(1, 0),`valid_char_c` DECIMAL(1, 0),`invalid_char_c` DECIMAL(1, 0)}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -7, -9, null, null, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 7, 9, null, null, null], op=INSERT, meta=()}", @@ -1821,7 +1916,7 @@ void testNumericCastingsWithTruncation() throws Exception { assertThat(runNumericCastingWith(generateCastTo("DECIMAL(2, 0)"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(2, 0),`small_c` DECIMAL(2, 0),`int_c` DECIMAL(2, 0),`bigint_c` DECIMAL(2, 0),`float_c` DECIMAL(2, 0),`double_c` DECIMAL(2, 0),`decimal_c` DECIMAL(2, 0),`valid_char_c` DECIMAL(2, 0),`invalid_char_c` DECIMAL(2, 0)}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(2, 0),`small_c` DECIMAL(2, 0),`int_c` DECIMAL(2, 0),`bigint_c` DECIMAL(2, 0),`float_c` DECIMAL(2, 0),`double_c` DECIMAL(2, 0),`decimal_c` DECIMAL(2, 0),`valid_char_c` DECIMAL(2, 0),`invalid_char_c` DECIMAL(2, 0)}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -7, -9, -10, -12, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 7, 9, 10, 12, null], op=INSERT, meta=()}", @@ -1829,7 +1924,7 @@ void testNumericCastingsWithTruncation() throws Exception { assertThat(runNumericCastingWith(generateCastTo("DECIMAL(3, 1)"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(3, 1),`small_c` DECIMAL(3, 1),`int_c` DECIMAL(3, 1),`bigint_c` DECIMAL(3, 1),`float_c` DECIMAL(3, 1),`double_c` DECIMAL(3, 1),`decimal_c` DECIMAL(3, 1),`valid_char_c` DECIMAL(3, 1),`invalid_char_c` DECIMAL(3, 1)}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(3, 1),`small_c` DECIMAL(3, 1),`int_c` DECIMAL(3, 1),`bigint_c` DECIMAL(3, 1),`float_c` DECIMAL(3, 1),`double_c` DECIMAL(3, 1),`decimal_c` DECIMAL(3, 1),`valid_char_c` DECIMAL(3, 1),`invalid_char_c` DECIMAL(3, 1)}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.7, -8.9, -10.1, -12.1, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.7, 8.9, 10.1, 12.1, null], op=INSERT, meta=()}", @@ -1837,7 +1932,7 @@ void testNumericCastingsWithTruncation() throws Exception { assertThat(runNumericCastingWith(generateCastTo("DECIMAL(19, 10)"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(19, 10),`small_c` DECIMAL(19, 10),`int_c` DECIMAL(19, 10),`bigint_c` DECIMAL(19, 10),`float_c` DECIMAL(19, 10),`double_c` DECIMAL(19, 10),`decimal_c` DECIMAL(19, 10),`valid_char_c` DECIMAL(19, 10),`invalid_char_c` DECIMAL(19, 10)}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(19, 10),`small_c` DECIMAL(19, 10),`int_c` DECIMAL(19, 10),`bigint_c` DECIMAL(19, 10),`float_c` DECIMAL(19, 10),`double_c` DECIMAL(19, 10),`decimal_c` DECIMAL(19, 10),`valid_char_c` DECIMAL(19, 10),`invalid_char_c` DECIMAL(19, 10)}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0000000000, -3.0000000000, -4.0000000000, -5.0000000000, -6.7000000000, -8.9000000000, -10.1100000000, -12.1300000000, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0000000000, 3.0000000000, 4.0000000000, 5.0000000000, 6.7000000000, 8.9000000000, 10.1100000000, 12.1300000000, null], op=INSERT, meta=()}", @@ -1904,7 +1999,7 @@ void testTransformWithLargeLiterals() throws Exception { assertThat(outputEvents) .containsExactly( // Initial stage - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`int_max` INT,`greater_than_int_max` BIGINT,`int_min` INT,`less_than_int_min` BIGINT,`really_big_decimal` DECIMAL(19, 0)}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`int_max` INT,`greater_than_int_max` BIGINT,`int_min` INT,`less_than_int_min` BIGINT,`really_big_decimal` DECIMAL(19, 0)}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Cecily, 23, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[3, Colin, 24, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}", diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java index b97b4b6de7f..5cf424cd379 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java @@ -165,7 +165,7 @@ void testTransformWithUdf(ValuesDataSink.SinkApi sinkApi, String language) throw String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`fmt` STRING}, primaryKeys=col1, options=({key1=value1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`fmt` STRING}, primaryKeys=col1, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, from 1 to z is lie], op=INSERT, meta=({op_ts=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, from 2 to z is lie], op=INSERT, meta=({op_ts=2})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, from 3 to z is lie], op=INSERT, meta=({op_ts=3})}", @@ -236,7 +236,7 @@ void testFilterWithUdf(ValuesDataSink.SinkApi sinkApi, String language) throws E String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`collen` STRING}, primaryKeys=col1, options=({key1=value1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`collen` STRING}, primaryKeys=col1, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 3], op=INSERT, meta=({op_ts=2})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, 4], op=INSERT, meta=({op_ts=3})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", @@ -305,7 +305,7 @@ void testOverloadedUdf(ValuesDataSink.SinkApi sinkApi, String language) throws E String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`tob` STRING,`toi` STRING,`tof` STRING,`tos` STRING}, primaryKeys=col1, options=({key1=value1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`tob` STRING,`toi` STRING,`tof` STRING,`tos` STRING}, primaryKeys=col1, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=({op_ts=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=({op_ts=2})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=({op_ts=3})}", @@ -377,7 +377,7 @@ void testUdfLifecycle(ValuesDataSink.SinkApi sinkApi, String language) throws Ex assertThat(outputEvents) .contains("[ LifecycleFunction ] opened.") .contains( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`stt` STRING}, primaryKeys=col1, options=({key1=value1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`stt` STRING}, primaryKeys=col1, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, #0], op=INSERT, meta=({op_ts=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, #1], op=INSERT, meta=({op_ts=2})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, #2], op=INSERT, meta=({op_ts=3})}", @@ -449,7 +449,7 @@ void testTypeHintedUdf(ValuesDataSink.SinkApi sinkApi, String language) throws E String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`ans` STRING}, primaryKeys=col1, options=({key1=value1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`ans` STRING}, primaryKeys=col1, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, Forty-two], op=INSERT, meta=({op_ts=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, Forty-two], op=INSERT, meta=({op_ts=2})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, Forty-two], op=INSERT, meta=({op_ts=3})}", @@ -534,7 +534,7 @@ void testComplicatedUdf(ValuesDataSink.SinkApi sinkApi, String language) throws assertThat(outputEvents) .contains("[ LifecycleFunction ] opened.") .contains( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`inccol` STRING,`typ` STRING,`fmt` STRING,`stt` STRING}, primaryKeys=col1, options=({key1=value1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`inccol` STRING,`typ` STRING,`fmt` STRING,`stt` STRING}, primaryKeys=col1, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 3, Integer: 42, 1-42, #0], op=INSERT, meta=({op_ts=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 4, Integer: 42, 2-42, #1], op=INSERT, meta=({op_ts=2})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, 5, Integer: 42, 3-42, #2], op=INSERT, meta=({op_ts=3})}", @@ -609,7 +609,7 @@ void testTransformWithFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`fmt` STRING}, primaryKeys=col1, options=({key1=value1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`fmt` STRING}, primaryKeys=col1, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, from 1 to z is lie], op=INSERT, meta=({op_ts=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, from 2 to z is lie], op=INSERT, meta=({op_ts=2})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, from 3 to z is lie], op=INSERT, meta=({op_ts=3})}", @@ -679,7 +679,7 @@ void testFilterWithFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) thr String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`collen` STRING}, primaryKeys=col1, options=({key1=value1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`collen` STRING}, primaryKeys=col1, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 3], op=INSERT, meta=({op_ts=2})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, 4], op=INSERT, meta=({op_ts=3})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", @@ -747,7 +747,7 @@ void testOverloadedFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) thr String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`tob` STRING,`toi` STRING,`tof` STRING,`tos` STRING}, primaryKeys=col1, options=({key1=value1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`tob` STRING,`toi` STRING,`tof` STRING,`tos` STRING}, primaryKeys=col1, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=({op_ts=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=({op_ts=2})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=({op_ts=3})}", @@ -825,8 +825,8 @@ void testComplicatedFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) th // Check the order and content of all received events String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) - .contains( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`inccol` STRING,`typ` STRING,`fmt` STRING}, primaryKeys=col1, options=({key1=value1})}", + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`inccol` STRING,`typ` STRING,`fmt` STRING}, primaryKeys=col1, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 3, Integer: 42, 1-42], op=INSERT, meta=({op_ts=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 4, Integer: 42, 2-42], op=INSERT, meta=({op_ts=2})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, 5, Integer: 42, 3-42], op=INSERT, meta=({op_ts=3})}", @@ -902,7 +902,7 @@ void testTransformWithModel(ValuesDataSink.SinkApi sinkApi) throws Exception { String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .contains( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`emb` STRING}, primaryKeys=col1, options=({key1=value1})}") + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`emb` STRING}, primaryKeys=col1, options=({key1=value1})}") // The result of transform by model is not fixed. .hasSize(9); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index d6072102737..c565b561047 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -373,7 +373,7 @@ private Schema transformSchema(TableId tableId, Schema schema) { Tuple2.of(tableId, transformProjection)); // update the columns of projection and add the column of projection into Schema newSchemas.add( - postTransformProcessor.processSchemaChangeEvent( + postTransformProcessor.processSchema( schema, transform.getSupportedMetadataColumns())); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java index abedd912083..dc8c099d60c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java @@ -30,9 +30,11 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; /** @@ -124,8 +126,7 @@ public static TransformProjectionProcessor of( supportedMetadataColumns); } - public Schema processSchemaChangeEvent( - Schema schema, SupportedMetadataColumn[] supportedMetadataColumns) { + public Schema processSchema(Schema schema, SupportedMetadataColumn[] supportedMetadataColumns) { List projectionColumns = TransformParser.generateProjectionColumns( transformProjection.getProjection(), @@ -133,12 +134,22 @@ public Schema processSchemaChangeEvent( udfDescriptors, supportedMetadataColumns); transformProjection.setProjectionColumns(projectionColumns); + Set primaryKeys = new HashSet<>(schema.primaryKeys()); return schema.copy( projectionColumns.stream() .map(ProjectionColumn::getColumn) + .map(column -> setPkNonNull(primaryKeys, column)) .collect(Collectors.toList())); } + private static Column setPkNonNull(Set primaryKeys, Column column) { + if (primaryKeys.contains(column.getName())) { + return column.copy(column.getType().notNull()); + } else { + return column; + } + } + public BinaryRecordData processData( BinaryRecordData payload, long epochTime, String opType, Map meta) { List valueList = new ArrayList<>(); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java index 4d873384561..a7dc61ac4b6 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java @@ -45,7 +45,7 @@ public class PostTransformOperatorTest { TableId.tableId("my_company", "my_branch", "customers"); private static final Schema CUSTOMERS_SCHEMA = Schema.newBuilder() - .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col1", DataTypes.STRING().notNull()) .physicalColumn("col2", DataTypes.STRING()) .physicalColumn("col12", DataTypes.STRING()) .primaryKey("col1") @@ -55,7 +55,7 @@ public class PostTransformOperatorTest { TableId.tableId("my_company", "my_branch", "data_types"); private static final Schema DATATYPE_SCHEMA = Schema.newBuilder() - .physicalColumn("colString", DataTypes.STRING()) + .physicalColumn("colString", DataTypes.STRING().notNull()) .physicalColumn("colBoolean", DataTypes.BOOLEAN()) .physicalColumn("colTinyint", DataTypes.TINYINT()) .physicalColumn("colSmallint", DataTypes.SMALLINT()) @@ -74,12 +74,12 @@ public class PostTransformOperatorTest { TableId.tableId("my_company", "my_branch", "metadata_table"); private static final Schema METADATA_SCHEMA = Schema.newBuilder() - .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col1", DataTypes.STRING().notNull()) .primaryKey("col1") .build(); private static final Schema EXPECTED_METADATA_SCHEMA = Schema.newBuilder() - .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col1", DataTypes.STRING().notNull()) .physicalColumn("identifier_name", DataTypes.STRING()) .physicalColumn("__namespace_name__", DataTypes.STRING().notNull()) .physicalColumn("__schema_name__", DataTypes.STRING().notNull()) @@ -91,7 +91,7 @@ public class PostTransformOperatorTest { TableId.tableId("my_company", "my_branch", "metadata_as_table"); private static final Schema METADATA_AS_SCHEMA = Schema.newBuilder() - .physicalColumn("sid", DataTypes.INT()) + .physicalColumn("sid", DataTypes.INT().notNull()) .physicalColumn("name", DataTypes.STRING()) .physicalColumn("name_upper", DataTypes.STRING()) .physicalColumn("tbname", DataTypes.STRING().notNull()) @@ -102,7 +102,7 @@ public class PostTransformOperatorTest { TableId.tableId("my_company", "my_branch", "timestamp_table"); private static final Schema TIMESTAMP_SCHEMA = Schema.newBuilder() - .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col1", DataTypes.STRING().notNull()) .physicalColumn("time_equal", DataTypes.INT()) .physicalColumn("timestamp_equal", DataTypes.INT()) .physicalColumn("date_equal", DataTypes.INT()) @@ -113,7 +113,7 @@ public class PostTransformOperatorTest { TableId.tableId("my_company", "my_branch", "timestampdiff_table"); private static final Schema TIMESTAMPDIFF_SCHEMA = Schema.newBuilder() - .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col1", DataTypes.STRING().notNull()) .physicalColumn("second_diff", DataTypes.INT()) .physicalColumn("minute_diff", DataTypes.INT()) .physicalColumn("hour_diff", DataTypes.INT()) @@ -125,7 +125,7 @@ public class PostTransformOperatorTest { TableId.tableId("my_company", "my_branch", "data_null"); private static final Schema NULL_SCHEMA = Schema.newBuilder() - .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col1", DataTypes.STRING().notNull()) .physicalColumn("colString", DataTypes.STRING()) .physicalColumn("nullInt", DataTypes.INT()) .physicalColumn("nullBoolean", DataTypes.BOOLEAN()) @@ -145,7 +145,7 @@ public class PostTransformOperatorTest { TableId.tableId("my_company", "my_branch", "data_cast"); private static final Schema CAST_SCHEMA = Schema.newBuilder() - .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col1", DataTypes.STRING().notNull()) .physicalColumn("castInt", DataTypes.INT()) .physicalColumn("castBoolean", DataTypes.BOOLEAN()) .physicalColumn("castTinyint", DataTypes.TINYINT()) @@ -164,7 +164,7 @@ public class PostTransformOperatorTest { TableId.tableId("my_company", "my_branch", "timezone_table"); private static final Schema TIMEZONE_SCHEMA = Schema.newBuilder() - .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col1", DataTypes.STRING().notNull()) .physicalColumn("datetime", DataTypes.STRING()) .primaryKey("col1") .build(); @@ -173,7 +173,7 @@ public class PostTransformOperatorTest { TableId.tableId("my_company", "my_branch", "condition_table"); private static final Schema CONDITION_SCHEMA = Schema.newBuilder() - .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col1", DataTypes.STRING().notNull()) .physicalColumn("condition_result", DataTypes.BOOLEAN()) .primaryKey("col1") .build(); @@ -232,7 +232,7 @@ public class PostTransformOperatorTest { TableId.tableId("my_company", "my_branch", "column_square"); private static final Schema COLUMN_SQUARE_SCHEMA = Schema.newBuilder() - .physicalColumn("col1", DataTypes.INT()) + .physicalColumn("col1", DataTypes.INT().notNull()) .physicalColumn("col2", DataTypes.INT()) .physicalColumn("square_col2", DataTypes.INT()) .primaryKey("col1") @@ -604,7 +604,7 @@ void testSoftDeleteTransform() throws Exception { RegularEventOperatorTestHarness.with(transform, 1); Schema expectedSchema = Schema.newBuilder() - .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col1", DataTypes.STRING().notNull()) .physicalColumn("identifier_name", DataTypes.STRING()) .physicalColumn("__namespace_name__", DataTypes.STRING().notNull()) .physicalColumn("__schema_name__", DataTypes.STRING().notNull()) diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java index ffc6da850f2..50521e2a5b3 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java @@ -245,7 +245,7 @@ public void testIrrelevantSchemaChangeInExplicitTransformRules() throws Exceptio .primaryKey("id") .build(), Schema.newBuilder() - .physicalColumn("id", DataTypes.INT()) + .physicalColumn("id", DataTypes.INT().notNull()) .physicalColumn("age", DataTypes.INT()) .physicalColumn("computed", DataTypes.INT()) .primaryKey("id") @@ -302,7 +302,7 @@ public void testSemiRelevantSchemaChangeInExplicitTransformRules() throws Except .primaryKey("id") .build(), Schema.newBuilder() - .physicalColumn("id", DataTypes.INT()) + .physicalColumn("id", DataTypes.INT().notNull()) .physicalColumn("age", DataTypes.INT()) .physicalColumn("computed", DataTypes.INT()) .primaryKey("id") @@ -362,7 +362,7 @@ public void testRelevantColumnSchemaInExplicitTransformRules() throws Exception .primaryKey("id") .build(), Schema.newBuilder() - .physicalColumn("id", DataTypes.INT()) + .physicalColumn("id", DataTypes.INT().notNull()) .physicalColumn("age", DataTypes.INT()) .physicalColumn("name", DataTypes.STRING()) .physicalColumn("computed", DataTypes.INT()) @@ -412,7 +412,7 @@ public void testSchemaChangeWithPreWildcard() throws Exception { .primaryKey("id") .build(), Schema.newBuilder() - .physicalColumn("id", DataTypes.INT()) + .physicalColumn("id", DataTypes.INT().notNull()) .physicalColumn("name", DataTypes.STRING()) .physicalColumn("age", DataTypes.INT()) .physicalColumn("computed", DataTypes.INT()) @@ -547,7 +547,7 @@ public void testSchemaChangeWithMidWildcard() throws Exception { .build(), Schema.newBuilder() .physicalColumn("computed1", DataTypes.INT()) - .physicalColumn("id", DataTypes.INT()) + .physicalColumn("id", DataTypes.INT().notNull()) .physicalColumn("name", DataTypes.STRING()) .physicalColumn("age", DataTypes.INT()) .physicalColumn("computed2", DataTypes.INT()) @@ -682,7 +682,7 @@ public void testSchemaChangeWithPostWildcard() throws Exception { .build(), Schema.newBuilder() .physicalColumn("computed", DataTypes.INT()) - .physicalColumn("id", DataTypes.INT()) + .physicalColumn("id", DataTypes.INT().notNull()) .physicalColumn("name", DataTypes.STRING()) .physicalColumn("age", DataTypes.INT()) .primaryKey("id") diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java index 7a048270686..59abfa06a7b 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java @@ -350,7 +350,7 @@ public void testDataChangeEventTransform() throws Exception { .primaryKey("id") .build(), Schema.newBuilder() - .physicalColumn("id", DataTypes.INT()) + .physicalColumn("id", DataTypes.INT().notNull()) .physicalColumn("age", DataTypes.INT()) .physicalColumn("computed", DataTypes.INT()) .primaryKey("id") @@ -396,7 +396,7 @@ public void testSchemaNullabilityTransform() throws Exception { .primaryKey("id") .build(), Schema.newBuilder() - .physicalColumn("id", DataTypes.INT()) + .physicalColumn("id", DataTypes.INT().notNull()) .physicalColumn("name", DataTypes.STRING().notNull()) .physicalColumn("age", DataTypes.INT().notNull()) .physicalColumn("computed", DataTypes.INT()) @@ -515,7 +515,7 @@ public void testWildcardTransform() throws Exception { .primaryKey("id") .build(), Schema.newBuilder() - .physicalColumn("id", DataTypes.INT()) + .physicalColumn("id", DataTypes.INT().notNull()) .physicalColumn("name", DataTypes.STRING()) .physicalColumn("age", DataTypes.INT()) .physicalColumn("computed", DataTypes.INT()) @@ -562,7 +562,7 @@ public void testWildcardTransform() throws Exception { .build(), Schema.newBuilder() .physicalColumn("computed", DataTypes.INT()) - .physicalColumn("id", DataTypes.INT()) + .physicalColumn("id", DataTypes.INT().notNull()) .physicalColumn("name", DataTypes.STRING()) .physicalColumn("age", DataTypes.INT()) .primaryKey("id") @@ -611,7 +611,7 @@ public void testMetadataTransform() throws Exception { .primaryKey("id") .build(), Schema.newBuilder() - .physicalColumn("id", DataTypes.INT()) + .physicalColumn("id", DataTypes.INT().notNull()) .physicalColumn("name", DataTypes.STRING()) .physicalColumn("age", DataTypes.INT()) .physicalColumn("__namespace_name__", DataTypes.STRING().notNull()) @@ -665,7 +665,7 @@ public void testCalculatedMetadataTransform() throws Exception { .primaryKey("id") .build(), Schema.newBuilder() - .physicalColumn("id", DataTypes.INT()) + .physicalColumn("id", DataTypes.INT().notNull()) .physicalColumn("name", DataTypes.STRING()) .physicalColumn("age", DataTypes.INT()) .physicalColumn("identifier_name", DataTypes.STRING()) @@ -714,7 +714,7 @@ public void testCalculatedMetadataTransform() throws Exception { .build(), Schema.newBuilder() .physicalColumn("identifier_name", DataTypes.STRING()) - .physicalColumn("id", DataTypes.INT()) + .physicalColumn("id", DataTypes.INT().notNull()) .physicalColumn("name", DataTypes.STRING()) .physicalColumn("age", DataTypes.INT()) .primaryKey("id") @@ -765,7 +765,7 @@ public void testMetadataAndCalculatedTransform() throws Exception { .primaryKey("id") .build(), Schema.newBuilder() - .physicalColumn("id", DataTypes.INT()) + .physicalColumn("id", DataTypes.INT().notNull()) .physicalColumn("name", DataTypes.STRING()) .physicalColumn("age", DataTypes.INT()) .physicalColumn("identifier_name", DataTypes.STRING()) @@ -855,7 +855,7 @@ public void testMetadataAndCalculatedTransform() throws Exception { .physicalColumn("__namespace_name__", DataTypes.STRING().notNull()) .physicalColumn("__schema_name__", DataTypes.STRING().notNull()) .physicalColumn("__table_name__", DataTypes.STRING().notNull()) - .physicalColumn("id", DataTypes.INT()) + .physicalColumn("id", DataTypes.INT().notNull()) .physicalColumn("name", DataTypes.STRING()) .physicalColumn("age", DataTypes.INT()) .primaryKey("id") @@ -942,7 +942,7 @@ public void testMetadataTransformIncludeMetaColumnString() throws Exception { .primaryKey("id") .build(), Schema.newBuilder() - .physicalColumn("id", DataTypes.INT()) + .physicalColumn("id", DataTypes.INT().notNull()) .physicalColumn("name", DataTypes.STRING().notNull()) .physicalColumn("age", DataTypes.INT().notNull()) .physicalColumn("computed", DataTypes.INT()) @@ -1065,7 +1065,10 @@ public void testTransformWithCommentsAndExpressions() throws Exception { .build(), Schema.newBuilder() .physicalColumn( - "id", DataTypes.INT(), "id column", "AUTO_INCREMENT()") + "id", + DataTypes.INT().notNull(), + "id column", + "AUTO_INCREMENT()") .physicalColumn( "name", DataTypes.STRING(), "name column", "John Smith") .physicalColumn("age", DataTypes.INT(), "age column", "17") @@ -1098,7 +1101,10 @@ public void testTransformWithCommentsAndExpressions() throws Exception { .build(), Schema.newBuilder() .physicalColumn( - "id", DataTypes.INT(), "id column", "AUTO_INCREMENT()") + "id", + DataTypes.INT().notNull(), + "id column", + "AUTO_INCREMENT()") .physicalColumn("age", DataTypes.INT(), "age column", "17") .physicalColumn("computed", DataTypes.INT()) .primaryKey("id") @@ -1145,7 +1151,10 @@ public void testTransformWithCommentsAndExpressions() throws Exception { .physicalColumn( "name", DataTypes.STRING(), "name column", "John Smith") .physicalColumn( - "id", DataTypes.INT(), "id column", "AUTO_INCREMENT()") + "id", + DataTypes.INT().notNull(), + "id column", + "AUTO_INCREMENT()") .primaryKey("id") .build()) .initializeHarness()