From 80a03bd34be15ab014678fee7306cbdfe9735740 Mon Sep 17 00:00:00 2001 From: Farhan Ahmed Date: Wed, 4 Oct 2023 17:14:22 -0400 Subject: [PATCH] feat: Adding CDC Sample --- .../AppendCompleteCallback.java | 62 +++++++ .../BqToBqStorageSchemaConverter.java | 100 ++++++++++++ .../bigquerystorage/JsonWriterStreamCdc.java | 151 ++++++++++++++++++ 3 files changed, 313 insertions(+) create mode 100644 samples/snippets/src/main/java/com/example/bigquerystorage/AppendCompleteCallback.java create mode 100644 samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java create mode 100644 samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/AppendCompleteCallback.java b/samples/snippets/src/main/java/com/example/bigquerystorage/AppendCompleteCallback.java new file mode 100644 index 0000000000..4556cd66fa --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/AppendCompleteCallback.java @@ -0,0 +1,62 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import org.json.JSONArray; +import org.json.JSONObject; + +class AppendCompleteCallback implements ApiFutureCallback { + private static final Object lock = new Object(); + private static int batchCount = 0; + + public void onSuccess(AppendRowsResponse response) { + synchronized (lock) { + if (response.hasError()) { + System.out.format("Error: %s\n", response.getError()); + } else { + ++batchCount; + System.out.format("Wrote batch %d\n", batchCount); + } + } + } + + public void onFailure(Throwable throwable) { + System.out.format("Error: %s\n", throwable.toString()); + } +} diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java b/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java new file mode 100644 index 0000000000..0c1ac380c0 --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java @@ -0,0 +1,100 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Class is copied from java-bigquerystorage/samples snippet, as a temporary workaround + * to the fact there is no built-in converter between the REST object + * {@see com.google.cloud.bigquery.Schema} + * and the gRPC/Proto based {@see com.google.cloud.bigquery.storage.v1.TableSchema}. + * https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java + */ + +package com.example.bigquerystorage; + +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.common.collect.ImmutableMap; + +/** Converts structure from BigQuery client to BigQueryStorage client */ +public class BqToBqStorageSchemaConverter { + private static ImmutableMap BQTableSchemaModeMap = + ImmutableMap.of( + Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE, + Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED, + Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED); + + private static ImmutableMap BQTableSchemaTypeMap = + new ImmutableMap.Builder() + .put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL) + .put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES) + .put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE) + .put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME) + .put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE) + .put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY) + .put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64) + .put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC) + .put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING) + .put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT) + .put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME) + .put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP) + // Below this comment: mappings manually added by the Datastream team + .put(StandardSQLTypeName.BIGNUMERIC, TableFieldSchema.Type.BIGNUMERIC) + .put(StandardSQLTypeName.JSON, TableFieldSchema.Type.JSON) + .put(StandardSQLTypeName.INTERVAL, TableFieldSchema.Type.INTERVAL) + .build(); + + /** + * Converts from BigQuery client Table Schema to bigquery storage API Table Schema. + * + * @param schema the BigQuery client Table Schema + * @return the bigquery storage API Table Schema + */ + public static TableSchema convertTableSchema(Schema schema) { + TableSchema.Builder result = TableSchema.newBuilder(); + for (int i = 0; i < schema.getFields().size(); i++) { + result.addFields(i, convertFieldSchema(schema.getFields().get(i))); + } + return result.build(); + } + + /** + * Converts from bigquery v2 Field Schema to bigquery storage API Field Schema. + * + * @param field the BigQuery client Field Schema + * @return the bigquery storage API Field Schema + */ + public static TableFieldSchema convertFieldSchema(Field field) { + TableFieldSchema.Builder result = TableFieldSchema.newBuilder(); + if (field.getMode() == null) { + field = field.toBuilder().setMode(Field.Mode.NULLABLE).build(); + } + result.setMode(BQTableSchemaModeMap.get(field.getMode())); + result.setName(field.getName()); + result.setType(BQTableSchemaTypeMap.get(field.getType().getStandardType())); + if (field.getDescription() != null) { + result.setDescription(field.getDescription()); + } + if (field.getSubFields() != null) { + for (int i = 0; i < field.getSubFields().size(); i++) { + result.addFields(i, convertFieldSchema(field.getSubFields().get(i))); + } + } + return result.build(); + } +} diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java b/samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java new file mode 100644 index 0000000000..46dc09bb5d --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java @@ -0,0 +1,151 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import org.json.JSONArray; +import org.json.JSONObject; + +public class JsonWriterStreamCdc { + + private static final String CHANGE_TYPE_PSEUDO_COLUMN = "_change_type"; + + public static void main(String[] args) throws Exception { + if (args.length < 4) { + System.out.println("Arguments: project, dataset, table, source_file"); + return; + } + + String projectId = args[0]; + String datasetName = args[1]; + String tableName = args[2]; + String dataFile = args[3]; + createDestinationTable(projectId, datasetName, tableName); + writeToDefaultStream(projectId, datasetName, tableName, dataFile); + } + + public static void createDestinationTable( + String projectId, String datasetName, String tableName) { + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + // Create a schema that matches the source data. + Schema schema = + Schema.of( + Field.of("commit", StandardSQLTypeName.STRING), + Field.newBuilder("parent", StandardSQLTypeName.STRING) + .setMode(Field.Mode.REPEATED) + .build(), + Field.of("author", StandardSQLTypeName.STRING), + Field.of("committer", StandardSQLTypeName.STRING), + Field.of("commit_date", StandardSQLTypeName.DATETIME), + Field.of( + "commit_msg", + StandardSQLTypeName.STRUCT, + FieldList.of( + Field.of("subject", StandardSQLTypeName.STRING), + Field.of("message", StandardSQLTypeName.STRING))), + Field.of("repo_name", StandardSQLTypeName.STRING)); + + // Create a table that uses this schema. + TableId tableId = TableId.of(projectId, datasetName, tableName); + Table table = bigquery.getTable(tableId); + if (table == null) { + TableInfo tableInfo = + TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build(); + bigquery.create(tableInfo); + } + } + + // writeToDefaultStream: Writes records from the source file to the destination table. + public static void writeToDefaultStream( + String projectId, String datasetName, String tableName, String dataFile) + throws DescriptorValidationException, InterruptedException, IOException { + + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + + // Get the schema of the destination table and convert to the equivalent BigQueryStorage type. + Table table = bigquery.getTable(datasetName, tableName); + Schema schema = table.getDefinition().getSchema(); + TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema); + + // Use the JSON stream writer to send records in JSON format. + TableName parentTable = TableName.of(projectId, datasetName, tableName); + try (JsonStreamWriter writer = + JsonStreamWriter.newBuilder(parentTable.toString(), + addPseudoColumnsIfNeeded(tableSchema)) + .build()) { + // Read JSON data from the source file and send it to the Write API. + BufferedReader reader = new BufferedReader(new FileReader(dataFile)); + String line = reader.readLine(); + while (line != null) { + // As a best practice, send batches of records, instead of single records at a time. + JSONArray jsonArr = new JSONArray(); + for (int i = 0; i < 100; i++) { + JSONObject record = new JSONObject(line); + jsonArr.put(record); + line = reader.readLine(); + if (line == null) { + break; + } + } // batch + ApiFuture future = writer.append(jsonArr); + // The append method is asynchronous. Rather than waiting for the method to complete, + // which can hurt performance, register a completion callback and continue streaming. + ApiFutures.addCallback( + future, new AppendCompleteCallback(), MoreExecutors.directExecutor()); + } + } + } + + private static TableSchema addPseudoColumnsIfNeeded(TableSchema tableSchema) { + return tableSchema + .toBuilder() + .addFields( + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.STRING) + .setMode(TableFieldSchema.Mode.NULLABLE) + .build()) + .addFields( + TableFieldSchema.newBuilder() + .setName(CHANGE_TYPE_PSEUDO_COLUMN) + .setType(TableFieldSchema.Type.STRING) + .setMode(TableFieldSchema.Mode.NULLABLE) + .build()) + .build(); + } +}