Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use native build flag and some other changes #53

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions Benchmark/System/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import System.IO
, openFile
, hClose
)
import System.Process (proc, createProcess, waitForProcess, callCommand)

import qualified Streamly.Data.Fold as FL
import qualified Streamly.FileSystem.Handle as FH
import qualified Streamly.Prelude as S
import qualified Streamly.System.Process as Proc
import qualified Streamly.Internal.System.Command as Cmd

-- Internal imports
import qualified Streamly.Internal.FileSystem.Handle
Expand Down Expand Up @@ -71,16 +71,12 @@ largeByteFile = "./largeByteFile"
generateByteFile :: IO ()
generateByteFile = do
ddPath <- which "dd"
let procObj = proc ddPath [
"if=" ++ devRandom,
"of=" ++ largeByteFile,
"count=" ++ show ddBlockCount,
"bs=" ++ show ddBlockSize
]

(_, _, _, procHandle) <- createProcess procObj
_ <- waitForProcess procHandle
return ()
Cmd.toStdout
$ ddPath
++ " if=" ++ devRandom
++ " of=" ++ largeByteFile
++ " count=" ++ show ddBlockCount
++ " bs=" ++ show ddBlockSize

-------------------------------------------------------------------------------
-- Create a file filled with ascii chars
Expand Down Expand Up @@ -112,7 +108,7 @@ trToStderrContent =
createExecutable :: IO ()
createExecutable = do
writeFile trToStderr trToStderrContent
callCommand ("chmod +x " ++ trToStderr)
Cmd.toStdout ("chmod +x " ++ trToStderr)

-------------------------------------------------------------------------------
-- Create and delete the temp data/exec files
Expand Down
4 changes: 2 additions & 2 deletions default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ let haskellPackages =
# } {})
(let src = fetchGit {
url = "[email protected]:composewell/streamly.git";
rev = "1ee11e87ec920df66e6bb1299ab000948df90ae5";
rev = "4bb8b7c950ffeee9d5c9c3ca23c65be93ca34f0b";
}; in super.callCabal2nix "streamly" src {})
(old:
{ librarySystemDepends =
Expand All @@ -62,7 +62,7 @@ let haskellPackages =
nixpkgs.haskell.lib.overrideCabal
(let src = fetchGit {
url = "[email protected]:composewell/streamly.git";
rev = "cbccb7777792cb4bf8dd8716929f4e28ea6cf718";
rev = "4bb8b7c950ffeee9d5c9c3ca23c65be93ca34f0b";
}; in super.callCabal2nix "streamly-core" "${src}/core" {})
(old:
{ librarySystemDepends =
Expand Down
2 changes: 1 addition & 1 deletion src/Streamly/Internal/System/Command.hs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ where
import Control.Monad.Catch (MonadCatch)
import Data.Char (isSpace)
import Data.Word (Word8)
import Streamly.Data.Array.Foreign (Array)
import Streamly.Data.Array.Unboxed (Array)
import Streamly.Data.Fold (Fold)
import Streamly.Internal.Data.Parser (Parser)
import Streamly.Prelude (MonadAsync, SerialT)
Expand Down
42 changes: 32 additions & 10 deletions src/Streamly/Internal/System/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -93,28 +93,27 @@ module Streamly.Internal.System.Process
)
where

-- #define USE_NATIVE

import Control.Exception (Exception(..), catch, throwIO)
import Control.Monad (void, unless)
import Control.Monad.Catch (MonadCatch, throwM)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Concurrent (forkIO, forkOS, runInBoundThread)
import Control.Concurrent.MVar
import Data.Function ((&))
import Data.Word (Word8)
import Streamly.Data.Array.Foreign (Array)
import Foreign.C.Error (Errno(..), ePIPE)
import GHC.IO.Exception (IOException(..), IOErrorType(..))
import Streamly.Data.Array.Unboxed (Array)
import Streamly.Data.Fold (Fold)
import Streamly.Prelude (MonadAsync, parallel, IsStream, adapt, SerialT)
import System.Exit (ExitCode(..))
import System.IO (hClose, Handle)

#ifdef USE_NATIVE
import Control.Exception (Exception(..), catch, throwIO, SomeException)
import Control.Exception (SomeException)
import System.Posix.Process (ProcessStatus(..))
import Streamly.Internal.System.Process.Posix
#else
import Control.Concurrent (forkIO)
import Control.Exception (Exception(..), catch, throwIO)
import Control.Monad (void, unless)
import Foreign.C.Error (Errno(..), ePIPE)
import GHC.IO.Exception (IOException(..), IOErrorType(..))
import System.Process
( ProcessHandle
, CreateProcess(..)
Expand All @@ -126,7 +125,7 @@ import System.Process
)
#endif

import qualified Streamly.Data.Array.Foreign as Array
import qualified Streamly.Data.Array.Unboxed as Array
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Prelude as Stream

Expand Down Expand Up @@ -185,6 +184,13 @@ mkConfig _ _ = Config False

pipeStdErr :: Config -> Config
pipeStdErr (Config _) = Config True

inheritStdin :: Config -> Config
inheritStdin (Config _) = Config True

inheritStdout :: Config -> Config
inheritStdout (Config _) = Config True

#else
newtype Config = Config CreateProcess

Expand Down Expand Up @@ -288,7 +294,11 @@ cleanupException :: MonadIO m =>
(Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
cleanupException (Just stdinH, Just stdoutH, stderrMaybe, ph) = liftIO $ do
-- Send a SIGTERM to the process
#ifdef USE_NATIVE
terminate ph
#else
terminateProcess ph
#endif

-- Ideally we should be closing the handle without flushing the buffers so
-- that we cannot get a SIGPIPE. But there seems to be no way to do that as
Expand All @@ -298,7 +308,11 @@ cleanupException (Just stdinH, Just stdoutH, stderrMaybe, ph) = liftIO $ do
whenJust hClose stderrMaybe

-- Non-blocking wait for the process to go away
#ifdef USE_NATIVE
void $ forkIO (void $ wait ph)
#else
void $ forkIO (void $ waitForProcess ph)
#endif

where

Expand Down Expand Up @@ -343,6 +357,14 @@ createProc' modCfg path args = do

Config cfg = modCfg $ mkConfig path args

createProc'' ::
(Config -> Config) -- ^ Process attribute modifier
-> FilePath -- ^ Executable path
-> [String] -- ^ Arguments
-> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
createProc'' modCfg path args =
runInBoundThread $ createProc' modCfg path args

{-# INLINE putChunksClose #-}
putChunksClose :: (MonadIO m, IsStream t) =>
Handle -> t m (Array Word8) -> t m a
Expand Down
54 changes: 43 additions & 11 deletions src/Streamly/Internal/System/Process/Posix.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
-- {-# LANGUAGE Safe #-}
-- |
-- Module : Streamly.Internal.System.Process.Posix
-- Copyright : (c) 2020 Composewell Technologies
Expand All @@ -17,6 +18,7 @@ module Streamly.Internal.System.Process.Posix
, newProcess
, wait
, getStatus
, terminate

-- * IPC
, mkPipe
Expand All @@ -33,15 +35,19 @@ import Data.Tuple (swap)
import GHC.IO.Device (IODeviceType(..))
import GHC.IO.Encoding (getLocaleEncoding)
import GHC.IO.Handle.FD (mkHandleFromFD)
import System.IO (IOMode(..), Handle)
import System.IO (IOMode(..), Handle, hPutStrLn, stderr)
import System.IO.Error (isDoesNotExistError)
import System.Posix.IO (createPipe, dupTo, closeFd)
import System.Posix.Process (forkProcess, executeFile, ProcessStatus)
import System.Posix.Process (forkProcess, executeFile, ProcessStatus, getProcessID)
import System.Posix.Types (ProcessID, Fd(..), CDev, CIno)
import System.Posix.Signals (signalProcess, sigTERM)
import System.Posix.Internals (fdGetMode)
import qualified Streamly.Internal.FileSystem.Dir as Dir
import qualified Streamly.Prelude as Stream

import qualified GHC.IO.FD as FD
import qualified System.Posix.Process as Posix
import Data.List (intercalate)

-------------------------------------------------------------------------------
-- Utilities to create stdio handles
Expand All @@ -51,13 +57,13 @@ import qualified System.Posix.Process as Posix
-- We have to put the FDs into binary mode on Windows to avoid the newline
-- translation that the CRT IO library does.
setBinaryMode :: FD.FD -> IO ()
#if defined(mingw32_HOST_OS)
setBinaryMode fd = do
_ <- setmode (FD.fdFD fd) True
return ()
#else





setBinaryMode _ = return ()
#endif


-- See Posix.fdToHandle and GHC.IO.Handle.FD.fdToHandle
-- See stdin, stdout, stderr in module GHC.IO.Handle.FD
Expand Down Expand Up @@ -109,9 +115,18 @@ mkPipeDupChild :: Direction -> Fd -> IO (Fd, (IO (), IO (), IO ()))
mkPipeDupChild direction childFd = do
let setDirection = if direction == ParentToChild then id else swap
(child, parent) <- fmap setDirection createPipe
pid <- getProcessID
let parentAction = closeFd child
childAction =
closeFd parent >> void (dupTo child childFd) >> closeFd child
hPutStrLn stderr ("closing parent fd" ++ show parent) >>
closeFd parent >>
hPutStrLn stderr ("closed parent fd" ++ show parent) >>
hPutStrLn stderr ("duplicating child to fd" ++ show (child, childFd)) >>
void (dupTo child childFd) >>
hPutStrLn stderr ("duplicated child to fd" ++ show (child, childFd)) >>
hPutStrLn stderr ("closing child" ++ show child) >>
closeFd child >>
hPutStrLn stderr ("closed child" ++ show child)
failureAction = closeFd child >> closeFd parent
return (parent, (parentAction, childAction, failureAction))

Expand Down Expand Up @@ -141,7 +156,15 @@ mkStdioPipes pipeStdErr = do
-}

let parentAction = inpParent >> outParent >> errParent -- >> excParent
childAction = inpChild >> outChild >> errChild -- >> excChild
childAction =
hPutStrLn stderr "child input action doing"
>> inpChild
>> hPutStrLn stderr "child input action done"
>> hPutStrLn stderr "child output action doing"
>> outChild
>> hPutStrLn stderr "child output action done"
>> errChild -- >> excChild
-- childAction = inpChild >> outChild >> errChild -- >> excChild
failureAction = inpFail >> outFail >> errFail -- >> excFail

inpH <- toHandle WriteMode inp
Expand Down Expand Up @@ -235,6 +258,7 @@ newProcess ::
-> IO Process
newProcess action path args env = do
pid <- forkProcess exec
hPutStrLn stderr ("parent process " ++ show pid)
pidToProcess pid Nothing

where
Expand All @@ -243,7 +267,12 @@ newProcess action path args env = do
-- to the parent and clean up the parent fds. We can send the exceptions
-- via a pipe like we do for threads.
--
exec = action >> executeFile path True args env
exec = do
pid <- getProcessID
hPutStrLn stderr ("child process " ++ show pid)
fds <- Stream.toList . Stream.unfold Dir.readFiles $ ("/proc/" ++ show pid ++ "/fd")
hPutStrLn stderr (intercalate ", " fds)
action >> executeFile path True args env

newtype ProcessDoesNotExist = ProcessDoesNotExist ProcessID deriving Show

Expand Down Expand Up @@ -315,3 +344,6 @@ getStatus proc@(Process pid _ procStatus) = do
if isDoesNotExistError e
then return (Nothing, Nothing)
else throwIO e

terminate :: Process -> IO ()
terminate (Process pid _ _) = signalProcess sigTERM pid
2 changes: 1 addition & 1 deletion src/Streamly/System/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ import Streamly.Internal.System.Process
-- >>> import Data.Char (toUpper)
-- >>> import Data.Function ((&))
-- >>> import qualified Streamly.Console.Stdio as Stdio
-- >>> import qualified Streamly.Data.Array.Foreign as Array
-- >>> import qualified Streamly.Data.Array.Unboxed as Array
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import qualified Streamly.System.Process as Process
Expand Down
4 changes: 2 additions & 2 deletions stack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ packages:
extra-deps:
- unicode-data-0.3.0
- git: https://github.com/composewell/streamly
commit: "1ee11e87ec920df66e6bb1299ab000948df90ae5"
commit: "4bb8b7c950ffeee9d5c9c3ca23c65be93ca34f0b"
- git: https://github.com/composewell/streamly
commit: "1ee11e87ec920df66e6bb1299ab000948df90ae5"
commit: "4bb8b7c950ffeee9d5c9c3ca23c65be93ca34f0b"
subdirs:
- core

Expand Down
41 changes: 26 additions & 15 deletions streamly-process.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,35 @@ flag use-gauge
manual: True
default: False

flag use-native
description: Do not depend on the process package
manual: True
default: False

common compile-options
default-language: Haskell2010
ghc-options:
-Wall
-Wcompat
-Wunrecognised-warning-flags
-Widentities
-Wincomplete-record-updates
-Wincomplete-uni-patterns
-Wredundant-constraints
-Wnoncanonical-monad-instances
-Weverything
-Wno-implicit-prelude
-Wno-missing-deriving-strategies
-Wno-missing-exported-signatures
-Wno-missing-import-lists
-Wno-missing-local-signatures
-Wno-missing-safe-haskell-mode
-Wno-missed-specialisations
-Wno-all-missed-specialisations
-Wno-monomorphism-restriction
-Wno-prepositive-qualified-module
-Wno-unsafe

common optimization-options
ghc-options:
-O2
-fdicts-strict
-fspec-constr-recursive=16
-fmax-worker-args=16
if flag(use-native)
cpp-options: -DUSE_NATIVE

library
import: compile-options, optimization-options
Expand All @@ -74,18 +85,20 @@ library
Streamly.System.Process
Streamly.Internal.System.Process
Streamly.Internal.System.Command
if !os(windows)
if flag (use-native) && !os(windows)
exposed-modules:
Streamly.Internal.System.Process.Posix
build-depends:
base >= 4.8 && < 5
, exceptions >= 0.8 && < 0.11
, process >= 1.0 && < 1.7
-- Uses internal APIs
, streamly == 0.9.0.*
if !os(windows)
build-depends:
unix >= 2.5 && < 2.8
if !flag(use-native)
build-depends: process >= 1.0 && < 1.7
else
if !os(windows)
build-depends:
unix >= 2.5 && < 2.8

-------------------------------------------------------------------------------
-- Benchmarks
Expand All @@ -106,7 +119,6 @@ benchmark Benchmark.System.Process
streamly-process
, base >= 4.8 && < 5
, directory >= 1.2.2 && < 1.4
, process >= 1.0 && < 1.7
-- Uses internal APIs
, streamly == 0.9.0.*

Expand Down Expand Up @@ -136,7 +148,6 @@ test-suite Test.System.Process
, directory >= 1.2.2 && < 1.4
, exceptions >= 0.8 && < 0.11
, hspec >= 2.0 && < 3
, process >= 1.0 && < 1.7
, QuickCheck >= 2.10 && < 2.15
-- Uses internal APIs
, streamly == 0.9.0.*
Loading