From cf57b8b72a65bac720b074b5fb633c35eb4fb0ff Mon Sep 17 00:00:00 2001 From: Siddharth Agrawal Date: Fri, 8 Mar 2024 19:18:12 -0800 Subject: [PATCH] fix: also shutdown the stream connection in case the timeout exception is triggered. --- .../bigquery/storage/v1/ConnectionWorker.java | 5 +- .../it/ITBigQueryWriteManualClientTest.java | 69 +++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 4608dc942a..785a65cf3d 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -353,7 +353,7 @@ public void run() { } finally { lock.unlock(); } - cleanupInflightRequests(); + cleanup(); }); this.appendThread.start(); } @@ -812,7 +812,10 @@ private void appendLoop() { this.streamConnection.send(originalRequestBuilder.build()); } } + cleanup(); + } + private void cleanup() { log.info( "Cleanup starts. Stream: " + streamName diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 496b406cda..8ab48d108b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -32,10 +32,12 @@ import com.google.cloud.bigquery.storage.v1.*; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError; +import com.google.cloud.bigquery.storage.v1.Exceptions.MaximumRequestCallbackWaitTimeExceededException; import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetAlreadyExists; import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetOutOfRange; import com.google.cloud.bigquery.storage.v1.Exceptions.SchemaMismatchedException; import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException; +import com.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; @@ -49,6 +51,7 @@ import java.math.BigDecimal; import java.sql.Timestamp; import java.text.ParseException; +import java.time.Duration; import java.time.Instant; import java.time.ZoneId; import java.time.temporal.ChronoUnit; @@ -1612,4 +1615,70 @@ public void testLargeRequest() throws IOException, InterruptedException, Executi assertEquals("50", queryIter.next().get(0).getStringValue()); } } + + @Test + public void testTimeoutException() throws IOException, InterruptedException, ExecutionException { + try { + JsonStreamWriter.setMaxRequestCallbackWaitTime( + Duration.ofMillis(10)); // induce timeout exception + String tableName = "TestTimeoutExceptionTable"; + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, tableName), + StandardTableDefinition.of( + Schema.of( + com.google.cloud.bigquery.Field.newBuilder( + "foo", StandardSQLTypeName.STRING) + .build()))) + .build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + StreamWriter streamWriter = + StreamWriter.newBuilder(parent.toString() + "/_default") + .setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor())) + .build(); + + // Create initial set of requests; these will go to the inflight queue + List> futureResponses = new ArrayList<>(); + final int MAX_INITIAL_REQUESTS = 10; + for (int i = 0; i < MAX_INITIAL_REQUESTS; i++) { + futureResponses.add(streamWriter.append(CreateProtoRows(new String[] {"aaa"}), -1)); + } + Thread.sleep( + 50); // Delay enough time for background thread to schedule existing requests, but not + // enough for all the responses to arrive + + // Now generate one more request; once this is scheduled the timeout exception will be + // triggered + futureResponses.add(streamWriter.append(CreateProtoRows(new String[] {"ddd"}), -1)); + + boolean gotFirstException = + false; // We don't know how many responses will have arrived by the time the last request + // is scheduled; however, we expect not all responses will have arrived. + boolean gotSubsequentException = false; + for (int i = 0; i < MAX_INITIAL_REQUESTS + 1; i++) { + try { + AppendRowsResponse actualResponse = futureResponses.get(i).get(); + } catch (Throwable t) { + assertTrue(t instanceof ExecutionException); + t = t.getCause(); + if (!gotFirstException) { + gotFirstException = true; + assertTrue(t instanceof MaximumRequestCallbackWaitTimeExceededException); + } else { + gotSubsequentException = true; + assertTrue(t instanceof StreamWriterClosedException); + assertEquals(Code.ABORTED, Status.fromThrowable(t).getCode()); + } + } + } + assertTrue(gotFirstException); + assertTrue(gotSubsequentException); + } finally { + JsonStreamWriter.setMaxRequestCallbackWaitTime( + Duration.ofMinutes( + 5)); // restore timeout exception as this is a static setting and will affect other + // tests + } + } }