From c46c1185cdc28be8d9a51dd17fa2b5853a32c406 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Thu, 19 Oct 2023 10:01:08 +0200 Subject: [PATCH] RUBY-3332 Fix tailable cursors --- lib/mongo/collection/view.rb | 1 + lib/mongo/collection/view/iterable.rb | 15 ++ spec/integration/find_options_spec.rb | 227 ++++++++++++++++++++++++++ 3 files changed, 243 insertions(+) create mode 100644 spec/integration/find_options_spec.rb diff --git a/lib/mongo/collection/view.rb b/lib/mongo/collection/view.rb index 794e04e5c7..dcc0f89752 100644 --- a/lib/mongo/collection/view.rb +++ b/lib/mongo/collection/view.rb @@ -127,6 +127,7 @@ def hash # return in each response from MongoDB. # @option options [ Hash ] :collation The collation to use. # @option options [ String ] :comment Associate a comment with the query. + # @option options [ :tailable, :tailable_await ] :cursor_type The type of cursor to use. # @option options [ Hash ] :explain Execute an explain with the provided # explain options (known options are :verbose and :verbosity) rather # than a find. diff --git a/lib/mongo/collection/view/iterable.rb b/lib/mongo/collection/view/iterable.rb index f9d7b1603a..f16569efac 100644 --- a/lib/mongo/collection/view/iterable.rb +++ b/lib/mongo/collection/view/iterable.rb @@ -185,6 +185,8 @@ def initial_query_op(session) collection.client.log_warn("The :oplog_replay option is deprecated and ignored by MongoDB 4.4 and later") end + maybe_set_tailable_options(spec) + if explained? spec[:explain] = options[:explain] Operation::Explain.new(spec) @@ -200,6 +202,19 @@ def send_initial_query(server, session = nil) def use_query_cache? QueryCache.enabled? && !collection.system_collection? end + + # Add tailable cusror options to the command specifiction if needed. + # + # @param [ Hash ] spec The command specification. + def maybe_set_tailable_options(spec) + case cursor_type + when :tailable + spec[:tailable] = true + when :tailable_await + spec[:tailable] = true + spec[:await_data] = true + end + end end end end diff --git a/spec/integration/find_options_spec.rb b/spec/integration/find_options_spec.rb new file mode 100644 index 0000000000..519a164b70 --- /dev/null +++ b/spec/integration/find_options_spec.rb @@ -0,0 +1,227 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe 'Find operation options' do + require_mri + require_no_auth + min_server_fcv '4.4' + + let(:subscriber) { Mrss::EventSubscriber.new } + + let(:seeds) do + [ SpecConfig.instance.addresses.first ] + end + + let(:client_options) do + {} + end + + let(:collection_options) do + {} + end + + let(:client) do + ClientRegistry.instance.new_local_client( + seeds, + SpecConfig.instance.test_options + .merge(database: SpecConfig.instance.test_db) + .merge(client_options) + ).tap do |client| + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + end + end + + let(:collection) do + client['find_options', collection_options] + end + + let(:find_command) do + subscriber.started_events.find { |cmd| cmd.command_name == 'find' } + end + + let(:should_create_collection) { true } + + before do + client['find_options'].drop + collection.create if should_create_collection + collection.insert_many([ { a: 1 }, { a: 2 }, { a: 3 } ]) + end + + describe 'collation' do + let(:client_options) do + {} + end + + let(:collation) do + { 'locale' => 'en_US' } + end + + context 'when defined on the collection' do + let(:collection_options) do + { collation: collation } + end + + it 'uses the collation defined on the collection' do + collection.find.to_a + expect(find_command.command['collation']).to be_nil + end + end + + context 'when defined on the operation' do + let(:collection_options) do + {} + end + + it 'uses the collation defined on the collection' do + collection.find({}, collation: collation).to_a + expect(find_command.command['collation']).to eq(collation) + end + end + + context 'when defined on both collection and operation' do + let(:collection_options) do + { 'locale' => 'de_AT' } + end + + let(:should_create_collection) { false } + + it 'uses the collation defined on the collection' do + collection.find({}, collation: collation).to_a + expect(find_command.command['collation']).to eq(collation) + end + end + end + + describe 'read concern' do + context 'when defined on the client' do + let(:client_options) do + { read_concern: { level: :local } } + end + + let(:collection_options) do + {} + end + + it 'uses the read concern defined on the client' do + collection.find.to_a + expect(find_command.command['readConcern']).to eq('level' => 'local') + end + + context 'when defined on the collection' do + let(:collection_options) do + { read_concern: { level: :majority } } + end + + it 'uses the read concern defined on the collection' do + collection.find.to_a + expect(find_command.command['readConcern']).to eq('level' => 'majority') + end + + context 'when defined on the operation' do + let(:operation_read_concern) do + { level: :available } + end + + it 'uses the read concern defined on the operation' do + collection.find({}, read_concern: operation_read_concern).to_a + expect(find_command.command['readConcern']).to eq('level' => 'available') + end + end + end + + context 'when defined on the operation' do + let(:collection_options) do + {} + end + + let(:operation_read_concern) do + { level: :available } + end + + it 'uses the read concern defined on the operation' do + collection.find({}, read_concern: operation_read_concern).to_a + expect(find_command.command['readConcern']).to eq('level' => 'available') + end + end + end + + context 'when defined on the collection' do + let(:client_options) do + {} + end + + let(:collection_options) do + { read_concern: { level: :majority } } + end + + it 'uses the read concern defined on the collection' do + collection.find.to_a + expect(find_command.command['readConcern']).to eq('level' => 'majority') + end + + context 'when defined on the operation' do + let(:operation_read_concern) do + { level: :available } + end + + it 'uses the read concern defined on the operation' do + collection.find({}, read_concern: operation_read_concern).to_a + expect(find_command.command['readConcern']).to eq('level' => 'available') + end + end + end + end + + describe 'read preference' do + require_topology :replica_set + + context 'when defined on the client' do + let(:client_options) do + { read: { mode: :secondary } } + end + + let(:collection_options) do + {} + end + + it 'uses the read preference defined on the client' do + collection.find.to_a + expect(find_command.command['$readPreference']).to eq('mode' => 'secondary') + end + + context 'when defined on the collection' do + let(:collection_options) do + { read: { mode: :secondary_preferred } } + end + + it 'uses the read concern defined on the collection' do + collection.find.to_a + expect(find_command.command['$readPreference']).to eq('mode' => 'secondaryPreferred') + end + end + end + end + + describe 'cursor type' do + let(:collection_options) do + { capped: true, size: 1000 } + end + + context 'when cursor type is :tailable' do + it 'sets the cursor type to tailable' do + collection.find({}, cursor_type: :tailable).first + expect(find_command.command['tailable']).to be true + expect(find_command.command['awaitData']).to be_falsey + end + end + + context 'when cursor type is :tailable_await' do + it 'sets the cursor type to tailable' do + collection.find({}, cursor_type: :tailable_await).first + expect(find_command.command['tailable']).to be true + expect(find_command.command['awaitData']).to be true + end + end + end +end