Skip to content

Commit

Permalink
feat: Add RetrySettings use to Write API samples. (#2419)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Evan Greco <egreco@google.com>
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 27, 2024
1 parent 4d5eb73 commit 5b000d0
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public Builder setCompressorName(String compressorName) {
* Enable client lib automatic retries on request level errors.
*
* <pre>
* Immeidate Retry code:
* Immediate Retry code:
* ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
* Backoff Retry code:
* RESOURCE_EXHAUSTED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
Expand All @@ -37,6 +38,7 @@
import java.io.IOException;
import org.json.JSONArray;
import org.json.JSONObject;
import org.threeten.bp.Duration;

public class JsonWriterStreamCdc {

Expand Down Expand Up @@ -108,6 +110,18 @@ private static void query(String query) {
public static void writeToDefaultStream(
String projectId, String datasetName, String tableName, JSONArray data)
throws DescriptorValidationException, InterruptedException, IOException {
// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
// To use the UPSERT functionality, the table schema needs to be padded with an additional
// column "_change_type".
TableSchema tableSchema =
Expand Down Expand Up @@ -159,7 +173,9 @@ public static void writeToDefaultStream(
// Use the JSON stream writer to send records in JSON format.
TableName parentTable = TableName.of(projectId, datasetName, tableName);
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema).build()) {
JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema)
.setRetrySettings(retrySettings)
.build()) {

ApiFuture<AppendRowsResponse> future = writer.append(data);
// The append method is asynchronous. Rather than waiting for the method to complete,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BQTableSchemaToProtoDescriptor;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
Expand All @@ -36,12 +37,12 @@
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Message;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONObject;
import org.threeten.bp.Duration;

public class ParallelWriteCommittedStream {

Expand Down Expand Up @@ -157,13 +158,26 @@ private void writeToStream(
lastMetricsSuccessCount = 0;
lastMetricsFailureCount = 0;
}
// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
Descriptor descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(
writeStream.getTableSchema());
ProtoSchema protoSchema = ProtoSchemaConverter.convert(descriptor);
try (StreamWriter writer =
StreamWriter.newBuilder(writeStream.getName())
.setWriterSchema(protoSchema)
.setRetrySettings(retrySettings)
.setTraceId("SAMPLE:parallel_append")
.build()) {
while (System.currentTimeMillis() < deadlineMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

// [START bigquerystorage_jsonstreamwriter_buffered]
import com.google.api.core.ApiFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
Expand All @@ -33,6 +34,7 @@
import java.util.concurrent.ExecutionException;
import org.json.JSONArray;
import org.json.JSONObject;
import org.threeten.bp.Duration;

public class WriteBufferedStream {

Expand Down Expand Up @@ -61,11 +63,25 @@ public static void writeBufferedStream(String projectId, String datasetName, Str
.build();
WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);

// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();

// Use the JSON stream writer to send records in JSON format.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
.setRetrySettings(retrySettings)
.build()) {
// Write two batches to the stream, each with 10 JSON records.
for (int i = 0; i < 2; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
Expand All @@ -37,6 +38,7 @@
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;
import org.threeten.bp.Duration;

public class WriteCommittedStream {

Expand Down Expand Up @@ -113,11 +115,25 @@ void initialize(TableName parentTable, BigQueryWriteClient client)
.build();
WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);

// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();

// Use the JSON stream writer to send records in JSON format.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
streamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client)
.setRetrySettings(retrySettings)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package com.example.bigquerystorage;

// [START bigquerystorage_jsonstreamwriter_pending]

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
Expand All @@ -41,6 +41,7 @@
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;
import org.threeten.bp.Duration;

public class WritePendingStream {

Expand Down Expand Up @@ -129,6 +130,19 @@ void initialize(TableName parentTable, BigQueryWriteClient client)
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/WriteStream.Type.html
WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build();

// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();

CreateWriteStreamRequest createWriteStreamRequest =
CreateWriteStreamRequest.newBuilder()
.setParent(parentTable.toString())
Expand All @@ -140,7 +154,9 @@ void initialize(TableName parentTable, BigQueryWriteClient client)
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
streamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build();
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
.setRetrySettings(retrySettings)
.build();
}

public void append(JSONArray data, long offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package com.example.bigquerystorage;

// [START bigquerystorage_jsonstreamwriter_default]

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.QueryJobConfiguration;
Expand All @@ -35,12 +35,9 @@
import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import io.grpc.Status;
import io.grpc.Status.Code;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executors;
Expand All @@ -49,6 +46,7 @@
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;
import org.threeten.bp.Duration;

public class WriteToDefaultStream {

Expand Down Expand Up @@ -97,7 +95,7 @@ public static void writeToDefaultStream(String projectId, String datasetName, St
jsonArr.put(record);
}

writer.append(new AppendContext(jsonArr, 0));
writer.append(new AppendContext(jsonArr));
}

// Final cleanup for the stream during worker teardown.
Expand Down Expand Up @@ -130,26 +128,15 @@ private static void verifyExpectedRowCount(TableName parentTable, int expectedRo
private static class AppendContext {

JSONArray data;
int retryCount = 0;

AppendContext(JSONArray data, int retryCount) {
AppendContext(JSONArray data) {
this.data = data;
this.retryCount = retryCount;
}
}

private static class DataWriter {

private static final int MAX_RETRY_COUNT = 3;
private static final int MAX_RECREATE_COUNT = 3;
private static final ImmutableList<Code> RETRIABLE_ERROR_CODES =
ImmutableList.of(
Code.INTERNAL,
Code.ABORTED,
Code.CANCELLED,
Code.FAILED_PRECONDITION,
Code.DEADLINE_EXCEEDED,
Code.UNAVAILABLE);

// Track the number of in-flight requests to wait for all responses before shutting down.
private final Phaser inflightRequestCount = new Phaser(1);
Expand All @@ -163,6 +150,19 @@ private static class DataWriter {

public void initialize(TableName parentTable)
throws DescriptorValidationException, IOException, InterruptedException {
// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();

// Use the JSON stream writer to send records in JSON format. Specify the table name to write
// to the default stream.
// For more information about JsonStreamWriter, see:
Expand All @@ -183,6 +183,7 @@ public void initialize(TableName parentTable)
// column, apply the default value to the missing value field.
.setDefaultMissingValueInterpretation(
AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE)
.setRetrySettings(retrySettings)
.build();
}

Expand Down Expand Up @@ -244,26 +245,6 @@ public void onSuccess(AppendRowsResponse response) {
}

public void onFailure(Throwable throwable) {
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information,
// see: https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
Status status = Status.fromThrowable(throwable);
if (appendContext.retryCount < MAX_RETRY_COUNT
&& RETRIABLE_ERROR_CODES.contains(status.getCode())) {
appendContext.retryCount++;
try {
// Since default stream appends are not ordered, we can simply retry the appends.
// Retrying with exclusive streams requires more careful consideration.
this.parent.append(appendContext);
// Mark the existing attempt as done since it's being retried.
done();
return;
} catch (Exception e) {
// Fall through to return error.
System.out.format("Failed to retry append: %s\n", e);
}
}

if (throwable instanceof AppendSerializationError) {
AppendSerializationError ase = (AppendSerializationError) throwable;
Map<Integer, String> rowIndexToErrorMessage = ase.getRowIndexToErrorMessage();
Expand All @@ -282,7 +263,7 @@ public void onFailure(Throwable throwable) {
// avoid potentially blocking while we are in a callback.
if (dataNew.length() > 0) {
try {
this.parent.append(new AppendContext(dataNew, 0));
this.parent.append(new AppendContext(dataNew));
} catch (DescriptorValidationException e) {
throw new RuntimeException(e);
} catch (IOException e) {
Expand Down

0 comments on commit 5b000d0

Please sign in to comment.