Skip to content

Commit

Permalink
RUBY-3372 Add CSOT for CRUD commands (#2849)
Browse files Browse the repository at this point in the history
  • Loading branch information
comandeo-mongo authored Mar 10, 2024
1 parent 8ddd727 commit e915e84
Show file tree
Hide file tree
Showing 14 changed files with 477 additions and 57 deletions.
1 change: 1 addition & 0 deletions lib/mongo/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,7 @@ def encrypted_fields_map
@encrypted_fields_map ||= @options.fetch(:auto_encryption_options, {})[:encrypted_fields_map]
end

# @return [ Integer | nil ] Value of timeout_ms option if set.
# @api private
def timeout_ms
@options[:timeout_ms]
Expand Down
46 changes: 41 additions & 5 deletions lib/mongo/collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ def ==(other)
# and *:nearest*.
# - *:tag_sets* -- an array of hashes.
# - *:local_threshold*.
# @option opts [ Session ] :session The session to use for the operation.
# @option opts [ Integer ] :size The size of the capped collection.
# @option options [ Session ] :session The session to use for the operation.
# @option options [ Integer ] :size The size of the capped collection.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
# @option opts [ Hash ] :time_series Create a time-series collection.
# The hash may have the following items:
# - *:timeField* -- The name of the field which contains the date in each
Expand Down Expand Up @@ -502,6 +504,8 @@ def drop(opts = {})
# @option options [ Integer ] :skip The number of docs to skip before returning results.
# @option options [ Hash ] :sort The key and direction pairs by which the result set
# will be sorted.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
# @option options [ Hash ] :let Mapping of variables to use in the command.
# See the server documentation for details.
#
Expand Down Expand Up @@ -539,6 +543,8 @@ def find(filter = nil, options = {})
# as of server version 3.6, aggregations always provide results using a
# cursor and this option is therefore not valid.
# @option options [ Session ] :session The session to use.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
#
# @return [ View::Aggregation ] The aggregation object.
#
Expand Down Expand Up @@ -637,6 +643,8 @@ def watch(pipeline = [], options = {})
# @option options [ Session ] :session The session to use.
# @option options [ Object ] :comment A user-provided
# comment to attach to this command.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
#
# @return [ Integer ] The document count.
#
Expand Down Expand Up @@ -673,6 +681,8 @@ def count(filter = nil, options = {})
# @option options [ Session ] :session The session to use.
# @option options [ Object ] :comment A user-provided
# comment to attach to this command.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
#
# @return [ Integer ] The document count.
#
Expand All @@ -694,6 +704,8 @@ def count_documents(filter = {}, options = {})
# @option options [ Hash ] :read The read preference options.
# @option options [ Object ] :comment A user-provided
# comment to attach to this command.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
#
# @return [ Integer ] The document count.
#
Expand All @@ -715,6 +727,8 @@ def estimated_document_count(options = {})
# @option options [ Hash ] :read The read preference options.
# @option options [ Hash ] :collation The collation to use.
# @option options [ Session ] :session The session to use.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
#
# @return [ Array<Object> ] The list of distinct values.
#
Expand Down Expand Up @@ -787,6 +801,8 @@ def inspect
# @option opts [ Object ] :comment A user-provided comment to attach to
# this command.
# @option opts [ Session ] :session The session to use for the operation.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
# @option opts [ Hash ] :write_concern The write concern options.
# Can be :w => Integer, :fsync => Boolean, :j => Boolean.
#
Expand Down Expand Up @@ -844,6 +860,8 @@ def insert_one(document, opts = {})
# @option options [ true | false ] :ordered Whether the operations
# should be executed in order.
# @option options [ Session ] :session The session to use for the operation.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
# @option options [ Hash ] :write_concern The write concern options.
# Can be :w => Integer, :fsync => Boolean, :j => Boolean.
#
Expand Down Expand Up @@ -872,6 +890,8 @@ def insert_many(documents, options = {})
# @option options [ true | false ] :bypass_document_validation Whether or
# not to skip document level validation.
# @option options [ Session ] :session The session to use for the set of operations.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
# @option options [ Hash ] :let Mapping of variables to use in the command.
# See the server documentation for details.
#
Expand All @@ -894,6 +914,8 @@ def bulk_write(requests, options = {})
# @option options [ Session ] :session The session to use.
# @option options [ Hash | String ] :hint The index to use for this operation.
# May be specified as a Hash (e.g. { _id: 1 }) or a String (e.g. "_id_").
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
# @option options [ Hash ] :let Mapping of variables to use in the command.
# See the server documentation for details.
#
Expand All @@ -916,6 +938,8 @@ def delete_one(filter = nil, options = {})
# @option options [ Session ] :session The session to use.
# @option options [ Hash | String ] :hint The index to use for this operation.
# May be specified as a Hash (e.g. { _id: 1 }) or a String (e.g. "_id_").
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
# @option options [ Hash ] :let Mapping of variables to use in the command.
# See the server documentation for details.
#
Expand Down Expand Up @@ -964,6 +988,8 @@ def parallel_scan(cursor_count, options = {})
# not to skip document level validation.
# @option options [ Hash ] :collation The collation to use.
# @option options [ Session ] :session The session to use.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
# @option options [ Hash | String ] :hint The index to use for this operation.
# May be specified as a Hash (e.g. { _id: 1 }) or a String (e.g. "_id_").
# @option options [ Hash ] :let Mapping of variables to use in the command.
Expand Down Expand Up @@ -993,6 +1019,8 @@ def replace_one(filter, replacement, options = {})
# @option options [ Array ] :array_filters A set of filters specifying to which array elements
# an update should apply.
# @option options [ Session ] :session The session to use.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
# @option options [ Hash | String ] :hint The index to use for this operation.
# May be specified as a Hash (e.g. { _id: 1 }) or a String (e.g. "_id_").
# @option options [ Hash ] :let Mapping of variables to use in the command.
Expand Down Expand Up @@ -1022,6 +1050,8 @@ def update_many(filter, update, options = {})
# @option options [ Array ] :array_filters A set of filters specifying to which array elements
# an update should apply.
# @option options [ Session ] :session The session to use.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
# @option options [ Hash | String ] :hint The index to use for this operation.
# May be specified as a Hash (e.g. { _id: 1 }) or a String (e.g. "_id_").
# @option options [ Hash ] :let Mapping of variables to use in the command.
Expand Down Expand Up @@ -1052,6 +1082,8 @@ def update_one(filter, update, options = {})
# Defaults to the collection's write concern.
# @option options [ Hash ] :collation The collation to use.
# @option options [ Session ] :session The session to use.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
# @option options [ Hash | String ] :hint The index to use for this operation.
# May be specified as a Hash (e.g. { _id: 1 }) or a String (e.g. "_id_").
# @option options [ Hash ] :let Mapping of variables to use in the command.
Expand Down Expand Up @@ -1096,6 +1128,8 @@ def find_one_and_delete(filter, options = {})
# May be specified as a Hash (e.g. { _id: 1 }) or a String (e.g. "_id_").
# @option options [ Hash ] :let Mapping of variables to use in the command.
# See the server documentation for details.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
#
# @return [ BSON::Document ] The document.
#
Expand Down Expand Up @@ -1132,6 +1166,8 @@ def find_one_and_update(filter, update, options = {})
# @option options [ Session ] :session The session to use.
# @option options [ Hash | String ] :hint The index to use for this operation.
# May be specified as a Hash (e.g. { _id: 1 }) or a String (e.g. "_id_").
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
# @option options [ Hash ] :let Mapping of variables to use in the command.
# See the server documentation for details.
#
Expand Down Expand Up @@ -1164,10 +1200,10 @@ def system_collection?
end

def timeout_ms(opts = {})
if opts.key?(:timeout_ms)
opts.delete(:timeout_ms)
if opts[:timeout_ms].nil?
options[:timeout_ms] || database.timeout_ms
else
options.fetch(:timeout_ms) { database.timeout_ms }
opts.delete(:timeout_ms)
end
end
end
Expand Down
10 changes: 10 additions & 0 deletions lib/mongo/collection/view.rb
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ def hash
# document more than once. Deprecated as of MongoDB server version 4.0.
# @option options [ Hash ] :sort The key and direction pairs used to sort
# the results.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
#
# @since 2.0.0
def initialize(collection, filter = {}, options = {})
Expand Down Expand Up @@ -214,6 +216,14 @@ def view; self; end
def with_session(opts = {}, &block)
client.send(:with_session, @options.merge(opts), &block)
end

def timeout_ms(opts = {})
if opts[:timeout_ms].nil?
options[:timeout_ms] || database.timeout_ms
else
opts.delete(:timeout_ms)
end
end
end
end
end
17 changes: 16 additions & 1 deletion lib/mongo/collection/view/aggregation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ def allow_disk_use(value = nil)
# as of server version 3.6, aggregations always provide results using a
# cursor and this option is therefore not valid.
# @option options [ Session ] :session The session to use.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
#
# @since 2.0.0
def initialize(view, pipeline, options = {})
Expand Down Expand Up @@ -181,13 +183,18 @@ def effective_read_preference(connection)
end

def send_initial_query(server, session)
context = Operation::Context.new(
client: client,
session: session,
timeout_ms: timeout_ms
)
server.with_connection do |connection|
initial_query_op(
session,
effective_read_preference(connection)
).execute_with_connection(
connection,
context: Operation::Context.new(client: client, session: session)
context: context
)
end
end
Expand All @@ -206,6 +213,14 @@ def cache_options
multi_collection: true,
}
end

def timeout_ms(opts = {})
if opts[:timeout_ms].nil?
options[:timeout_ms] || database.timeout_ms
else
opts.delete(:timeout_ms)
end
end
end
end
end
Expand Down
7 changes: 6 additions & 1 deletion lib/mongo/collection/view/iterable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,12 @@ def initial_query_op(session)
end

def send_initial_query(server, session = nil)
initial_query_op(session).execute(server, context: Operation::Context.new(client: client, session: session))
context = Operation::Context.new(
client: client,
session: session,
timeout_ms: timeout_ms
)
initial_query_op(session).execute(server, context: context)
end

def use_query_cache?
Expand Down
41 changes: 38 additions & 3 deletions lib/mongo/collection/view/readable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ module Readable
# as of server version 3.6, aggregations always provide results using a
# cursor and this option is therefore not valid.
# @option options [ Session ] :session The session to use.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
#
# @return [ Aggregation ] The aggregation object.
#
Expand Down Expand Up @@ -157,6 +159,8 @@ def comment(comment = nil)
# @option opts [ Mongo::Session ] :session The session to use for the operation.
# @option opts [ Object ] :comment A user-provided
# comment to attach to this command.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
#
# @return [ Integer ] The document count.
#
Expand Down Expand Up @@ -193,7 +197,14 @@ def count(opts = {})
# string key. Note that this isn't documented as valid usage.
collation: opts[:collation] || opts['collation'] || collation,
comment: opts[:comment],
).execute(server, context: Operation::Context.new(client: client, session: session))
).execute(
server,
context: Operation::Context.new(
client: client,
session: session,
timeout_ms: timeout_ms(opts)
)
)
end.n.to_i
end
end
Expand All @@ -216,6 +227,8 @@ def count(opts = {})
# @option opts [ Mongo::Session ] :session The session to use for the operation.
# @option ops [ Object ] :comment A user-provided
# comment to attach to this command.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
#
# @return [ Integer ] The document count.
#
Expand Down Expand Up @@ -247,6 +260,8 @@ def count_documents(opts = {})
# @option opts [ Hash ] :read The read preference options.
# @option opts [ Object ] :comment A user-provided
# comment to attach to this command.
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
#
# @return [ Integer ] The document count.
#
Expand All @@ -268,7 +283,11 @@ def estimated_document_count(opts = {})
selector = ServerSelector.get(read_pref || server_selector)
with_session(opts) do |session|
read_with_retry(session, selector) do |server|
context = Operation::Context.new(client: client, session: session)
context = Operation::Context.new(
client: client,
session: session,
timeout_ms: timeout_ms(opts)
)
cmd = { count: collection.name }
cmd[:maxTimeMS] = opts[:max_time_ms] if opts[:max_time_ms]
if read_concern
Expand Down Expand Up @@ -342,7 +361,14 @@ def distinct(field_name, opts = {})
# For some reason collation was historically accepted as a
# string key. Note that this isn't documented as valid usage.
collation: opts[:collation] || opts['collation'] || collation,
).execute(server, context: Operation::Context.new(client: client, session: session))
).execute(
server,
context: Operation::Context.new(
client: client,
session: session,
timeout_ms: timeout_ms(opts)
)
)
end.first['values']
end
end
Expand Down Expand Up @@ -627,6 +653,15 @@ def cursor_type(type = nil)
configure(:cursor_type, type)
end

# The per-operation timeout in milliseconds. Must a positive integer.
#
# @param [ Integer ] timeout_ms Timeout value.
#
# @return [ Integer, View ] Either the timeout_ms value or a new +View+.
def timeout_ms(timeout_ms = nil)
configure(:timeout_ms, timeout_ms)
end

# @api private
def read_concern
if options[:session] && options[:session].in_transaction?
Expand Down
Loading

0 comments on commit e915e84

Please sign in to comment.