Skip to content

Commit

Permalink
Change things around to get closer to the reactive streams spec
Browse files Browse the repository at this point in the history
* Stream#subscribe now takes a Subscriber
* PushStream is renamed Source and has a #<< for publishing
* #subscribe and #unsubscribe are encapsulated in Publisher
* The private #deliver method of Stream has been encapsulated in a class called Processor, which is both a Publisher and a Subscriber, and rename it #call
* The combinators have been rewritten to be Sources
  • Loading branch information
iconara committed Jan 1, 2015
1 parent 9bcb739 commit 9f76e1f
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 159 deletions.
2 changes: 1 addition & 1 deletion lib/ione/io/acceptor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def initialize(host, port, backlog, unblocker, reactor, socket_impl=nil)
@unblocker = unblocker
@reactor = reactor
@socket_impl = socket_impl || ServerSocket
@accept_stream = Stream::PushStream.new
@accept_stream = Stream::Source.new
@lock = Mutex.new
@state = BINDING_STATE
end
Expand Down
2 changes: 1 addition & 1 deletion lib/ione/io/base_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def initialize(host, port, unblocker)
@lock = Mutex.new
@write_buffer = ByteBuffer.new
@closed_promise = Promise.new
@data_stream = Stream::PushStream.new
@data_stream = Stream::Source.new
end

# Closes the connection
Expand Down
182 changes: 97 additions & 85 deletions lib/ione/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,160 +2,172 @@

module Ione
# @abstract Base class for streams
# @see Ione::Stream::PushStream
# @see Ione::Stream::Publisher
class Stream
# @private
def initialize
@subscribers = []
@lock = Mutex.new
end
class Publisher
# @private
def initialize
@subscribers = {}
@lock = Mutex.new
end

# @yieldparam [Object] element each element that flows through the stream
# @return [self] the stream itself
def subscribe(subscriber=nil, &block)
@lock.lock
subscribers = @subscribers.dup
subscribers << (subscriber || block)
@subscribers = subscribers
self
ensure
@lock.unlock
end
alias_method :each, :subscribe

def unsubscribe(subscriber)
@lock.lock
subscribers = @subscribers.dup
subscribers.delete(subscriber)
@subscribers = subscribers
ensure
@lock.unlock
# @yieldparam [Object] element each element that flows through the stream
# @return [self] the stream itself
def subscribe(subscriber=nil, &block)
subscriber ||= block
unless subscriber.respond_to?(:call)
raise ArgumentError, %(A subscriber must respond to #call)
end
@lock.lock
begin
@subscribers[subscriber] = 1
self
ensure
@lock.unlock
end
subscriber
end
alias_method :each, :subscribe

# @param [Object] subscriber
# @return [self] the stream itself
def unsubscribe(subscriber)
@lock.lock
@subscribers.delete(subscriber)
subscriber
ensure
@lock.unlock
end
end

private

def deliver(element)
@subscribers.each do |subscriber|
subscriber.call(element) rescue nil
module Subscriber
def call(element)
end
self
end

module Combinators
# @yieldparam [Object] element
# @yieldreturn [Object] the transformed element
# @return [Ione::Stream]
def map(&transformer)
TransformedStream.new(self, transformer)
subscribe(TransformedStream.new(transformer))
end

# @yieldparam [Object] element
# @yieldreturn [Boolean] whether or not to pass the element downstream
# @return [Ione::Stream]
def select(&filter)
FilteredStream.new(self, filter)
subscribe(FilteredStream.new(filter))
end

# @param [Object] state
# @yieldparam [Object] element
# @yieldparam [Ione::Stream::PushStream] downstream
# @yieldparam [Ione::Stream::Publisher] downstream
# @yieldparam [Object] state
# @yieldreturn [Object] the next state
# @return [Ione::Stream]
def aggregate(state=nil, &aggregator)
AggregatingStream.new(self, aggregator, state)
subscribe(AggregatingStream.new(aggregator, state))
end

# @param [Integer] n the number of elements to pass downstream before
# unsubscribing
# @return [Ione::Stream]
def take(n)
LimitedStream.new(self, n)
subscribe(LimitedStream.new(self, n))
end

# @param [Integer] n the number of elements to skip before passing
# elements downstream
# @return [Ione::Stream]
def drop(n)
SkippingStream.new(self, n)
subscribe(SkippingStream.new(n))
end
end

include Combinators

class PushStream < Stream
module_eval do
# this crazyness is just to hide these declarations from Yard
alias_method :push, :deliver
public :push
alias_method :<<, :deliver
public :<<
end

# @!parse
# # @param [Object] element
# # @return [self]
# def push(element); end
# alias_method :<<, :push
class Processor < Publisher
include Subscriber
include Combinators
end

# @return [Proc] a Proc that can be used to push elements to this stream
def to_proc
method(:push).to_proc
class Source < Processor
def <<(element)
@subscribers.each_key do |subscriber|
subscriber.call(element) rescue nil
end
element
end
end

# @private
class TransformedStream < Stream
def initialize(upstream, transformer)
class TransformedStream < Source
def initialize(transformer)
super()
upstream.each { |e| deliver(transformer.call(e)) }
@transformer = transformer
end

def call(element)
self << @transformer.call(element)
end
end

# @private
class FilteredStream < Stream
def initialize(upstream, filter)
class FilteredStream < Source
def initialize(filter)
super()
upstream.each { |e| deliver(e) if filter.call(e) }
@filter = filter
end

def call(element)
self << element if @filter.call(element)
end
end

# @private
class AggregatingStream < PushStream
def initialize(upstream, aggregator, state)
class AggregatingStream < Source
def initialize(aggregator, state)
super()
upstream.each { |e| state = aggregator.call(e, self, state) }
@aggregator = aggregator
@state = state
end

def call(element)
@state = @aggregator.call(element, self, @state)
end
end

# @private
class LimitedStream < Stream
class LimitedStream < Source
def initialize(upstream, n)
super()
counter = 0
subscriber = proc do |e|
if counter < n
deliver(e)
else
upstream.unsubscribe(subscriber)
end
counter += 1
@upstream = upstream
@counter = 0
@limit = n
end

def call(element)
if @counter < @limit
self << element
else
@upstream.unsubscribe(self)
end
upstream.subscribe(subscriber)
@counter += 1
end
end

# @private
class SkippingStream < Stream
def initialize(upstream, n)
class SkippingStream < Source
def initialize(n)
super()
counter = 0
upstream.subscribe do |e|
if counter == n
deliver(e)
else
counter += 1
end
@counter = 0
@skips = n
end

def call(element)
if @counter == @skips
self << element
else
@counter += 1
end
end
end
Expand Down
Loading

0 comments on commit 9f76e1f

Please sign in to comment.