Skip to content

Commit

Permalink
Merge pull request #349 from MishaConway/feature-support-plugging-in-…
Browse files Browse the repository at this point in the history
…custom-load-balancers

add support for being able to plug in custom load balancer
  • Loading branch information
sobrinho committed Feb 13, 2016
2 parents f557628 + 58df2cb commit bfce8ce
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 7 deletions.
8 changes: 8 additions & 0 deletions lib/octopus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ def self.config
end
end

def self.load_balancer=(balancer)
@load_balancer = balancer
end

def self.load_balancer
@load_balancer ||= Octopus::LoadBalancing::RoundRobin
end

def self.master_shard
((config && config[:master_shard]) || :master).to_sym
end
Expand Down
2 changes: 1 addition & 1 deletion lib/octopus/load_balancing/round_robin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def initialize(slaves_list)
end

# Returns the next available slave in the pool
def next
def next(options)
@slaves_list[@slave_index = (@slave_index + 1) % @slaves_list.length]
end
end
Expand Down
21 changes: 18 additions & 3 deletions lib/octopus/proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class Proxy
CURRENT_SHARD_KEY = 'octopus.current_shard'.freeze
CURRENT_GROUP_KEY = 'octopus.current_group'.freeze
CURRENT_SLAVE_GROUP_KEY = 'octopus.current_slave_group'.freeze
CURRENT_LOAD_BALANCE_OPTIONS_KEY = 'octopus.current_load_balance_options'.freeze
BLOCK_KEY = 'octopus.block'.freeze
LAST_CURRENT_SHARD_KEY = 'octopus.last_current_shard'.freeze
FULLY_REPLICATED_KEY = 'octopus.fully_replicated'.freeze
Expand Down Expand Up @@ -94,7 +95,7 @@ def initialize_replication(config)

@slaves_list = @shards.keys.map(&:to_s).sort
@slaves_list.delete('master')
@slaves_load_balancer = Octopus::LoadBalancing::RoundRobin.new(@slaves_list)
@slaves_load_balancer = Octopus.load_balancer.new(@slaves_list)
end

def current_model
Expand All @@ -117,6 +118,7 @@ def current_shard=(shard_symbol)
hash = shard_symbol
shard_symbol = hash[:shard]
slave_group_symbol = hash[:slave_group]
load_balance_options = hash[:load_balance_options]

if shard_symbol.nil? && slave_group_symbol.nil?
fail 'Neither shard or slave group must be specified'
Expand All @@ -133,6 +135,7 @@ def current_shard=(shard_symbol)
end
end
self.current_slave_group = slave_group_symbol
self.current_load_balance_options = load_balance_options
else
fail "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil?
end
Expand All @@ -159,6 +162,15 @@ def current_slave_group

def current_slave_group=(slave_group_symbol)
Thread.current[CURRENT_SLAVE_GROUP_KEY] = slave_group_symbol
Thread.current[CURRENT_LOAD_BALANCE_OPTIONS_KEY] = nil if slave_group_symbol.nil?
end

def current_load_balance_options
Thread.current[CURRENT_LOAD_BALANCE_OPTIONS_KEY]
end

def current_load_balance_options=(options)
Thread.current[CURRENT_LOAD_BALANCE_OPTIONS_KEY] = options
end

def block
Expand Down Expand Up @@ -429,7 +441,7 @@ def current_model_replicated?

def send_queries_to_selected_slave(method, *args, &block)
if current_model.replicated || fully_replicated?
selected_slave = @slaves_load_balancer.next
selected_slave = @slaves_load_balancer.next current_load_balance_options
else
selected_slave = Octopus.master_shard
end
Expand Down Expand Up @@ -457,7 +469,7 @@ def slaves_grouped?
# Temporarily switch `current_shard` to the next slave in a slave group and send queries to it
# while preserving `current_shard`
def send_queries_to_balancer(balancer, method, *args, &block)
send_queries_to_slave(balancer.next, method, *args, &block)
send_queries_to_slave(balancer.next(current_load_balance_options), method, *args, &block)
end

# Temporarily switch `current_shard` to the specified slave and send queries to it
Expand Down Expand Up @@ -487,6 +499,8 @@ def keeping_connection_proxy(shard, &_block)
def using_shard(shard, &_block)
older_shard = current_shard
older_slave_group = current_slave_group
older_load_balance_options = current_load_balance_options


begin
unless current_model && !current_model.allowed_shard?(shard)
Expand All @@ -496,6 +510,7 @@ def using_shard(shard, &_block)
ensure
self.current_shard = older_shard
self.current_slave_group = older_slave_group
self.current_load_balance_options = older_load_balance_options
end
end

Expand Down
6 changes: 3 additions & 3 deletions lib/octopus/slave_group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ class SlaveGroup
def initialize(slaves)
slaves = HashWithIndifferentAccess.new(slaves)
slaves_list = slaves.values
@load_balancer = Octopus::LoadBalancing::RoundRobin.new(slaves_list)
@load_balancer = Octopus.load_balancer.new(slaves_list)
end

def next
@load_balancer.next
def next(options)
@load_balancer.next options
end
end
end

0 comments on commit bfce8ce

Please sign in to comment.