Skip to content

Commit

Permalink
fix: fix channel not shut down properly exception.
Browse files Browse the repository at this point in the history
Client being created has to be properly closed, otherwise during garbage
collection an error will be reported showing channel not shutdown
properly
  • Loading branch information
GaoleMeng committed Mar 7, 2024
1 parent 2dd8efc commit ef89a67
Showing 1 changed file with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.threeten.bp.Duration;

public class WriteToDefaultStream {

public static void runWriteToDefaultStream()
throws DescriptorValidationException, InterruptedException, IOException {
// TODO(developer): Replace these variables before running the sample.
Expand Down Expand Up @@ -138,6 +138,8 @@ private static class DataWriter {

private static final int MAX_RECREATE_COUNT = 3;

private BigQueryWriteClient client;

// Track the number of in-flight requests to wait for all responses before shutting down.
private final Phaser inflightRequestCount = new Phaser(1);
private final Object lock = new Object();
Expand All @@ -163,12 +165,16 @@ public void initialize(TableName parentTable)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();

// Initialize client without settings, internally within stream writer a new client will be
// created with full settings.
client = BigQueryWriteClient.create();

// Use the JSON stream writer to send records in JSON format. Specify the table name to write
// to the default stream.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
streamWriter =
JsonStreamWriter.newBuilder(parentTable.toString(), BigQueryWriteClient.create())
JsonStreamWriter.newBuilder(parentTable.toString(), client)
.setExecutorProvider(
FixedExecutorProvider.create(Executors.newScheduledThreadPool(100)))
.setChannelProvider(
Expand All @@ -195,7 +201,7 @@ public void append(AppendContext appendContext)
&& recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) {
streamWriter =
JsonStreamWriter.newBuilder(
streamWriter.getStreamName(), BigQueryWriteClient.create())
streamWriter.getStreamName(), client)
.build();
this.error = null;
}
Expand All @@ -217,6 +223,7 @@ public void cleanup() {
// Wait for all in-flight requests to complete.
inflightRequestCount.arriveAndAwaitAdvance();

client.close();
// Close the connection to the server.
streamWriter.close();

Expand Down

0 comments on commit ef89a67

Please sign in to comment.