From 4b1ddc36e14778730f56da8f6277a97ea86f232f Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Thu, 26 Sep 2024 16:21:41 +0530 Subject: [PATCH] Do not use buffering in pipe file handles Allow buffer size specification in byte and char APIs. --- CHANGELOG.md | 4 + src/DocTestProcess.hs | 4 +- src/Streamly/Internal/System/Process.hs | 121 ++++++++++++++++++------ src/Streamly/System/Process.hs | 13 ++- 4 files changed, 113 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c88449c..30aca87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Unreleased + +* Remove buffering from the pipe chunked APIs. + ## 0.3.1 (Dec 2023) * Allow streamly-0.10.0 and streamly-core-0.2.0 diff --git a/src/DocTestProcess.hs b/src/DocTestProcess.hs index d882a18..e230dcf 100644 --- a/src/DocTestProcess.hs +++ b/src/DocTestProcess.hs @@ -2,6 +2,7 @@ >>> :set -XFlexibleContexts >>> :set -XScopedTypeVariables +>>> :set -Wno-deprecations >>> import Data.Char (toUpper) >>> import Data.Function ((&)) >>> import qualified Streamly.Console.Stdio as Stdio @@ -13,7 +14,8 @@ For APIs that have not been released yet. ->>> import qualified Streamly.Internal.Console.Stdio as Stdio (putChars, putChunks) +>>> import Streamly.Internal.System.IO (defaultChunkSize) +>>> import qualified Streamly.Internal.Console.Stdio as Stdio (putChars, putChunks, readChunks) >>> import qualified Streamly.Internal.FileSystem.Dir as Dir (readFiles) >>> import qualified Streamly.Internal.System.Process as Process >>> import qualified Streamly.Internal.Unicode.Stream as Unicode (lines) diff --git a/src/Streamly/Internal/System/Process.hs b/src/Streamly/Internal/System/Process.hs index 6cb6ed1..a3b1693 100644 --- a/src/Streamly/Internal/System/Process.hs +++ b/src/Streamly/Internal/System/Process.hs @@ -8,6 +8,7 @@ -- {-# LANGUAGE CPP #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} -- TODO: @@ -42,7 +43,6 @@ -- -- - Replace FilePath with a typed path. -- -{-# LANGUAGE FlexibleContexts #-} module Streamly.Internal.System.Process ( @@ -138,7 +138,7 @@ import Streamly.Data.Array (Array) import Streamly.Data.Fold (Fold) import Streamly.Data.Stream.Prelude (MonadAsync, Stream) import System.Exit (ExitCode(..)) -import System.IO (hClose, Handle) +import System.IO (hClose, Handle, hSetBuffering, BufferMode(..)) #if !defined(mingw32_HOST_OS) import System.Posix.Types (CUid (..), CGid (..)) #endif @@ -485,7 +485,7 @@ parallel s1 s2 = Stream.parList (Stream.eager True) [s1, s2] ------------------------------------------------------------------------------- -- Transformation ------------------------------------------------------------------------------- --- + -- | On normal cleanup we do not need to close the pipe handles as they are -- already guaranteed to be closed (we can assert that) by the time we reach -- here. We should not kill the process, rather wait for it to terminate @@ -574,9 +574,16 @@ createProc' modCfg path args = do -- XXX Read the exception channel and reap the process if it failed before -- exec. parent + hSetBuffering inp NoBuffering + hSetBuffering out NoBuffering + hSetBuffering err NoBuffering return (Just inp, Just out, err, proc) #else - createProcess cfg + r@(inp, out, err, _) <- createProcess cfg + mapM_ (`hSetBuffering` NoBuffering) inp + mapM_ (`hSetBuffering` NoBuffering) out + mapM_ (`hSetBuffering` NoBuffering) err + return r #endif where @@ -634,6 +641,11 @@ pipeChunksEitherWith modifier path args input = -- | Like 'pipeChunks' but also includes stderr as 'Left' stream in the -- 'Either' output. +-- +-- Definition: +-- +-- >>> pipeChunksEither = pipeChunksEitherWith id +-- {-# INLINE pipeChunksEither #-} pipeChunksEither :: (MonadCatch m, MonadAsync m) @@ -643,10 +655,13 @@ pipeChunksEither :: -> Stream m (Either (Array Word8) (Array Word8)) -- ^ Output stream pipeChunksEither = pipeChunksEitherWith id --- | @pipeBytesEither path args input@ runs the executable at @path@ using @args@ --- as arguments and @input@ stream as its standard input. The error stream of --- the executable is presented as 'Left' values in the resulting stream and --- output stream as 'Right' values. +-- | @pipeBytesEither path args input@ runs the executable at @path@ using +-- @args@ as arguments and @input@ stream as its standard input. The error +-- stream of the executable is presented as 'Left' values in the resulting +-- stream and output stream as 'Right' values. The input to the pipe is +-- buffered with a buffer size of 'defaultChunkSize'. +-- +-- For control over the buffer use your own chunking and chunk based APIs. -- -- Raises 'ProcessFailure' exception in case of failure. -- @@ -697,7 +712,8 @@ pipeChunksWith modifier path args input = -- | @pipeChunks file args input@ runs the executable @file@ specified by -- its name or path using @args@ as arguments and @input@ stream as its --- standard input. Returns the standard output of the executable as a stream. +-- standard input. Returns the standard output of the process as a stream +-- of chunks of bytes (Array Word8). -- -- If only the name of an executable file is specified instead of its path then -- the file name is searched in the directories specified by the PATH @@ -719,6 +735,10 @@ pipeChunksWith modifier path args input = -- :} --HELLO WORLD -- +-- Definition: +-- +-- >>> pipeChunks = pipeChunksWith id +-- -- /pre-release/ {-# INLINE pipeChunks #-} pipeChunks :: @@ -739,8 +759,12 @@ processChunks :: -> Stream m (Array Word8) -- ^ Output stream processChunks = pipeChunks --- | Like 'pipeChunks' except that it works on a stream of bytes instead of --- a stream of chunks. +-- | Like 'pipeChunks' except that its input and output is stream of bytes +-- instead of a stream of chunks. The input to the pipe is buffered with a +-- buffer size of 'defaultChunkSize'. +-- +-- For control over the input buffer use your own chunking and chunk based +-- APIs. -- -- We can write the example in 'pipeChunks' as follows. -- @@ -774,8 +798,12 @@ processBytes :: -> Stream m Word8 -- ^ Output Stream processBytes = pipeBytes --- | Like 'pipeChunks' except that it works on a stream of chars instead of --- a stream of chunks. +-- | Like 'pipeChunks' except that its input and output is stream of chars +-- instead of a stream of chunks. The input to the pipe is buffered with a +-- buffer size of 'defaultChunkSize'. +-- +-- For control over the input buffer use your own chunking and chunk based +-- APIs. -- -- >>> :{ -- Process.toChars "echo" ["hello world"] @@ -847,8 +875,9 @@ toChunksWith modifier path args = run _ = error "toChunksWith: Not reachable" -- | @toBytesEither path args@ runs the executable at @path@ using @args@ as --- arguments and returns a stream of 'Either' bytes. The 'Left' values are from --- @stderr@ and the 'Right' values are from @stdout@ of the executable. +-- arguments and returns the output of the process as a stream of 'Either' +-- bytes. The 'Left' values are from @stderr@ and the 'Right' values are from +-- @stdout@ of the executable. -- -- Raises 'ProcessFailure' exception in case of failure. -- @@ -877,8 +906,12 @@ toBytesEither path args = rightRdr = fmap Right Array.reader in Stream.unfoldMany (Unfold.either leftRdr rightRdr) output --- | The following code is equivalent to the shell command @echo "hello --- world"@: +-- | @toBytes path args@ runs the executable specified by @path@ using @args@ +-- as arguments and returns the output of the process as a stream of bytes. +-- +-- Raises 'ProcessFailure' exception in case of failure. +-- +-- The following code is equivalent to the shell command @echo "hello world"@: -- -- >>> :{ -- Process.toBytes "echo" ["hello world"] @@ -921,8 +954,13 @@ toChunksEither :: -> Stream m (Either (Array Word8) (Array Word8)) -- ^ Output Stream toChunksEither = toChunksEitherWith id --- | The following code is equivalent to the shell command @echo "hello --- world"@: +-- | @toChunks path args@ runs the executable specified by @path@ using @args@ +-- as arguments and returns the output of the process as a stream of chunks of +-- bytes (Array Word8). +-- +-- Raises 'ProcessFailure' exception in case of failure. +-- +-- The following code is equivalent to the shell command @echo "hello world"@: -- -- >>> :{ -- Process.toChunks "echo" ["hello world"] @@ -941,7 +979,13 @@ toChunks :: -> Stream m (Array Word8) -- ^ Output Stream toChunks = toChunksWith id --- | +-- | @toChars path args@ runs the executable specified by @path@ using @args@ +-- as arguments and returns the output of the process as a stream of chars. +-- +-- Raises 'ProcessFailure' exception in case of failure. +-- +-- Definition: +-- -- >>> toChars path args = toBytes path args & Unicode.decodeUtf8 -- {-# INLINE toChars #-} @@ -952,8 +996,19 @@ toChars :: -> Stream m Char -- ^ Output Stream toChars path args = toBytes path args & Unicode.decodeUtf8 --- | --- >>> toLines path args f = toChars path args & Unicode.lines f +-- | @toLines f path args@ runs the executable specified by @path@ using @args@ +-- as arguments and folds the output of the process at line breaks, using the +-- fold @f@, to return a stream of folded lines. +-- +-- Raises 'ProcessFailure' exception in case of failure. +-- +-- To return a stream of lines as strings: +-- +-- >>> toStrings = toLines Fold.toList +-- +-- Definition: +-- +-- >>> toLines f path args = toChars path args & Unicode.lines f -- {-# INLINE toLines #-} toLines :: @@ -964,8 +1019,12 @@ toLines :: -> Stream m a -- ^ Output Stream toLines f path args = toChars path args & Unicode.lines f --- | --- >>> toString path args = toChars path args & Stream.fold Fold.toList +-- | @toString path args@ runs the executable specified by @path@ using @args@ +-- as arguments and folds the entire output of the process as a single string. +-- +-- Definition: +-- +-- >>> toString path args = toChars path args & Stream.toList -- {-# INLINE toString #-} toString :: @@ -973,9 +1032,13 @@ toString :: => FilePath -- ^ Executable name or path -> [String] -- ^ Arguments -> m String -toString path args = toChars path args & Stream.fold Fold.toList +toString path args = toChars path args & Stream.toList --- | +-- | @toStdout path args@ runs the executable specified by @path@ using @args@ +-- as arguments and returns the output of the process on stdout. +-- +-- Definition: +-- -- >>> toStdout path args = toChunks path args & Stdio.putChunks -- {-# INLINE toStdout #-} @@ -992,7 +1055,11 @@ toStdout path args = do return () -} --- | +-- | @toNull path args@ runs the executable specified by @path@ using @args@ +-- as arguments and discards the output of the process. +-- +-- Definition: +-- -- >>> toNull path args = toChunks path args & Stream.fold Fold.drain -- {-# INLINE toNull #-} diff --git a/src/Streamly/System/Process.hs b/src/Streamly/System/Process.hs index 9def8d8..8e0e2bb 100644 --- a/src/Streamly/System/Process.hs +++ b/src/Streamly/System/Process.hs @@ -38,7 +38,9 @@ -- -- >>> :{ -- Process.toBytes "echo" ["hello world"] --- & Unicode.decodeLatin1 & fmap toUpper & Unicode.encodeLatin1 +-- & Unicode.decodeLatin1 +-- & fmap toUpper +-- & Unicode.encodeLatin1 -- & Stream.fold Stdio.write -- :} -- HELLO WORLD @@ -74,6 +76,15 @@ -- & Stream.fold Stdio.writeChunks -- :} -- +-- = Running Interactive Programs (e.g. ghci) +-- +-- >>> :{ +-- ghci = +-- Stdio.readChunks +-- & Process.pipeChunks "ghci" [] +-- & Stdio.putChunks +-- :} +-- -- = Experimental APIs -- -- See "Streamly.Internal.System.Process" for unreleased functions.