diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 4608dc942a..fc99576187 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -353,7 +353,7 @@ public void run() { } finally { lock.unlock(); } - cleanupInflightRequests(); + cleanup(/* waitForDone= */ false); }); this.appendThread.start(); } @@ -812,7 +812,10 @@ private void appendLoop() { this.streamConnection.send(originalRequestBuilder.build()); } } + cleanup(/* waitForDone= */true); + } + private void cleanup(boolean waitForDone) { log.info( "Cleanup starts. Stream: " + streamName @@ -828,7 +831,9 @@ private void appendLoop() { // We can close the stream connection and handle the remaining inflight requests. if (streamConnection != null) { this.streamConnection.close(); - waitForDoneCallback(3, TimeUnit.MINUTES); + if (waitForDone) { + waitForDoneCallback(3, TimeUnit.MINUTES); + } } // At this point, there cannot be more callback. It is safe to clean up all inflight requests. diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 71e4d47673..ded12b25d5 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -650,9 +650,9 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E null, client.getSettings(), retrySettings); - testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3)); + testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(2)); - long appendCount = 10; + long appendCount = 2; for (int i = 0; i < appendCount; i++) { testBigQueryWrite.addResponse(createAppendResponse(i)); } @@ -691,6 +691,8 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E 100) .get()); assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue"); + connectionWorker.close(); + assertTrue(connectionWorker.isUserClosed()); } @Test diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 496b406cda..c50a04a9f5 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -32,10 +32,12 @@ import com.google.cloud.bigquery.storage.v1.*; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError; +import com.google.cloud.bigquery.storage.v1.Exceptions.MaximumRequestCallbackWaitTimeExceededException; import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetAlreadyExists; import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetOutOfRange; import com.google.cloud.bigquery.storage.v1.Exceptions.SchemaMismatchedException; import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException; +import com.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; @@ -49,6 +51,7 @@ import java.math.BigDecimal; import java.sql.Timestamp; import java.text.ParseException; +import java.time.Duration; import java.time.Instant; import java.time.ZoneId; import java.time.temporal.ChronoUnit;