Skip to content

Commit

Permalink
RUBY-3370 Add CSOT to server selection and connection checkout (#2833)
Browse files Browse the repository at this point in the history
  • Loading branch information
comandeo-mongo authored Feb 14, 2024
1 parent 44c607f commit e1158d3
Show file tree
Hide file tree
Showing 16 changed files with 257 additions and 43 deletions.
8 changes: 8 additions & 0 deletions lib/mongo/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class Client
:ssl_verify_certificate,
:ssl_verify_hostname,
:ssl_verify_ocsp_endpoint,
:timeout_ms,
:truncate_logs,
:user,
:wait_queue_timeout,
Expand Down Expand Up @@ -413,6 +414,8 @@ def hash
# @option options [ true, false ] :ssl_verify_hostname Whether to perform peer hostname
# validation. This setting overrides :ssl_verify with respect to whether hostname validation
# is performed.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
# @option options [ true, false ] :truncate_logs Whether to truncate the
# logs at the default 250 characters.
# @option options [ String ] :user The user name.
Expand Down Expand Up @@ -1185,6 +1188,11 @@ def encrypted_fields_map
@encrypted_fields_map ||= @options.fetch(:auto_encryption_options, {})[:encrypted_fields_map]
end

# @api private
def timeout_ms
@options[:timeout_ms]
end

private

# Create a new encrypter object using the client's auto encryption options
Expand Down
9 changes: 7 additions & 2 deletions lib/mongo/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -783,8 +783,13 @@ def has_writable_server?
# @return [ Mongo::Server ] A primary server.
#
# @since 2.0.0
def next_primary(ping = nil, session = nil)
ServerSelector.primary.select_server(self, nil, session)
def next_primary(ping = nil, session = nil, remaining_timeout_ms: nil)
ServerSelector.primary.select_server(
self,
nil,
session,
remaining_timeout_ms: remaining_timeout_ms
)
end

# Get the connection pool for the server.
Expand Down
24 changes: 21 additions & 3 deletions lib/mongo/collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,10 @@ def create(opts = {})
self.write_concern
end

context = Operation::Context.new(client: client, session: session)
context = Operation::Context.new(
client: client,
session: session
)
maybe_create_qe_collections(opts[:encrypted_fields], client, session) do |encrypted_fields|
Operation::Create.new(
selector: operation,
Expand All @@ -413,7 +416,10 @@ def create(opts = {})
collation: options[:collation] || options['collation'],
encrypted_fields: encrypted_fields,
validator: options[:validator],
).execute(next_primary(nil, session), context: context)
).execute(
next_primary(nil, session),
context: context
)
end
end
end
Expand Down Expand Up @@ -801,7 +807,11 @@ def insert_one(document, opts = {})
raise ArgumentError, "Document to be inserted cannot be nil"
end

context = Operation::Context.new(client: client, session: session)
context = Operation::Context.new(
client: client,
session: session,
timeout_ms: timeout_ms(opts)
)
write_with_retry(write_concern, context: context) do |connection, txn_num, context|
Operation::Insert.new(
:documents => [ document ],
Expand Down Expand Up @@ -1152,5 +1162,13 @@ def namespace
def system_collection?
name.start_with?('system.')
end

def timeout_ms(opts = {})
if opts.key?(:timeout_ms)
opts.delete(:timeout_ms)
else
options.fetch(:timeout_ms) { database.timeout_ms }
end
end
end
end
3 changes: 2 additions & 1 deletion lib/mongo/collection/view.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ class View
:client,
:cluster,
:database,
:nro_write_with_retry,
:read_with_retry,
:read_with_retry_cursor,
:timeout_ms,
:write_with_retry,
:nro_write_with_retry,
:write_concern_with_session

# Delegate to the cluster for the next primary.
Expand Down
4 changes: 4 additions & 0 deletions lib/mongo/database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -497,5 +497,9 @@ def self.create(client)
database = Database.new(client, client.options[:database], client.options)
client.instance_variable_set(:@database, database)
end

def timeout_ms
options.fetch(:timeout_ms) { client.timeout_ms }
end
end
end
37 changes: 36 additions & 1 deletion lib/mongo/operation/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ module Operation
#
# @api private
class Context
def initialize(client: nil, session: nil, connection_global_id: nil, options: nil)
def initialize(
client: nil,
session: nil,
connection_global_id: nil,
timeout_ms: nil,
options: nil
)
if options
if client
raise ArgumentError, 'Client and options cannot both be specified'
Expand All @@ -50,14 +56,25 @@ def initialize(client: nil, session: nil, connection_global_id: nil, options: ni
raise ArgumentError, 'Trying to pin context to a connection when the session is already pinned to a connection.'
end

if timeout_ms && timeout_ms < 0
raise ArgumentError, 'timeout_ms must be a positive integer'
end

@client = client
@session = session
@connection_global_id = connection_global_id
@timeout_ms = timeout_ms
@deadline = if @timeout_ms && @timeout_ms > 0
Utils.monotonic_time + (@timeout_ms / 1_000.0)
else
nil
end
@options = options
end

attr_reader :client
attr_reader :session
attr_reader :deadline
attr_reader :options

def connection_global_id
Expand Down Expand Up @@ -133,6 +150,24 @@ def encrypter
raise Error::InternalDriverError, 'Encrypter should only be accessed when encryption is to be performed'
end
end

def remaining_timeout_sec
return nil if @timeout_ms.nil? || @timeout_ms == 0

remaining_seconds = deadline - Utils.monotonic_time
if remaining_seconds <= 0
0
else
remaining_seconds
end
end

def remaining_timeout_ms
seconds = remaining_timeout_sec
return nil if seconds.nil?

(seconds * 1_000).to_i
end
end
end
end
5 changes: 4 additions & 1 deletion lib/mongo/operation/shared/op_msg_executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ module OpMsgExecutable
#
# @return [ Mongo::Operation::Result ] The operation result.
def execute(server, context:, options: {})
server.with_connection(connection_global_id: context.connection_global_id) do |connection|
server.with_connection(
connection_global_id: context.connection_global_id,
context: context
) do |connection|
execute_with_connection(connection, context: context, options: options)
end
end
Expand Down
5 changes: 4 additions & 1 deletion lib/mongo/operation/shared/write.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ module Write
#
# @since 2.5.2
def execute(server, context:)
server.with_connection(connection_global_id: context.connection_global_id) do |connection|
server.with_connection(
connection_global_id: context.connection_global_id,
context: context
) do |connection|
execute_with_connection(connection, context: context)
end
end
Expand Down
10 changes: 8 additions & 2 deletions lib/mongo/retryable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@ module Retryable
# @api private
#
# @return [ Mongo::Server ] A server matching the server preference.
def select_server(cluster, server_selector, session, failed_server = nil)
server_selector.select_server(cluster, nil, session, deprioritized: [failed_server].compact)
def select_server(cluster, server_selector, session, failed_server = nil, remaining_timeout_ms: nil)
server_selector.select_server(
cluster,
nil,
session,
deprioritized: [failed_server].compact,
remaining_timeout_ms: remaining_timeout_ms
)
end

# Returns the read worker for handling retryable reads.
Expand Down
31 changes: 26 additions & 5 deletions lib/mongo/retryable/write_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ def write_with_retry(write_concern, ending_transaction: false, context:, &block)
# If we are here, session is not nil. A session being nil would have
# failed retry_write_allowed? check.

server = select_server(cluster, ServerSelector.primary, session)
server = select_server(
cluster, ServerSelector.primary,
session,
remaining_timeout_ms: context.remaining_timeout_ms
)

unless ending_transaction || server.retry_writes?
return legacy_write_with_retry(server, context: context, &block)
Expand Down Expand Up @@ -177,8 +181,16 @@ def legacy_write_with_retry(server = nil, context:)
attempt = 0
begin
attempt += 1
server ||= select_server(cluster, ServerSelector.primary, session)
server.with_connection(connection_global_id: context.connection_global_id) do |connection|
server ||= select_server(
cluster,
ServerSelector.primary,
session,
remaining_timeout_ms: context.remaining_timeout_ms
)
server.with_connection(
connection_global_id: context.connection_global_id,
context: context
) do |connection|
# Legacy retries do not use txn_num
yield connection, nil, context.dup
end
Expand Down Expand Up @@ -220,7 +232,10 @@ def modern_write_with_retry(session, server, context, &block)
txn_num = nil
connection_succeeded = false

server.with_connection(connection_global_id: context.connection_global_id) do |connection|
server.with_connection(
connection_global_id: context.connection_global_id,
context: context
) do |connection|
connection_succeeded = true

session.materialize_if_needed
Expand Down Expand Up @@ -263,7 +278,13 @@ def retry_write(original_error, txn_num, context:, failed_server: nil, &block)
# server description and/or topology as necessary (specifically,
# a socket error or a not master error should have marked the respective
# server unknown). Here we just need to wait for server selection.
server = select_server(cluster, ServerSelector.primary, session, failed_server)
server = select_server(
cluster,
ServerSelector.primary,
session,
failed_server,
remaining_timeout_ms: context.remaining_timeout_ms
)

unless server.retry_writes?
# Do not need to add "modern retry" here, it should already be on
Expand Down
8 changes: 6 additions & 2 deletions lib/mongo/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,12 @@ def reconnect!
# @return [ Object ] The result of the block execution.
#
# @since 2.3.0
def with_connection(connection_global_id: nil, &block)
pool.with_connection(connection_global_id: connection_global_id, &block)
def with_connection(connection_global_id: nil, context: nil, &block)
pool.with_connection(
connection_global_id: connection_global_id,
context: context,
&block
)
end

# Handle handshake failure.
Expand Down
15 changes: 10 additions & 5 deletions lib/mongo/server/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ def unpin
# @return [ true ] If the connection succeeded.
#
# @since 2.0.0
def connect!
def connect!(context = nil)
raise_if_closed!

unless @socket
@socket = create_socket
@socket = create_socket(context)
@description, @compressor = do_connect

if server.load_balancer?
Expand All @@ -256,10 +256,15 @@ def connect!
#
#
# @return [ Socket ] The created socket.
private def create_socket
private def create_socket(context = nil)
add_server_diagnostics do
address.socket(socket_timeout, ssl_options.merge(
connection_address: address, connection_generation: generation, pipe: options[:pipe]))
opts = ssl_options.merge(
connection_address: address,
connection_generation: generation,
pipe: options[:pipe],
connect_timeout: context&.remaining_timeout_sec
)
address.socket(socket_timeout, opts)
end
end

Expand Down
Loading

0 comments on commit e1158d3

Please sign in to comment.