diff options
author | Alina Banerjee <alina@glitchgirl.us> | 2021-07-19 03:59:12 +0000 |
---|---|---|
committer | Alina Banerjee <alina@glitchgirl.us> | 2021-07-19 22:38:29 +0000 |
commit | c708b969bafb403d482565601f8d0ed963e54a3c (patch) | |
tree | 81ea1c9f198a34af8807b2765311d54293b4648b /utils/benchmarks/events | |
parent | de9fedc380d22ff6db3e4c7540af07b99d26fbd9 (diff) | |
download | haskell-wip/fix-8045.tar.gz |
Move event benchmarks to utils/benchmarks/events/wip/fix-8045
Diffstat (limited to 'utils/benchmarks/events')
25 files changed, 1976 insertions, 0 deletions
diff --git a/utils/benchmarks/events/Args.hs b/utils/benchmarks/events/Args.hs new file mode 100644 index 0000000000..26ba2e6d58 --- /dev/null +++ b/utils/benchmarks/events/Args.hs @@ -0,0 +1,74 @@ +module Args + ( + theLast + , ljust + , nonNegative + , parseArgs + , positive + , printUsage + ) where + +import Data.Monoid (Last(..)) +import System.Console.GetOpt (OptDescr, ArgOrder(Permute), + getOpt, usageInfo) +import System.Environment (getProgName) +import System.Exit (ExitCode(..), exitWith) +import System.IO (hPutStrLn, stderr) + +-- | Deconstructor for 'Last' values. +theLast :: (cfg -> Last a) -- ^ Field to access. + -> cfg + -> a +theLast f cfg = case f cfg of + Last Nothing -> error "some horrible config sin has occurred" + Last (Just a) -> a + +-- | Parse command line options. +parseArgs :: Monoid cfg => cfg -> [OptDescr (IO cfg)] -> [String] + -> IO (cfg, [String]) +parseArgs defCfg options args = + case getOpt Permute options args of + (_, _, err:_) -> parseError err + (opts, rest, _) -> do + cfg <- (mappend defCfg . mconcat) `fmap` sequence opts + return (cfg, rest) + +-- | Constructor for 'Last' values. +ljust :: a -> Last a +ljust = Last . Just + +-- | Parse a positive number. +nonNegative :: (Num a, Ord a, Read a) => + String -> (Last a -> cfg) -> String -> IO cfg +nonNegative q f s = + case reads s of + [(n,"")] | n >= 0 -> return . f $ ljust n + | otherwise -> parseError $ q ++ " must be non negative" + _ -> parseError $ "invalid " ++ q ++ " provided" + +-- | Parse a positive number. +positive :: (Num a, Ord a, Read a) => + String -> (Last a -> cfg) -> String -> IO cfg +positive q f s = + case reads s of + [(n,"")] | n > 0 -> return . f $ ljust n + | otherwise -> parseError $ q ++ " must be positive" + _ -> parseError $ "invalid " ++ q ++ " provided" + +-- | Display an error message from a command line parsing failure, and +-- exit. +parseError :: String -> IO a +parseError msg = do + progName <- getProgName + hPutStrLn stderr $ "Error: " ++ msg + hPutStrLn stderr $ "Run \"" ++ progName ++ " --help\" for usage information\n" + exitWith (ExitFailure 64) + +printUsage :: [OptDescr b] -> ExitCode -> IO a +printUsage options exitCode = do + p <- getProgName + putStr (usageInfo ("Usage: " ++ p ++ " [OPTIONS] [ARGS]") options) + mapM_ putStrLn [ + "" + ] + exitWith exitCode diff --git a/utils/benchmarks/events/DeadConn.hs b/utils/benchmarks/events/DeadConn.hs new file mode 100644 index 0000000000..904c118ae8 --- /dev/null +++ b/utils/benchmarks/events/DeadConn.hs @@ -0,0 +1,133 @@ +{-# LANGUAGE CPP, OverloadedStrings #-} + +-- A simple tool that creates a number of "dead" connections to a +-- server. A dead connection is a connection that doesn't transmit +-- any data but stays connected. This tool is useful to simulate a +-- number of slow/idle connections to a server. + +import Args (ljust, nonNegative, parseArgs, positive, theLast) +import EventSocket (connect, recv, sendAll) +import Control.Concurrent (forkIO) +import Control.Monad (forM_, forever) +import qualified Data.ByteString.Char8 as S +import Data.Monoid (Last(..)) +import Network.Socket (AddrInfo(..), SocketType(..), + defaultHints, getAddrInfo, + socket, close, withSocketsDo) +import System.Console.GetOpt (ArgDescr(ReqArg), OptDescr(..)) +import System.Environment (getArgs) +import GHC.Event (ensureIOManagerIsRunning, threadDelay) +import System.Posix.Resource (ResourceLimit(..), + ResourceLimits(..), + Resource(..), setResourceLimit) +import qualified Data.Semigroup as Sem + +main = withSocketsDo $ do + (cfg, _) <- parseArgs defaultConfig defaultOptions =<< getArgs + let numConns = theLast cfgNumConns cfg + host = theLast cfgHost cfg + port = theLast cfgPort cfg + delay = theLast cfgDelay cfg * 1000 + lim = ResourceLimit $ fromIntegral numConns + 50 + myHints = defaultHints { addrSocketType = Stream } + + ensureIOManagerIsRunning + setResourceLimit ResourceOpenFiles + ResourceLimits { softLimit = lim, hardLimit = lim } + + addrinfos <- getAddrInfo (Just myHints) (Just host) (Just $ show port) + let addr = head addrinfos + + putStrLn $ "Running " ++ show numConns ++ " threads to clobber " ++ + host ++ ":" ++ show port ++ "..." + forM_ [0..numConns-1] $ \n -> forkIO . forever $ do + let myDelay = delay + n * 1037 + sock <- socket (addrFamily addr) (addrSocketType addr) + (addrProtocol addr) + connect sock (addrAddress addr) + let sendLoop s + | S.null s = recvLoop + | otherwise = do + threadDelay myDelay + let len = (n `mod` (S.length request - 1)) + 1 + let (h,t) = S.splitAt len s + sendAll sock h + sendLoop t + recvLoop = do + threadDelay myDelay + s <- recv sock 256 + if S.null s + then close sock + else recvLoop + sendLoop request + putStrLn $ show numConns ++ " threads looping" + + -- Block process forever. + --threadDelay maxBound + +request = "GET / HTTP/1.1\r\nHost: www.test.com\r\n\r\n" + +------------------------------------------------------------------------ +-- Configuration + +data Config = Config { + cfgNumConns :: Last Int + , cfgDelay :: Last Int + , cfgHost :: Last String + , cfgPort :: Last Int + } + +defaultConfig :: Config +defaultConfig = Config + { cfgNumConns = ljust 50 + , cfgDelay = ljust 100 + , cfgHost = ljust "localhost" + , cfgPort = ljust 3000 + } + +instance Sem.Semigroup Config where + Config { + cfgNumConns = a + , cfgDelay = b + , cfgHost = c + , cfgPort = d + } + <> Config { cfgNumConns = e + , cfgDelay = f + , cfgHost = g + , cfgPort = h + } = + Config { cfgNumConns = a <> e + , cfgDelay = b <> f + , cfgHost = c <> g + , cfgPort = d <> h + } + +instance Monoid Config where + mempty = Config + { cfgNumConns = mempty + , cfgDelay = mempty + , cfgHost = mempty + , cfgPort = mempty + } + + mappend = (<>) + +defaultOptions :: [OptDescr (IO Config)] +defaultOptions = [ + Option ['n'] ["connections"] + (ReqArg (nonNegative "number of connections" $ \n -> + mempty { cfgNumConns = n }) "N") + "number of connections" + , Option ['d'] ["delay"] + (ReqArg (nonNegative "delay between chunks" $ \d -> + mempty { cfgDelay = d }) "N") + "delay between chunks (ms)" + , Option ['h'] ["host"] + (ReqArg (\s -> return $ mempty { cfgHost = ljust s }) "HOST") + "server address" + , Option ['p'] ["port"] + (ReqArg (positive "server port" $ \n -> + mempty { cfgPort = n }) "N") + "server port" + ] diff --git a/utils/benchmarks/events/EventFile.hs b/utils/benchmarks/events/EventFile.hs new file mode 100644 index 0000000000..1edad86a89 --- /dev/null +++ b/utils/benchmarks/events/EventFile.hs @@ -0,0 +1,49 @@ +{-# LANGUAGE CPP #-} + +-- | File functions using System.Event instead of GHC's I/O manager. +module EventFile + ( + read + ) where + +import Data.ByteString (ByteString) +import Data.ByteString.Internal (createAndTrim) +import Data.Word (Word8) +import Foreign.Ptr (Ptr, castPtr) +import Foreign.C.Error (eINTR, getErrno, throwErrno) +#if __GLASGOW_HASKELL__ < 612 +import GHC.IOBase (IOErrorType(..)) +#else +import GHC.IO.Exception (IOErrorType(..)) +#endif +#if defined(USE_GHC_IO_MANAGER) +import GHC.Conc (threadWaitRead) +#else +import GHC.Event (threadWaitRead) +#endif +import System.IO.Error (ioeSetErrorString, mkIOError) +import System.Posix.Internals (c_read) +import System.Posix.Types (Fd) +import Prelude hiding (read) +import EventUtil + +read :: Fd -> Int -> IO ByteString +read fd nbytes + | nbytes <= 0 = ioError (mkInvalidReadArgError "read") + | otherwise = createAndTrim nbytes $ readInner fd nbytes + +readInner :: Fd -> Int -> Ptr Word8 -> IO Int +readInner fd nbytes ptr = do + len <- throwErrnoIfMinus1Retry_repeatOnBlock "read" + (threadWaitRead (fromIntegral fd)) $ + c_read (fromIntegral fd) (castPtr ptr) (fromIntegral nbytes) + case fromIntegral len of + (-1) -> do errno <- getErrno + if errno == eINTR + then readInner fd nbytes ptr + else throwErrno "read" + n -> return n + +mkInvalidReadArgError :: String -> IOError +mkInvalidReadArgError loc = ioeSetErrorString + (mkIOError InvalidArgument loc Nothing Nothing) "non-positive length" diff --git a/utils/benchmarks/events/EventHttp.hs b/utils/benchmarks/events/EventHttp.hs new file mode 100644 index 0000000000..ea74314b81 --- /dev/null +++ b/utils/benchmarks/events/EventHttp.hs @@ -0,0 +1,130 @@ +{-# LANGUAGE ForeignFunctionInterface, OverloadedStrings, BangPatterns #-} + +import Control.Concurrent +import Control.Exception (finally) +import Control.Monad +import GHC.Conc hiding (ensureIOManagerIsRunning) +import GHC.Event (ensureIOManagerIsRunning) +import GHC.Event.Manager as M +import Foreign.C.Error +import Foreign.C.Types +import Foreign.Marshal.Alloc +import Foreign.Marshal.Utils +import Foreign.ForeignPtr +import Foreign.Ptr +import System.Posix.Types +import Network.Socket hiding (accept) +import qualified Network.Socket.Address as A +import EventSocket (recv, sendAll, c_recv, c_send) +import EventUtil (setNonBlocking) +import Data.ByteString.Char8 as B hiding (zip) +import Data.ByteString.Internal as B + +main = do + ensureIOManagerIsRunning + let port = "5002" + myHints = defaultHints { addrFlags = [AI_PASSIVE] + , addrSocketType = Stream } + (ai:_) <- getAddrInfo (Just myHints) Nothing (Just port) + sock <- socket (addrFamily ai) (addrSocketType ai) (addrProtocol ai) + setSocketOption sock ReuseAddr 1 + bind sock (addrAddress ai) + listen sock 1024 + mgrs <- replicateM numCapabilities M.new + done <- newEmptyMVar + forM_ (zip [0..] mgrs) $ \(cpu,mgr) -> do + forkOn cpu $ do + accept mgr sock clinet + M.loop mgr + putMVar done () + takeMVar done + +repeatOnIntr :: IO (Either Errno a) -> IO (Either Errno a) +repeatOnIntr act = do + ret <- act + case ret of + l@(Left err) -> if err == eINTR + then repeatOnIntr act + else return l + r -> return r + +blocking :: EventManager + -> Either (Fd,Event) FdKey + -> IO (Either Errno a) + -> (Either Fd FdKey -> a -> IO ()) + -> IO () +blocking mgr efdk act on_success = do + ret <- repeatOnIntr act + case ret of + Left err + | err /= eWOULDBLOCK && err /= eAGAIN -> + ioError (errnoToIOError "accept" err Nothing Nothing) + | otherwise -> + case efdk of + Left (fd,evts) -> void (registerFd mgr retry_evt fd evts OneShot) + Right _ -> return () + Right a -> case efdk of + Left (fd,_evts) -> on_success (Left fd) a + Right fdk -> on_success (Right fdk) a + where retry_evt fdk _ = blocking mgr (Right fdk) act on_success + +accept :: EventManager -> Socket + -> (EventManager -> Socket -> SockAddr -> IO ()) + -> IO () +accept mgr sock serve = + withFdSocket sock $ \fd -> do + sk <- getSocketName sock + let sz = A.sizeOfSocketAddress sk + act :: IO (Either Errno (CInt, SockAddr)) + act = allocaBytes sz $ \sockaddr -> do + n <- with (fromIntegral sz) $ c_accept (fromIntegral fd) sockaddr + if n == -1 + then Left `fmap` getErrno + else do + sa <- peekSockAddr sockaddr + return $! Right (n, sa) + blocking mgr (Left (fromIntegral fd,evtRead)) act $ \_efdk (nfd,addr) -> do + setNonBlocking (fromIntegral nfd) + nsock <- MkSocket nfd family stype proto `fmap` newMVar Connected + serve mgr nsock addr + +clinet :: EventManager -> Socket -> SockAddr -> IO () +clinet mgr sock _ = withFdSocket sock $ \fd -> do + let bufSize = 4096 + act = do + fp <- B.mallocByteString bufSize + withForeignPtr fp $ \ptr -> do + ret <- c_recv fd ptr (fromIntegral bufSize) 0 + if ret == -1 + then Left `fmap` getErrno + else if ret == 0 + then return $! Right empty + else do + let !bs = PS (castForeignPtr fp) 0 (fromIntegral ret) + return $! Right bs + blocking mgr (Left (fromIntegral fd,evtRead)) act $ \efdk bs -> do + fd <- case efdk of + Left fd -> return fd + Right fdk -> unregisterFd_ mgr fdk >> return (keyFd fdk) + let (PS fp off len) = "HTTP/1.0 200 OK\r\nConnection: Close\r\nContent-Length: 5\r\n\r\nPong!" + withForeignPtr fp $ \s -> + c_send (fromIntegral fd) (s `plusPtr` off) (fromIntegral len) 0 + close sock + +client :: EventManager -> Socket -> SockAddr -> IO () +client _mgr sock _addr = loop' `finally` close sock + where + loop' = do + req <- recvRequest "" + sendAll sock msg + when ("Connection: Keep-Alive" `isInfixOf` req) loop' + msg = "HTTP/1.0 200 OK\r\nConnection: Close\r\nContent-Length: 5\r\n\r\nPong!" + recvRequest r = do + s <- recv sock 4096 + let t = B.append r s + if B.null s || "\r\n\r\n" `B.isInfixOf` t + then return t + else recvRequest t + +foreign import ccall unsafe "sys/socket.h accept" + c_accept :: CInt -> Ptr SockAddr -> Ptr CInt{-CSockLen???-} -> IO CInt diff --git a/utils/benchmarks/events/EventSocket.hs b/utils/benchmarks/events/EventSocket.hs new file mode 100644 index 0000000000..66fc84a3d1 --- /dev/null +++ b/utils/benchmarks/events/EventSocket.hs @@ -0,0 +1,162 @@ +{-# LANGUAGE CPP, ForeignFunctionInterface #-} + +-- | Socket functions using GHC.Event instead of GHC's I/O manager. +module EventSocket + ( + accept + , connect + , recv + , send + , sendAll + , c_recv + , c_send + ) where + +import Control.Monad (when) +import Data.ByteString (ByteString) +import Data.Word (Word8) +import qualified Data.ByteString as B +import qualified Data.ByteString.Internal as B +import Data.ByteString.Unsafe (unsafeUseAsCStringLen) +import Foreign.C.Types (CChar(..), CInt(..), CSize(..)) +import Foreign.Marshal.Alloc (allocaBytes) +import Foreign.Marshal.Utils (with) +import Foreign.ForeignPtr (withForeignPtr) +import Foreign.Ptr (Ptr, castPtr) +import Foreign.C.Error (Errno(..), eINPROGRESS, eINTR, + errnoToIOError, getErrno, throwErrno) +#if __GLASGOW_HASKELL__ < 612 +import GHC.IOBase (IOErrorType(..)) +#else +import GHC.IO.Exception (IOErrorType(..)) +#endif +import Network.Socket hiding (accept, connect, bind) +import qualified Network.Socket as NS (connect) +import Network.Socket.Address (pokeSocketAddress, sizeOfSocketAddress, peekSocketAddress) +import Network.Socket.Internal +import Prelude hiding (repeat) +import qualified GHC.Event as T +import System.IO.Error (ioeSetErrorString, mkIOError) +import EventUtil + +connect :: Socket -- Unconnected Socket + -> SockAddr -- Socket address stuff + -> IO () +connect sock addr = do + let sz = sizeOfSocketAddress addr in + allocaBytes sz $ \p_sock_addr -> do + pokeSocketAddress p_sock_addr addr + withFdSocket sock $ \s -> do + let connectLoop = do + r <- c_connect s (castPtr p_sock_addr) (CInt (fromIntegral sz)) + if r == -1 + then do + err <- getErrno + case () of + _ | err == eINTR -> connectLoop + _ | err == eINPROGRESS -> connectBlocked s sock + _ -> throwSocketError "connect" + else return r + + _ <- connectLoop + return () + + where + connectBlocked s sk = do + T.threadWaitWrite (fromIntegral s) + err <- getSocketOption sk SoError + if err == 0 + then return 0 + else ioError (errnoToIOError "connect" + (Errno (fromIntegral err)) + Nothing Nothing) + +foreign import ccall unsafe "connect" + c_connect :: CInt -> Ptr SockAddr -> CInt{-CSockLen?? -} -> IO CInt + +------------------------------------------------------------------------ +-- Receiving + +recv :: Socket -> Int -> IO ByteString +recv sock nbytes + | nbytes <= 0 = ioError (mkInvalidRecvArgError "Network.Socket.ByteString.recv") + | otherwise = withFdSocket sock $ \s -> do + fp <- B.mallocByteString nbytes + n <- withForeignPtr fp $ recvInner s nbytes + if n <= 0 + then return B.empty + else return $! B.PS fp 0 n + +recvInner :: CInt -> Int -> Ptr Word8 -> IO Int +recvInner s nbytes ptr = do + len <- throwErrnoIfMinus1Retry_repeatOnBlock "recv" + (T.threadWaitRead (fromIntegral s)) $ + c_recv s (castPtr ptr) (fromIntegral nbytes) 0{-flags-} + case fromIntegral len of + (-1) -> do errno <- getErrno + if errno == eINTR + then recvInner s nbytes ptr + else throwErrno "Network.Socket.ByteString.recv" + n -> return n + +------------------------------------------------------------------------ +-- Sending + +-- | Send data to the socket. The socket must be connected to a +-- remote socket. Returns the number of bytes sent. Applications are +-- responsible for ensuring that all data has been sent. +send :: Socket -- ^ Connected socket + -> ByteString -- ^ Data to send + -> IO Int -- ^ Number of bytes sent +send sock xs = + unsafeUseAsCStringLen xs $ \(str, len) -> do + withFdSocket sock $ \s -> do + fmap fromIntegral $ + throwSocketErrorIfMinus1RetryMayBlock "send" + (T.threadWaitWrite (fromIntegral s)) $ + c_send s str (fromIntegral len) 0 + +-- | Send data to the socket. The socket must be connected to a +-- remote socket. Unlike 'send', this function continues to send data +-- until either all data has been sent or an error occurs. On error, +-- an exception is raised, and there is no way to determine how much +-- data, if any, was successfully sent. +sendAll :: Socket -- ^ Connected socket + -> ByteString -- ^ Data to send + -> IO () +sendAll sock bs = do + sent <- send sock bs + when (sent < B.length bs) $ sendAll sock (B.drop sent bs) + +------------------------------------------------------------------------ +-- Accepting + +accept :: Socket -> IO (Socket, SockAddr) +accept sock = + withFdSocket sock $ \s -> do + sockaddr <- getSocketName sock + let sz = sizeOfSocketAddress sockaddr + allocaBytes sz $ \ sock_addr_ptr -> do + with sz $ \ ptr_len -> do + new_sock_fd <- throwSocketErrorIfMinus1RetryMayBlock "accept" + (T.threadWaitRead (fromIntegral s)) $ + c_accept s sock_addr_ptr (castPtr ptr_len) + setNonBlocking (fromIntegral new_sock_fd) + addr <- peekSocketAddress sock_addr_ptr + new_sock <- mkSocket (fromIntegral new_sock_fd) + NS.connect new_sock addr + return (new_sock, addr) + +mkInvalidRecvArgError :: String -> IOError +mkInvalidRecvArgError loc = ioeSetErrorString (mkIOError InvalidArgument + loc Nothing Nothing) + "non-positive length" + +foreign import ccall unsafe "sys/socket.h accept" + c_accept :: CInt -> Ptr SockAddr -> Ptr CInt{-CSockLen?? -} -> IO CInt + +foreign import ccall unsafe "sys/socket.h send" + c_send :: CInt -> Ptr a -> CSize -> CInt -> IO CInt + +foreign import ccall unsafe "sys/socket.h recv" + c_recv :: CInt -> Ptr CChar -> CSize -> CInt -> IO CInt diff --git a/utils/benchmarks/events/EventUtil.hs b/utils/benchmarks/events/EventUtil.hs new file mode 100644 index 0000000000..0fbdb9280a --- /dev/null +++ b/utils/benchmarks/events/EventUtil.hs @@ -0,0 +1,45 @@ +{-# LANGUAGE CPP #-} + +module EventUtil + ( + setNonBlocking + , throwErrnoIfMinus1Retry_mayBlock + , throwErrnoIfMinus1Retry_repeatOnBlock + ) where + +import Foreign.C.Error (eINTR, eWOULDBLOCK, eAGAIN, getErrno, throwErrno) +import Foreign.C.Types (CInt) +import Prelude hiding (repeat) +import System.Posix.Internals (setNonBlockingFD) +import System.Posix.Types (Fd) + +{-# SPECIALISE + throwErrnoIfMinus1Retry_mayBlock + :: String -> IO CInt -> IO CInt -> IO CInt #-} +throwErrnoIfMinus1Retry_mayBlock :: (Eq a, Num a) => String -> + IO a -> IO a -> IO a +throwErrnoIfMinus1Retry_mayBlock name on_block act = do + res <- act + if res == -1 + then do + err <- getErrno + if err == eINTR + then throwErrnoIfMinus1Retry_mayBlock name on_block act + else if err == eWOULDBLOCK || err == eAGAIN + then on_block + else throwErrno name + else return res + +throwErrnoIfMinus1Retry_repeatOnBlock :: (Eq a, Num a) => String -> + IO b -> IO a -> IO a +throwErrnoIfMinus1Retry_repeatOnBlock name on_block act = + throwErrnoIfMinus1Retry_mayBlock name (on_block >> repeat) act + where repeat = throwErrnoIfMinus1Retry_repeatOnBlock name on_block act + +setNonBlocking :: Fd -> IO () +setNonBlocking fd = +#if __GLASGOW_HASKELL__ > 611 + setNonBlockingFD (fromIntegral fd) True +#else + setNonBlockingFD (fromIntegral fd) +#endif diff --git a/utils/benchmarks/events/HttpClient.hs b/utils/benchmarks/events/HttpClient.hs new file mode 100644 index 0000000000..cf9f81fc34 --- /dev/null +++ b/utils/benchmarks/events/HttpClient.hs @@ -0,0 +1,155 @@ +{-# LANGUAGE BangPatterns, FlexibleContexts, OverloadedStrings #-} + +import Text.Printf +import qualified Data.Attoparsec.ByteString as A (feed, parseWith, IResult(..), parse) +import qualified Data.Attoparsec.ByteString.Char8 as C +import RFC2616 +import Control.Exception +import Control.Concurrent.QSemN +import Control.Monad +import Network.Socket hiding (connect) +import System.Console.GetOpt +import Data.Monoid +import GHC.Conc (numCapabilities) +import Args (ljust, parseArgs, positive, theLast) +import Control.Concurrent (forkIO) +import System.Environment (getArgs) +import qualified Data.ByteString.Char8 as B +import Text.Parsec +import Text.Parsec.String +import Data.Char (isSpace) +import GHC.Event (ensureIOManagerIsRunning) +import EventSocket (connect, recv, sendAll) +import Data.Time.Clock (diffUTCTime, getCurrentTime, nominalDiffTimeToSeconds) +import qualified Data.Semigroup as S + +type URL = (String, String, String) + +url :: Parser URL +url = + (,,) <$> (string "http://" *> (many . satisfy $ \c -> c /= ':' && c /= '/')) + <*> ((char ':' *> many digit) <|> pure "80") + <*> ((many1 . satisfy $ not . isSpace) <|> pure "/") + +urlConnector :: String -> IO (IO (Socket, B.ByteString)) +urlConnector urlStr = do + let (host, port, uri) = case parse url "<cmdline>" urlStr of + Left err -> error (show err) + Right req -> req + myHints = defaultHints { addrSocketType = Stream } + (ai:_) <- getAddrInfo (Just myHints) (Just host) (Just port) + return $ do + sock <- socket (addrFamily ai) (addrSocketType ai) (addrProtocol ai) + let req = B.concat ["GET ", B.pack uri, " HTTP/1.1\r\n" + ,"Host: ", B.pack host, ":", B.pack port, "\r\n"] + connect sock (addrAddress ai) + return (sock, req) + +client ctors reqs = do + forM_ ctors $ \connector -> do + let loop slop !reqno sock reqStart = do + let refill = recv sock 65536 + req = B.concat [reqStart, "\r\n"] + sendAll sock req + resp <- (if B.null slop then refill else return slop) >>= + A.parseWith refill RFC2616.response + case resp of + err@(A.Partial _) -> print err + A.Fail bs _ msg -> print (msg, B.take 10 bs) + A.Done bs (_, chdrs) -> do + let hdrs = map lowerHeader chdrs + closeHeader = Header "connection" ["close"] + contentLength = case A.parse C.decimal (B.concat + (lookupHeader "content-length" hdrs)) `A.feed` "" of + A.Done _ n -> n + _ -> error (show chdrs) + let slurp !n s = do + let len = B.length s + if len == 0 || len >= n + then return $! B.drop n s + else slurp (n-len) =<< recv sock 65536 + if B.length bs >= contentLength + then if reqno >= reqs || closeHeader `elem` hdrs + then return () + else loop (B.drop contentLength bs) (reqno+1) sock reqStart + else slurp contentLength bs >>= \s -> + if reqno >= reqs || closeHeader `elem` hdrs + then return () + else loop s (reqno+1) sock reqStart + bracket connector (close . fst) . uncurry $ loop "" 1 + + +main = do + (cfg, urls) <- parseArgs defaultConfig defaultOptions =<< getArgs + when (null urls) $ error "no URLs" + ensureIOManagerIsRunning + ctors <- mapM urlConnector urls + let clients = theLast cfgClients cfg + conns = theLast cfgConnections cfg + requests = theLast cfgRequests cfg + total = clients * conns * requests + putStrLn $ "issuing " ++ show total ++ " requests" + sem <- newQSemN 0 + start <- getCurrentTime + replicateM_ clients $ do + _ <- forkIO $ client (take conns (cycle ctors)) requests `finally` signalQSemN sem 1 + return () + waitQSemN sem clients + end <- getCurrentTime + let elapsed = realToFrac (nominalDiffTimeToSeconds $ diffUTCTime end start) :: Double + rate = realToFrac (fromIntegral total / elapsed) :: Double + printf "%.6g reqs/sec in %.6g secs\n" rate elapsed + +------------------------------------------------------------------------ +-- Configuration + +data Config = Config { + cfgClients :: Last Int + , cfgConnections :: Last Int + , cfgRequests :: Last Int + } + +defaultConfig :: Config +defaultConfig = Config { + cfgClients = ljust numCapabilities + , cfgConnections = ljust numCapabilities + , cfgRequests = ljust 1 + } + +instance Monoid Config where + mempty = Config { + cfgClients = mempty + , cfgConnections = mempty + , cfgRequests = mempty + } + + mappend = (<>) + +instance S.Semigroup Config where + Config { + cfgClients = a + , cfgConnections = b + , cfgRequests = c + } <> Config { cfgClients = d + , cfgConnections = e + , cfgRequests = f + } = + Config { cfgClients = a <> d + , cfgConnections = b <> e + , cfgRequests = c <> f } + +defaultOptions :: [OptDescr (IO Config)] +defaultOptions = [ + Option ['c'] ["clients"] + (ReqArg (positive "number of concurrent clients" $ \n -> + mempty { cfgClients = n }) "N") + "number of concurrent clients" + , Option ['n'] ["connections"] + (ReqArg (positive "number of connections" $ \n -> + mempty { cfgConnections = n }) "N") + "number of connections" + , Option ['r'] ["requests"] + (ReqArg (positive "number of requests per connection" $ \n -> + mempty { cfgRequests = n }) "N") + "number of requests" + ] diff --git a/utils/benchmarks/events/IntMap.hs b/utils/benchmarks/events/IntMap.hs new file mode 100644 index 0000000000..0313de4e87 --- /dev/null +++ b/utils/benchmarks/events/IntMap.hs @@ -0,0 +1,24 @@ +{-# LANGUAGE BangPatterns #-} +module Main where + +import Criterion.Main +import Data.IntMap.Strict (IntMap) +import qualified Data.IntMap.Strict as IM + +main = defaultMain + [ bench "insert10k" $ whnf ascFrom n + ] + where + -- Number of elements + n = 10000 + +-- | Create an integer map with keys in ascending order starting at 0 +-- and ending at @max@ (exclusive.) +ascFrom :: Int -> IntMap Int +ascFrom max = go 0 IM.empty + where + go :: Int -> IntMap Int -> IntMap Int + go n !mp + | n >= max = mp + | otherwise = let !mp' = IM.insertWith const n n mp + in go (n + 1) mp' diff --git a/utils/benchmarks/events/LICENSE b/utils/benchmarks/events/LICENSE new file mode 100644 index 0000000000..d248edeb52 --- /dev/null +++ b/utils/benchmarks/events/LICENSE @@ -0,0 +1,30 @@ +Copyright (c) 2020, XXX + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + + * Neither the name of XXX nor the names of other + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/utils/benchmarks/events/Makefile b/utils/benchmarks/events/Makefile new file mode 100644 index 0000000000..ad3a0bb290 --- /dev/null +++ b/utils/benchmarks/events/Makefile @@ -0,0 +1,105 @@ +# To enable profiling you must pass the --enable-library-profiling +# flag to "cabal configure". +ghc-prof-flags := +ifdef ENABLE_PROFILING + ghc-prof-flags += -prof -hisuf p_hi -osuf p_o -fprof-auto + lib-suffix := _p +else + lib-suffix := +endif + +# To enable event logging you must pass the --ghc-option=-eventlog +# flag to "cabal configure". +ifdef ENABLE_EVENTLOG + ghc-prof-flags += -eventlog +endif + +INPLACE := ./../../../inplace +ghc := $(INPLACE)/bin/ghc-stage2 +ghc-opt-flags := -O2 +ghc-base-flags := -funbox-strict-fields -fno-ignore-asserts +ghc-base-flags += -Wall -fno-warn-orphans -fno-warn-missing-signatures +ghc-flags := $(ghc-base-flags) $(ghc-prof-flags) \ + -package network-3.1.2.1 \ + -package unix +ghc-hpc-flags := $(ghc-base-flags) -fhpc -fno-ignore-asserts + +cabal := $(shell which cabal 2>/dev/null) + +%.o: %.hs + $(ghc) $(ghc-flags) $(ghc-opt-flags) -c $< -o $@ + +%.hs: %.hsc + hsc2hs $< + +# package network-bytestring is deprecated into network +# and bytestring-show is redundant +ghc-bench-flags := -package mtl \ + -package attoparsec \ + +ifdef USE_GHC_IO_MANAGER + ghc-bench-flags += -DUSE_GHC_IO_MANAGER +endif + +programs := dead-conn deadconn pong-server signal simple static-http \ + thread-delay timers http-client + +.PHONY: bench + +bench: all clean +all: $(programs) + +cc = gcc +cc-opt-flags = -O2 +%.o: %.c + $(cc) -c $(cc-opt-flags) $< -o $@ + +dead-conn: ghc-flags += $(ghc-bench-flags) +dead-conn: Args.o EventUtil.o EventSocket.o DeadConn.o + $(ghc) $(ghc-flags) -threaded -o $@ $(filter %.o,$^) + +# A C version of the above tool. +deadconn: deadconn.o + $(cc) $(cc-opt-flags) -o $@ $(filter %.o,$^) + +pong-server: ghc-flags += $(ghc-bench-flags) +pong-server: Args.o EventUtil.o EventSocket.o PongServer.o + $(ghc) $(ghc-flags) -threaded -o $@ $(filter %.o,$^) + +event-http: ghc-flags += $(ghc-bench-flags) +event-http: EventUtil.o EventFile.o EventSocket.o NoPush.o RFC2616.o EventHttp.o + $(ghc) $(ghc-flags) -threaded -o $@ $(filter %.o,$^) + +static-http: ghc-flags += $(ghc-bench-flags) +static-http: EventUtil.o EventFile.o EventSocket.o NoPush.o RFC2616.o StaticHttp.o + $(ghc) $(ghc-flags) -threaded -o $@ $(filter %.o,$^) + +http-client: ghc-flags += $(ghc-bench-flags) -package parsec-3.1.14.0 +http-client: Args.o EventUtil.o EventSocket.o RFC2616.o HttpClient.o + $(ghc) $(ghc-flags) -threaded -o $@ $(filter %.o,$^) + +signal: Signal.o + $(ghc) $(ghc-flags) -threaded -o $@ $(filter %.o,$^) + +simple: Args.o Simple.o + $(ghc) $(ghc-flags) -threaded -o $@ $(filter %.o,$^) + +thread-delay: ghc-flags += -package stm +thread-delay: Args.o ThreadDelay.o + $(ghc) $(ghc-flags) -threaded -o $@ $(filter %.o,$^) + +timers: Args.o Timers.o + $(ghc) $(ghc-flags) -threaded -o $@ $(filter %.o,$^) + + +.PHONY: cleanall + +cleanall: clean data_clean + -find . \( -name 'events' -o -name 'threads' -o -name 'new-thread-delay' -o -name 'old-thread-delay' \) -exec rm {} \; + -rm -f $(programs) + +clean: + -find . \( -name '*.o' -o -name '*.dyn_o' -o -name '*.hi' \) -exec rm {} \; + +data_clean: + -find . \( -name '*.dat' -o -name '*.png' \) -exec rm {} \; diff --git a/utils/benchmarks/events/NoPush.hsc b/utils/benchmarks/events/NoPush.hsc new file mode 100644 index 0000000000..6f52d71404 --- /dev/null +++ b/utils/benchmarks/events/NoPush.hsc @@ -0,0 +1,35 @@ +{-# LANGUAGE ForeignFunctionInterface #-} + +module NoPush (setNoPush) where + +#include <sys/socket.h> +#include <netinet/tcp.h> +#include <netinet/in.h> + +import Foreign.C.Error (throwErrnoIfMinus1_) +import Foreign.C.Types (CInt(..)) +import Foreign.Marshal.Utils (with) +import Foreign.Ptr (Ptr) +import Foreign.Storable (sizeOf) +import Network.Socket (Socket, withFdSocket) + +noPush :: CInt +#if defined(TCP_NOPUSH) +noPush = #const TCP_NOPUSH +#elif defined(TCP_CORK) +noPush = #const TCP_CORK +#else +noPush = 0 +#endif + +setNoPush :: Socket -> Bool -> IO () +setNoPush _ _ | noPush == 0 = return () +setNoPush sock onOff = + withFdSocket sock $ \fd -> do + let v = if onOff then 1 else 0 + with v $ \ptr -> + throwErrnoIfMinus1_ "setNoPush" $ + c_setsockopt fd (#const IPPROTO_TCP) noPush ptr (fromIntegral (sizeOf v)) + +foreign import ccall unsafe "setsockopt" + c_setsockopt :: CInt -> CInt -> CInt -> Ptr CInt -> CInt -> IO CInt diff --git a/utils/benchmarks/events/PSQ.hs b/utils/benchmarks/events/PSQ.hs new file mode 100644 index 0000000000..e318207c61 --- /dev/null +++ b/utils/benchmarks/events/PSQ.hs @@ -0,0 +1,54 @@ +{-# LANGUAGE BangPatterns #-} + +module Main where + +import Criterion.Main +import GHC.Event.PSQ (PSQ) +import qualified GHC.Event.PSQ as Q + +main = defaultMain + [ bench "atMost1k/length" $ whnf (atMostLength 1000) q + , bench "insert10k/min" $ whnf (Q.findMin . ascFrom) n + , bench "delete1k/min" $ whnf (Q.findMin . deleteEveryN (n `div` 1000) n) q + , bench "adjust1k/min" $ whnf (Q.findMin . adjustEveryN (n `div` 1000) n) q + ] + where + -- Number of elements + n = 10000 + + -- Priority queue with 'n' elements + q = ascFrom n + +-- | Return the number of elements with priority at most @pt@ +atMostLength :: Q.Prio -> PSQ Int -> Int +atMostLength pt q = length . fst . Q.atMost pt $ q + +-- | Create a priority queue with keys and priorities in ascending +-- order starting at 0 and ending at @max@ (exclusive.) +ascFrom :: Int -> PSQ Int +ascFrom max = go 0 Q.empty + where + go :: Int -> PSQ Int -> PSQ Int + go n !q + | n >= max = q + | otherwise = go (n + 1) $ + Q.insert (fromIntegral n) (fromIntegral n) n q + +-- | Delete all keys that are multiples of @step@ but less than @max@. +deleteEveryN :: Int -> Int -> PSQ a -> PSQ a +deleteEveryN step max q0 = go 0 q0 + where + go :: Int -> PSQ a -> PSQ a + go n !q + | n >= max = q + | otherwise = go (n + step) $ Q.delete (fromIntegral n) q + +-- | Adjust the priority of all keys that are multiples of @step@ but +-- less than @max@. +adjustEveryN :: Int -> Int -> PSQ a -> PSQ a +adjustEveryN step max q0 = go 0 q0 + where + go :: Int -> PSQ a -> PSQ a + go n !q + | n >= max = q + | otherwise = go (n + step) $ Q.adjust (+ 1) (fromIntegral n) q diff --git a/utils/benchmarks/events/PongServer.hs b/utils/benchmarks/events/PongServer.hs new file mode 100644 index 0000000000..53501bef45 --- /dev/null +++ b/utils/benchmarks/events/PongServer.hs @@ -0,0 +1,122 @@ +{-# LANGUAGE CPP, OverloadedStrings #-} + +-- Requires the network-bytestring library. +-- +-- Start server and run +-- httperf --server=localhost --port=5002 --uri=/ --num-conns=10000 +-- or +-- ab -n 10000 -c 100 http://localhost:5002/ + +import Args (ljust, parseArgs, positive, theLast) +import Control.Concurrent (forkIO, runInUnboundThread) +import Data.ByteString.Char8 () +import Data.Monoid (Last(..)) +import Network.Socket hiding (accept) +import qualified Data.ByteString as S +import qualified Data.ByteString.Char8 as C () +#if defined(USE_GHC_IO_MANAGER) +import Network.Socket (accept) +import Network.Socket.ByteString (recv, sendAll) +#else +import EventSocket (accept, recv, sendAll) +import GHC.Event (ensureIOManagerIsRunning) +#endif +import System.Console.GetOpt (ArgDescr(ReqArg), OptDescr(..)) +import System.Environment (getArgs) +import System.Posix.Resource (ResourceLimit(..), ResourceLimits(..), + Resource(..), setResourceLimit) +import qualified Data.Semigroup as Sem + +main = do + (cfg, _) <- parseArgs defaultConfig defaultOptions =<< getArgs + let listenBacklog = theLast cfgListenBacklog cfg + port = theLast cfgPort cfg + lim = ResourceLimit . fromIntegral . theLast cfgMaxFds $ cfg + myHints = defaultHints { addrFlags = [AI_PASSIVE] + , addrSocketType = Stream } +#if !defined(USE_GHC_IO_MANAGER) + ensureIOManagerIsRunning +#endif + setResourceLimit ResourceOpenFiles + ResourceLimits { softLimit = lim, hardLimit = lim } + (ai:_) <- getAddrInfo (Just myHints) Nothing (Just port) + sock <- socket (addrFamily ai) (addrSocketType ai) (addrProtocol ai) + setSocketOption sock ReuseAddr 1 + bind sock (addrAddress ai) + listen sock listenBacklog + runInUnboundThread $ acceptConnections sock + +acceptConnections :: Socket -> IO () +acceptConnections sock = loop + where + loop = do + (c,_) <- accept sock + _ <- forkIO $ client c + loop + +client :: Socket -> IO () +client sock = do + recvRequest "" + sendAll sock msg + close sock + where + msg = "HTTP/1.0 200 OK\r\nConnection: Close\r\nContent-Length: 5\r\n\r\nPong!" + recvRequest r = do + s <- recv sock 4096 + let t = S.append r s + if S.null s || "\r\n\r\n" `S.isInfixOf` t + then return () + else recvRequest t + +------------------------------------------------------------------------ +-- Configuration + +data Config = Config { + cfgListenBacklog :: Last Int + , cfgMaxFds :: Last Int + , cfgPort :: Last String + } + +defaultConfig :: Config +defaultConfig = Config { + cfgListenBacklog = ljust 1024 + , cfgMaxFds = ljust 256 + , cfgPort = ljust "5002" + } + +instance Sem.Semigroup Config where + Config { + cfgListenBacklog = a + , cfgMaxFds = b + , cfgPort = c + } <> Config { cfgListenBacklog = d + , cfgMaxFds = e + , cfgPort = f + } = + Config {cfgListenBacklog = a <> d, + cfgMaxFds = b <> e, + cfgPort = c <> f} + +instance Monoid Config where + mempty = Config { + cfgListenBacklog = mempty + , cfgMaxFds = mempty + , cfgPort = mempty + } + + mappend = (<>) + +defaultOptions :: [OptDescr (IO Config)] +defaultOptions = [ + Option ['p'] ["port"] + (ReqArg (\s -> return mempty { cfgPort = ljust s }) "N") + "server port" + , Option ['m'] ["max-fds"] + (ReqArg (positive "maximum number of file descriptors" $ \n -> + mempty { cfgMaxFds = n }) "N") + "maximum number of file descriptors" + , Option [] ["listen-backlog"] + (ReqArg (positive "maximum number of pending connections" $ \n -> + mempty { cfgListenBacklog = n }) "N") + "maximum number of pending connections" + ] diff --git a/utils/benchmarks/events/RFC2616.hs b/utils/benchmarks/events/RFC2616.hs new file mode 100644 index 0000000000..4c24ce7a61 --- /dev/null +++ b/utils/benchmarks/events/RFC2616.hs @@ -0,0 +1,89 @@ +{-# LANGUAGE OverloadedStrings #-} + +module RFC2616 + ( + Header(..) + , Request(..) + , Response(..) + , isToken + , messageHeader + , request + , requestLine + , response + , responseLine + , lowerHeader + , lookupHeader + ) where + +import Data.Attoparsec.ByteString as P +import qualified Data.Attoparsec.ByteString.Char8 as P8 +import Data.Attoparsec.ByteString.Char8 (char8, endOfLine, isDigit_w8) +import Data.Word (Word8) +import qualified Data.ByteString.Char8 as B hiding (map) +import qualified Data.ByteString as B (map) + +isToken :: Word8 -> Bool +isToken w = w <= 127 && notInClass "\0-\31()<>@,;:\\\"/[]?={} \t" w + +skipSpaces :: Parser () +skipSpaces = satisfy P8.isHorizontalSpace *> skipWhile P8.isHorizontalSpace + +data Request = Request { + requestMethod :: !B.ByteString + , requestUri :: !B.ByteString + , requestVersion :: !B.ByteString + } deriving (Eq, Ord, Show) + +httpVersion :: Parser B.ByteString +httpVersion = string "HTTP/" *> P.takeWhile (\c -> isDigit_w8 c || c == 46) + +requestLine :: Parser Request +requestLine = do + method <- P.takeWhile1 isToken <* char8 ' ' + uri <- P.takeWhile1 (/=32) <* char8 ' ' + version <- httpVersion <* endOfLine + return $! Request method uri version + +data Header = Header { + headerName :: !B.ByteString + , headerValue :: [B.ByteString] + } deriving (Eq, Ord, Show) + +messageHeader :: Parser Header +messageHeader = do + header <- P.takeWhile isToken <* char8 ':' <* P.skipWhile P8.isHorizontalSpace + body <- P.takeTill P8.isEndOfLine <* endOfLine + bodies <- P.many' $ skipSpaces *> P.takeTill P8.isEndOfLine <* endOfLine + return $! Header header (body:bodies) + +request :: Parser (Request, [Header]) +request = (,) <$> requestLine <*> P.many' messageHeader <* endOfLine + +data Response = Response { + responseVersion :: !B.ByteString + , responseCode :: !B.ByteString + , responseMsg :: !B.ByteString + } deriving (Eq, Ord, Show) + +responseLine :: Parser Response +responseLine = do + version <- httpVersion <* char8 ' ' + code <- P.takeWhile isDigit_w8 <* char8 ' ' + msg <- P.takeTill P8.isEndOfLine <* endOfLine + return $! Response version code msg + +response :: Parser (Response, [Header]) +response = (,) <$> responseLine <*> P.many' messageHeader <* endOfLine + +lowerHeader :: Header -> Header +lowerHeader (Header n v) = Header (B.map toLower n) (map (B.map toLower) v) + where toLower w | w >= 65 && w <= 90 = w + 32 + | otherwise = w + +lookupHeader :: B.ByteString -> [Header] -> [B.ByteString] +lookupHeader k = go + where + go (Header n v:hs) + | k == n = v + | otherwise = go hs + go _ = [] diff --git a/utils/benchmarks/events/Setup.hs b/utils/benchmarks/events/Setup.hs new file mode 100644 index 0000000000..9a994af677 --- /dev/null +++ b/utils/benchmarks/events/Setup.hs @@ -0,0 +1,2 @@ +import Distribution.Simple +main = defaultMain diff --git a/utils/benchmarks/events/Signal.hs b/utils/benchmarks/events/Signal.hs new file mode 100644 index 0000000000..b4aef9ca7e --- /dev/null +++ b/utils/benchmarks/events/Signal.hs @@ -0,0 +1,26 @@ +{-# LANGUAGE FlexibleContexts #-} + +import Control.Concurrent +import System.Posix.Signals (Handler(Catch), sigINT, + sigUSR1, sigUSR2, + blockSignals, + fullSignalSet, installHandler) +import qualified GHC.Event as EM (new, emState, loop, EventManager) +import GHC.IORef (readIORef) + +handler :: EM.EventManager -> Handler +handler em = do + Catch (readIORef (EM.emState em) >>= print) + +main :: IO () +main = do + mgr <- EM.new + blockSignals fullSignalSet + _ <- installHandler sigINT (handler mgr) (Just fullSignalSet) + putStrLn "INT handler installed" + _ <- forkIO $ do + threadDelay 1000000 + _ <- installHandler sigUSR1 (handler mgr) (Just fullSignalSet) + _ <- installHandler sigUSR2 (handler mgr) (Just fullSignalSet) + putStrLn "USR1 and USR2 handlers installed" + EM.loop mgr diff --git a/utils/benchmarks/events/Simple.hs b/utils/benchmarks/events/Simple.hs new file mode 100644 index 0000000000..3bba995d24 --- /dev/null +++ b/utils/benchmarks/events/Simple.hs @@ -0,0 +1,126 @@ +{-# LANGUAGE BangPatterns #-} +-- Flow: +-- +-- 1. Create N pipes. +-- +-- Modelled after: +-- http://levent.svn.sourceforge.net/viewvc/levent/trunk/libevent/test/bench.c + +import Args (ljust, parseArgs, positive, theLast) +import Control.Concurrent (MVar, forkIO, takeMVar, newEmptyMVar, putMVar) +import Control.Monad (forM_, replicateM, when, void) +import Data.IORef (IORef, atomicModifyIORef, newIORef) +import Data.Monoid (Last(..)) +import Foreign.C.Error (throwErrnoIfMinus1Retry, throwErrnoIfMinus1Retry_) +import Foreign.Marshal.Alloc (alloca) +import System.Console.GetOpt (ArgDescr(ReqArg), OptDescr(..)) +import System.Environment (getArgs) +import GHC.Event (Event, Lifetime(OneShot)) +import qualified GHC.Event as ET (TimerManager, newWith, newDefaultBackend, + registerTimeout) +import qualified GHC.Event as EM (FdKey, loop, keyFd, new, registerFd, + evtRead, evtWrite) +import Data.Semigroup as Sem hiding (Last, Option) +import System.Posix.IO (createPipe) +import System.Posix.Resource (ResourceLimit(..), ResourceLimits(..), + Resource(..), setResourceLimit) +import System.Posix.Internals (c_close, c_read, c_write) +import System.Posix.Types (Fd(..)) + +data Config = Config { + cfgDelay :: Last Int + , cfgNumPipes :: Last Int + , cfgNumMessages :: Last Int + } + +defaultConfig :: Config +defaultConfig = Config { + cfgDelay = ljust 0 + , cfgNumPipes = ljust 448 + , cfgNumMessages = ljust 1024 + } + +instance Sem.Semigroup Config where + (Config a b c) <> + (Config d e f) = + Config + (a <> d) + (b <> e) + (c <> f) + +instance Monoid Config where + mempty = Config mempty mempty mempty + mappend = (<>) + +defaultOptions :: [OptDescr (IO Config)] +defaultOptions = [ + Option ['d'] ["delay"] + (ReqArg (positive "delay in ms before read" $ \n -> mempty { cfgDelay = n }) "N") + "number of pipes to use" + ,Option ['p'] ["pipes"] + (ReqArg (positive "number of pipes" $ \n -> mempty { cfgNumPipes = n }) "N") + "number of pipes to use" + ,Option ['m'] ["messages"] + (ReqArg (positive "number of messages" $ \n -> mempty { cfgNumMessages = n }) "N") + "number of messages to send" + ] + +readCallback :: Config -> ET.TimerManager -> MVar () -> IORef Int + -> EM.FdKey -> Event -> IO () +readCallback cfg mgr done ref reg _ = do + let numMessages = theLast cfgNumMessages cfg + delay = theLast cfgDelay cfg + fd = EM.keyFd reg + a <- atomicModifyIORef ref (\a -> let !b = a+1 in (b,b)) + case undefined of + _ | a > numMessages -> close fd >> putMVar done () + | delay == 0 -> readByte fd + | otherwise -> void (ET.registerTimeout mgr delay (readByte fd)) + +writeCallback :: Config -> IORef Int -> EM.FdKey -> Event -> IO () +writeCallback cfg ref reg _ = do + let numMessages = theLast cfgNumMessages cfg + fd = EM.keyFd reg + a <- atomicModifyIORef ref (\a -> let !b = a+1 in (b,b)) + if a > numMessages + then close fd + else writeByte fd + +main :: IO () +main = do + (cfg, _args) <- parseArgs defaultConfig defaultOptions =<< getArgs + let numPipes = theLast cfgNumPipes cfg + lim = ResourceLimit $ fromIntegral numPipes * 2 + 50 + setResourceLimit ResourceOpenFiles + ResourceLimits { softLimit = lim, hardLimit = lim } + + putStrLn "creating pipes" + pipePairs <- replicateM numPipes createPipe + + mgr <- EM.new + mgr_timer <- ET.newWith =<< ET.newDefaultBackend + _ <- forkIO $ EM.loop mgr + rref <- newIORef 0 + wref <- newIORef 0 + done <- newEmptyMVar + putStrLn "registering readers" + forM_ pipePairs $ \(r,_) -> + EM.registerFd mgr (readCallback cfg mgr_timer done rref) r EM.evtRead OneShot + putStrLn "registering writers" + forM_ pipePairs $ \(_,w) -> + EM.registerFd mgr (writeCallback cfg wref) w EM.evtWrite OneShot + putStrLn "waiting until done" + takeMVar done + +readByte :: Fd -> IO () +readByte (Fd fd) = + alloca $ \p -> throwErrnoIfMinus1Retry_ "readByte" $ c_read fd p 1 + +writeByte :: Fd -> IO () +writeByte (Fd fd) = + alloca $ \p -> do + n <- throwErrnoIfMinus1Retry "writeByte" $ c_write fd p 1 + when (n /= 1) . error $ "writeByte returned " ++ show n + +close :: Fd -> IO () +close (Fd fd) = void (c_close fd) diff --git a/utils/benchmarks/events/StaticHttp.hs b/utils/benchmarks/events/StaticHttp.hs new file mode 100644 index 0000000000..096faf4fe6 --- /dev/null +++ b/utils/benchmarks/events/StaticHttp.hs @@ -0,0 +1,97 @@ +{-# LANGUAGE CPP, OverloadedStrings #-} + +import Control.Concurrent (forkIO, runInUnboundThread) +import Control.Exception (bracket, finally) +import Control.Monad (unless, when) +import Control.Monad.Fix (fix) +import qualified Data.Attoparsec.ByteString as A +import qualified Data.ByteString.Char8 as B +import qualified Data.ByteString.Lazy as L +import Network.Socket hiding (accept) +#if defined(USE_GHC_IO_MANAGER) +import Network.Socket (accept) +import Network.Socket.ByteString (recv, sendAll) +#else +import EventSocket (accept, recv, sendAll) +import GHC.Event (ensureIOManagerIsRunning) +#endif +import qualified EventFile as F +import System.Posix.Files +import System.Posix.IO +import NoPush +import RFC2616 + +strict :: L.ByteString -> B.ByteString +strict = B.concat . L.toChunks + +main = do + let port = "5002" + myHints = defaultHints { addrFlags = [AI_PASSIVE] + , addrSocketType = Stream } + (ai:_) <- getAddrInfo (Just myHints) Nothing (Just port) +#if !defined(USE_GHC_IO_MANAGER) + ensureIOManagerIsRunning +#endif + sock <- socket (addrFamily ai) (addrSocketType ai) (addrProtocol ai) + setSocketOption sock ReuseAddr 1 + bind sock (addrAddress ai) + listen sock 1024 + runInUnboundThread $ acceptConnections sock + +acceptConnections :: Socket -> IO () +acceptConnections sock = loop + where + loop = do + (c,_) <- accept sock + _ <- forkIO $ client c + loop + +parseM :: Monad m => m B.ByteString -> A.Parser a -> m (B.ByteString, Either String a) +parseM refill p = (step . A.parse p) =<< refill + where step (A.Fail bs _stk msg) = return (bs, Left msg) + step (A.Partial k) = (step . k) =<< refill + step (A.Done bs r) = return (bs, Right r) + +asInt :: Integral a => a -> Int +asInt = fromIntegral + +withNoPush :: Socket -> IO a -> IO a +withNoPush sock act = setNoPush sock True >> act `finally` setNoPush sock False + +client :: Socket -> IO () +client sock = (`finally` close sock) loop + where + loop = do + (bs, ereq) <- parseM (recv sock 4096) request + case ereq of + Right (req,hdrs) | requestMethod req == "GET" -> do + let http10 = requestVersion req == "1.0" + connection = lookupHeader "Connection" hdrs + keepAlive = (http10 && connection == ["Keep-Alive"]) || + (not http10 && connection /= ["Close"]) + bracket (openFd (B.unpack (requestUri req)) ReadOnly Nothing + defaultFileFlags{nonBlock=True}) closeFd $ \fd -> do + st <- getFdStatus fd + let fixedHeaders + | http10 && keepAlive = + B.intercalate "\r\n" [ + "HTTP/1.0 200 OK" + , "Content-type: application/octet-stream" + , "Connection: Keep-Alive" + ] + | otherwise = + B.intercalate "\r\n" [ + "HTTP/1.1 200 OK" + , "Content-type: application/octet-stream" + ] + withNoPush sock $ do + sendAll sock $! (`B.append` "\r\n\r\n") $ B.intercalate "\r\n" [ + fixedHeaders + , B.append "Content-length: " . strict . L.singleton . toEnum . asInt . fileSize $ st + ] + fix $ \sendLoop -> do + s <- F.read fd 16384 + unless (B.null s) $ sendAll sock s >> sendLoop + when keepAlive loop + err | B.null bs -> return () + | otherwise -> print err >> sendAll sock "HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n" diff --git a/utils/benchmarks/events/ThreadDelay.hs b/utils/benchmarks/events/ThreadDelay.hs new file mode 100644 index 0000000000..484f266a3d --- /dev/null +++ b/utils/benchmarks/events/ThreadDelay.hs @@ -0,0 +1,73 @@ +{-# LANGUAGE CPP, BangPatterns #-} + +-- Benchmark 'threadDelay' by forking N threads which sleep for a +-- number of milliseconds and wait for them all to finish. + +import Args (ljust, parseArgs, positive, theLast) +import Control.Concurrent (forkIO, runInUnboundThread) +import Control.Monad (unless, when) +import qualified Data.Semigroup as Sem +import Data.Monoid (Last(..)) +import System.Console.GetOpt (ArgDescr(ReqArg), OptDescr(..)) +import System.Environment (getArgs) +import GHC.Event (ensureIOManagerIsRunning) +import Control.Concurrent.STM +#if defined(USE_GHC_IO_MANAGER) +import Control.Concurrent (threadDelay) +#else +import GHC.Event (threadDelay) +#endif + +main = do + (cfg, _) <- parseArgs defaultConfig defaultOptions =<< getArgs + let numThreads = theLast cfgNumThreads cfg + + ensureIOManagerIsRunning + done <- newTVarIO False + ref <- newTVarIO 0 + let loop :: Int -> IO () + loop i = do + when (i < numThreads) $ do + _ <- forkIO $ do + threadDelay 1000 + atomically $ do + a <- readTVar ref + let !b = a+1 + writeTVar ref b + when (b == numThreads) $ writeTVar done True + loop (i + 1) + runInUnboundThread $ do + loop 0 + atomically $ do + b <- readTVar done + unless b retry + +------------------------------------------------------------------------ +-- Configuration + +data Config = Config { + cfgNumThreads :: Last Int + } + +defaultConfig :: Config +defaultConfig = Config { + cfgNumThreads = ljust 1000 + } + +instance Sem.Semigroup Config where + Config a <> Config b = + Config (a <> b) + +instance Monoid Config where + mempty = Config { + cfgNumThreads = mempty + } + mappend = (<>) + +defaultOptions :: [OptDescr (IO Config)] +defaultOptions = [ + Option ['n'] ["threads"] + (ReqArg (positive "number of threads" $ \n -> + mempty { cfgNumThreads = n }) "N") + "number of threads" + ] diff --git a/utils/benchmarks/events/Timers.hs b/utils/benchmarks/events/Timers.hs new file mode 100644 index 0000000000..3c1be4f2e9 --- /dev/null +++ b/utils/benchmarks/events/Timers.hs @@ -0,0 +1,73 @@ +{-# LANGUAGE BangPatterns #-} +-- Benchmark that registers N timeouts, adjusts them a number of time +-- and finally waits for them to expire. + +import Args (ljust, parseArgs, nonNegative, positive, theLast) +import Control.Concurrent (MVar, forkIO, takeMVar, newEmptyMVar, putMVar) +import Control.Monad (forM_, replicateM, when) +import Data.IORef (IORef, atomicModifyIORef, newIORef) +import Data.Semigroup as Sem hiding (Last, Option) +import Data.Monoid (Last(..)) +import GHC.Event as ET (timeLoop, newWith, newDefaultBackend, + registerTimeout, updateTimeout) +import System.Console.GetOpt (ArgDescr(ReqArg), OptDescr(..)) +import System.Environment (getArgs) + +data Config = Config + { cfgNumTimeouts :: Last Int + , cfgNumAdjusts :: Last Int + } + +defaultConfig :: Config +defaultConfig = Config + { cfgNumTimeouts = ljust 1000 + , cfgNumAdjusts = ljust 3 + } + +instance Sem.Semigroup Config where + (Config cfgNumTimeouts_1 cfgNumAdjusts_1) <> + (Config cfgNumTimeouts_2 cfgNumAdjusts_2) = + Config (cfgNumTimeouts_1 <> cfgNumTimeouts_2) (cfgNumAdjusts_1 <> cfgNumAdjusts_2) + +instance Monoid Config where + mappend = (<>) + mempty = Config mempty mempty + +defaultOptions :: [OptDescr (IO Config)] +defaultOptions = [ + Option ['n'] ["timeouts"] + (ReqArg (positive "number of timeouts" $ \n -> + mempty { cfgNumTimeouts = n }) "N") + "number of timeouts to use" + , Option ['a'] ["adjustments"] + (ReqArg (nonNegative "number of adjustments" $ \n -> + mempty { cfgNumAdjusts = n }) "N") + "number of adjustments to use for each timeout" + ] + +callback :: MVar () -> IORef Int -> Config -> IO () +callback done nref cfg = do + a <- atomicModifyIORef nref (\a -> let !b = a+1 in (b,b)) + when (a >= numTimeouts) $ putMVar done () + where + numTimeouts = theLast cfgNumTimeouts cfg + +main :: IO () +main = do + (cfg, _) <- parseArgs defaultConfig defaultOptions =<< getArgs + let numTimeouts = theLast cfgNumTimeouts cfg + numAdjusts = theLast cfgNumAdjusts cfg + + mgr <- ET.newWith =<< ET.newDefaultBackend + _ <- forkIO $ ET.timeLoop mgr + nref <- newIORef 0 + done <- newEmptyMVar + let finalTimeout = 1 -- ms + tenSecs = 10 * 1000 -- ms + timeouts = replicate numAdjusts tenSecs ++ [finalTimeout] + firstTimeout = head timeouts + keys <- replicateM numTimeouts $ registerTimeout mgr firstTimeout + (callback done nref cfg) + forM_ (tail timeouts) $ \t -> + forM_ keys $ \key -> updateTimeout mgr key t + takeMVar done diff --git a/utils/benchmarks/events/bench-thread-delay.sh b/utils/benchmarks/events/bench-thread-delay.sh new file mode 100755 index 0000000000..55b45bbd1f --- /dev/null +++ b/utils/benchmarks/events/bench-thread-delay.sh @@ -0,0 +1,46 @@ +#!/bin/bash + +set -e +set -u + +THREAD_LIST="1000 2000 4000 6000 8000 10000 15000 20000 25000 30000 40000" + +NEW_BIN="./new-thread-delay" +OLD_BIN="./old-thread-delay" + +NEW_DAT="new.dat" +OLD_DAT="old.dat" + +make data_clean + +make clean +make thread-delay +mv thread-delay $NEW_BIN + +make clean +make thread-delay USE_GHC_IO_MANAGER=1 +mv thread-delay $OLD_BIN + +# Format: threads,time(s) + +echo -n "Benchmarking old I/O manager..." +for n in $THREAD_LIST +do + echo -en "$n\t" >> $OLD_DAT + (time -p $OLD_BIN -n $n) 2>&1 | awk '$1 == "user" {print $2}' | cat >> $OLD_DAT +done +echo "done" + +echo -n "Benchmarking new I/O manager..." +for n in $THREAD_LIST +do + echo -en "$n\t" >> $NEW_DAT + (time -p $NEW_BIN -n $n) 2>&1 | awk '$1 == "user" {print $2}' | cat >> $NEW_DAT +done +echo "done" + +echo -n "Generating plot file..." +gnuplot -e "set terminal png size 1300,600 font 50; set output 'thread-delay.png'; +plot \"$OLD_DAT\" using 1:2 title 'Thread Delay (Old/with GHC I/O manager)', \"$NEW_DAT\" using 1:2 +title 'Thread Delay (New/without GHC I/O manager)'" +echo -n "finished" diff --git a/utils/benchmarks/events/bench-timers.sh b/utils/benchmarks/events/bench-timers.sh new file mode 100755 index 0000000000..4b4b8852c5 --- /dev/null +++ b/utils/benchmarks/events/bench-timers.sh @@ -0,0 +1,45 @@ +#!/bin/bash + +set -e +set -u + +THREAD_LIST="1000 2000 4000 6000 8000 10000 15000 20000 25000 30000 40000" + +EVENTS_BIN="./events" +THREADS_BIN="./threads" + +EVENTS_DAT="events.dat" +THREADS_DAT="threads.dat" + +make data_clean +make clean + +make timers +mv timers $EVENTS_BIN + +make thread-delay +mv thread-delay $THREADS_BIN + +# Format: threads,time(s) + +echo -n "Benchmarking events..." +for n in $THREAD_LIST +do + echo -en "$n\t" >> $EVENTS_DAT + (time -p $EVENTS_BIN -n $n) 2>&1 | awk '$1 == "user" {print $2}' | cat >> $EVENTS_DAT +done +echo "done" + +echo -n "Benchmarking threads..." +for n in $THREAD_LIST +do + echo -en "$n\t" >> $THREADS_DAT + (time -p $THREADS_BIN -n $n) 2>&1 | awk '$1 == "user" {print $2}' | cat >> $THREADS_DAT +done +echo "done" + +echo -n "Generating plot file..." +gnuplot -e "set terminal png size 1300,600 font 50; set output 'timers.png'; +plot \"$EVENTS_DAT\" using 1:2 title 'Events', \"$THREADS_DAT\" using 1:2 +title 'Threads'" +echo -n "finished" diff --git a/utils/benchmarks/events/benchmark_shell.py b/utils/benchmarks/events/benchmark_shell.py new file mode 100755 index 0000000000..46c812c8a0 --- /dev/null +++ b/utils/benchmarks/events/benchmark_shell.py @@ -0,0 +1,93 @@ +#!/usr/bin/python +""" +This program is intended to be run via sudo, and does the following +things: + +1. Sets up a high-performance networking environment by tweaking + various sysctl settings. +2. Runs either an interactive shell or a command line program. +3. Resets the environment back to what it was. +""" + +import os +import sys +import tempfile + +SYSCTLS = dict( + Darwin={ + 'kern.ipc.somaxconn': 1024, + 'kern.maxfiles': 22528, + 'kern.maxfilesperproc': 20480, + 'net.inet.ip.portrange.first': 1024, + 'net.inet.ip.portrange.hifirst': 1024, + }, + Linux={ + 'net.core.somaxconn': 1024, + 'net.core.rmem_max': 16777216, + 'net.core.wmem_max': 16777216, + 'net.ipv4.ip_local_port_range': '1024 65535', + 'net.ipv4.tcp_fin_timeout': 15, + 'net.ipv4.tcp_max_syn_backlog': 16384, + 'net.ipv4.tcp_rmem': '4096 87380 16777216', + 'net.ipv4.tcp_tw_recycle': 1, + 'net.ipv4.tcp_tw_reuse': 1, + 'net.ipv4.tcp_wmem': '4096 65536 16777216', + }, + ) + +ULIMITS = dict( + Darwin={ + '-n': 20480, + }, + Linux={ + '-n': 131072, + }, + ) + +if os.access('/sbin/sysctl', os.X_OK): + SYSCTL = '/sbin/sysctl' +elif os.access('/usr/sbin/sysctl', os.X_OK): + SYSCTL = '/usr/sbin/sysctl' +else: + print >> sys.stderr, 'where is sysctl!?' + sys.exit(1) + +CHANGED_SYSCTLS = {} + +def change_sysctl(name, newval): + """ Set kernel parameters using sysctl for the appropriate platform """ + oldval = os.popen('%s -n %s' % (SYSCTL, name), 'r').read().strip().replace('\t', ' ') + if not oldval: + print('could not get value of ' + name, file=sys.stderr) + return + if oldval == str(newval): + return + ret = os.system('%s -w %s=%r 2>/dev/null' % (SYSCTL, name, newval)) + if ret != 0: + print('could not change %s from %s to %r' % (name, oldval, newval), + file=sys.stderr) + return + CHANGED_SYSCTLS[name] = oldval + + +PLATFORM = os.uname()[0] + +for (n, v) in SYSCTLS[PLATFORM].items(): + change_sysctl(n, v) + +FD, PATH = tempfile.mkstemp('.sh') +FP = os.fdopen(FD, 'w') + +for (n, v) in ULIMITS[PLATFORM].items(): + print('ulimit %s %s' % (n, v), file=FP) + if len(sys.argv) > 1: + print('exec ' + ' '.join(sys.argv[1:]), file=FP) + else: + print('exec %s -l' % os.environ.get('SHELL', '/bin/bash'), file=FP) + +FP.close() +os.system('exec /bin/sh ' + PATH) +os.unlink(PATH) + +for (n, v) in CHANGED_SYSCTLS.items(): + change_sysctl(n, v) diff --git a/utils/benchmarks/events/benchmarks.cabal b/utils/benchmarks/events/benchmarks.cabal new file mode 100644 index 0000000000..6d4adea702 --- /dev/null +++ b/utils/benchmarks/events/benchmarks.cabal @@ -0,0 +1,99 @@ +cabal-version: 3.0 +name: benchmarks +version: 0.1.0.0 +description: These programs are from the Event package (before it was + made an internal GHC module) and have been modified to be + use as simple executables to benchmark GHC's I/O manager. +license: BSD-3-Clause +license-file: LICENSE +synopsis: Utilities to benchmark GHC's I/O manager +author: Bryan O'Sullivan <bos@serpentine.com> + Johan Tibell <johan.tibell@gmail.com> +maintainer: TBD +category: Testing +build-type: Simple + +common shared-deps + ghc-options: -threaded -Wall -fno-warn-orphans + -fno-warn-missing-signatures + -fwarn-tabs + -funbox-strict-fields -fno-ignore-asserts + build-depends: base == 4.16.0.0, + network == 3.1.2.1, + unix, + mtl, + attoparsec + default-language: Haskell2010 + +executable dead-conn + import: shared-deps + main-is: DeadConn.hs + other-modules: Args + EventSocket + EventUtil + other-extensions: CPP, + OverloadedStrings + build-depends: bytestring + +executable pong-server + import: shared-deps + main-is: PongServer.hs + other-modules: Args, + EventSocket, + EventUtil + other-extensions: CPP, + OverloadedStrings + build-depends: bytestring + +executable static-http + import: shared-deps + main-is: StaticHttp.hs + other-modules: EventFile, + EventSocket, + EventUtil, + NoPush, + RFC2616 + other-extensions: CPP, + OverloadedStrings, + build-depends: bytestring + + +executable http-client + import: shared-deps + main-is: HttpClient.hs + other-modules: Args, + EventSocket, + EventUtil, + RFC2616 + other-extensions: BangPatterns, + FlexibleContexts, + OverloadedStrings + build-depends: bytestring, + time == 1.11.1.1, + parsec + +executable signal + import: shared-deps + main-is: Signal.hs + other-extensions: BangPatterns + +executable simple + import: shared-deps + main-is: Simple.hs + other-modules: Args + other-extensions: BangPatterns + +executable thread-delay + import: shared-deps + main-is: ThreadDelay.hs + other-modules: Args + other-extensions: CPP, + BangPatterns + build-depends: bytestring, + stm + +executable timers + import: shared-deps + main-is: Timers.hs + other-modules: Args + other-extensions: BangPatterns diff --git a/utils/benchmarks/events/deadconn.c b/utils/benchmarks/events/deadconn.c new file mode 100644 index 0000000000..364371a1e9 --- /dev/null +++ b/utils/benchmarks/events/deadconn.c @@ -0,0 +1,89 @@ +/* + * deadconn - a tool for dead http connections creation + * + * 07-01-2001 by Davide Libenzi <davidel@xmailserver.org> + * + */ + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> +#include <sys/time.h> +#include <time.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/file.h> +#include <sys/socket.h> +#include <netdb.h> +#include <signal.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <sys/wait.h> +#include <fcntl.h> +#include <errno.h> + +#define MAX_CONNECT_ERRORS 4 + +int tconnect(struct in_addr const *paddr, int port) { + int sfd; + struct sockaddr_in sin; + if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { + perror("socket"); + return -1; + } + memset(&sin, 0, sizeof(sin)); + memcpy(&sin.sin_addr, &paddr->s_addr, 4); + sin.sin_port = htons((short int) port); + sin.sin_family = AF_INET; + if (connect(sfd, (struct sockaddr *) &sin, sizeof(sin)) == 0) { + return sfd; + } + perror("connect"); + close(sfd); + return -1; +} + +int main(int argc, char *argv[]) { + int ii, sfd, errors; + char *server; + int port; + int nconns, ccreat = 0; + struct hostent * he; + struct in_addr inadr; + struct sockaddr_in sin; + if (argc < 4) { + printf("use: %s server port numconns\n", argv[0]); + return 1; + } + server = argv[1]; + port = atoi(argv[2]); + nconns = atoi(argv[3]); + if (inet_aton(server, &inadr) == 0) { + if ((he = gethostbyname(server)) == NULL) { + fprintf(stderr, "unable to resolve: %s\n", server); + return -1; + } + memcpy(&inadr.s_addr, he->h_addr_list[0], he->h_length); + } + for (ii = 0; ii < nconns; ii++) { + errors = 0; + retry: + if ((sfd = tconnect(&inadr, port)) != -1) { + char const *req = "GET / HTTP/1.0\r\n"; + write(sfd, req, strlen(req)); + ++ccreat; + errors = 0; + printf("%d\n", ccreat); + } else { + sleep(1); + if (++errors < MAX_CONNECT_ERRORS) + goto retry; + break; + } + } + printf("%d connections created ...\n", ccreat); + while (1) + sleep(10); + return 0; +} |