From e1158d3ba9a2c450ddf6be154e26706a09beb0c6 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Wed, 14 Feb 2024 16:20:35 +0100 Subject: [PATCH] RUBY-3370 Add CSOT to server selection and connection checkout (#2833) --- lib/mongo/client.rb | 8 ++ lib/mongo/cluster.rb | 9 ++- lib/mongo/collection.rb | 24 +++++- lib/mongo/collection/view.rb | 3 +- lib/mongo/database.rb | 4 + lib/mongo/operation/context.rb | 37 ++++++++- .../operation/shared/op_msg_executable.rb | 5 +- lib/mongo/operation/shared/write.rb | 5 +- lib/mongo/retryable.rb | 10 ++- lib/mongo/retryable/write_worker.rb | 31 ++++++-- lib/mongo/server.rb | 8 +- lib/mongo/server/connection.rb | 15 ++-- lib/mongo/server/connection_pool.rb | 35 +++++--- lib/mongo/server_selector/base.rb | 25 ++++-- lib/mongo/socket.rb | 2 +- spec/mongo/operation/context_spec.rb | 79 +++++++++++++++++++ 16 files changed, 257 insertions(+), 43 deletions(-) create mode 100644 spec/mongo/operation/context_spec.rb diff --git a/lib/mongo/client.rb b/lib/mongo/client.rb index e06231d4ad..57c43871ca 100644 --- a/lib/mongo/client.rb +++ b/lib/mongo/client.rb @@ -111,6 +111,7 @@ class Client :ssl_verify_certificate, :ssl_verify_hostname, :ssl_verify_ocsp_endpoint, + :timeout_ms, :truncate_logs, :user, :wait_queue_timeout, @@ -413,6 +414,8 @@ def hash # @option options [ true, false ] :ssl_verify_hostname Whether to perform peer hostname # validation. This setting overrides :ssl_verify with respect to whether hostname validation # is performed. + # @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds. + # Must a positive integer. The default value is unset which means infinite. # @option options [ true, false ] :truncate_logs Whether to truncate the # logs at the default 250 characters. # @option options [ String ] :user The user name. @@ -1185,6 +1188,11 @@ def encrypted_fields_map @encrypted_fields_map ||= @options.fetch(:auto_encryption_options, {})[:encrypted_fields_map] end + # @api private + def timeout_ms + @options[:timeout_ms] + end + private # Create a new encrypter object using the client's auto encryption options diff --git a/lib/mongo/cluster.rb b/lib/mongo/cluster.rb index eac6d6229d..a85f3e6ff2 100644 --- a/lib/mongo/cluster.rb +++ b/lib/mongo/cluster.rb @@ -783,8 +783,13 @@ def has_writable_server? # @return [ Mongo::Server ] A primary server. # # @since 2.0.0 - def next_primary(ping = nil, session = nil) - ServerSelector.primary.select_server(self, nil, session) + def next_primary(ping = nil, session = nil, remaining_timeout_ms: nil) + ServerSelector.primary.select_server( + self, + nil, + session, + remaining_timeout_ms: remaining_timeout_ms + ) end # Get the connection pool for the server. diff --git a/lib/mongo/collection.rb b/lib/mongo/collection.rb index a2b2076b7d..8ac7bbef59 100644 --- a/lib/mongo/collection.rb +++ b/lib/mongo/collection.rb @@ -401,7 +401,10 @@ def create(opts = {}) self.write_concern end - context = Operation::Context.new(client: client, session: session) + context = Operation::Context.new( + client: client, + session: session + ) maybe_create_qe_collections(opts[:encrypted_fields], client, session) do |encrypted_fields| Operation::Create.new( selector: operation, @@ -413,7 +416,10 @@ def create(opts = {}) collation: options[:collation] || options['collation'], encrypted_fields: encrypted_fields, validator: options[:validator], - ).execute(next_primary(nil, session), context: context) + ).execute( + next_primary(nil, session), + context: context + ) end end end @@ -801,7 +807,11 @@ def insert_one(document, opts = {}) raise ArgumentError, "Document to be inserted cannot be nil" end - context = Operation::Context.new(client: client, session: session) + context = Operation::Context.new( + client: client, + session: session, + timeout_ms: timeout_ms(opts) + ) write_with_retry(write_concern, context: context) do |connection, txn_num, context| Operation::Insert.new( :documents => [ document ], @@ -1152,5 +1162,13 @@ def namespace def system_collection? name.start_with?('system.') end + + def timeout_ms(opts = {}) + if opts.key?(:timeout_ms) + opts.delete(:timeout_ms) + else + options.fetch(:timeout_ms) { database.timeout_ms } + end + end end end diff --git a/lib/mongo/collection/view.rb b/lib/mongo/collection/view.rb index dcc0f89752..923f5079c7 100644 --- a/lib/mongo/collection/view.rb +++ b/lib/mongo/collection/view.rb @@ -63,10 +63,11 @@ class View :client, :cluster, :database, + :nro_write_with_retry, :read_with_retry, :read_with_retry_cursor, + :timeout_ms, :write_with_retry, - :nro_write_with_retry, :write_concern_with_session # Delegate to the cluster for the next primary. diff --git a/lib/mongo/database.rb b/lib/mongo/database.rb index 9b656c5b99..af40b1fe36 100644 --- a/lib/mongo/database.rb +++ b/lib/mongo/database.rb @@ -497,5 +497,9 @@ def self.create(client) database = Database.new(client, client.options[:database], client.options) client.instance_variable_set(:@database, database) end + + def timeout_ms + options.fetch(:timeout_ms) { client.timeout_ms } + end end end diff --git a/lib/mongo/operation/context.rb b/lib/mongo/operation/context.rb index b7cae91d5a..2222fc4028 100644 --- a/lib/mongo/operation/context.rb +++ b/lib/mongo/operation/context.rb @@ -35,7 +35,13 @@ module Operation # # @api private class Context - def initialize(client: nil, session: nil, connection_global_id: nil, options: nil) + def initialize( + client: nil, + session: nil, + connection_global_id: nil, + timeout_ms: nil, + options: nil + ) if options if client raise ArgumentError, 'Client and options cannot both be specified' @@ -50,14 +56,25 @@ def initialize(client: nil, session: nil, connection_global_id: nil, options: ni raise ArgumentError, 'Trying to pin context to a connection when the session is already pinned to a connection.' end + if timeout_ms && timeout_ms < 0 + raise ArgumentError, 'timeout_ms must be a positive integer' + end + @client = client @session = session @connection_global_id = connection_global_id + @timeout_ms = timeout_ms + @deadline = if @timeout_ms && @timeout_ms > 0 + Utils.monotonic_time + (@timeout_ms / 1_000.0) + else + nil + end @options = options end attr_reader :client attr_reader :session + attr_reader :deadline attr_reader :options def connection_global_id @@ -133,6 +150,24 @@ def encrypter raise Error::InternalDriverError, 'Encrypter should only be accessed when encryption is to be performed' end end + + def remaining_timeout_sec + return nil if @timeout_ms.nil? || @timeout_ms == 0 + + remaining_seconds = deadline - Utils.monotonic_time + if remaining_seconds <= 0 + 0 + else + remaining_seconds + end + end + + def remaining_timeout_ms + seconds = remaining_timeout_sec + return nil if seconds.nil? + + (seconds * 1_000).to_i + end end end end diff --git a/lib/mongo/operation/shared/op_msg_executable.rb b/lib/mongo/operation/shared/op_msg_executable.rb index a97b8fd48d..99d890de9b 100644 --- a/lib/mongo/operation/shared/op_msg_executable.rb +++ b/lib/mongo/operation/shared/op_msg_executable.rb @@ -32,7 +32,10 @@ module OpMsgExecutable # # @return [ Mongo::Operation::Result ] The operation result. def execute(server, context:, options: {}) - server.with_connection(connection_global_id: context.connection_global_id) do |connection| + server.with_connection( + connection_global_id: context.connection_global_id, + context: context + ) do |connection| execute_with_connection(connection, context: context, options: options) end end diff --git a/lib/mongo/operation/shared/write.rb b/lib/mongo/operation/shared/write.rb index 68214fac36..2f9f50cd47 100644 --- a/lib/mongo/operation/shared/write.rb +++ b/lib/mongo/operation/shared/write.rb @@ -35,7 +35,10 @@ module Write # # @since 2.5.2 def execute(server, context:) - server.with_connection(connection_global_id: context.connection_global_id) do |connection| + server.with_connection( + connection_global_id: context.connection_global_id, + context: context + ) do |connection| execute_with_connection(connection, context: context) end end diff --git a/lib/mongo/retryable.rb b/lib/mongo/retryable.rb index 2508e13efd..efde6fdadf 100644 --- a/lib/mongo/retryable.rb +++ b/lib/mongo/retryable.rb @@ -46,8 +46,14 @@ module Retryable # @api private # # @return [ Mongo::Server ] A server matching the server preference. - def select_server(cluster, server_selector, session, failed_server = nil) - server_selector.select_server(cluster, nil, session, deprioritized: [failed_server].compact) + def select_server(cluster, server_selector, session, failed_server = nil, remaining_timeout_ms: nil) + server_selector.select_server( + cluster, + nil, + session, + deprioritized: [failed_server].compact, + remaining_timeout_ms: remaining_timeout_ms + ) end # Returns the read worker for handling retryable reads. diff --git a/lib/mongo/retryable/write_worker.rb b/lib/mongo/retryable/write_worker.rb index 339a28b2f1..569e201532 100644 --- a/lib/mongo/retryable/write_worker.rb +++ b/lib/mongo/retryable/write_worker.rb @@ -74,7 +74,11 @@ def write_with_retry(write_concern, ending_transaction: false, context:, &block) # If we are here, session is not nil. A session being nil would have # failed retry_write_allowed? check. - server = select_server(cluster, ServerSelector.primary, session) + server = select_server( + cluster, ServerSelector.primary, + session, + remaining_timeout_ms: context.remaining_timeout_ms + ) unless ending_transaction || server.retry_writes? return legacy_write_with_retry(server, context: context, &block) @@ -177,8 +181,16 @@ def legacy_write_with_retry(server = nil, context:) attempt = 0 begin attempt += 1 - server ||= select_server(cluster, ServerSelector.primary, session) - server.with_connection(connection_global_id: context.connection_global_id) do |connection| + server ||= select_server( + cluster, + ServerSelector.primary, + session, + remaining_timeout_ms: context.remaining_timeout_ms + ) + server.with_connection( + connection_global_id: context.connection_global_id, + context: context + ) do |connection| # Legacy retries do not use txn_num yield connection, nil, context.dup end @@ -220,7 +232,10 @@ def modern_write_with_retry(session, server, context, &block) txn_num = nil connection_succeeded = false - server.with_connection(connection_global_id: context.connection_global_id) do |connection| + server.with_connection( + connection_global_id: context.connection_global_id, + context: context + ) do |connection| connection_succeeded = true session.materialize_if_needed @@ -263,7 +278,13 @@ def retry_write(original_error, txn_num, context:, failed_server: nil, &block) # server description and/or topology as necessary (specifically, # a socket error or a not master error should have marked the respective # server unknown). Here we just need to wait for server selection. - server = select_server(cluster, ServerSelector.primary, session, failed_server) + server = select_server( + cluster, + ServerSelector.primary, + session, + failed_server, + remaining_timeout_ms: context.remaining_timeout_ms + ) unless server.retry_writes? # Do not need to add "modern retry" here, it should already be on diff --git a/lib/mongo/server.rb b/lib/mongo/server.rb index d87e70a4f5..9c82bb0ba7 100644 --- a/lib/mongo/server.rb +++ b/lib/mongo/server.rb @@ -490,8 +490,12 @@ def reconnect! # @return [ Object ] The result of the block execution. # # @since 2.3.0 - def with_connection(connection_global_id: nil, &block) - pool.with_connection(connection_global_id: connection_global_id, &block) + def with_connection(connection_global_id: nil, context: nil, &block) + pool.with_connection( + connection_global_id: connection_global_id, + context: context, + &block + ) end # Handle handshake failure. diff --git a/lib/mongo/server/connection.rb b/lib/mongo/server/connection.rb index 349948b61d..f32e6abb04 100644 --- a/lib/mongo/server/connection.rb +++ b/lib/mongo/server/connection.rb @@ -226,11 +226,11 @@ def unpin # @return [ true ] If the connection succeeded. # # @since 2.0.0 - def connect! + def connect!(context = nil) raise_if_closed! unless @socket - @socket = create_socket + @socket = create_socket(context) @description, @compressor = do_connect if server.load_balancer? @@ -256,10 +256,15 @@ def connect! # # # @return [ Socket ] The created socket. - private def create_socket + private def create_socket(context = nil) add_server_diagnostics do - address.socket(socket_timeout, ssl_options.merge( - connection_address: address, connection_generation: generation, pipe: options[:pipe])) + opts = ssl_options.merge( + connection_address: address, + connection_generation: generation, + pipe: options[:pipe], + connect_timeout: context&.remaining_timeout_sec + ) + address.socket(socket_timeout, opts) end end diff --git a/lib/mongo/server/connection_pool.rb b/lib/mongo/server/connection_pool.rb index dfbc718522..b65a2e7547 100644 --- a/lib/mongo/server/connection_pool.rb +++ b/lib/mongo/server/connection_pool.rb @@ -208,8 +208,12 @@ def min_size # @return [ Float ] The queue wait timeout. # # @since 2.9.0 - def wait_timeout - @wait_timeout ||= options[:wait_timeout] || DEFAULT_WAIT_TIMEOUT + def wait_timeout(remaining_timeout_sec = nil) + if remaining_timeout_sec.nil? + options[:wait_timeout] || DEFAULT_WAIT_TIMEOUT + else + remaining_timeout_sec + end end # The maximum seconds a socket can remain idle since it has been @@ -352,7 +356,7 @@ def summary # and remains so for longer than the wait timeout. # # @since 2.9.0 - def check_out(connection_global_id: nil) + def check_out(connection_global_id: nil, context: nil) check_invariants publish_cmap_event( @@ -362,7 +366,9 @@ def check_out(connection_global_id: nil) raise_if_pool_closed! raise_if_pool_paused_locked! - connection = retrieve_and_connect_connection(connection_global_id) + connection = retrieve_and_connect_connection( + connection_global_id, context + ) publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckedOut.new(@server.address, connection.id, self), @@ -698,10 +704,13 @@ def inspect # @return [ Object ] The result of the block. # # @since 2.0.0 - def with_connection(connection_global_id: nil) + def with_connection(connection_global_id: nil, context: nil) raise_if_closed! - connection = check_out(connection_global_id: connection_global_id) + connection = check_out( + connection_global_id: connection_global_id, + context: context + ) yield(connection) rescue Error::SocketError, Error::SocketTimeoutError, Error::ConnectionPerished => e maybe_raise_pool_cleared!(connection, e) @@ -975,9 +984,9 @@ def maybe_raise_pool_cleared!(connection, e) # Attempts to connect (handshake and auth) the connection. If an error is # encountered, closes the connection and raises the error. - def connect_connection(connection) + def connect_connection(connection, context = nil) begin - connection.connect! + connection.connect!(context) rescue Exception connection.disconnect!(reason: :error) raise @@ -1250,8 +1259,8 @@ def get_connection(pid, connection_global_id) # @raise [ Error::PoolClosedError ] If the pool has been closed. # @raise [ Timeout::Error ] If the connection pool is at maximum size # and remains so for longer than the wait timeout. - def retrieve_and_connect_connection(connection_global_id) - deadline = Utils.monotonic_time + wait_timeout + def retrieve_and_connect_connection(connection_global_id, context = nil) + deadline = Utils.monotonic_time + wait_timeout(context&.remaining_timeout_sec) connection = nil @lock.synchronize do @@ -1267,7 +1276,7 @@ def retrieve_and_connect_connection(connection_global_id) connection = wait_for_connection(connection_global_id, deadline) end - connect_or_raise(connection) unless connection.connected? + connect_or_raise(connection, context) unless connection.connected? @lock.synchronize do @checked_out_connections << connection @@ -1327,8 +1336,8 @@ def wait_for_connection(connection_global_id, deadline) # cannot be connected. # This method also publish corresponding event and ensures that counters # and condition variables are updated. - def connect_or_raise(connection) - connect_connection(connection) + def connect_or_raise(connection, context) + connect_connection(connection, context) rescue Exception # Handshake or authentication failed @lock.synchronize do diff --git a/lib/mongo/server_selector/base.rb b/lib/mongo/server_selector/base.rb index 10eb478449..640b7996f5 100644 --- a/lib/mongo/server_selector/base.rb +++ b/lib/mongo/server_selector/base.rb @@ -33,11 +33,11 @@ class Base # # @option options [ Integer ] :local_threshold The local threshold boundary for # nearest selection in seconds. - # @option options [ Integer ] max_staleness The maximum replication lag, + # @option options [ Integer ] :max_staleness The maximum replication lag, # in seconds, that a secondary can suffer and still be eligible for a read. # A value of -1 is treated identically to nil, which is to not # have a maximum staleness. - # @option options [ Hash | nil ] hedge A Hash specifying whether to enable hedged + # @option options [ Hash | nil ] :hedge A Hash specifying whether to enable hedged # reads on the server. Hedged reads are not enabled by default. When # specifying this option, it must be in the format: { enabled: true }, # where the value of the :enabled key is a boolean value. @@ -178,8 +178,15 @@ def ==(other) # lint mode is enabled. # # @since 2.0.0 - def select_server(cluster, ping = nil, session = nil, write_aggregation: false, deprioritized: []) - select_server_impl(cluster, ping, session, write_aggregation, deprioritized).tap do |server| + def select_server( + cluster, + ping = nil, + session = nil, + write_aggregation: false, + deprioritized: [], + remaining_timeout_ms: nil + ) + select_server_impl(cluster, ping, session, write_aggregation, deprioritized, remaining_timeout_ms).tap do |server| if Lint.enabled? && !server.pool.ready? raise Error::LintError, 'Server selector returning a server with a pool which is not ready' end @@ -187,12 +194,18 @@ def select_server(cluster, ping = nil, session = nil, write_aggregation: false, end # Parameters and return values are the same as for select_server. - private def select_server_impl(cluster, ping, session, write_aggregation, deprioritized) + private def select_server_impl(cluster, ping, session, write_aggregation, deprioritized, remaining_timeout_ms) if cluster.topology.is_a?(Cluster::Topology::LoadBalanced) return cluster.servers.first end - server_selection_timeout = cluster.options[:server_selection_timeout] || SERVER_SELECTION_TIMEOUT + timeout = cluster.options[:server_selection_timeout] || SERVER_SELECTION_TIMEOUT + + server_selection_timeout = if remaining_timeout_ms + [timeout, remaining_timeout_ms].min + else + timeout + end # Special handling for zero timeout: if we have to select a server, # and the timeout is zero, fail immediately (since server selection diff --git a/lib/mongo/socket.rb b/lib/mongo/socket.rb index 62e3ef464e..b61b09f601 100644 --- a/lib/mongo/socket.rb +++ b/lib/mongo/socket.rb @@ -254,7 +254,7 @@ def eof? true end - # For backwards compatibilty only, do not use. + # For backwards compatibility only, do not use. # # @return [ true ] Always true. # diff --git a/spec/mongo/operation/context_spec.rb b/spec/mongo/operation/context_spec.rb new file mode 100644 index 0000000000..3f4f714caf --- /dev/null +++ b/spec/mongo/operation/context_spec.rb @@ -0,0 +1,79 @@ +# frozen_string_literal: true + +require 'lite_spec_helper' + +describe Mongo::Operation::Context do + describe '#initialize' do + context 'when timeout_ms is negative' do + it 'raises an error' do + expect do + described_class.new(timeout_ms: -1) + end.to raise_error ArgumentError, /must be a positive integer/ + end + end + end + + describe '#deadline' do + let(:context) { described_class.new(timeout_ms: timeout_ms) } + + context 'when timeout_ms is nil' do + let(:timeout_ms) { nil } + + it 'returns nil' do + expect(context.deadline).to be_nil + end + end + + context 'when timeout_ms is zero' do + let(:timeout_ms) { 0 } + + it 'returns nil' do + expect(context.deadline).to be_nil + end + end + + context 'when timeout_ms is positive' do + before do + allow(Mongo::Utils).to receive(:monotonic_time).and_return(100.0) + end + + let(:timeout_ms) { 10_000 } + + it 'calculates the deadline' do + expect(context.deadline).to eq(110) + end + end + end + + describe '#remaining_timeout_ms' do + let(:context) { described_class.new(timeout_ms: timeout_ms) } + + context 'when timeout_ms is nil' do + let(:timeout_ms) { nil } + + it 'returns nil' do + expect(context.remaining_timeout_ms).to be_nil + end + end + + context 'when timeout_ms is zero' do + let(:timeout_ms) { 0 } + + it 'returns nil' do + expect(context.remaining_timeout_ms).to be_nil + end + end + + context 'when timeout_ms is positive' do + before do + allow(Mongo::Utils).to receive(:monotonic_time).ordered.and_return(100.0, 105.0) + end + + let(:timeout_ms) { 10_000 } + + it 'calculates the remaining time' do + expect(context.remaining_timeout_ms).to eq(5_000) + end + end + end +end