Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams instead of callbacks #19

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open

Streams instead of callbacks #19

wants to merge 23 commits into from

Conversation

iconara
Copy link
Owner

@iconara iconara commented Nov 9, 2014

Add streams (push streams for the moment, not reactive streams), and replace some of the event listeners with them.

This enables a new style of protocol handlers:

data_chunk_stream = connection.to_stream
line_stream = data_chunk_stream.aggregate(ByteBuffer.new) do |chunk, downstream, buffer|
  buffer << chunk
  while (newline_index = buffer.index("\n"))
    downstream << buffer.read(newline_index + 1)
  end
  buffer
end
request_stream = line_stream.aggregate([]) do |line, downstream, lines|
  # aggregate lines together into requests
end
request_stream.each do |request|
  response =  # process request
  connection.write(response.to_bytes)
end

@coveralls
Copy link

Coverage Status

Coverage increased (+0.01%) when pulling da43818 on streams into de24a4a on master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.01%) when pulling f93509b on streams into fe5688b on master.

A connection is a stream of the data chunks received by the socket
I’m pretty pleased with the state machine approach to parsing the Redis protocol. It’s a few more lines than would be strictly necessary because of all the classes, but it’s always nice with a state machine.
This is the remains of an earlier implementation.
It’s really a naïve Redis client implementation
Because Future::Combinators and not Future::FutureCombinators (any more)
I’m not really sure how to formulate it, but this feels better than the current wording.
* Stream#subscribe now takes a Subscriber
* PushStream is renamed Source and has a #<< for publishing
* #subscribe and #unsubscribe are encapsulated in Publisher
* The private #deliver method of Stream has been encapsulated in a class called Processor, which is both a Publisher and a Subscriber, and rename it #call
* The combinators have been rewritten to be Sources
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants