Skip to content

Commit

Permalink
feat: Adding CDC Sample
Browse files Browse the repository at this point in the history
  • Loading branch information
farhan0102 committed Oct 17, 2023
1 parent 5df548a commit 80a03bd
Show file tree
Hide file tree
Showing 3 changed files with 313 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.bigquerystorage;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import org.json.JSONArray;
import org.json.JSONObject;

class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {
private static final Object lock = new Object();
private static int batchCount = 0;

public void onSuccess(AppendRowsResponse response) {
synchronized (lock) {
if (response.hasError()) {
System.out.format("Error: %s\n", response.getError());
} else {
++batchCount;
System.out.format("Wrote batch %d\n", batchCount);
}
}
}

public void onFailure(Throwable throwable) {
System.out.format("Error: %s\n", throwable.toString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Class is copied from java-bigquerystorage/samples snippet, as a temporary workaround
* to the fact there is no built-in converter between the REST object
* {@see com.google.cloud.bigquery.Schema}
* and the gRPC/Proto based {@see com.google.cloud.bigquery.storage.v1.TableSchema}.
* https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java
*/

package com.example.bigquerystorage;

import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.common.collect.ImmutableMap;

/** Converts structure from BigQuery client to BigQueryStorage client */
public class BqToBqStorageSchemaConverter {
private static ImmutableMap<Field.Mode, TableFieldSchema.Mode> BQTableSchemaModeMap =
ImmutableMap.of(
Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE,
Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED,
Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED);

private static ImmutableMap<StandardSQLTypeName, TableFieldSchema.Type> BQTableSchemaTypeMap =
new ImmutableMap.Builder<StandardSQLTypeName, TableFieldSchema.Type>()
.put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL)
.put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES)
.put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE)
.put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME)
.put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE)
.put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY)
.put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64)
.put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC)
.put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING)
.put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT)
.put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME)
.put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP)
// Below this comment: mappings manually added by the Datastream team
.put(StandardSQLTypeName.BIGNUMERIC, TableFieldSchema.Type.BIGNUMERIC)
.put(StandardSQLTypeName.JSON, TableFieldSchema.Type.JSON)
.put(StandardSQLTypeName.INTERVAL, TableFieldSchema.Type.INTERVAL)
.build();

/**
* Converts from BigQuery client Table Schema to bigquery storage API Table Schema.
*
* @param schema the BigQuery client Table Schema
* @return the bigquery storage API Table Schema
*/
public static TableSchema convertTableSchema(Schema schema) {
TableSchema.Builder result = TableSchema.newBuilder();
for (int i = 0; i < schema.getFields().size(); i++) {
result.addFields(i, convertFieldSchema(schema.getFields().get(i)));
}
return result.build();
}

/**
* Converts from bigquery v2 Field Schema to bigquery storage API Field Schema.
*
* @param field the BigQuery client Field Schema
* @return the bigquery storage API Field Schema
*/
public static TableFieldSchema convertFieldSchema(Field field) {
TableFieldSchema.Builder result = TableFieldSchema.newBuilder();
if (field.getMode() == null) {
field = field.toBuilder().setMode(Field.Mode.NULLABLE).build();
}
result.setMode(BQTableSchemaModeMap.get(field.getMode()));
result.setName(field.getName());
result.setType(BQTableSchemaTypeMap.get(field.getType().getStandardType()));
if (field.getDescription() != null) {
result.setDescription(field.getDescription());
}
if (field.getSubFields() != null) {
for (int i = 0; i < field.getSubFields().size(); i++) {
result.addFields(i, convertFieldSchema(field.getSubFields().get(i)));
}
}
return result.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.bigquerystorage;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import org.json.JSONArray;
import org.json.JSONObject;

public class JsonWriterStreamCdc {

private static final String CHANGE_TYPE_PSEUDO_COLUMN = "_change_type";

public static void main(String[] args) throws Exception {
if (args.length < 4) {
System.out.println("Arguments: project, dataset, table, source_file");
return;
}

String projectId = args[0];
String datasetName = args[1];
String tableName = args[2];
String dataFile = args[3];
createDestinationTable(projectId, datasetName, tableName);
writeToDefaultStream(projectId, datasetName, tableName, dataFile);
}

public static void createDestinationTable(
String projectId, String datasetName, String tableName) {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
// Create a schema that matches the source data.
Schema schema =
Schema.of(
Field.of("commit", StandardSQLTypeName.STRING),
Field.newBuilder("parent", StandardSQLTypeName.STRING)
.setMode(Field.Mode.REPEATED)
.build(),
Field.of("author", StandardSQLTypeName.STRING),
Field.of("committer", StandardSQLTypeName.STRING),
Field.of("commit_date", StandardSQLTypeName.DATETIME),
Field.of(
"commit_msg",
StandardSQLTypeName.STRUCT,
FieldList.of(
Field.of("subject", StandardSQLTypeName.STRING),
Field.of("message", StandardSQLTypeName.STRING))),
Field.of("repo_name", StandardSQLTypeName.STRING));

// Create a table that uses this schema.
TableId tableId = TableId.of(projectId, datasetName, tableName);
Table table = bigquery.getTable(tableId);
if (table == null) {
TableInfo tableInfo =
TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build();
bigquery.create(tableInfo);
}
}

// writeToDefaultStream: Writes records from the source file to the destination table.
public static void writeToDefaultStream(
String projectId, String datasetName, String tableName, String dataFile)
throws DescriptorValidationException, InterruptedException, IOException {

BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

// Get the schema of the destination table and convert to the equivalent BigQueryStorage type.
Table table = bigquery.getTable(datasetName, tableName);
Schema schema = table.getDefinition().getSchema();
TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema);

// Use the JSON stream writer to send records in JSON format.
TableName parentTable = TableName.of(projectId, datasetName, tableName);
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(parentTable.toString(),
addPseudoColumnsIfNeeded(tableSchema))
.build()) {
// Read JSON data from the source file and send it to the Write API.
BufferedReader reader = new BufferedReader(new FileReader(dataFile));
String line = reader.readLine();
while (line != null) {
// As a best practice, send batches of records, instead of single records at a time.
JSONArray jsonArr = new JSONArray();
for (int i = 0; i < 100; i++) {
JSONObject record = new JSONObject(line);
jsonArr.put(record);
line = reader.readLine();
if (line == null) {
break;
}
} // batch
ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
// The append method is asynchronous. Rather than waiting for the method to complete,
// which can hurt performance, register a completion callback and continue streaming.
ApiFutures.addCallback(
future, new AppendCompleteCallback(), MoreExecutors.directExecutor());
}
}
}

private static TableSchema addPseudoColumnsIfNeeded(TableSchema tableSchema) {
return tableSchema
.toBuilder()
.addFields(
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.build())
.addFields(
TableFieldSchema.newBuilder()
.setName(CHANGE_TYPE_PSEUDO_COLUMN)
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.build())
.build();
}
}

0 comments on commit 80a03bd

Please sign in to comment.