Skip to content

Commit

Permalink
feat: Add integration tests with RetrySettings enabled. (#2275)
Browse files Browse the repository at this point in the history
* Add integration tests with RetrySettings enabled.

Initially, these tests are not run automatically as the running Service Account requires permissions on special GCP projects that inject instream errors into streams to test retries.  There will be a following nightly build that will run these tests.

---------

Co-authored-by: Evan Greco <egreco@google.com>
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored and PhongChuong committed Nov 23, 2023
1 parent 1388610 commit a00e8f3
Show file tree
Hide file tree
Showing 7 changed files with 398 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .kokoro/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ integration)
-DtrimStackTrace=false \
-Dclirr.skip=true \
-Denforcer.skip=true \
-Dit.test=!ITBigQueryWrite*RetryTest \
-Dsurefire.failIfNoSpecifiedTests=false \
-Dfailsafe.failIfNoSpecifiedTests=false \
-fae \
verify
RETURN_CODE=$?
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.26.0')
implementation platform('com.google.cloud:libraries-bom:26.27.0')
implementation 'com.google.cloud:google-cloud-bigquerystorage'
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,14 +985,14 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r
Long offset =
requestWrapper.message.hasOffset() ? requestWrapper.message.getOffset().getValue() : -1;
if (isDefaultStreamName(streamName) || offset == -1) {
log.fine(
log.info(
String.format(
"Retrying default stream message in stream %s for in-stream error: %s, retry count:"
+ " %s",
streamName, errorCode, requestWrapper.retryCount));
addMessageToFrontOfWaitingQueue(requestWrapper);
} else {
log.fine(
log.info(
String.format(
"Retrying exclusive message in stream %s at offset %d for in-stream error: %s, retry"
+ " count: %s",
Expand Down Expand Up @@ -1089,6 +1089,7 @@ private void requestCallback(AppendRowsResponse response) {
// Retries need to happen on the same thread as queue locking may occur
if (response.hasError()) {
if (retryOnRetryableError(Code.values()[response.getError().getCode()], requestWrapper)) {
log.info("Attempting to retry on error: " + response.getError().toString());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,15 +389,15 @@ public void testCloseExternalClient()
// Create some stream writers.
List<StreamWriter> streamWriterList = new ArrayList<>();
for (int i = 0; i < 4; i++) {
StreamWriter sw =
streamWriterList.add(
StreamWriter.newBuilder(
String.format("projects/p1/datasets/d1/tables/t%s/streams/_default", i),
externalClient)
.setEnableConnectionPool(true)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setEnableConnectionPool(true)
.build();
streamWriterList.add(sw);
.setLocation("us")
.build());
}

for (long i = 0; i < appendCount; i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigquery.storage.v1.it;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.logging.Logger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

/** Integration tests for BigQuery Write API. */
public class ITBigQueryWriteNonQuotaRetryTest {
private static final Logger LOG = Logger.getLogger(ITBigQueryWriteQuotaRetryTest.class.getName());
private static final String DATASET = RemoteBigQueryHelper.generateDatasetName();
private static final String TABLE = "testtable";
private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset";
// This project is configured on the server to inject INTERNAL in-stream errors every
// 10 messages. This is done to verify in-stream message retries.
private static final String NON_QUOTA_RETRY_PROJECT_ID = "bq-write-api-java-retry-test";
private static BigQueryWriteClient client;
private static BigQuery bigquery;

@BeforeClass
public static void beforeClass() throws IOException {
client = BigQueryWriteClient.create();

RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
bigquery = bigqueryHelper.getOptions().getService();
DatasetInfo datasetInfo =
DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build();
bigquery.create(datasetInfo);
LOG.info("Created test dataset: " + DATASET);
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, TABLE),
StandardTableDefinition.of(
Schema.of(
Field.newBuilder("foo", LegacySQLTypeName.STRING)
.setMode(Field.Mode.NULLABLE)
.build())))
.build();
bigquery.create(tableInfo);
}

@AfterClass
public static void afterClass() {
if (client != null) {
client.close();
}

if (bigquery != null) {
RemoteBigQueryHelper.forceDelete(bigquery, DATASET);
LOG.info("Deleted test dataset: " + DATASET);
}
}

@Test
public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry()
throws IOException, InterruptedException, DescriptorValidationException {
WriteRetryTestUtil.runExclusiveRetryTest(
bigquery,
client,
DATASET,
NON_QUOTA_RETRY_PROJECT_ID,
WriteStream.Type.COMMITTED,
/* requestCount=*/ 901,
/* rowBatchSize=*/ 1);
}

@Test
public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry()
throws IOException, InterruptedException, DescriptorValidationException {
WriteRetryTestUtil.runDefaultRetryTest(
bigquery,
client,
DATASET,
NON_QUOTA_RETRY_PROJECT_ID,
/* requestCount=*/ 901,
/* rowBatchSize=*/ 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigquery.storage.v1.it;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.logging.Logger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

/** Integration tests for BigQuery Write API. */
public class ITBigQueryWriteQuotaRetryTest {
private static final Logger LOG = Logger.getLogger(ITBigQueryWriteQuotaRetryTest.class.getName());
private static final String DATASET = RemoteBigQueryHelper.generateDatasetName();
private static final String TABLE = "testtable";
private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset";
// This project is configured on the server to inject RESOURCE_EXHAUSTED in-stream errors every
// 10 messages. This is done to verify in-stream message retries.
private static final String QUOTA_RETRY_PROJECT_ID = "bq-writeapi-java-quota-retry";
private static BigQueryWriteClient client;
private static BigQuery bigquery;

@BeforeClass
public static void beforeClass() throws IOException {
client = BigQueryWriteClient.create();

RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
bigquery = bigqueryHelper.getOptions().getService();
DatasetInfo datasetInfo =
DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build();
bigquery.create(datasetInfo);
LOG.info("Created test dataset: " + DATASET);
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, TABLE),
StandardTableDefinition.of(
Schema.of(
Field.newBuilder("foo", LegacySQLTypeName.STRING)
.setMode(Field.Mode.NULLABLE)
.build())))
.build();
bigquery.create(tableInfo);
}

@AfterClass
public static void afterClass() {
if (client != null) {
client.close();
}

if (bigquery != null) {
RemoteBigQueryHelper.forceDelete(bigquery, DATASET);
LOG.info("Deleted test dataset: " + DATASET);
}
}

@Test
public void testJsonStreamWriterCommittedStreamWithQuotaRetry()
throws IOException, InterruptedException, DescriptorValidationException {
WriteRetryTestUtil.runExclusiveRetryTest(
bigquery,
client,
DATASET,
QUOTA_RETRY_PROJECT_ID,
WriteStream.Type.COMMITTED,
/* requestCount=*/ 901,
/* rowBatchSize=*/ 1);
}

@Test
public void testJsonStreamWriterDefaultStreamWithQuotaRetry()
throws IOException, InterruptedException, DescriptorValidationException {
WriteRetryTestUtil.runDefaultRetryTest(
bigquery,
client,
DATASET,
QUOTA_RETRY_PROJECT_ID,
/* requestCount=*/ 901,
/* rowBatchSize=*/ 1);
}
}
Loading

0 comments on commit a00e8f3

Please sign in to comment.