Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUBY-3370 Add CSOT to server selection and connection checkout #2833

Merged
merged 10 commits into from
Feb 14, 2024
8 changes: 8 additions & 0 deletions lib/mongo/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class Client
:ssl_verify_certificate,
:ssl_verify_hostname,
:ssl_verify_ocsp_endpoint,
:timeout_ms,
:truncate_logs,
:user,
:wait_queue_timeout,
Expand Down Expand Up @@ -413,6 +414,8 @@ def hash
# @option options [ true, false ] :ssl_verify_hostname Whether to perform peer hostname
# validation. This setting overrides :ssl_verify with respect to whether hostname validation
# is performed.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
# @option options [ true, false ] :truncate_logs Whether to truncate the
# logs at the default 250 characters.
# @option options [ String ] :user The user name.
Expand Down Expand Up @@ -1185,6 +1188,11 @@ def encrypted_fields_map
@encrypted_fields_map ||= @options.fetch(:auto_encryption_options, {})[:encrypted_fields_map]
end

# @api private
def timeout_ms
@options[:timeout_ms]
end

private

# Create a new encrypter object using the client's auto encryption options
Expand Down
9 changes: 7 additions & 2 deletions lib/mongo/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -783,8 +783,13 @@ def has_writable_server?
# @return [ Mongo::Server ] A primary server.
#
# @since 2.0.0
def next_primary(ping = nil, session = nil)
ServerSelector.primary.select_server(self, nil, session)
def next_primary(ping = nil, session = nil, remaining_timeout_ms: nil)
ServerSelector.primary.select_server(
self,
nil,
session,
remaining_timeout_ms: remaining_timeout_ms
)
end

# Get the connection pool for the server.
Expand Down
24 changes: 21 additions & 3 deletions lib/mongo/collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,10 @@ def create(opts = {})
self.write_concern
end

context = Operation::Context.new(client: client, session: session)
context = Operation::Context.new(
client: client,
session: session
)
maybe_create_qe_collections(opts[:encrypted_fields], client, session) do |encrypted_fields|
Operation::Create.new(
selector: operation,
Expand All @@ -413,7 +416,10 @@ def create(opts = {})
collation: options[:collation] || options['collation'],
encrypted_fields: encrypted_fields,
validator: options[:validator],
).execute(next_primary(nil, session), context: context)
).execute(
next_primary(nil, session),
context: context
)
end
end
end
Expand Down Expand Up @@ -801,7 +807,11 @@ def insert_one(document, opts = {})
raise ArgumentError, "Document to be inserted cannot be nil"
end

context = Operation::Context.new(client: client, session: session)
context = Operation::Context.new(
client: client,
session: session,
timeout_ms: timeout_ms(opts)
)
write_with_retry(write_concern, context: context) do |connection, txn_num, context|
Operation::Insert.new(
:documents => [ document ],
Expand Down Expand Up @@ -1152,5 +1162,13 @@ def namespace
def system_collection?
name.start_with?('system.')
end

def timeout_ms(opts = {})
if opts.key?(:timeout_ms)
opts.delete(:timeout_ms)
else
options.fetch(:timeout_ms) { database.timeout_ms }
end
end
end
end
3 changes: 2 additions & 1 deletion lib/mongo/collection/view.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ class View
:client,
:cluster,
:database,
:nro_write_with_retry,
:read_with_retry,
:read_with_retry_cursor,
:timeout_ms,
:write_with_retry,
:nro_write_with_retry,
:write_concern_with_session

# Delegate to the cluster for the next primary.
Expand Down
4 changes: 4 additions & 0 deletions lib/mongo/database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -497,5 +497,9 @@ def self.create(client)
database = Database.new(client, client.options[:database], client.options)
client.instance_variable_set(:@database, database)
end

def timeout_ms
options.fetch(:timeout_ms) { client.timeout_ms }
end
end
end
37 changes: 36 additions & 1 deletion lib/mongo/operation/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ module Operation
#
# @api private
class Context
def initialize(client: nil, session: nil, connection_global_id: nil, options: nil)
def initialize(
client: nil,
session: nil,
connection_global_id: nil,
timeout_ms: nil,
options: nil
)
if options
if client
raise ArgumentError, 'Client and options cannot both be specified'
Expand All @@ -50,14 +56,25 @@ def initialize(client: nil, session: nil, connection_global_id: nil, options: ni
raise ArgumentError, 'Trying to pin context to a connection when the session is already pinned to a connection.'
end

if timeout_ms && timeout_ms < 0
raise ArgumentError, 'timeout_ms must be a positive integer'
end

@client = client
@session = session
@connection_global_id = connection_global_id
@timeout_ms = timeout_ms
@deadline = if @timeout_ms && @timeout_ms > 0
Utils.monotonic_time + (@timeout_ms / 1_000.0)
else
nil
end
@options = options
end

attr_reader :client
attr_reader :session
attr_reader :deadline
attr_reader :options

def connection_global_id
Expand Down Expand Up @@ -133,6 +150,24 @@ def encrypter
raise Error::InternalDriverError, 'Encrypter should only be accessed when encryption is to be performed'
end
end

def remaining_timeout_sec
return nil if @timeout_ms.nil? || @timeout_ms == 0

remaining_seconds = deadline - Utils.monotonic_time
if remaining_seconds <= 0
0
else
remaining_seconds
end
end

def remaining_timeout_ms
seconds = remaining_timeout_sec
return nil if seconds.nil?

(seconds * 1_000).to_i
end
end
end
end
5 changes: 4 additions & 1 deletion lib/mongo/operation/shared/op_msg_executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ module OpMsgExecutable
#
# @return [ Mongo::Operation::Result ] The operation result.
def execute(server, context:, options: {})
server.with_connection(connection_global_id: context.connection_global_id) do |connection|
server.with_connection(
connection_global_id: context.connection_global_id,
context: context
) do |connection|
execute_with_connection(connection, context: context, options: options)
end
end
Expand Down
5 changes: 4 additions & 1 deletion lib/mongo/operation/shared/write.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ module Write
#
# @since 2.5.2
def execute(server, context:)
server.with_connection(connection_global_id: context.connection_global_id) do |connection|
server.with_connection(
connection_global_id: context.connection_global_id,
context: context
) do |connection|
execute_with_connection(connection, context: context)
end
end
Expand Down
10 changes: 8 additions & 2 deletions lib/mongo/retryable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@ module Retryable
# @api private
#
# @return [ Mongo::Server ] A server matching the server preference.
def select_server(cluster, server_selector, session, failed_server = nil)
server_selector.select_server(cluster, nil, session, deprioritized: [failed_server].compact)
def select_server(cluster, server_selector, session, failed_server = nil, remaining_timeout_ms: nil)
server_selector.select_server(
cluster,
nil,
session,
deprioritized: [failed_server].compact,
remaining_timeout_ms: remaining_timeout_ms
)
end

# Returns the read worker for handling retryable reads.
Expand Down
31 changes: 26 additions & 5 deletions lib/mongo/retryable/write_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ def write_with_retry(write_concern, ending_transaction: false, context:, &block)
# If we are here, session is not nil. A session being nil would have
# failed retry_write_allowed? check.

server = select_server(cluster, ServerSelector.primary, session)
server = select_server(
cluster, ServerSelector.primary,
session,
remaining_timeout_ms: context.remaining_timeout_ms
)

unless ending_transaction || server.retry_writes?
return legacy_write_with_retry(server, context: context, &block)
Expand Down Expand Up @@ -177,8 +181,16 @@ def legacy_write_with_retry(server = nil, context:)
attempt = 0
begin
attempt += 1
server ||= select_server(cluster, ServerSelector.primary, session)
server.with_connection(connection_global_id: context.connection_global_id) do |connection|
server ||= select_server(
cluster,
ServerSelector.primary,
session,
remaining_timeout_ms: context.remaining_timeout_ms
)
server.with_connection(
connection_global_id: context.connection_global_id,
context: context
) do |connection|
# Legacy retries do not use txn_num
yield connection, nil, context.dup
end
Expand Down Expand Up @@ -220,7 +232,10 @@ def modern_write_with_retry(session, server, context, &block)
txn_num = nil
connection_succeeded = false

server.with_connection(connection_global_id: context.connection_global_id) do |connection|
server.with_connection(
connection_global_id: context.connection_global_id,
context: context
) do |connection|
connection_succeeded = true

session.materialize_if_needed
Expand Down Expand Up @@ -263,7 +278,13 @@ def retry_write(original_error, txn_num, context:, failed_server: nil, &block)
# server description and/or topology as necessary (specifically,
# a socket error or a not master error should have marked the respective
# server unknown). Here we just need to wait for server selection.
server = select_server(cluster, ServerSelector.primary, session, failed_server)
server = select_server(
cluster,
ServerSelector.primary,
session,
failed_server,
remaining_timeout_ms: context.remaining_timeout_ms
)

unless server.retry_writes?
# Do not need to add "modern retry" here, it should already be on
Expand Down
8 changes: 6 additions & 2 deletions lib/mongo/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,12 @@ def reconnect!
# @return [ Object ] The result of the block execution.
#
# @since 2.3.0
def with_connection(connection_global_id: nil, &block)
pool.with_connection(connection_global_id: connection_global_id, &block)
def with_connection(connection_global_id: nil, context: nil, &block)
pool.with_connection(
connection_global_id: connection_global_id,
context: context,
&block
)
end

# Handle handshake failure.
Expand Down
15 changes: 10 additions & 5 deletions lib/mongo/server/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ def unpin
# @return [ true ] If the connection succeeded.
#
# @since 2.0.0
def connect!
def connect!(context = nil)
raise_if_closed!

unless @socket
@socket = create_socket
@socket = create_socket(context)
@description, @compressor = do_connect

if server.load_balancer?
Expand All @@ -256,10 +256,15 @@ def connect!
#
#
# @return [ Socket ] The created socket.
private def create_socket
private def create_socket(context = nil)
add_server_diagnostics do
address.socket(socket_timeout, ssl_options.merge(
connection_address: address, connection_generation: generation, pipe: options[:pipe]))
opts = ssl_options.merge(
connection_address: address,
connection_generation: generation,
pipe: options[:pipe],
connect_timeout: context&.remaining_timeout_sec
)
address.socket(socket_timeout, opts)
end
end

Expand Down
Loading
Loading