From f40ac9551cd05113afa0dd6fbaf34e59e0df7473 Mon Sep 17 00:00:00 2001 From: Craig R Webster Date: Fri, 29 Jul 2016 17:55:59 +0100 Subject: [PATCH 1/2] Add ActiveJob support. ActiveJob wraps the job classes so the `is_a` checks don't work properly. --- .../resque_worker_extension.rb | 37 ++++++++++++------- resque-concurrent-restriction.gemspec | 2 +- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb b/lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb index 6fb4235..2145590 100644 --- a/lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb +++ b/lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb @@ -26,22 +26,34 @@ def reserve_with_restriction # This needs to be a instance method def done_working_with_restriction begin - job_class = @job_in_progress.payload_class - job_class.release_restriction(@job_in_progress) if job_class.is_a?(ConcurrentRestriction) + job_class = Helper.payload_class @job_in_progress + job_class.release_restriction(@job_in_progress) if Helper.restrict_concurrency? job_class ensure return done_working_without_restriction end end - end - module Job + module Helper + # ActiveJobs are an ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper + # Check the payload for the class we'd like them to be when they're run. + def self.payload_class(job) + return job.payload_class if job.payload_class.is_a? ConcurrentRestriction + class_name = job.args[0]['job_class'] + Object.const_get class_name + end + + def self.restrict_concurrency?(job_class) + job_class.is_a? ConcurrentRestriction + end + end + module Job def self.extended(receiver) - class << receiver - alias reserve_without_restriction reserve - alias reserve reserve_with_restriction - end + class << receiver + alias reserve_without_restriction reserve + alias reserve reserve_with_restriction + end end # Wrap reserve so we can move a job to restriction queue if it is restricted @@ -74,16 +86,13 @@ def get_queued_job(queue) # Short-curcuit if a job was not found return if resque_job.nil? - # If there is a job on regular queues, then only run it if its not restricted - job_class = resque_job.payload_class - job_args = resque_job.args - # Return to work on job if not a restricted job - return resque_job unless job_class.is_a?(ConcurrentRestriction) + job_class = Helper.payload_class(resque_job) + return resque_job unless Helper.restrict_concurrency? job_class # Keep trying if job is restricted. If job is runnable, we keep the lock until # done_working - return resque_job unless job_class.stash_if_restricted(resque_job) + return resque_job unless job_class.stash_if_restricted resque_job end # Safety net, here in case we hit the upper bound and there are still queued items diff --git a/resque-concurrent-restriction.gemspec b/resque-concurrent-restriction.gemspec index 5b086a4..e7d3200 100644 --- a/resque-concurrent-restriction.gemspec +++ b/resque-concurrent-restriction.gemspec @@ -20,7 +20,7 @@ Gem::Specification.new do |s| s.require_paths = ["lib"] s.add_dependency("resque", '~> 1.25') - s.add_dependency("activesupport", '~> 3.2') + s.add_dependency("activesupport", '~> 4') s.add_development_dependency('rspec', '~> 2.5') s.add_development_dependency('awesome_print') From 126797114df5845478c88c3fd5750abbb10dfc7a Mon Sep 17 00:00:00 2001 From: Craig R Webster Date: Mon, 1 Aug 2016 13:39:04 +0100 Subject: [PATCH 2/2] Pass the expected args array when ActiveJob --- .../concurrent_restriction_job.rb | 4 +-- .../resque_worker_extension.rb | 27 ++++++++++++------- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb b/lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb index 3e5608d..adf2f20 100644 --- a/lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb +++ b/lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb @@ -383,7 +383,7 @@ def run_atomically(lock_key, tries=ConcurrentRestriction.lock_tries) # to release_restriction when job completes def stash_if_restricted(job) restricted = nil - tracking_key = tracking_key(*job.args) + tracking_key = tracking_key(*Helper.job_args(job)) lock_key = lock_key(tracking_key) did_run = run_atomically(lock_key) do @@ -434,7 +434,7 @@ def next_runnable_job(queue) # Decrements the running_count - to be called at end of job def release_restriction(job) - tracking_key = tracking_key(*job.args) + tracking_key = tracking_key(*Helper.job_args(args)) lock_key = lock_key(tracking_key) run_atomically(lock_key) do diff --git a/lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb b/lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb index 2145590..fcc9041 100644 --- a/lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb +++ b/lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb @@ -27,7 +27,7 @@ def reserve_with_restriction def done_working_with_restriction begin job_class = Helper.payload_class @job_in_progress - job_class.release_restriction(@job_in_progress) if Helper.restrict_concurrency? job_class + job_class.release_restriction(@job_in_progress) if Helper.restrict_concurrency? @job_in_progress ensure return done_working_without_restriction end @@ -35,16 +35,24 @@ def done_working_with_restriction end module Helper - # ActiveJobs are an ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper - # Check the payload for the class we'd like them to be when they're run. def self.payload_class(job) - return job.payload_class if job.payload_class.is_a? ConcurrentRestriction - class_name = job.args[0]['job_class'] - Object.const_get class_name + return job.payload_class unless active_job? job + args = job_args job + Object.const_get args['job_class'] + end + + # ActiveJobs are an ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper + def self.active_job?(job) + job.payload_class.is_a? ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper + end + + def self.job_args(job) + return job.args unless active_job? job + job.args[0]['arguments'] end - def self.restrict_concurrency?(job_class) - job_class.is_a? ConcurrentRestriction + def self.restrict_concurrency?(job) + payload_class(job).is_a? ConcurrentRestriction end end @@ -87,8 +95,7 @@ def get_queued_job(queue) return if resque_job.nil? # Return to work on job if not a restricted job - job_class = Helper.payload_class(resque_job) - return resque_job unless Helper.restrict_concurrency? job_class + return resque_job unless Helper.restrict_concurrency? resque_job # Keep trying if job is restricted. If job is runnable, we keep the lock until # done_working