Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ActiveJob support #12

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ def run_atomically(lock_key, tries=ConcurrentRestriction.lock_tries)
# to release_restriction when job completes
def stash_if_restricted(job)
restricted = nil
tracking_key = tracking_key(*job.args)
tracking_key = tracking_key(*Helper.job_args(job))
lock_key = lock_key(tracking_key)

did_run = run_atomically(lock_key) do
Expand Down Expand Up @@ -434,7 +434,7 @@ def next_runnable_job(queue)

# Decrements the running_count - to be called at end of job
def release_restriction(job)
tracking_key = tracking_key(*job.args)
tracking_key = tracking_key(*Helper.job_args(args))
lock_key = lock_key(tracking_key)

run_atomically(lock_key) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,42 @@ def reserve_with_restriction
# This needs to be a instance method
def done_working_with_restriction
begin
job_class = @job_in_progress.payload_class
job_class.release_restriction(@job_in_progress) if job_class.is_a?(ConcurrentRestriction)
job_class = Helper.payload_class @job_in_progress
job_class.release_restriction(@job_in_progress) if Helper.restrict_concurrency? @job_in_progress
ensure
return done_working_without_restriction
end
end

end

module Job
module Helper
def self.payload_class(job)
return job.payload_class unless active_job? job
args = job_args job
Object.const_get args['job_class']
end

# ActiveJobs are an ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper
def self.active_job?(job)
job.payload_class.is_a? ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper
end

def self.job_args(job)
return job.args unless active_job? job
job.args[0]['arguments']
end

def self.restrict_concurrency?(job)
payload_class(job).is_a? ConcurrentRestriction
end
end

module Job
def self.extended(receiver)
class << receiver
alias reserve_without_restriction reserve
alias reserve reserve_with_restriction
end
class << receiver
alias reserve_without_restriction reserve
alias reserve reserve_with_restriction
end
end

# Wrap reserve so we can move a job to restriction queue if it is restricted
Expand Down Expand Up @@ -74,16 +94,12 @@ def get_queued_job(queue)
# Short-curcuit if a job was not found
return if resque_job.nil?

# If there is a job on regular queues, then only run it if its not restricted
job_class = resque_job.payload_class
job_args = resque_job.args

# Return to work on job if not a restricted job
return resque_job unless job_class.is_a?(ConcurrentRestriction)
return resque_job unless Helper.restrict_concurrency? resque_job

# Keep trying if job is restricted. If job is runnable, we keep the lock until
# done_working
return resque_job unless job_class.stash_if_restricted(resque_job)
return resque_job unless job_class.stash_if_restricted resque_job
end

# Safety net, here in case we hit the upper bound and there are still queued items
Expand Down
2 changes: 1 addition & 1 deletion resque-concurrent-restriction.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Gem::Specification.new do |s|
s.require_paths = ["lib"]

s.add_dependency("resque", '~> 1.25')
s.add_dependency("activesupport", '~> 3.2')
s.add_dependency("activesupport", '~> 4')

s.add_development_dependency('rspec', '~> 2.5')
s.add_development_dependency('awesome_print')
Expand Down