From d9a6bc8d76dcf4977ba355f528ea8481a269e4df Mon Sep 17 00:00:00 2001 From: Ian Katz Date: Sat, 25 Jan 2020 00:09:29 -0500 Subject: [PATCH 1/3] Add each_line enumerator to IO class --- lib/piperator/io.rb | 7 +++++++ spec/piperator/io_spec.rb | 8 ++++++++ 2 files changed, 15 insertions(+) diff --git a/lib/piperator/io.rb b/lib/piperator/io.rb index abb8ff1..2bc0429 100644 --- a/lib/piperator/io.rb +++ b/lib/piperator/io.rb @@ -49,6 +49,13 @@ def gets(separator = $INPUT_RECORD_SEPARATOR, _limit = nil) read_with { @buffer.gets(separator) } end + # Returns an enumerator of lines in the stream, without reading the entire stream into memory + # + # @return [Enumerator] + def each_line + Enumerator.new { |y| loop { y << gets&.gsub(/#{$INPUT_RECORD_SEPARATOR}$/, '') } }.lazy.take_while(&:itself).each + end + # Flush internal buffer until the last unread byte def flush if @buffer.pos == @buffer_read_pos diff --git a/spec/piperator/io_spec.rb b/spec/piperator/io_spec.rb index 247a0dc..1770ffc 100644 --- a/spec/piperator/io_spec.rb +++ b/spec/piperator/io_spec.rb @@ -81,6 +81,14 @@ end end + describe '#each_line' do + subject { Piperator::IO.new(["foo\n", "bar\n", "baz\nbmp"].each) } + + it 'return enumerated lines' do + expect(subject.each_line.to_a).to eq(["foo", "bar", "baz", "bmp"]) + end + end + describe '#flush' do subject { Piperator::IO.new(['a' * 16 * KILOBYTE].each) } let(:flush_threshold) { Piperator::IO::FLUSH_THRESHOLD } From 0bf80b89d32e9e8999b13536d680068848ea0e5b Mon Sep 17 00:00:00 2001 From: Ian Katz Date: Mon, 28 Jun 2021 23:07:58 -0400 Subject: [PATCH 2/3] Use pipe implemenation for infinite IO --- lib/piperator.rb | 29 +++++++++++++ lib/piperator/io.rb | 7 --- spec/piperator/infinite_io_spec.rb | 69 ++++++++++++++++++++++++++++++ spec/piperator/io_spec.rb | 8 ---- 4 files changed, 98 insertions(+), 15 deletions(-) create mode 100644 spec/piperator/infinite_io_spec.rb diff --git a/lib/piperator.rb b/lib/piperator.rb index 5ea03fc..fe331bc 100644 --- a/lib/piperator.rb +++ b/lib/piperator.rb @@ -51,4 +51,33 @@ def self.pipe(enumerable) def self.wrap(value) Pipeline.wrap(value) end + + # Coerce any enumerator to be an IO (via pipe). + # + # Pro: infinite length without using infinite memory. Con: unseekable (as is IO::Pipe). + # + # @param enumerator [Enumerator] source of data; infinite sources are OK + # @yieldparam io_r [IO] readable IO + def self.infinite_io(enumerator) + stop = false + io_r, io_w = ::IO.pipe # not the IO from this library + + # a thread writes all the data to the pipe. the pipe automatically buffers everything for us + thr = Thread.new do + enumerator.each do |chunk| + break if stop + io_w.write(chunk) + end + ensure + io_w.close + end + + yield io_r + ensure + stop = true + io_r.read until io_r.eof? # must drain, or risk closing before writes finish -- broken pipe + io_r.close # must close ??? + thr.join # must ensure that all data desired to be written is actually written + end + end diff --git a/lib/piperator/io.rb b/lib/piperator/io.rb index 2bc0429..abb8ff1 100644 --- a/lib/piperator/io.rb +++ b/lib/piperator/io.rb @@ -49,13 +49,6 @@ def gets(separator = $INPUT_RECORD_SEPARATOR, _limit = nil) read_with { @buffer.gets(separator) } end - # Returns an enumerator of lines in the stream, without reading the entire stream into memory - # - # @return [Enumerator] - def each_line - Enumerator.new { |y| loop { y << gets&.gsub(/#{$INPUT_RECORD_SEPARATOR}$/, '') } }.lazy.take_while(&:itself).each - end - # Flush internal buffer until the last unread byte def flush if @buffer.pos == @buffer_read_pos diff --git a/spec/piperator/infinite_io_spec.rb b/spec/piperator/infinite_io_spec.rb new file mode 100644 index 0000000..6dab0fb --- /dev/null +++ b/spec/piperator/infinite_io_spec.rb @@ -0,0 +1,69 @@ +require 'spec_helper' + + + +RSpec.describe "Piperator.infinite_io" do + repeated_string = "foobar\n" + infinite_foobar = Enumerator.new { |y| loop { y << repeated_string } } + + describe '#read' do + subject { proc { |&b| Piperator.infinite_io(infinite_foobar) { |x| b.(x) } } } + + it 'reads specific number of bytes' do + subject.call do |io| + expect(io.read(4)).to eq('foob') + end + end + + it 'buffers rest and returns on next read' do + subject.call do |io| + expect(io.read(2)).to eq('fo') + expect(io.read(2)).to eq('ob') + expect(io.read(2)).to eq('ar') + end + end + + it 'does not try to reach the end before working' do + n = 100000 + subject.call do |io| + expect(io.read(n * repeated_string.length)).to eq(repeated_string * n) + end + end + end + + describe '#gets' do + it 'returns characters until the separator' do + Piperator.infinite_io(infinite_foobar) do |io| + expect(io.gets).to eq(repeated_string) + end + end + + it 'responds to gets with nil when enumerable is exhausted' do + n = 2 + Piperator.infinite_io(([repeated_string] * n).each) do |io| + n.times { expect(io.gets).to eq(repeated_string) } + expect(io.gets).to be_nil + end + end + end + + describe '#eof?' do + it 'returns eof when enumerable is exhausted' do + n = 2 + Piperator.infinite_io(([repeated_string] * n).each) do |io| + expect(io.eof?).to be_falsey + n.times { expect(io.gets).to eq(repeated_string) } + expect(io.eof?).to be_truthy + end + end + end + + describe "#each_line" do + it 'reevaluates line breaks' do + Piperator.infinite_io(["foo\n", "bar\n", "baz\nbmp"].lazy.each) do |io| + expect(io.each_line.map(&:strip)).to eq(["foo", "bar", "baz", "bmp"]) + end + end + end + +end diff --git a/spec/piperator/io_spec.rb b/spec/piperator/io_spec.rb index 1770ffc..247a0dc 100644 --- a/spec/piperator/io_spec.rb +++ b/spec/piperator/io_spec.rb @@ -81,14 +81,6 @@ end end - describe '#each_line' do - subject { Piperator::IO.new(["foo\n", "bar\n", "baz\nbmp"].each) } - - it 'return enumerated lines' do - expect(subject.each_line.to_a).to eq(["foo", "bar", "baz", "bmp"]) - end - end - describe '#flush' do subject { Piperator::IO.new(['a' * 16 * KILOBYTE].each) } let(:flush_threshold) { Piperator::IO::FLUSH_THRESHOLD } From 466417e4e6b7000250f02da061d7c05f9e264fb0 Mon Sep 17 00:00:00 2001 From: Ian Katz Date: Mon, 28 Jun 2021 23:14:15 -0400 Subject: [PATCH 3/3] change pattern for ::IO.pipe --- lib/piperator.rb | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/lib/piperator.rb b/lib/piperator.rb index fe331bc..2b0276a 100644 --- a/lib/piperator.rb +++ b/lib/piperator.rb @@ -60,24 +60,23 @@ def self.wrap(value) # @yieldparam io_r [IO] readable IO def self.infinite_io(enumerator) stop = false - io_r, io_w = ::IO.pipe # not the IO from this library + ::IO.pipe do |io_r, io_w| # not the IO from this library - # a thread writes all the data to the pipe. the pipe automatically buffers everything for us - thr = Thread.new do - enumerator.each do |chunk| - break if stop - io_w.write(chunk) + # a thread writes all the data to the pipe. the pipe automatically buffers everything for us + thr = Thread.new do + enumerator.each do |chunk| + break if stop + io_w.write(chunk) + end + ensure + io_w.close # otherwise a read will hang end + + yield io_r ensure - io_w.close + stop = true + thr.join end - - yield io_r - ensure - stop = true - io_r.read until io_r.eof? # must drain, or risk closing before writes finish -- broken pipe - io_r.close # must close ??? - thr.join # must ensure that all data desired to be written is actually written end end