diff --git a/.github/.OwlBot.lock.yaml b/.github/.OwlBot.lock.yaml index d5500ef442..52eaa54d82 100644 --- a/.github/.OwlBot.lock.yaml +++ b/.github/.OwlBot.lock.yaml @@ -13,5 +13,5 @@ # limitations under the License. docker: image: gcr.io/cloud-devrel-public-resources/owlbot-java:latest - digest: sha256:46d2d262cd285c638656c8bde468011b723dc0c7ffd6a5ecc2650fe639c82e8f -# created: 2023-07-24T14:21:17.707234503Z + digest: sha256:88ba8dcc5c2c7792e1c3511381f4ab329002a1c42c512f66ca87ced572dfbf9f +# created: 2023-09-05T18:54:42.225408832Z diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 01a5ce1602..4fdebf7e0a 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -14,7 +14,7 @@ jobs: matrix: java: [11, 17] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4 - uses: actions/setup-java@v2 with: distribution: temurin @@ -28,7 +28,7 @@ jobs: name: "units (8)" runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4 - uses: actions/setup-java@v3 with: java-version: 8 @@ -48,7 +48,7 @@ jobs: windows: runs-on: windows-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4 - uses: actions/setup-java@v2 with: distribution: temurin @@ -63,7 +63,7 @@ jobs: matrix: java: [17] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4 - uses: actions/setup-java@v2 with: distribution: temurin @@ -73,7 +73,7 @@ jobs: javadoc: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4 - uses: actions/setup-java@v3 with: distribution: temurin @@ -85,7 +85,7 @@ jobs: lint: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4 - uses: actions/setup-java@v2 with: distribution: temurin @@ -97,7 +97,7 @@ jobs: clirr: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4 - uses: actions/setup-java@v2 with: distribution: temurin diff --git a/.github/workflows/samples.yaml b/.github/workflows/samples.yaml index 9b1fe1529e..46de89b51a 100644 --- a/.github/workflows/samples.yaml +++ b/.github/workflows/samples.yaml @@ -7,7 +7,7 @@ jobs: checkstyle: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4 - uses: actions/setup-java@v1 with: java-version: 8 diff --git a/.kokoro/requirements.txt b/.kokoro/requirements.txt index 32989051e7..a73256ab80 100644 --- a/.kokoro/requirements.txt +++ b/.kokoro/requirements.txt @@ -12,9 +12,9 @@ cachetools==5.3.1 \ --hash=sha256:95ef631eeaea14ba2e36f06437f36463aac3a096799e876ee55e5cdccb102590 \ --hash=sha256:dce83f2d9b4e1f732a8cd44af8e8fab2dbe46201467fc98b3ef8f269092bf62b # via google-auth -certifi==2023.5.7 \ - --hash=sha256:0f0d56dc5a6ad56fd4ba36484d6cc34451e1c6548c61daad8c320169f91eddc7 \ - --hash=sha256:c6c2e98f5c7869efca1f8916fed228dd91539f9f1b444c314c06eef02980c716 +certifi==2023.7.22 \ + --hash=sha256:539cc1d13202e33ca466e88b2807e29f4c13049d6d87031a3c110744495cb082 \ + --hash=sha256:92d6037539857d8206b8f6ae472e8b77db8058fec5937a1ef3f54304089edbb9 # via requests cffi==1.15.1 \ --hash=sha256:00a9ed42e88df81ffae7a8ab6d9356b371399b91dbdf0c3cb1e84c03a13aceb5 \ @@ -485,6 +485,5 @@ zipp==3.16.1 \ # via importlib-metadata # WARNING: The following packages were not pinned, but pip requires them to be -# pinned when the requirements file includes hashes and the requirement is not -# satisfied by a package already installed. Consider using the --allow-unsafe flag. +# pinned when the requirements file includes hashes. Consider using the --allow-unsafe flag. # setuptools diff --git a/CHANGELOG.md b/CHANGELOG.md index ef780723db..4c430186ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,59 @@ # Changelog +## [2.44.0](https://github.com/googleapis/java-bigquerystorage/compare/v2.43.0...v2.44.0) (2023-10-10) + + +### Features + +* Add in-stream retry logic for retryable and quota errors ([#2243](https://github.com/googleapis/java-bigquerystorage/issues/2243)) ([6435a04](https://github.com/googleapis/java-bigquerystorage/commit/6435a0491827779b01dd0c3cf184f6578bf33f3e)) + + +### Dependencies + +* Bumping google-cloud-shared-config to v1.5.8 ([#2269](https://github.com/googleapis/java-bigquerystorage/issues/2269)) ([fb6e38d](https://github.com/googleapis/java-bigquerystorage/commit/fb6e38d575800ab4c7c16ae0545fdbd91ea358bd)) +* Update dependency com.google.cloud:google-cloud-bigquery to v2.33.1 ([#2258](https://github.com/googleapis/java-bigquerystorage/issues/2258)) ([f6cbea2](https://github.com/googleapis/java-bigquerystorage/commit/f6cbea204b5a414d8e2932ad2fd194996685ec39)) +* Update dependency com.google.cloud:google-cloud-shared-dependencies to v3.17.0 ([#2266](https://github.com/googleapis/java-bigquerystorage/issues/2266)) ([827aaf7](https://github.com/googleapis/java-bigquerystorage/commit/827aaf70bc19de8d67d0f386085877eb5d2fbced)) + +## [2.43.0](https://github.com/googleapis/java-bigquerystorage/compare/v2.42.0...v2.43.0) (2023-09-28) + + +### Features + +* Add sample code about default missingValueInterpretation ([#2249](https://github.com/googleapis/java-bigquerystorage/issues/2249)) ([ebedcc0](https://github.com/googleapis/java-bigquerystorage/commit/ebedcc07e400429fc6b80a4ad0543fe25eef970d)) + + +### Dependencies + +* Update actions/checkout digest to 8ade135 ([#2251](https://github.com/googleapis/java-bigquerystorage/issues/2251)) ([182e050](https://github.com/googleapis/java-bigquerystorage/commit/182e050d9929f9fb58694e76625b03bb54f67efe)) +* Update dependency com.google.cloud:google-cloud-bigquery to v2.32.0 ([#2246](https://github.com/googleapis/java-bigquerystorage/issues/2246)) ([893fcb9](https://github.com/googleapis/java-bigquerystorage/commit/893fcb9f6c4b2eb9814ea2597fe9aae95367b4cd)) +* Update dependency com.google.cloud:google-cloud-bigquery to v2.33.0 ([#2255](https://github.com/googleapis/java-bigquerystorage/issues/2255)) ([7689dee](https://github.com/googleapis/java-bigquerystorage/commit/7689dee5f70a144efb9eb9be1a058d11d7e3c05d)) +* Update dependency com.google.cloud:google-cloud-shared-dependencies to v3.16.1 ([#2250](https://github.com/googleapis/java-bigquerystorage/issues/2250)) ([43d96d2](https://github.com/googleapis/java-bigquerystorage/commit/43d96d288207a607d168f604f190fb28b5eab132)) +* Update dependency org.apache.arrow:arrow-vector to v13 ([#2236](https://github.com/googleapis/java-bigquerystorage/issues/2236)) ([afde7ce](https://github.com/googleapis/java-bigquerystorage/commit/afde7ce1f48f1c7cdd4b06d4aabdaac9d367aa50)) +* Update dependency org.apache.avro:avro to v1.11.3 ([#2252](https://github.com/googleapis/java-bigquerystorage/issues/2252)) ([10b615b](https://github.com/googleapis/java-bigquerystorage/commit/10b615b49ba8889f7d051ac03d4751ace82b6823)) + +## [2.42.0](https://github.com/googleapis/java-bigquerystorage/compare/v2.41.1...v2.42.0) (2023-09-13) + + +### Features + +* Add default_missing_value_interpretation field; indicate KMS_SERVICE_ERROR is retryable ([#2229](https://github.com/googleapis/java-bigquerystorage/issues/2229)) ([df686d6](https://github.com/googleapis/java-bigquerystorage/commit/df686d6ac51d182b52dbd1f5a69585bb605e9b94)) +* Expose settings to configure default missing value interpretation. ([#2230](https://github.com/googleapis/java-bigquerystorage/issues/2230)) ([dc5ed73](https://github.com/googleapis/java-bigquerystorage/commit/dc5ed73f513a77939286d3c129fc26f039c23d5c)) + + +### Bug Fixes + +* Populate final stauts to initial request during connection shutdown ([#2228](https://github.com/googleapis/java-bigquerystorage/issues/2228)) ([9b9b5c0](https://github.com/googleapis/java-bigquerystorage/commit/9b9b5c09d7bc458493338eced8527a168fff0129)) + + +### Dependencies + +* Update actions/checkout action to v4 ([#2237](https://github.com/googleapis/java-bigquerystorage/issues/2237)) ([d5d739f](https://github.com/googleapis/java-bigquerystorage/commit/d5d739fe7624b74584c1272f13635f728fdf53d2)) +* Update arrow.version to v13 ([#2234](https://github.com/googleapis/java-bigquerystorage/issues/2234)) ([ac45c2a](https://github.com/googleapis/java-bigquerystorage/commit/ac45c2aa189fa0cba05f88486f44d3b1d6f761ca)) +* Update dependency com.google.cloud:google-cloud-bigquery to v2.31.1 ([#2225](https://github.com/googleapis/java-bigquerystorage/issues/2225)) ([5144c5a](https://github.com/googleapis/java-bigquerystorage/commit/5144c5ad1e107f96d2003064cd2823982ac0e360)) +* Update dependency com.google.cloud:google-cloud-bigquery to v2.31.2 ([#2241](https://github.com/googleapis/java-bigquerystorage/issues/2241)) ([91e3730](https://github.com/googleapis/java-bigquerystorage/commit/91e37303f57ec2e211a375652a8eca8b7d39d1e6)) +* Update dependency com.google.cloud:google-cloud-shared-dependencies to v3.15.0 ([#2239](https://github.com/googleapis/java-bigquerystorage/issues/2239)) ([5352a7d](https://github.com/googleapis/java-bigquerystorage/commit/5352a7d7e1d7b01a33936adf7e204c5f49f0c230)) +* Update dependency org.apache.arrow:arrow-memory-netty to v13 ([#2235](https://github.com/googleapis/java-bigquerystorage/issues/2235)) ([7e50bef](https://github.com/googleapis/java-bigquerystorage/commit/7e50bef3fa3c92c94aeefedca0ae87c5132bb1be)) + ## [2.41.1](https://github.com/googleapis/java-bigquerystorage/compare/v2.41.0...v2.41.1) (2023-08-08) diff --git a/README.md b/README.md index bdd35abdea..b66d3e3f2a 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ If you are using Maven with [BOM][libraries-bom], add this to your pom.xml file: com.google.cloud libraries-bom - 26.21.0 + 26.25.0 pom import @@ -42,7 +42,7 @@ If you are using Maven without the BOM, add this to your dependencies: com.google.cloud google-cloud-bigquerystorage - 2.41.0 + 2.44.0 ``` @@ -50,20 +50,20 @@ If you are using Maven without the BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.22.0') +implementation platform('com.google.cloud:libraries-bom:26.25.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.41.1' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.44.0' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.41.1" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.44.0" ``` @@ -220,7 +220,7 @@ Java is a registered trademark of Oracle and/or its affiliates. [kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-bigquerystorage/java11.html [stability-image]: https://img.shields.io/badge/stability-stable-green [maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-bigquerystorage.svg -[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/2.41.1 +[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/2.44.0 [authentication]: https://github.com/googleapis/google-cloud-java#authentication [auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes [predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles diff --git a/google-cloud-bigquerystorage-bom/pom.xml b/google-cloud-bigquerystorage-bom/pom.xml index 656e245ae2..e0f3f20122 100644 --- a/google-cloud-bigquerystorage-bom/pom.xml +++ b/google-cloud-bigquerystorage-bom/pom.xml @@ -3,12 +3,12 @@ 4.0.0 com.google.cloud google-cloud-bigquerystorage-bom - 2.41.1 + 2.44.0 pom com.google.cloud google-cloud-shared-config - 1.5.7 + 1.6.0 Google Cloud bigquerystorage BOM @@ -52,37 +52,37 @@ com.google.cloud google-cloud-bigquerystorage - 2.41.1 + 2.44.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta1 - 0.165.1 + 0.168.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta2 - 0.165.1 + 0.168.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1 - 2.41.1 + 2.44.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta1 - 0.165.1 + 0.168.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta2 - 0.165.1 + 0.168.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1 - 2.41.1 + 2.44.0 diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index 35979d19d3..6d99d6a031 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -173,5 +173,20 @@ 1001 com/google/cloud/bigquery/storage/v1/StreamConnection + + 7002 + com/google/cloud/bigquery/storage/v1/StreamWriter$Builder + com.google.cloud.bigquery.storage.v1.StreamWriter$Builder setMaxRetryNumAttempts(int) + + + 7002 + com/google/cloud/bigquery/storage/v1/StreamWriter$Builder + com.google.cloud.bigquery.storage.v1.StreamWriter$Builder setRetryMultiplier(double) + + + 7002 + com/google/cloud/bigquery/storage/v1/StreamWriter$Builder + com.google.cloud.bigquery.storage.v1.StreamWriter$Builder setRetryFirstDelay(org.threeten.bp.Duration) + diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml index 9ec694162d..06ea19f2a6 100644 --- a/google-cloud-bigquerystorage/pom.xml +++ b/google-cloud-bigquerystorage/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.google.cloud google-cloud-bigquerystorage - 2.41.1 + 2.44.0 jar BigQuery Storage https://github.com/googleapis/java-bigquerystorage @@ -11,7 +11,7 @@ com.google.cloud google-cloud-bigquerystorage-parent - 2.41.1 + 2.44.0 google-cloud-bigquerystorage @@ -68,6 +68,14 @@ io.grpc grpc-protobuf + + io.grpc + grpc-util + + + io.grpc + grpc-util + com.google.api api-common @@ -75,7 +83,7 @@ com.google.auto.value auto-value - ${auto-value-annotation.version} + ${auto-value.version} com.google.auto.value @@ -178,7 +186,7 @@ org.apache.avro avro - 1.11.2 + 1.11.3 test @@ -196,10 +204,6 @@ com.google.code.findbugs jsr305 - - io.grpc - grpc-core - com.google.cloud google-cloud-core diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 2d4733c9fe..44673207ba 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -16,8 +16,12 @@ package com.google.cloud.bigquery.storage.v1; import com.google.api.core.ApiFuture; +import com.google.api.core.NanoClock; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.FlowController; +import com.google.api.gax.retrying.ExponentialRetryAlgorithm; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.auto.value.AutoValue; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation; @@ -82,7 +86,11 @@ class ConnectionWorker implements AutoCloseable { private Lock lock; private Condition hasMessageInWaitingQueue; private Condition inflightReduced; - private static Duration maxRetryDuration = Duration.ofMinutes(5); + /* + * Max retry duration when trying to establish a connection. This does not + * apply to in-stream retries. + */ + private final Duration maxRetryDuration; private ExecutorService threadPool = Executors.newFixedThreadPool(1); /* @@ -227,9 +235,29 @@ class ConnectionWorker implements AutoCloseable { private RuntimeException testOnlyRunTimeExceptionInAppendLoop = null; private long testOnlyAppendLoopSleepTime = 0; + /* + * Tracks the number of responses to ignore in the case of exclusive stream retry + */ + @GuardedBy("lock") + private int responsesToIgnore = 0; + + /* + * Contains settings related to in-stream retries. If retrySettings is null, + * this implies that no retries will occur on retryable in-stream errors. + */ + private final RetrySettings retrySettings; + private static String projectMatching = "projects/[^/]+/"; private static Pattern streamPatternProject = Pattern.compile(projectMatching); + static final Pattern DEFAULT_STREAM_PATTERN = + Pattern.compile("projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)/(streams/)?_default$"); + + public static Boolean isDefaultStreamName(String streamName) { + Matcher matcher = DEFAULT_STREAM_PATTERN.matcher(streamName); + return matcher.matches(); + } + /** The maximum size of one request. Defined by the API. */ public static long getApiMaxRequestBytes() { return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) @@ -260,7 +288,8 @@ public ConnectionWorker( FlowController.LimitExceededBehavior limitExceededBehavior, String traceId, @Nullable String compressorName, - BigQueryWriteSettings clientSettings) + BigQueryWriteSettings clientSettings, + RetrySettings retrySettings) throws IOException { this.lock = new ReentrantLock(); this.hasMessageInWaitingQueue = lock.newCondition(); @@ -269,7 +298,7 @@ public ConnectionWorker( if (location != null && !location.isEmpty()) { this.location = location; } - this.maxRetryDuration = maxRetryDuration; + this.maxRetryDuration = maxRetryDuration != null ? maxRetryDuration : Duration.ofMinutes(5); if (writerSchema == null) { throw new StatusRuntimeException( Status.fromCode(Code.INVALID_ARGUMENT) @@ -282,6 +311,7 @@ public ConnectionWorker( this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); this.compressorName = compressorName; + this.retrySettings = retrySettings; // Always recreate a client for connection worker. HashMap newHeaders = new HashMap<>(); newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); @@ -356,6 +386,57 @@ public void run(Throwable finalStatus) { log.info("Finish connecting stream: " + streamName + " id: " + writerId); } + @GuardedBy("lock") + private boolean shouldWaitForBackoff(AppendRequestAndResponse requestWrapper) { + if (this.retrySettings != null + && Instant.now().isBefore(requestWrapper.blockMessageSendDeadline)) { + log.fine( + String.format( + "Waiting for wait queue to unblock at %s for retry # %s", + requestWrapper.blockMessageSendDeadline, requestWrapper.retryCount)); + return true; + } + + return false; + } + + private void waitForBackoffIfNecessary(AppendRequestAndResponse requestWrapper) { + lock.lock(); + try { + Condition condition = lock.newCondition(); + while (shouldWaitForBackoff(requestWrapper)) { + condition.await(100, java.util.concurrent.TimeUnit.MILLISECONDS); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } finally { + lock.unlock(); + } + } + + @GuardedBy("lock") + private void addMessageToFrontOfWaitingQueue(AppendRequestAndResponse requestWrapper) { + addMessageToWaitingQueue(requestWrapper, /* addToFront= */ true); + } + + @GuardedBy("lock") + private void addMessageToBackOfWaitingQueue(AppendRequestAndResponse requestWrapper) { + addMessageToWaitingQueue(requestWrapper, /* addToFront= */ false); + } + + @GuardedBy("lock") + private void addMessageToWaitingQueue( + AppendRequestAndResponse requestWrapper, boolean addToFront) { + ++this.inflightRequests; + this.inflightBytes += requestWrapper.messageSize; + hasMessageInWaitingQueue.signal(); + if (addToFront) { + waitingRequestQueue.addFirst(requestWrapper); + } else { + waitingRequestQueue.add(requestWrapper); + } + } + /** Schedules the writing of rows at given offset. */ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, long offset) { if (this.location != null && !this.location.equals(streamWriter.getLocation())) { @@ -412,7 +493,8 @@ String getWriteLocation() { private ApiFuture appendInternal( StreamWriter streamWriter, AppendRowsRequest message) { - AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message, streamWriter); + AppendRequestAndResponse requestWrapper = + new AppendRequestAndResponse(message, streamWriter, this.retrySettings); if (requestWrapper.messageSize > getApiMaxRequestBytes()) { requestWrapper.appendResult.setException( new StatusRuntimeException( @@ -630,11 +712,16 @@ private void appendLoop() { while (!inflightRequestQueue.isEmpty()) { waitingRequestQueue.addFirst(inflightRequestQueue.pollLast()); } + + // If any of the inflight messages were meant to be ignored during requestCallback, they + // no longer will be able to make the round trip, so clear responsesToIgnore. + this.responsesToIgnore = 0; } while (!this.waitingRequestQueue.isEmpty()) { AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst(); + waitForBackoffIfNecessary(requestWrapper); requestWrapper.trySetRequestInsertQueueTime(); - this.inflightRequestQueue.addLast(requestWrapper); + this.inflightRequestQueue.add(requestWrapper); localQueue.addLast(requestWrapper); } } catch (InterruptedException e) { @@ -830,7 +917,7 @@ private void cleanupInflightRequests() { finalStatus = this.connectionFinalStatus; } while (!this.inflightRequestQueue.isEmpty()) { - localQueue.addLast(pollInflightRequestQueue()); + localQueue.addLast(pollFirstInflightRequestQueue()); } this.inflightCleanuped = true; } finally { @@ -866,6 +953,75 @@ private void cleanupInflightRequests() { } } + private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse requestWrapper) { + if (this.retrySettings == null) { + return false; + } + + if (this.retrySettings.getMaxAttempts() == 0) { + return false; + } + + if (!isConnectionErrorRetriable(errorCode) && errorCode != Code.RESOURCE_EXHAUSTED) { + return false; + } + + if (requestWrapper.retryCount < this.retrySettings.getMaxAttempts()) { + lock.lock(); + try { + requestWrapper.retryCount++; + if (this.retrySettings != null && errorCode == Code.RESOURCE_EXHAUSTED) { + // Trigger exponential backoff in append loop when request is resent for quota errors + if (requestWrapper.attemptSettings == null) { + requestWrapper.attemptSettings = requestWrapper.retryAlgorithm.createFirstAttempt(); + } else { + requestWrapper.attemptSettings = + requestWrapper.retryAlgorithm.createNextAttempt(requestWrapper.attemptSettings); + } + requestWrapper.blockMessageSendDeadline = + Instant.now().plusMillis(requestWrapper.attemptSettings.getRetryDelay().toMillis()); + } + + Long offset = + requestWrapper.message.hasOffset() ? requestWrapper.message.getOffset().getValue() : -1; + if (isDefaultStreamName(streamName) || offset == -1) { + log.fine( + String.format( + "Retrying default stream message in stream %s for in-stream error: %s, retry count:" + + " %s", + streamName, errorCode, requestWrapper.retryCount)); + addMessageToFrontOfWaitingQueue(requestWrapper); + } else { + log.fine( + String.format( + "Retrying exclusive message in stream %s at offset %d for in-stream error: %s, retry" + + " count: %s", + streamName, + requestWrapper.message.getOffset().getValue(), + errorCode, + requestWrapper.retryCount)); + // Send all inflight messages to front of queue + while (!inflightRequestQueue.isEmpty()) { + AppendRequestAndResponse element = pollLastInflightRequestQueue(); + addMessageToFrontOfWaitingQueue(element); + responsesToIgnore++; + } + + addMessageToFrontOfWaitingQueue(requestWrapper); + } + return true; + } finally { + lock.unlock(); + } + } + + log.info( + String.format( + "Max retry count reached for message in stream %s at offset %d. Retry count: %d", + streamName, requestWrapper.message.getOffset().getValue(), requestWrapper.retryCount)); + return false; + } + private void requestCallback(AppendRowsResponse response) { if (response.hasUpdatedSchema()) { AppendRowsResponse responseWithUpdatedSchemaRemoved = @@ -880,11 +1036,28 @@ private void requestCallback(AppendRowsResponse response) { AppendRequestAndResponse requestWrapper; this.lock.lock(); - if (response.hasUpdatedSchema()) { - this.updatedSchema = - TableSchemaAndTimestamp.create(System.nanoTime(), response.getUpdatedSchema()); - } try { + // Ignored response has arrived + if (responsesToIgnore > 0) { + if (response.hasError()) { + log.fine( + String.format("Ignoring response in stream %s at offset %s.", streamName, response)); + } else { + log.warning( + String.format( + "Unexpected successful response in stream %s at offset %s. Due to a previous" + + " retryable error being inflight, this message is being ignored.", + streamName, response.getAppendResult().getOffset())); + } + + responsesToIgnore--; + return; + } + + if (response.hasUpdatedSchema()) { + this.updatedSchema = + TableSchemaAndTimestamp.create(System.nanoTime(), response.getUpdatedSchema()); + } // Had a successful connection with at least one result, reset retries. // conectionRetryCountWithoutCallback is reset so that only multiple retries, without // successful records sent, will cause the stream to fail. @@ -895,7 +1068,7 @@ private void requestCallback(AppendRowsResponse response) { connectionRetryStartTime = 0; } if (!this.inflightRequestQueue.isEmpty()) { - requestWrapper = pollInflightRequestQueue(); + requestWrapper = pollFirstInflightRequestQueue(); } else if (inflightCleanuped) { // It is possible when requestCallback is called, the inflight queue is already drained // because we timed out waiting for done. @@ -913,7 +1086,14 @@ private void requestCallback(AppendRowsResponse response) { this.lock.unlock(); } - // We need a separte thread pool to unblock the next request callback. + // Retries need to happen on the same thread as queue locking may occur + if (response.hasError()) { + if (retryOnRetryableError(Code.values()[response.getError().getCode()], requestWrapper)) { + return; + } + } + + // We need a separate thread pool to unblock the next request callback. // Otherwise user may call append inside request callback, which may be blocked on waiting // on in flight quota, causing deadlock as requests can't be popped out of queue until // the current request callback finishes. @@ -952,13 +1132,12 @@ private void requestCallback(AppendRowsResponse response) { }); } - private boolean isConnectionErrorRetriable(Throwable t) { - Status status = Status.fromThrowable(t); - return status.getCode() == Code.ABORTED - || status.getCode() == Code.UNAVAILABLE - || status.getCode() == Code.CANCELLED - || status.getCode() == Code.INTERNAL - || status.getCode() == Code.DEADLINE_EXCEEDED; + private boolean isConnectionErrorRetriable(Code statusCode) { + return statusCode == Code.ABORTED + || statusCode == Code.UNAVAILABLE + || statusCode == Code.CANCELLED + || statusCode == Code.INTERNAL + || statusCode == Code.DEADLINE_EXCEEDED; } private void doneCallback(Throwable finalStatus) { @@ -977,7 +1156,7 @@ private void doneCallback(Throwable finalStatus) { connectionRetryStartTime = System.currentTimeMillis(); } // If the error can be retried, don't set it here, let it try to retry later on. - if (isConnectionErrorRetriable(finalStatus) + if (isConnectionErrorRetriable(Status.fromThrowable(finalStatus).getCode()) && !userClosed && (maxRetryDuration.toMillis() == 0f || System.currentTimeMillis() - connectionRetryStartTime @@ -1013,14 +1192,25 @@ private void doneCallback(Throwable finalStatus) { } @GuardedBy("lock") - private AppendRequestAndResponse pollInflightRequestQueue() { - AppendRequestAndResponse requestWrapper = this.inflightRequestQueue.pollFirst(); + private AppendRequestAndResponse pollInflightRequestQueue(boolean pollLast) { + AppendRequestAndResponse requestWrapper = + pollLast ? inflightRequestQueue.pollLast() : inflightRequestQueue.poll(); --this.inflightRequests; this.inflightBytes -= requestWrapper.messageSize; this.inflightReduced.signal(); return requestWrapper; } + @GuardedBy("lock") + private AppendRequestAndResponse pollLastInflightRequestQueue() { + return pollInflightRequestQueue(/* pollLast= */ true); + } + + @GuardedBy("lock") + private AppendRequestAndResponse pollFirstInflightRequestQueue() { + return pollInflightRequestQueue(/* pollLast= */ false); + } + /** Thread-safe getter of updated TableSchema */ synchronized TableSchemaAndTimestamp getUpdatedSchema() { return this.updatedSchema; @@ -1032,17 +1222,36 @@ static final class AppendRequestAndResponse { final SettableApiFuture appendResult; final AppendRowsRequest message; final long messageSize; + // Used to determine the point at which appendLoop is able to process messages from the waiting + // queue. This is used to process errors that support exponential backoff retry. + Instant blockMessageSendDeadline; + + Integer retryCount; + ExponentialRetryAlgorithm retryAlgorithm; // The writer that issues the call of the request. final StreamWriter streamWriter; + TimedAttemptSettings attemptSettings; + Instant requestCreationTimeStamp; - AppendRequestAndResponse(AppendRowsRequest message, StreamWriter streamWriter) { + AppendRequestAndResponse( + AppendRowsRequest message, StreamWriter streamWriter, RetrySettings retrySettings) { this.appendResult = SettableApiFuture.create(); this.message = message; this.messageSize = message.getProtoRows().getSerializedSize(); this.streamWriter = streamWriter; + this.blockMessageSendDeadline = Instant.now(); + this.retryCount = 0; + // To be set after first retry + this.attemptSettings = null; + if (retrySettings != null) { + this.retryAlgorithm = + new ExponentialRetryAlgorithm(retrySettings, NanoClock.getDefaultClock()); + } else { + this.retryAlgorithm = null; + } } void trySetRequestInsertQueueTime() { diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 1530d48afc..cbf9b8a839 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -18,6 +18,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.gax.batching.FlowController; +import com.google.api.gax.retrying.RetrySettings; import com.google.auto.value.AutoValue; import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load; import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp; @@ -65,6 +66,8 @@ public class ConnectionWorkerPool { */ private final java.time.Duration maxRetryDuration; + private RetrySettings retrySettings; + /* * Behavior when inflight queue is exceeded. Only supports Block or Throw, default is Block. */ @@ -214,6 +217,8 @@ public abstract static class Builder { this.compressorName = comperssorName; this.clientSettings = clientSettings; this.currentMaxConnectionCount = settings.minConnectionsPerRegion(); + // In-stream retry is not enabled for multiplexing. + this.retrySettings = null; } /** @@ -387,7 +392,8 @@ private ConnectionWorker createConnectionWorker( limitExceededBehavior, traceId, compressorName, - clientSettings); + clientSettings, + retrySettings); connectionWorkerPool.add(connectionWorker); log.info( String.format( diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 6c7a8b89df..548941ae51 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -19,6 +19,7 @@ import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.protobuf.Descriptors; import java.io.IOException; @@ -346,6 +347,17 @@ public Builder setCompressorName(String compressorName) { return this; } + /** + * Sets the RetrySettings to use for in-stream error retry. + * + * @param retrySettings + * @return Builder + */ + public Builder setRetrySettings(RetrySettings retrySettings) { + this.schemaAwareStreamWriterBuilder.setRetrySettings(retrySettings); + return this; + } + /** * Sets the default missing value interpretation value if the column is not presented in the * missing_value_interpretations map. diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java index 8082ae0340..fced8ccd1e 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java @@ -19,6 +19,7 @@ import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError; @@ -95,7 +96,8 @@ private SchemaAwareStreamWriter(Builder builder) builder.flowControlSettings, builder.traceIdBase, builder.traceId, - builder.compressorName); + builder.compressorName, + builder.retrySettings); streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool); streamWriterBuilder.setLocation(builder.location); streamWriterBuilder.setDefaultMissingValueInterpretation( @@ -282,7 +284,8 @@ private void setStreamWriterSettings( @Nullable FlowControlSettings flowControlSettings, @Nullable String traceIdBase, @Nullable String traceId, - @Nullable String compressorName) { + @Nullable String compressorName, + @Nullable RetrySettings retrySettings) { if (channelProvider != null) { streamWriterBuilder.setChannelProvider(channelProvider); } @@ -325,6 +328,9 @@ private void setStreamWriterSettings( if (compressorName != null) { streamWriterBuilder.setCompressorName(compressorName); } + if (retrySettings != null) { + streamWriterBuilder.setRetrySettings(retrySettings); + } } /** @@ -435,6 +441,7 @@ public static final class Builder { private boolean enableConnectionPool = false; private String location; private String compressorName; + private RetrySettings retrySettings; private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation = MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED; @@ -643,6 +650,17 @@ public Builder setDefaultMissingValueInterpretation( return this; } + /** + * Sets the RetrySettings to use for in-stream error retry. + * + * @param retrySettings + * @return Builder + */ + public Builder setRetrySettings(RetrySettings retrySettings) { + this.retrySettings = retrySettings; + return this; + } + /** * Builds SchemaAwareStreamWriter * diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 510f11ceca..538bec4e32 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -19,6 +19,7 @@ import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auto.value.AutoOneOf; import com.google.auto.value.AutoValue; @@ -225,7 +226,8 @@ private StreamWriter(Builder builder) throws IOException { builder.limitExceededBehavior, builder.traceId, builder.compressorName, - clientSettings)); + clientSettings, + builder.retrySettings)); } else { if (!isDefaultStream(streamName)) { log.warning( @@ -235,6 +237,12 @@ private StreamWriter(Builder builder) throws IOException { "Trying to enable connection pool in non-default stream."); } + if (builder.retrySettings != null) { + log.warning("Retry settings is only allowed when connection pool is not enabled."); + throw new IllegalArgumentException( + "Trying to enable connection pool while providing retry settings."); + } + // We need a client to perform some getWriteStream calls. BigQueryWriteClient client = builder.client != null ? builder.client : new BigQueryWriteClient(clientSettings); @@ -433,7 +441,7 @@ public ApiFuture append(ProtoRows rows) { public ApiFuture append(ProtoRows rows, long offset) { if (userClosed.get()) { AppendRequestAndResponse requestWrapper = - new AppendRequestAndResponse(AppendRowsRequest.newBuilder().build(), this); + new AppendRequestAndResponse(AppendRowsRequest.newBuilder().build(), this, null); requestWrapper.appendResult.setException( new Exceptions.StreamWriterClosedException( Status.fromCode(Status.Code.FAILED_PRECONDITION) @@ -619,6 +627,8 @@ public static final class Builder { private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation = MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED; + private RetrySettings retrySettings = null; + private Builder(String streamName) { this.streamName = Preconditions.checkNotNull(streamName); this.client = null; @@ -756,6 +766,11 @@ public Builder setDefaultMissingValueInterpretation( return this; } + public Builder setRetrySettings(RetrySettings retrySettings) { + this.retrySettings = retrySettings; + return this; + } + /** Builds the {@code StreamWriterV2}. */ public StreamWriter build() throws IOException { return new StreamWriter(this); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index ba037423fb..3acf7d1349 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -25,6 +25,7 @@ import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.testing.MockGrpcService; import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.storage.test.Test.ComplicateType; import com.google.cloud.bigquery.storage.test.Test.FooType; import com.google.cloud.bigquery.storage.test.Test.InnerType; @@ -51,6 +52,13 @@ public class ConnectionWorkerTest { private static final String TEST_STREAM_1 = "projects/p1/datasets/d1/tables/t1/streams/s1"; private static final String TEST_STREAM_2 = "projects/p2/datasets/d2/tables/t2/streams/s2"; private static final String TEST_TRACE_ID = "DATAFLOW:job_id"; + private static final RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(org.threeten.bp.Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(3) + .setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(5)) + .build(); private FakeBigQueryWrite testBigQueryWrite; private FakeScheduledExecutorService fakeExecutor; @@ -334,7 +342,8 @@ public void testAppendButInflightQueueFull() throws Exception { FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, null, - client.getSettings()); + client.getSettings(), + retrySettings); testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); ConnectionWorker.setMaxInflightQueueWaitTime(500); @@ -390,7 +399,8 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception { FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, null, - client.getSettings()); + client.getSettings(), + retrySettings); testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); ConnectionWorker.setMaxInflightQueueWaitTime(500); @@ -458,7 +468,8 @@ public void testLocationMismatch() throws Exception { FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, null, - client.getSettings()); + client.getSettings(), + retrySettings); StatusRuntimeException ex = assertThrows( StatusRuntimeException.class, @@ -489,7 +500,8 @@ public void testStreamNameMismatch() throws Exception { FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, null, - client.getSettings()); + client.getSettings(), + retrySettings); StatusRuntimeException ex = assertThrows( StatusRuntimeException.class, @@ -541,7 +553,8 @@ private ConnectionWorker createConnectionWorker( FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, null, - client.getSettings()); + client.getSettings(), + retrySettings); } private ProtoSchema createProtoSchema(String protoName) { @@ -635,7 +648,8 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, null, - client.getSettings()); + client.getSettings(), + retrySettings); testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3)); long appendCount = 10; @@ -696,7 +710,8 @@ public void testLongTimeIdleWontFail() throws Exception { FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, null, - client.getSettings()); + client.getSettings(), + retrySettings); long appendCount = 10; for (int i = 0; i < appendCount * 2; i++) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java index 5f697185f1..a31cc145a6 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java @@ -22,6 +22,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; import org.threeten.bp.Duration; /** @@ -65,11 +66,23 @@ public void addResponse(AbstractMessage response) { } } + /** + * Add a response supplier to end of list. This supplier can be used to simulate retries or other + * forms of behavior. + */ + public void addResponse(Supplier response) { + serviceImpl.addResponse(response); + } + @Override public void addException(Exception exception) { serviceImpl.addConnectionError(exception); } + public void addStatusException(com.google.rpc.Status status) { + serviceImpl.addException(status); + } + @Override public ServerServiceDefinition getServiceDefinition() { return serviceImpl.bindService(); @@ -107,4 +120,12 @@ public void setExecutor(ScheduledExecutorService executor) { public void setFailedStatus(Status failedStatus) { serviceImpl.setFailedStatus(failedStatus); } + + public void setReturnErrorDuringExclusiveStreamRetry(boolean retryOnError) { + serviceImpl.setReturnErrorDuringExclusiveStreamRetry(retryOnError); + } + + public void setVerifyOffset(boolean verifyOffset) { + serviceImpl.setVerifyOffset(verifyOffset); + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java index e406fb03b6..16f3feea3c 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java @@ -17,6 +17,7 @@ import com.google.common.base.Optional; import com.google.common.util.concurrent.Uninterruptibles; +import com.google.rpc.Code; import io.grpc.Status; import io.grpc.stub.StreamObserver; import java.util.ArrayList; @@ -29,6 +30,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.logging.Logger; import org.threeten.bp.Duration; @@ -37,13 +39,14 @@ * unit testing. */ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase { - private static final Logger LOG = Logger.getLogger(FakeBigQueryWriteImpl.class.getName()); + private static final Logger LOG = Logger.getLogger(FakeBigQueryWriteImpl.class.getName()); + private final List> responses = + Collections.synchronizedList(new ArrayList<>()); private final LinkedBlockingQueue requests = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue writeRequests = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue flushRequests = new LinkedBlockingQueue<>(); - private final List responses = Collections.synchronizedList(new ArrayList<>()); private final LinkedBlockingQueue writeResponses = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue flushResponses = new LinkedBlockingQueue<>(); private final AtomicInteger nextMessageId = new AtomicInteger(1); @@ -58,6 +61,13 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase { private long recordCount = 0; private long connectionCount = 0; private long closeForeverAfter = 0; + private int responseIndex = 0; + private long expectedOffset = 0; + private boolean verifyOffset = false; + private boolean returnErrorDuringExclusiveStreamRetry = false; + private boolean returnErrorUntilRetrySuccess = false; + private Response retryResponse; + private long retryingOffset = -1; // Record whether the first record has been seen on a connection. private final Map, Boolean> connectionToFirstRequest = @@ -65,7 +75,8 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase { private Status failedStatus = Status.ABORTED; /** Class used to save the state of a possible response. */ - private static class Response { + public static class Response { + Optional appendResponse; Optional error; @@ -143,6 +154,40 @@ public void setFailedStatus(Status failedStatus) { this.failedStatus = failedStatus; } + private Response determineResponse(long offset) { + // The logic here checks to see if a retry is ongoing. The implication is that the + // offset that is being retried (retryingOffset) should lead to returning the same error + // over and over until a request eventually resolves, instead of calling get() on + // suppliers that, in the future, may be expected to trigger full retry loops. + Response response; + // Retry is in progress and the offset isn't the retrying offset; return saved response + if (returnErrorUntilRetrySuccess && offset != retryingOffset) { + response = retryResponse; + } else { + // We received the retryingOffset OR we aren't in retry mode; get response as + // expected. + // In case of connection reset: normally each response will only be sent once. But, if the + // stream is aborted, the last few responses may not be received, and the client will request + // them again. + response = responses.get(Math.toIntExact(offset)).get(); + // If we are in retry mode and don't have an error, clear retry variables + if (returnErrorUntilRetrySuccess && !response.getResponse().hasError()) { + retryingOffset = -1; + retryResponse = null; + } + } + + returnErrorUntilRetrySuccess = + returnErrorDuringExclusiveStreamRetry && response.getResponse().hasError(); + // If this is a new retry cycle, set retry variables + if (retryingOffset == -1 && returnErrorUntilRetrySuccess) { + retryingOffset = offset; + retryResponse = response; + } + + return response; + } + @Override public StreamObserver appendRows( final StreamObserver responseObserver) { @@ -152,15 +197,15 @@ public StreamObserver appendRows( new StreamObserver() { @Override public void onNext(AppendRowsRequest value) { - LOG.fine("Get request:" + value.toString()); - requests.add(value); recordCount++; - int offset = (int) (recordCount - 1); - if (value.hasOffset() && value.getOffset().getValue() != -1) { - offset = (int) value.getOffset().getValue(); + requests.add(value); + long offset = value.getOffset().getValue(); + if (offset == -1 || !value.hasOffset()) { + offset = responseIndex; } + responseIndex++; if (responseSleep.compareTo(Duration.ZERO) > 0) { - LOG.fine("Sleeping before response for " + responseSleep.toString()); + LOG.info("Sleeping before response for " + responseSleep.toString()); Uninterruptibles.sleepUninterruptibly( responseSleep.toMillis(), TimeUnit.MILLISECONDS); } @@ -179,6 +224,7 @@ public void onNext(AppendRowsRequest value) { } connectionToFirstRequest.put(responseObserver, false); if (closeAfter > 0 + && responseIndex % closeAfter == 0 && recordCount % closeAfter == 0 && (numberTimesToClose == 0 || connectionCount <= numberTimesToClose)) { LOG.info("Shutting down connection from test..."); @@ -187,7 +233,26 @@ public void onNext(AppendRowsRequest value) { LOG.info("Shutting down connection from test..."); responseObserver.onError(failedStatus.asException()); } else { - final Response response = responses.get(offset); + Response response = determineResponse(offset); + if (verifyOffset + && !response.getResponse().hasError() + && response.getResponse().getAppendResult().getOffset().getValue() > -1) { + // No error and offset is present; verify order + if (response.getResponse().getAppendResult().getOffset().getValue() + != expectedOffset) { + com.google.rpc.Status status = + com.google.rpc.Status.newBuilder().setCode(Code.INTERNAL_VALUE).build(); + response = new Response(AppendRowsResponse.newBuilder().setError(status).build()); + } else { + LOG.info( + String.format( + "asserted offset: %s expected: %s", + response.getResponse().getAppendResult().getOffset().getValue(), + expectedOffset)); + LOG.info(String.format("sending response: %s", response.getResponse())); + expectedOffset++; + } + } sendResponse(response, responseObserver); } } @@ -207,7 +272,6 @@ public void onCompleted() { private void sendResponse( Response response, StreamObserver responseObserver) { - LOG.fine("Sending response: " + response.toString()); if (response.isError()) { responseObserver.onError(response.getError()); } else { @@ -227,13 +291,20 @@ public FakeBigQueryWriteImpl setResponseSleep(Duration responseSleep) { return this; } - public FakeBigQueryWriteImpl addResponse(AppendRowsResponse appendRowsResponse) { - responses.add(new Response(appendRowsResponse)); - return this; + /** + * Add a response to end of list. Response can be either an record, or an exception. All repsones + * must be set up before any rows are appended. + */ + public void addResponse(AppendRowsResponse appendRowsResponse) { + responses.add(() -> new Response(appendRowsResponse)); } - public FakeBigQueryWriteImpl addResponse(AppendRowsResponse.Builder appendResponseBuilder) { - return addResponse(appendResponseBuilder.build()); + /** + * Add a response supplier to end of list. This supplier can be used to simulate retries or other + * forms of behavior. + */ + public void addResponse(Supplier response) { + responses.add(response); } public FakeBigQueryWriteImpl addWriteStreamResponse(WriteStream response) { @@ -247,10 +318,34 @@ public FakeBigQueryWriteImpl addFlushRowsResponse(FlushRowsResponse response) { } public FakeBigQueryWriteImpl addConnectionError(Throwable error) { - responses.add(new Response(error)); + responses.add(() -> new Response(error)); return this; } + /** + * Returns the given status, instead of a valid response. This should be treated as an exception + * on the other side. This will not stop processing. + */ + public void addException(com.google.rpc.Status status) { + responses.add(() -> new Response(AppendRowsResponse.newBuilder().setError(status).build())); + } + + /** + * Will abort the connection instead of return a valid response. This should NOT be used to return + * a retriable error (as that will cause an infinite loop.) + */ + public void addNonRetriableError(com.google.rpc.Status status) { + responses.add(() -> new Response(AppendRowsResponse.newBuilder().setError(status).build())); + } + + public void setVerifyOffset(boolean verifyOffset) { + this.verifyOffset = verifyOffset; + } + + public void setReturnErrorDuringExclusiveStreamRetry(boolean retryOnError) { + this.returnErrorDuringExclusiveStreamRetry = retryOnError; + } + public List getCapturedRequests() { return new ArrayList(requests); } @@ -276,6 +371,7 @@ public void reset() { public void setCloseEveryNAppends(long closeAfter) { this.closeAfter = closeAfter; } + /* If setCloseEveryNAppends is greater than 0, then the stream will be aborted every N appends. * setTimesToClose will limit the number of times to do the abort. If it is set to 0, it will * abort every N appends. diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index c07b86e17d..ee18e9e68d 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -32,6 +32,7 @@ import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.grpc.testing.MockGrpcService; import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.AbortedException; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.InvalidArgumentException; @@ -63,6 +64,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; import java.util.logging.Logger; import org.junit.After; import org.junit.Assert; @@ -81,8 +83,16 @@ public class StreamWriterTest { private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default"; private static final String TEST_STREAM_3 = "projects/p/datasets/d3/tables/t3/streams/_default"; private static final String TEST_STREAM_SHORTEN = "projects/p/datasets/d2/tables/t2/_default"; - private static final String EXPLICIT_STEAM = "projects/p/datasets/d1/tables/t1/streams/s1"; + private static final String EXPLICIT_STREAM = "projects/p/datasets/d1/tables/t1/streams/s1"; private static final String TEST_TRACE_ID = "DATAFLOW:job_id"; + private static final int MAX_RETRY_NUM_ATTEMPTS = 3; + private static final RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(MAX_RETRY_NUM_ATTEMPTS) + .setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(5)) + .build(); private FakeScheduledExecutorService fakeExecutor; private FakeBigQueryWrite testBigQueryWrite; private static MockServiceHelper serviceHelper; @@ -159,6 +169,24 @@ private StreamWriter getTestStreamWriter() throws IOException { .build(); } + private StreamWriter getTestStreamWriterRetryEnabled() throws IOException { + return StreamWriter.newBuilder(TEST_STREAM_1, client) + .setWriterSchema(createProtoSchema()) + .setTraceId(TEST_TRACE_ID) + .setMaxRetryDuration(java.time.Duration.ofSeconds(5)) + .setRetrySettings(retrySettings) + .build(); + } + + private StreamWriter getTestStreamWriterExclusiveRetryEnabled() throws IOException { + return StreamWriter.newBuilder(EXPLICIT_STREAM, client) + .setWriterSchema(createProtoSchema()) + .setTraceId(TEST_TRACE_ID) + .setMaxRetryDuration(java.time.Duration.ofSeconds(5)) + .setRetrySettings(retrySettings) + .build(); + } + private ProtoSchema createProtoSchema() { return createProtoSchema("foo"); } @@ -276,6 +304,39 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception { writer.close(); } + /* DummyResponseSupplierWillFailThenSucceed is used to mock repeated failures, such as retriable + * in-stream errors. This Supplier will fail up to totalFailCount with status failStatus. Once + * totalFailCount is reached, then the provided Response will be returned instead. + */ + private static class DummyResponseSupplierWillFailThenSucceed + implements Supplier { + + private final int totalFailCount; + private int failCount; + private final com.google.rpc.Status failStatus; + private final FakeBigQueryWriteImpl.Response response; + + DummyResponseSupplierWillFailThenSucceed( + FakeBigQueryWriteImpl.Response response, + int totalFailCount, + com.google.rpc.Status failStatus) { + this.totalFailCount = totalFailCount; + this.response = response; + this.failStatus = failStatus; + this.failCount = 0; + } + + @Override + public FakeBigQueryWriteImpl.Response get() { + if (failCount >= totalFailCount) { + return response; + } + failCount++; + return new FakeBigQueryWriteImpl.Response( + AppendRowsResponse.newBuilder().setError(this.failStatus).build()); + } + } + @Test public void testAppendSuccess() throws Exception { StreamWriter writer = getTestStreamWriter(); @@ -454,7 +515,7 @@ public void testEnableConnectionPoolOnExplicitStream() throws Exception { new ThrowingRunnable() { @Override public void run() throws Throwable { - StreamWriter.newBuilder(EXPLICIT_STEAM, client) + StreamWriter.newBuilder(EXPLICIT_STREAM, client) .setEnableConnectionPool(true) .build(); } @@ -471,6 +532,25 @@ public void testShortenStreamNameAllowed() throws Exception { .build(); } + @Test + public void testNoRetryWhenConnectionPoolEnabled() throws Exception { + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + StreamWriter.newBuilder(TEST_STREAM_SHORTEN, client) + .setEnableConnectionPool(true) + .setRetrySettings(RetrySettings.newBuilder().build()) + .build(); + } + }); + assertTrue( + ex.getMessage() + .contains("Trying to enable connection pool while providing retry settings.")); + } + @Test public void testAppendSuccessAndConnectionError() throws Exception { StreamWriter writer = @@ -528,7 +608,7 @@ public void testAppendFailedSchemaError() throws Exception { .build(); com.google.rpc.Status statusProto = com.google.rpc.Status.newBuilder() - .setCode(Code.INVALID_ARGUMENT.getHttpStatusCode()) + .setCode(Code.INVALID_ARGUMENT.ordinal()) .addDetails(Any.pack(storageError)) .build(); @@ -1587,4 +1667,388 @@ public void testBuilderExplicitSetting() throws Exception { .getScopesToApply() .size()); } + + @Test + public void testAppendSuccessAndInternalErrorRetrySuccess() throws Exception { + StreamWriter writer = getTestStreamWriterRetryEnabled(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addStatusException( + com.google.rpc.Status.newBuilder().setCode(Code.INTERNAL.ordinal()).build()); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addResponse(createAppendResponse(0)); + + ApiFuture appendFuture1 = + writer.append(createProtoRows(new String[] {"A"})); + ApiFuture appendFuture2 = + writer.append(createProtoRows(new String[] {"B"})); + ApiFuture appendFuture3 = + writer.append(createProtoRows(new String[] {"C"})); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(0, appendFuture2.get().getAppendResult().getOffset().getValue()); + assertEquals(0, appendFuture3.get().getAppendResult().getOffset().getValue()); + + writer.close(); + } + + @Test + public void testAppendSuccessAndInternalQuotaErrorRetrySuccess() throws Exception { + StreamWriter writer = getTestStreamWriterRetryEnabled(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addStatusException( + com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build()); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addResponse(createAppendResponse(0)); + + ApiFuture appendFuture1 = + writer.append(createProtoRows(new String[] {"A"})); + ApiFuture appendFuture2 = + writer.append(createProtoRows(new String[] {"B"})); + ApiFuture appendFuture3 = + writer.append(createProtoRows(new String[] {"C"})); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(0, appendFuture2.get().getAppendResult().getOffset().getValue()); + assertEquals(0, appendFuture3.get().getAppendResult().getOffset().getValue()); + + writer.close(); + } + + @Test + public void testAppendSuccessAndInternalErrorRetrySuccessExclusive() throws Exception { + // Ensure we return an error from the fake server when a retry is in progress + testBigQueryWrite.setReturnErrorDuringExclusiveStreamRetry(true); + // Ensure messages will be in the inflight queue + testBigQueryWrite.setVerifyOffset(true); + StreamWriter writer = getTestStreamWriterExclusiveRetryEnabled(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addResponse( + new DummyResponseSupplierWillFailThenSucceed( + new FakeBigQueryWriteImpl.Response(createAppendResponse(1)), + /* totalFailCount= */ MAX_RETRY_NUM_ATTEMPTS, + com.google.rpc.Status.newBuilder().setCode(Code.INTERNAL.ordinal()).build())); + testBigQueryWrite.addResponse(createAppendResponse(2)); + + ApiFuture appendFuture1 = + writer.append(createProtoRows(new String[] {"A"}), 0); + ApiFuture appendFuture2 = + writer.append(createProtoRows(new String[] {"B"}), 1); + ApiFuture appendFuture3 = + writer.append(createProtoRows(new String[] {"C"}), 2); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue()); + assertEquals(2, appendFuture3.get().getAppendResult().getOffset().getValue()); + + writer.close(); + } + + @Test + public void testAppendSuccessAndInternalErrorRetryNoOffsetSuccessExclusive() throws Exception { + StreamWriter writer = getTestStreamWriterExclusiveRetryEnabled(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addStatusException( + com.google.rpc.Status.newBuilder().setCode(Code.INTERNAL.ordinal()).build()); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + ApiFuture appendFuture1 = + writer.append(createProtoRows(new String[] {"A"})); + ApiFuture appendFuture2 = + writer.append(createProtoRows(new String[] {"B"})); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue()); + + writer.close(); + } + + @Test + public void testAppendSuccessAndQuotaErrorRetryNoOffsetSuccessExclusive() throws Exception { + StreamWriter writer = getTestStreamWriterExclusiveRetryEnabled(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addStatusException( + com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build()); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + ApiFuture appendFuture1 = + writer.append(createProtoRows(new String[] {"A"})); + ApiFuture appendFuture2 = + writer.append(createProtoRows(new String[] {"B"})); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue()); + + writer.close(); + } + + @Test + public void testExclusiveAppendSuccessAndInternalErrorRetrySuccess() throws Exception { + // Ensure we return an error from the fake server when a retry is in progress + testBigQueryWrite.setReturnErrorDuringExclusiveStreamRetry(true); + // Ensure messages will be in the inflight queue + testBigQueryWrite.setVerifyOffset(true); + // fakeBigQueryWrite.setResponseSleep(Duration.ofSeconds(3)); + StreamWriter writer = getTestStreamWriterExclusiveRetryEnabled(); + long appendCount = 20; + for (long i = 0; i < appendCount; i++) { + // Add a retriable error every 3 messages + if (i % 3 == 0) { + testBigQueryWrite.addResponse( + new DummyResponseSupplierWillFailThenSucceed( + new FakeBigQueryWriteImpl.Response(createAppendResponse(i)), + /* totalFailCount= */ MAX_RETRY_NUM_ATTEMPTS, + com.google.rpc.Status.newBuilder().setCode(Code.INTERNAL.ordinal()).build())); + } else { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + } + + List> futures = new ArrayList<>(); + for (long i = 0; i < appendCount; i++) { + futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i)); + } + + for (int i = 0; i < appendCount; i++) { + assertThat(futures.get(i).get().getAppendResult().getOffset().getValue()).isEqualTo((long) i); + } + } + + @Test + public void testExclusiveAppendSuccessAndQuotaErrorRetrySuccess() throws Exception { + // Ensure we return an error from the fake server when a retry is in progress + testBigQueryWrite.setReturnErrorDuringExclusiveStreamRetry(true); + // Ensure messages will be in the inflight queue + testBigQueryWrite.setVerifyOffset(true); + // fakeBigQueryWrite.setResponseSleep(Duration.ofSeconds(3)); + StreamWriter writer = getTestStreamWriterExclusiveRetryEnabled(); + long appendCount = 20; + for (long i = 0; i < appendCount; i++) { + // Add a retriable error every 3 messages + if (i % 3 == 0) { + testBigQueryWrite.addResponse( + new DummyResponseSupplierWillFailThenSucceed( + new FakeBigQueryWriteImpl.Response(createAppendResponse(i)), + /* totalFailCount= */ MAX_RETRY_NUM_ATTEMPTS, + com.google.rpc.Status.newBuilder() + .setCode(Code.RESOURCE_EXHAUSTED.ordinal()) + .build())); + } else { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + } + + List> futures = new ArrayList<>(); + for (long i = 0; i < appendCount; i++) { + futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i)); + } + + for (int i = 0; i < appendCount; i++) { + assertThat(futures.get(i).get().getAppendResult().getOffset().getValue()).isEqualTo((long) i); + } + } + + @Test + public void testAppendSuccessAndQuotaErrorRetrySuccessExclusive() throws Exception { + StreamWriter writer = getTestStreamWriterExclusiveRetryEnabled(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addResponse( + new DummyResponseSupplierWillFailThenSucceed( + new FakeBigQueryWriteImpl.Response(createAppendResponse(1)), + /* totalFailCount= */ MAX_RETRY_NUM_ATTEMPTS, + com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build())); + + ApiFuture appendFuture1 = + writer.append(createProtoRows(new String[] {"A"}), 0); + ApiFuture appendFuture2 = + writer.append(createProtoRows(new String[] {"B"}), 1); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue()); + + writer.close(); + } + + @Test + public void testAppendSuccessAndInternalErrorMaxRetryNumAttempts() throws Exception { + StreamWriter writer = getTestStreamWriterRetryEnabled(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addStatusException( + com.google.rpc.Status.newBuilder().setCode(Code.INTERNAL.ordinal()).build()); + testBigQueryWrite.addStatusException( + com.google.rpc.Status.newBuilder().setCode(Code.INTERNAL.ordinal()).build()); + testBigQueryWrite.addStatusException( + com.google.rpc.Status.newBuilder().setCode(Code.INTERNAL.ordinal()).build()); + testBigQueryWrite.addStatusException( + com.google.rpc.Status.newBuilder().setCode(Code.INTERNAL.ordinal()).build()); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + ApiFuture appendFuture1 = + writer.append(createProtoRows(new String[] {"A"})); + ApiFuture appendFuture2 = + writer.append(createProtoRows(new String[] {"B"})); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> { + appendFuture2.get(); + }); + assertEquals( + Status.Code.INTERNAL, ((StatusRuntimeException) ex.getCause()).getStatus().getCode()); + } + + @Test + public void testAppendSuccessAndQuotaErrorMaxRetryNumAttempts() throws Exception { + StreamWriter writer = getTestStreamWriterRetryEnabled(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addStatusException( + com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build()); + testBigQueryWrite.addStatusException( + com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build()); + testBigQueryWrite.addStatusException( + com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build()); + testBigQueryWrite.addStatusException( + com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build()); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + ApiFuture appendFuture1 = + writer.append(createProtoRows(new String[] {"A"})); + ApiFuture appendFuture2 = + writer.append(createProtoRows(new String[] {"B"})); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> { + appendFuture2.get(); + }); + assertEquals( + Status.Code.RESOURCE_EXHAUSTED, + ((StatusRuntimeException) ex.getCause()).getStatus().getCode()); + } + + @Test + public void testExclusiveAppendSuccessAndInternalErrorRetryMaxRetry() throws Exception { + testBigQueryWrite.setReturnErrorDuringExclusiveStreamRetry(true); + // Ensure messages will be in the inflight queue + testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1)); + StreamWriter writer = getTestStreamWriterExclusiveRetryEnabled(); + + int appendCount = 10; + for (long i = 0; i < appendCount - 1; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + testBigQueryWrite.addResponse( + new DummyResponseSupplierWillFailThenSucceed( + new FakeBigQueryWriteImpl.Response(createAppendResponse(appendCount)), + /* totalFailCount= */ MAX_RETRY_NUM_ATTEMPTS + 1, + com.google.rpc.Status.newBuilder().setCode(Code.INTERNAL.ordinal()).build())); + + List> futures = new ArrayList<>(); + for (long i = 0; i < appendCount; i++) { + futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i)); + } + + for (int i = 0; i < appendCount - 1; i++) { + assertThat(futures.get(i).get().getAppendResult().getOffset().getValue()).isEqualTo((long) i); + } + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> { + futures.get(appendCount - 1).get(); + }); + assertEquals( + Status.Code.INTERNAL, ((StatusRuntimeException) ex.getCause()).getStatus().getCode()); + } + + @Test + public void testExclusiveAppendSuccessAndQuotaErrorRetryMaxRetry() throws Exception { + testBigQueryWrite.setReturnErrorDuringExclusiveStreamRetry(true); + // Ensure messages will be in the inflight queue + testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1)); + StreamWriter writer = getTestStreamWriterExclusiveRetryEnabled(); + + int appendCount = 10; + for (long i = 0; i < appendCount - 1; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + testBigQueryWrite.addResponse( + new DummyResponseSupplierWillFailThenSucceed( + new FakeBigQueryWriteImpl.Response(createAppendResponse(appendCount)), + /* totalFailCount= */ MAX_RETRY_NUM_ATTEMPTS + 1, + com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build())); + + List> futures = new ArrayList<>(); + for (long i = 0; i < appendCount; i++) { + futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i)); + } + + for (int i = 0; i < appendCount - 1; i++) { + assertThat(futures.get(i).get().getAppendResult().getOffset().getValue()).isEqualTo((long) i); + } + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> { + futures.get(appendCount - 1).get(); + }); + assertEquals( + Status.Code.RESOURCE_EXHAUSTED, + ((StatusRuntimeException) ex.getCause()).getStatus().getCode()); + } + + @Test + public void testAppendSuccessAndNonRetryableError() throws Exception { + StreamWriter writer = getTestStreamWriterRetryEnabled(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addStatusException( + com.google.rpc.Status.newBuilder().setCode(Code.INVALID_ARGUMENT.ordinal()).build()); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + ApiFuture appendFuture1 = + writer.append(createProtoRows(new String[] {"A"})); + ApiFuture appendFuture2 = + writer.append(createProtoRows(new String[] {"B"})); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> { + appendFuture2.get(); + }); + assertEquals( + Status.Code.INVALID_ARGUMENT, + ((StatusRuntimeException) ex.getCause()).getStatus().getCode()); + } + + @Test + public void testExclusiveAppendSuccessAndNonRetryableError() throws Exception { + StreamWriter writer = getTestStreamWriterExclusiveRetryEnabled(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addStatusException( + com.google.rpc.Status.newBuilder().setCode(Code.INVALID_ARGUMENT.ordinal()).build()); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + ApiFuture appendFuture1 = + writer.append(createProtoRows(new String[] {"A"}), 0); + ApiFuture appendFuture2 = + writer.append(createProtoRows(new String[] {"B"}), 1); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> { + appendFuture2.get(); + }); + assertEquals( + Status.Code.INVALID_ARGUMENT, + ((StatusRuntimeException) ex.getCause()).getStatus().getCode()); + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 61de4e5a6e..76dbe75774 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -45,9 +45,6 @@ import java.io.IOException; import java.math.BigDecimal; import java.sql.Timestamp; -import java.text.SimpleDateFormat; -import java.time.Instant; -import java.time.ZoneId; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -69,19 +66,16 @@ public class ITBigQueryWriteManualClientTest { private static final String TABLE = "testtable"; private static final String TABLE2 = "complicatedtable"; - private static final String TABLE3 = "tableWithDefaultValueColumn"; private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; private static BigQueryWriteClient client; private static TableInfo tableInfo; private static TableInfo tableInfo2; - private static TableInfo tableInfo3; private static TableInfo tableInfoEU; private static String tableId; private static String tableId2; - private static String tableId3; private static String tableIdEU; private static BigQuery bigquery; @@ -137,26 +131,8 @@ public static void beforeClass() throws IOException { .build(), innerTypeFieldBuilder.setMode(Field.Mode.NULLABLE).build()))) .build(); - tableInfo3 = - TableInfo.newBuilder( - TableId.of(DATASET, TABLE3), - StandardTableDefinition.of( - Schema.of( - com.google.cloud.bigquery.Field.newBuilder("foo_with_default", LegacySQLTypeName.STRING) - .setDefaultValueExpression("'default_value_for_test'") - .setMode(Field.Mode.NULLABLE) - .build(), - com.google.cloud.bigquery.Field.newBuilder("bar_without_default", LegacySQLTypeName.STRING) - .setMode(Mode.NULLABLE) - .build(), - com.google.cloud.bigquery.Field.newBuilder("date_with_default_to_current", LegacySQLTypeName.DATETIME) - .setDefaultValueExpression("CURRENT_DATE()") - .setMode(Mode.NULLABLE) - .build()))) - .build(); bigquery.create(tableInfo); bigquery.create(tableInfo2); - bigquery.create(tableInfo3); tableId = String.format( "projects/%s/datasets/%s/tables/%s", @@ -165,10 +141,6 @@ public static void beforeClass() throws IOException { String.format( "projects/%s/datasets/%s/tables/%s", ServiceOptions.getDefaultProjectId(), DATASET, TABLE2); - tableId3 = - String.format( - "projects/%s/datasets/%s/tables/%s", - ServiceOptions.getDefaultProjectId(), DATASET, TABLE3); DatasetInfo datasetInfoEU = DatasetInfo.newBuilder(/* datasetId = */ DATASET_EU) .setLocation("EU") @@ -754,50 +726,126 @@ public void testJsonStreamWriterWithDefaultStream() @Test public void testJsonDefaultStreamOnTableWithDefaultValue_SchemaNotGiven() throws IOException, InterruptedException, ExecutionException, - Descriptors.DescriptorValidationException { + Descriptors.DescriptorValidationException { + String tableName = "defaultStreamDefaultValue"; + String defaultTableId = + String.format( + "projects/%s/datasets/%s/tables/%s", + ServiceOptions.getDefaultProjectId(), DATASET, tableName); + tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, tableName), + StandardTableDefinition.of( + Schema.of( + com.google.cloud.bigquery.Field.newBuilder( + "foo_with_default", LegacySQLTypeName.STRING) + .setDefaultValueExpression("'default_value_for_test'") + .setMode(Field.Mode.NULLABLE) + .build(), + com.google.cloud.bigquery.Field.newBuilder( + "bar_without_default", LegacySQLTypeName.STRING) + .setMode(Mode.NULLABLE) + .build(), + com.google.cloud.bigquery.Field.newBuilder( + "date_with_default_to_current", LegacySQLTypeName.DATETIME) + .setDefaultValueExpression("CURRENT_DATE()") + .setMode(Mode.NULLABLE) + .build()))) + .build(); + bigquery.create(tableInfo); try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(tableId3, client) + JsonStreamWriter.newBuilder(defaultTableId, client) .setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE) .build()) { - // 1. row has both fields set. - JSONArray jsonArr1 = new JSONArray(); - JSONObject row1 = new JSONObject(); - row1.put("foo_with_default", "aaa"); - row1.put("bar_without_default", "a"); - row1.put("date_with_default_to_current", "2022-02-02"); - jsonArr1.put(row1); - // 2. row with the column with default value unset - JSONObject row2 = new JSONObject(); - row2.put("bar_without_default", "a"); - jsonArr1.put(row2); - // 2. both value not set - JSONObject row3 = new JSONObject(); - jsonArr1.put(row3); - ApiFuture response1 = jsonStreamWriter.append(jsonArr1, -1); - response1.get(); - TableResult result = - bigquery.listTableData( - tableInfo3.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); - Iterator iter = result.getValues().iterator(); - FieldValueList currentRow = iter.next(); - assertEquals("aaa", currentRow.get(0).getStringValue()); - assertEquals("a", currentRow.get(1).getStringValue()); - assertEquals("2022-02-02T00:00:00", currentRow.get(2).getStringValue()); - - currentRow = iter.next(); - assertEquals("default_value_for_test", currentRow.get(0).getStringValue()); - assertEquals("a", currentRow.get(1).getStringValue()); - LOG.warning("The string value is " + currentRow.get(2).getStringValue()); - assertFalse(currentRow.get(2).getStringValue().isEmpty()); - - currentRow = iter.next(); - assertEquals("default_value_for_test", currentRow.get(0).getStringValue()); - assertFalse(currentRow.get(2).getStringValue().isEmpty()); - assertEquals(null, currentRow.get(1).getValue()); - assertEquals(false, iter.hasNext()); + testJsonStreamWriterForDefaultValue(jsonStreamWriter); } } + @Test + public void testJsonExclusiveStreamOnTableWithDefaultValue_GiveTableSchema() + throws IOException, InterruptedException, ExecutionException, + Descriptors.DescriptorValidationException { + String tableName = "exclusiveStreamDefaultValue"; + String exclusiveTableId = + String.format( + "projects/%s/datasets/%s/tables/%s", + ServiceOptions.getDefaultProjectId(), DATASET, tableName); + tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, tableName), + StandardTableDefinition.of( + Schema.of( + com.google.cloud.bigquery.Field.newBuilder( + "foo_with_default", LegacySQLTypeName.STRING) + .setDefaultValueExpression("'default_value_for_test'") + .setMode(Field.Mode.NULLABLE) + .build(), + com.google.cloud.bigquery.Field.newBuilder( + "bar_without_default", LegacySQLTypeName.STRING) + .setMode(Mode.NULLABLE) + .build(), + com.google.cloud.bigquery.Field.newBuilder( + "date_with_default_to_current", LegacySQLTypeName.DATETIME) + .setDefaultValueExpression("CURRENT_DATE()") + .setMode(Mode.NULLABLE) + .build()))) + .build(); + bigquery.create(tableInfo); + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(exclusiveTableId) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(exclusiveTableId, writeStream.getTableSchema()) + .setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE) + .build()) { + testJsonStreamWriterForDefaultValue(jsonStreamWriter); + } + } + + private void testJsonStreamWriterForDefaultValue(JsonStreamWriter jsonStreamWriter) + throws DescriptorValidationException, IOException, ExecutionException, InterruptedException { + // 1. row has both fields set. + JSONArray jsonArr1 = new JSONArray(); + JSONObject row1 = new JSONObject(); + row1.put("foo_with_default", "aaa"); + row1.put("bar_without_default", "a"); + row1.put("date_with_default_to_current", "2022-02-02"); + jsonArr1.put(row1); + // 2. row with the column with default value unset + JSONObject row2 = new JSONObject(); + row2.put("bar_without_default", "a"); + jsonArr1.put(row2); + // 2. both value not set + JSONObject row3 = new JSONObject(); + jsonArr1.put(row3); + ApiFuture response1 = jsonStreamWriter.append(jsonArr1, -1); + response1.get(); + TableResult result = + bigquery.listTableData(tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + Iterator iter = result.getValues().iterator(); + FieldValueList currentRow = iter.next(); + assertEquals("aaa", currentRow.get(0).getStringValue()); + assertEquals("a", currentRow.get(1).getStringValue()); + assertEquals("2022-02-02T00:00:00", currentRow.get(2).getStringValue()); + + currentRow = iter.next(); + assertEquals("default_value_for_test", currentRow.get(0).getStringValue()); + assertEquals("a", currentRow.get(1).getStringValue()); + LOG.warning("The string value is " + currentRow.get(2).getStringValue()); + assertFalse(currentRow.get(2).getStringValue().isEmpty()); + + currentRow = iter.next(); + assertEquals("default_value_for_test", currentRow.get(0).getStringValue()); + assertFalse(currentRow.get(2).getStringValue().isEmpty()); + assertEquals(null, currentRow.get(1).getValue()); + + assertEquals(false, iter.hasNext()); + } + // This test runs about 1 min. @Test public void testJsonStreamWriterWithMessagesOver10M() diff --git a/grpc-google-cloud-bigquerystorage-v1/pom.xml b/grpc-google-cloud-bigquerystorage-v1/pom.xml index 7540ce6f19..72f7c4e334 100644 --- a/grpc-google-cloud-bigquerystorage-v1/pom.xml +++ b/grpc-google-cloud-bigquerystorage-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1 - 2.41.1 + 2.44.0 grpc-google-cloud-bigquerystorage-v1 GRPC library for grpc-google-cloud-bigquerystorage-v1 com.google.cloud google-cloud-bigquerystorage-parent - 2.41.1 + 2.44.0 diff --git a/grpc-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadGrpc.java b/grpc-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadGrpc.java index 1ea103f612..6ceec843a3 100644 --- a/grpc-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadGrpc.java +++ b/grpc-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadGrpc.java @@ -33,7 +33,8 @@ public final class BigQueryReadGrpc { private BigQueryReadGrpc() {} - public static final String SERVICE_NAME = "google.cloud.bigquery.storage.v1.BigQueryRead"; + public static final java.lang.String SERVICE_NAME = + "google.cloud.bigquery.storage.v1.BigQueryRead"; // Static method descriptors that strictly reflect the proto. private static volatile io.grpc.MethodDescriptor< @@ -688,9 +689,9 @@ private static final class BigQueryReadFileDescriptorSupplier private static final class BigQueryReadMethodDescriptorSupplier extends BigQueryReadBaseDescriptorSupplier implements io.grpc.protobuf.ProtoMethodDescriptorSupplier { - private final String methodName; + private final java.lang.String methodName; - BigQueryReadMethodDescriptorSupplier(String methodName) { + BigQueryReadMethodDescriptorSupplier(java.lang.String methodName) { this.methodName = methodName; } diff --git a/grpc-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryWriteGrpc.java b/grpc-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryWriteGrpc.java index 1553527690..c7cdb491df 100644 --- a/grpc-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryWriteGrpc.java +++ b/grpc-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryWriteGrpc.java @@ -35,7 +35,8 @@ public final class BigQueryWriteGrpc { private BigQueryWriteGrpc() {} - public static final String SERVICE_NAME = "google.cloud.bigquery.storage.v1.BigQueryWrite"; + public static final java.lang.String SERVICE_NAME = + "google.cloud.bigquery.storage.v1.BigQueryWrite"; // Static method descriptors that strictly reflect the proto. private static volatile io.grpc.MethodDescriptor< @@ -1048,9 +1049,9 @@ private static final class BigQueryWriteFileDescriptorSupplier private static final class BigQueryWriteMethodDescriptorSupplier extends BigQueryWriteBaseDescriptorSupplier implements io.grpc.protobuf.ProtoMethodDescriptorSupplier { - private final String methodName; + private final java.lang.String methodName; - BigQueryWriteMethodDescriptorSupplier(String methodName) { + BigQueryWriteMethodDescriptorSupplier(java.lang.String methodName) { this.methodName = methodName; } diff --git a/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml b/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml index 8dcbb82e2f..5f95aa030f 100644 --- a/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml +++ b/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta1 - 0.165.1 + 0.168.0 grpc-google-cloud-bigquerystorage-v1beta1 GRPC library for grpc-google-cloud-bigquerystorage-v1beta1 com.google.cloud google-cloud-bigquerystorage-parent - 2.41.1 + 2.44.0 diff --git a/grpc-google-cloud-bigquerystorage-v1beta1/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageGrpc.java b/grpc-google-cloud-bigquerystorage-v1beta1/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageGrpc.java index 6a31e3f930..a155b259c0 100644 --- a/grpc-google-cloud-bigquerystorage-v1beta1/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageGrpc.java +++ b/grpc-google-cloud-bigquerystorage-v1beta1/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageGrpc.java @@ -37,7 +37,8 @@ public final class BigQueryStorageGrpc { private BigQueryStorageGrpc() {} - public static final String SERVICE_NAME = "google.cloud.bigquery.storage.v1beta1.BigQueryStorage"; + public static final java.lang.String SERVICE_NAME = + "google.cloud.bigquery.storage.v1beta1.BigQueryStorage"; // Static method descriptors that strictly reflect the proto. private static volatile io.grpc.MethodDescriptor< @@ -1021,9 +1022,9 @@ private static final class BigQueryStorageFileDescriptorSupplier private static final class BigQueryStorageMethodDescriptorSupplier extends BigQueryStorageBaseDescriptorSupplier implements io.grpc.protobuf.ProtoMethodDescriptorSupplier { - private final String methodName; + private final java.lang.String methodName; - BigQueryStorageMethodDescriptorSupplier(String methodName) { + BigQueryStorageMethodDescriptorSupplier(java.lang.String methodName) { this.methodName = methodName; } diff --git a/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml b/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml index d5b61b0841..17f6c23837 100644 --- a/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml +++ b/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta2 - 0.165.1 + 0.168.0 grpc-google-cloud-bigquerystorage-v1beta2 GRPC library for grpc-google-cloud-bigquerystorage-v1beta2 com.google.cloud google-cloud-bigquerystorage-parent - 2.41.1 + 2.44.0 diff --git a/grpc-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadGrpc.java b/grpc-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadGrpc.java index fd1a8ffea4..2fcee009d6 100644 --- a/grpc-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadGrpc.java +++ b/grpc-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadGrpc.java @@ -35,7 +35,8 @@ public final class BigQueryReadGrpc { private BigQueryReadGrpc() {} - public static final String SERVICE_NAME = "google.cloud.bigquery.storage.v1beta2.BigQueryRead"; + public static final java.lang.String SERVICE_NAME = + "google.cloud.bigquery.storage.v1beta2.BigQueryRead"; // Static method descriptors that strictly reflect the proto. private static volatile io.grpc.MethodDescriptor< @@ -704,9 +705,9 @@ private static final class BigQueryReadFileDescriptorSupplier private static final class BigQueryReadMethodDescriptorSupplier extends BigQueryReadBaseDescriptorSupplier implements io.grpc.protobuf.ProtoMethodDescriptorSupplier { - private final String methodName; + private final java.lang.String methodName; - BigQueryReadMethodDescriptorSupplier(String methodName) { + BigQueryReadMethodDescriptorSupplier(java.lang.String methodName) { this.methodName = methodName; } diff --git a/grpc-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryWriteGrpc.java b/grpc-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryWriteGrpc.java index 66f30d21ef..9583e9c365 100644 --- a/grpc-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryWriteGrpc.java +++ b/grpc-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryWriteGrpc.java @@ -36,7 +36,8 @@ public final class BigQueryWriteGrpc { private BigQueryWriteGrpc() {} - public static final String SERVICE_NAME = "google.cloud.bigquery.storage.v1beta2.BigQueryWrite"; + public static final java.lang.String SERVICE_NAME = + "google.cloud.bigquery.storage.v1beta2.BigQueryWrite"; // Static method descriptors that strictly reflect the proto. private static volatile io.grpc.MethodDescriptor< @@ -1044,9 +1045,9 @@ private static final class BigQueryWriteFileDescriptorSupplier private static final class BigQueryWriteMethodDescriptorSupplier extends BigQueryWriteBaseDescriptorSupplier implements io.grpc.protobuf.ProtoMethodDescriptorSupplier { - private final String methodName; + private final java.lang.String methodName; - BigQueryWriteMethodDescriptorSupplier(String methodName) { + BigQueryWriteMethodDescriptorSupplier(java.lang.String methodName) { this.methodName = methodName; } diff --git a/pom.xml b/pom.xml index 6cacb4e277..e5875a2ed4 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.google.cloud google-cloud-bigquerystorage-parent pom - 2.41.1 + 2.44.0 BigQuery Storage Parent https://github.com/googleapis/java-bigquerystorage @@ -14,7 +14,7 @@ com.google.cloud google-cloud-shared-config - 1.5.7 + 1.6.0 @@ -76,49 +76,49 @@ com.google.cloud google-cloud-shared-dependencies - 3.14.0 + 3.17.0 pom import com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta1 - 0.165.1 + 0.168.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta2 - 0.165.1 + 0.168.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1 - 2.41.1 + 2.44.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta1 - 0.165.1 + 0.168.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta2 - 0.165.1 + 0.168.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1 - 2.41.1 + 2.44.0 com.google.cloud google-cloud-bigquerystorage - 2.41.1 + 2.44.0 org.json json - 20230618 + 20231013 @@ -132,7 +132,7 @@ com.google.cloud google-cloud-bigquery - 2.31.0 + 2.33.2 test @@ -216,7 +216,6 @@ org.apache.maven.plugins maven-javadoc-plugin - 3.5.0 html diff --git a/proto-google-cloud-bigquerystorage-v1/pom.xml b/proto-google-cloud-bigquerystorage-v1/pom.xml index 854e211696..7e51bdb8d2 100644 --- a/proto-google-cloud-bigquerystorage-v1/pom.xml +++ b/proto-google-cloud-bigquerystorage-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1 - 2.41.1 + 2.44.0 proto-google-cloud-bigquerystorage-v1 PROTO library for proto-google-cloud-bigquerystorage-v1 com.google.cloud google-cloud-bigquerystorage-parent - 2.41.1 + 2.44.0 diff --git a/proto-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/ProjectName.java b/proto-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/ProjectName.java index 1837ab9531..f6dd2d213d 100644 --- a/proto-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/ProjectName.java +++ b/proto-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/ProjectName.java @@ -127,7 +127,7 @@ public boolean equals(Object o) { if (o == this) { return true; } - if (o != null || getClass() == o.getClass()) { + if (o != null && getClass() == o.getClass()) { ProjectName that = ((ProjectName) o); return Objects.equals(this.project, that.project); } diff --git a/proto-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/ReadStreamName.java b/proto-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/ReadStreamName.java index 6bebe51e5a..76d7d5cda4 100644 --- a/proto-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/ReadStreamName.java +++ b/proto-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/ReadStreamName.java @@ -174,7 +174,7 @@ public boolean equals(Object o) { if (o == this) { return true; } - if (o != null || getClass() == o.getClass()) { + if (o != null && getClass() == o.getClass()) { ReadStreamName that = ((ReadStreamName) o); return Objects.equals(this.project, that.project) && Objects.equals(this.location, that.location) diff --git a/proto-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/TableName.java b/proto-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/TableName.java index e98c7301ae..f61a0c8637 100644 --- a/proto-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/TableName.java +++ b/proto-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/TableName.java @@ -148,7 +148,7 @@ public boolean equals(Object o) { if (o == this) { return true; } - if (o != null || getClass() == o.getClass()) { + if (o != null && getClass() == o.getClass()) { TableName that = ((TableName) o); return Objects.equals(this.project, that.project) && Objects.equals(this.dataset, that.dataset) diff --git a/proto-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/WriteStreamName.java b/proto-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/WriteStreamName.java index 9edbc710a7..0788d62153 100644 --- a/proto-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/WriteStreamName.java +++ b/proto-google-cloud-bigquerystorage-v1/src/main/java/com/google/cloud/bigquery/storage/v1/WriteStreamName.java @@ -174,7 +174,7 @@ public boolean equals(Object o) { if (o == this) { return true; } - if (o != null || getClass() == o.getClass()) { + if (o != null && getClass() == o.getClass()) { WriteStreamName that = ((WriteStreamName) o); return Objects.equals(this.project, that.project) && Objects.equals(this.dataset, that.dataset) diff --git a/proto-google-cloud-bigquerystorage-v1beta1/pom.xml b/proto-google-cloud-bigquerystorage-v1beta1/pom.xml index b542eeeff3..d19a1f7a4d 100644 --- a/proto-google-cloud-bigquerystorage-v1beta1/pom.xml +++ b/proto-google-cloud-bigquerystorage-v1beta1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta1 - 0.165.1 + 0.168.0 proto-google-cloud-bigquerystorage-v1beta1 PROTO library for proto-google-cloud-bigquerystorage-v1beta1 com.google.cloud google-cloud-bigquerystorage-parent - 2.41.1 + 2.44.0 diff --git a/proto-google-cloud-bigquerystorage-v1beta1/src/main/java/com/google/cloud/bigquery/storage/v1beta1/ProjectName.java b/proto-google-cloud-bigquerystorage-v1beta1/src/main/java/com/google/cloud/bigquery/storage/v1beta1/ProjectName.java index 658e897596..633c2072ca 100644 --- a/proto-google-cloud-bigquerystorage-v1beta1/src/main/java/com/google/cloud/bigquery/storage/v1beta1/ProjectName.java +++ b/proto-google-cloud-bigquerystorage-v1beta1/src/main/java/com/google/cloud/bigquery/storage/v1beta1/ProjectName.java @@ -127,7 +127,7 @@ public boolean equals(Object o) { if (o == this) { return true; } - if (o != null || getClass() == o.getClass()) { + if (o != null && getClass() == o.getClass()) { ProjectName that = ((ProjectName) o); return Objects.equals(this.project, that.project); } diff --git a/proto-google-cloud-bigquerystorage-v1beta2/pom.xml b/proto-google-cloud-bigquerystorage-v1beta2/pom.xml index 981c3703d4..725a489ae6 100644 --- a/proto-google-cloud-bigquerystorage-v1beta2/pom.xml +++ b/proto-google-cloud-bigquerystorage-v1beta2/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta2 - 0.165.1 + 0.168.0 proto-google-cloud-bigquerystorage-v1beta2 PROTO library for proto-google-cloud-bigquerystorage-v1beta2 com.google.cloud google-cloud-bigquerystorage-parent - 2.41.1 + 2.44.0 diff --git a/proto-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/ProjectName.java b/proto-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/ProjectName.java index 22a76a6afd..3e5d15646e 100644 --- a/proto-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/ProjectName.java +++ b/proto-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/ProjectName.java @@ -127,7 +127,7 @@ public boolean equals(Object o) { if (o == this) { return true; } - if (o != null || getClass() == o.getClass()) { + if (o != null && getClass() == o.getClass()) { ProjectName that = ((ProjectName) o); return Objects.equals(this.project, that.project); } diff --git a/proto-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/ReadStreamName.java b/proto-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/ReadStreamName.java index 0caff6ae6d..6e831545c6 100644 --- a/proto-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/ReadStreamName.java +++ b/proto-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/ReadStreamName.java @@ -174,7 +174,7 @@ public boolean equals(Object o) { if (o == this) { return true; } - if (o != null || getClass() == o.getClass()) { + if (o != null && getClass() == o.getClass()) { ReadStreamName that = ((ReadStreamName) o); return Objects.equals(this.project, that.project) && Objects.equals(this.location, that.location) diff --git a/proto-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/TableName.java b/proto-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/TableName.java index 7a6996c5e8..1d65db7651 100644 --- a/proto-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/TableName.java +++ b/proto-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/TableName.java @@ -148,7 +148,7 @@ public boolean equals(Object o) { if (o == this) { return true; } - if (o != null || getClass() == o.getClass()) { + if (o != null && getClass() == o.getClass()) { TableName that = ((TableName) o); return Objects.equals(this.project, that.project) && Objects.equals(this.dataset, that.dataset) diff --git a/proto-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/WriteStreamName.java b/proto-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/WriteStreamName.java index d1d33e0639..ab40230f4a 100644 --- a/proto-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/WriteStreamName.java +++ b/proto-google-cloud-bigquerystorage-v1beta2/src/main/java/com/google/cloud/bigquery/storage/v1beta2/WriteStreamName.java @@ -174,7 +174,7 @@ public boolean equals(Object o) { if (o == this) { return true; } - if (o != null || getClass() == o.getClass()) { + if (o != null && getClass() == o.getClass()) { WriteStreamName that = ((WriteStreamName) o); return Objects.equals(this.project, that.project) && Objects.equals(this.dataset, that.dataset) diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml index 5e275b4148..3189abe3ab 100644 --- a/samples/install-without-bom/pom.xml +++ b/samples/install-without-bom/pom.xml @@ -21,7 +21,7 @@ 1.8 1.8 UTF-8 - 12.0.1 + 13.0.0 @@ -30,19 +30,19 @@ com.google.cloud google-cloud-bigquerystorage - 2.41.0 + 2.44.0 com.google.cloud google-cloud-bigquery - 2.31.0 + 2.33.2 org.apache.avro avro - 1.11.2 + 1.11.3 org.apache.arrow diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml index a7e6ebbae6..820d475abd 100644 --- a/samples/snapshot/pom.xml +++ b/samples/snapshot/pom.xml @@ -21,7 +21,7 @@ 1.8 1.8 UTF-8 - 12.0.1 + 13.0.0 @@ -29,19 +29,19 @@ com.google.cloud google-cloud-bigquerystorage - 2.41.1 + 2.44.0 com.google.cloud google-cloud-bigquery - 2.31.0 + 2.33.2 org.apache.avro avro - 1.11.2 + 1.11.3 diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index ffc33449dc..10d6fc2ede 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -21,7 +21,7 @@ 1.8 1.8 UTF-8 - 12.0.1 + 13.0.0 @@ -31,7 +31,7 @@ com.google.cloud libraries-bom - 26.21.0 + 26.25.0 pom import @@ -48,12 +48,12 @@ com.google.cloud google-cloud-bigquery - 2.31.0 + 2.33.2 org.apache.avro avro - 1.11.2 + 1.11.3 org.apache.arrow diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index 9b831e45df..feccef61f0 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -26,6 +26,7 @@ import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.TableResult; +import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; @@ -178,6 +179,10 @@ public void initialize(TableName parentTable) .setChannelsPerCpu(2) .build()) .setEnableConnectionPool(true) + // If value is missing in json and there is a default value configured on bigquery + // column, apply the default value to the missing value field. + .setDefaultMissingValueInterpretation( + AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE) .build(); } diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java index 8faa3de58c..b424368c42 100644 --- a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java @@ -67,7 +67,6 @@ public void setUp() { out = new PrintStream(bout); System.setOut(out); - bigquery = BigQueryOptions.getDefaultInstance().getService(); // Create a new dataset and table for each test. diff --git a/tutorials/JsonWriterDefaultStream/pom.xml b/tutorials/JsonWriterDefaultStream/pom.xml index 078b3448bd..4ff05562e9 100644 --- a/tutorials/JsonWriterDefaultStream/pom.xml +++ b/tutorials/JsonWriterDefaultStream/pom.xml @@ -19,27 +19,27 @@ com.google.cloud google-cloud-bigquerystorage - 2.41.0 + 2.44.0 com.google.cloud google-cloud-bigquery - 2.31.0 + 2.33.2 org.apache.avro avro - 1.11.2 + 1.11.3 org.apache.arrow arrow-vector - 12.0.1 + 13.0.0 org.apache.arrow arrow-memory-netty - 12.0.1 + 13.0.0 diff --git a/versions.txt b/versions.txt index 1074250d92..c630f75a23 100644 --- a/versions.txt +++ b/versions.txt @@ -1,10 +1,10 @@ # Format: # module:released-version:current-version -google-cloud-bigquerystorage:2.41.1:2.41.1 -grpc-google-cloud-bigquerystorage-v1beta1:0.165.1:0.165.1 -grpc-google-cloud-bigquerystorage-v1beta2:0.165.1:0.165.1 -grpc-google-cloud-bigquerystorage-v1:2.41.1:2.41.1 -proto-google-cloud-bigquerystorage-v1beta1:0.165.1:0.165.1 -proto-google-cloud-bigquerystorage-v1beta2:0.165.1:0.165.1 -proto-google-cloud-bigquerystorage-v1:2.41.1:2.41.1 +google-cloud-bigquerystorage:2.44.0:2.44.0 +grpc-google-cloud-bigquerystorage-v1beta1:0.168.0:0.168.0 +grpc-google-cloud-bigquerystorage-v1beta2:0.168.0:0.168.0 +grpc-google-cloud-bigquerystorage-v1:2.44.0:2.44.0 +proto-google-cloud-bigquerystorage-v1beta1:0.168.0:0.168.0 +proto-google-cloud-bigquerystorage-v1beta2:0.168.0:0.168.0 +proto-google-cloud-bigquerystorage-v1:2.44.0:2.44.0