summaryrefslogtreecommitdiff
path: root/vendor/Twisted-10.0.0/twisted/test/test_twistd.py
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/Twisted-10.0.0/twisted/test/test_twistd.py')
-rw-r--r--vendor/Twisted-10.0.0/twisted/test/test_twistd.py1378
1 files changed, 1378 insertions, 0 deletions
diff --git a/vendor/Twisted-10.0.0/twisted/test/test_twistd.py b/vendor/Twisted-10.0.0/twisted/test/test_twistd.py
new file mode 100644
index 0000000000..84b0cc4901
--- /dev/null
+++ b/vendor/Twisted-10.0.0/twisted/test/test_twistd.py
@@ -0,0 +1,1378 @@
+# Copyright (c) 2007-2009 Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Tests for L{twisted.application.app} and L{twisted.scripts.twistd}.
+"""
+
+import signal, inspect, errno
+
+import os, sys, cPickle, StringIO
+try:
+ import pwd, grp
+except ImportError:
+ pwd = grp = None
+
+from zope.interface import implements
+
+from twisted.trial import unittest
+
+from twisted.application import service, app
+from twisted.scripts import twistd
+from twisted.python import log
+from twisted.python.usage import UsageError
+from twisted.python.log import ILogObserver
+from twisted.python.versions import Version
+from twisted.python.components import Componentized
+from twisted.internet.defer import Deferred
+from twisted.python.fakepwd import UserDatabase
+
+try:
+ from twisted.python import syslog
+except ImportError:
+ syslog = None
+
+try:
+ from twisted.scripts import _twistd_unix
+except ImportError:
+ _twistd_unix = None
+else:
+ from twisted.scripts._twistd_unix import UnixApplicationRunner
+ from twisted.scripts._twistd_unix import UnixAppLogger
+
+try:
+ import profile
+except ImportError:
+ profile = None
+
+try:
+ import hotshot
+ import hotshot.stats
+except (ImportError, SystemExit):
+ # For some reasons, hotshot.stats seems to raise SystemExit on some
+ # distributions, probably when considered non-free. See the import of
+ # this module in twisted.application.app for more details.
+ hotshot = None
+
+try:
+ import pstats
+ import cProfile
+except ImportError:
+ cProfile = None
+
+
+
+def patchUserDatabase(patch, user, uid, group, gid):
+ """
+ Patch L{pwd.getpwnam} so that it behaves as though only one user exists
+ and patch L{grp.getgrnam} so that it behaves as though only one group
+ exists.
+
+ @param patch: A function like L{TestCase.patch} which will be used to
+ install the fake implementations.
+
+ @type user: C{str}
+ @param user: The name of the single user which will exist.
+
+ @type uid: C{int}
+ @param uid: The UID of the single user which will exist.
+
+ @type group: C{str}
+ @param group: The name of the single user which will exist.
+
+ @type gid: C{int}
+ @param gid: The GID of the single group which will exist.
+ """
+ # Try not to be an unverified fake, but try not to depend on quirks of
+ # the system either (eg, run as a process with a uid and gid which
+ # equal each other, and so doesn't reliably test that uid is used where
+ # uid should be used and gid is used where gid should be used). -exarkun
+ pwent = pwd.getpwuid(os.getuid())
+ grent = grp.getgrgid(os.getgid())
+
+ database = UserDatabase()
+ database.addUser(
+ user, pwent.pw_passwd, uid, pwent.pw_gid,
+ pwent.pw_gecos, pwent.pw_dir, pwent.pw_shell)
+
+ def getgrnam(name):
+ result = list(grent)
+ result[result.index(grent.gr_name)] = group
+ result[result.index(grent.gr_gid)] = gid
+ result = tuple(result)
+ return {group: result}[name]
+
+ patch(pwd, "getpwnam", database.getpwnam)
+ patch(grp, "getgrnam", getgrnam)
+
+
+
+class MockServiceMaker(object):
+ """
+ A non-implementation of L{twisted.application.service.IServiceMaker}.
+ """
+ tapname = 'ueoa'
+
+ def makeService(self, options):
+ """
+ Take a L{usage.Options} instance and return a
+ L{service.IService} provider.
+ """
+ self.options = options
+ self.service = service.Service()
+ return self.service
+
+
+
+class CrippledAppLogger(app.AppLogger):
+ """
+ @see: CrippledApplicationRunner.
+ """
+
+ def start(self, application):
+ pass
+
+
+
+class CrippledApplicationRunner(twistd._SomeApplicationRunner):
+ """
+ An application runner that cripples the platform-specific runner and
+ nasty side-effect-having code so that we can use it without actually
+ running any environment-affecting code.
+ """
+ loggerFactory = CrippledAppLogger
+
+ def preApplication(self):
+ pass
+
+
+ def postApplication(self):
+ pass
+
+
+
+class ServerOptionsTest(unittest.TestCase):
+ """
+ Non-platform-specific tests for the pltaform-specific ServerOptions class.
+ """
+
+ def test_postOptionsSubCommandCausesNoSave(self):
+ """
+ postOptions should set no_save to True when a subcommand is used.
+ """
+ config = twistd.ServerOptions()
+ config.subCommand = 'ueoa'
+ config.postOptions()
+ self.assertEquals(config['no_save'], True)
+
+
+ def test_postOptionsNoSubCommandSavesAsUsual(self):
+ """
+ If no sub command is used, postOptions should not touch no_save.
+ """
+ config = twistd.ServerOptions()
+ config.postOptions()
+ self.assertEquals(config['no_save'], False)
+
+
+ def test_reportProfileDeprecation(self):
+ """
+ Check that the --report-profile option prints a C{DeprecationWarning}.
+ """
+ config = twistd.ServerOptions()
+ self.assertWarns(
+ DeprecationWarning, "--report-profile option is deprecated and "
+ "a no-op since Twisted 8.0.", app.__file__,
+ config.parseOptions, ["--report-profile", "foo"])
+
+
+ def test_listAllProfilers(self):
+ """
+ All the profilers that can be used in L{app.AppProfiler} are listed in
+ the help output.
+ """
+ config = twistd.ServerOptions()
+ helpOutput = str(config)
+ for profiler in app.AppProfiler.profilers:
+ self.assertIn(profiler, helpOutput)
+
+
+ def test_defaultUmask(self):
+ """
+ The default value for the C{umask} option is C{None}.
+ """
+ config = twistd.ServerOptions()
+ self.assertEqual(config['umask'], None)
+
+
+ def test_umask(self):
+ """
+ The value given for the C{umask} option is parsed as an octal integer
+ literal.
+ """
+ config = twistd.ServerOptions()
+ config.parseOptions(['--umask', '123'])
+ self.assertEqual(config['umask'], 83)
+ config.parseOptions(['--umask', '0123'])
+ self.assertEqual(config['umask'], 83)
+
+
+ def test_invalidUmask(self):
+ """
+ If a value is given for the C{umask} option which cannot be parsed as
+ an integer, L{UsageError} is raised by L{ServerOptions.parseOptions}.
+ """
+ config = twistd.ServerOptions()
+ self.assertRaises(UsageError, config.parseOptions, ['--umask', 'abcdef'])
+
+ if _twistd_unix is None:
+ msg = "twistd unix not available"
+ test_defaultUmask.skip = test_umask.skip = test_invalidUmask.skip = msg
+
+
+
+class TapFileTest(unittest.TestCase):
+ """
+ Test twistd-related functionality that requires a tap file on disk.
+ """
+
+ def setUp(self):
+ """
+ Create a trivial Application and put it in a tap file on disk.
+ """
+ self.tapfile = self.mktemp()
+ f = file(self.tapfile, 'wb')
+ cPickle.dump(service.Application("Hi!"), f)
+ f.close()
+
+
+ def test_createOrGetApplicationWithTapFile(self):
+ """
+ Ensure that the createOrGetApplication call that 'twistd -f foo.tap'
+ makes will load the Application out of foo.tap.
+ """
+ config = twistd.ServerOptions()
+ config.parseOptions(['-f', self.tapfile])
+ application = CrippledApplicationRunner(config).createOrGetApplication()
+ self.assertEquals(service.IService(application).name, 'Hi!')
+
+
+
+class TestLoggerFactory(object):
+ """
+ A logger factory for L{TestApplicationRunner}.
+ """
+
+ def __init__(self, runner):
+ self.runner = runner
+
+
+ def start(self, application):
+ """
+ Save the logging start on the C{runner} instance.
+ """
+ self.runner.order.append("log")
+ self.runner.hadApplicationLogObserver = hasattr(self.runner,
+ 'application')
+
+
+ def stop(self):
+ """
+ Don't log anything.
+ """
+
+
+
+class TestApplicationRunner(app.ApplicationRunner):
+ """
+ An ApplicationRunner which tracks the environment in which its methods are
+ called.
+ """
+
+ def __init__(self, options):
+ app.ApplicationRunner.__init__(self, options)
+ self.order = []
+ self.logger = TestLoggerFactory(self)
+
+
+ def preApplication(self):
+ self.order.append("pre")
+ self.hadApplicationPreApplication = hasattr(self, 'application')
+
+
+ def postApplication(self):
+ self.order.append("post")
+ self.hadApplicationPostApplication = hasattr(self, 'application')
+
+
+
+class ApplicationRunnerTest(unittest.TestCase):
+ """
+ Non-platform-specific tests for the platform-specific ApplicationRunner.
+ """
+ def setUp(self):
+ config = twistd.ServerOptions()
+ self.serviceMaker = MockServiceMaker()
+ # Set up a config object like it's been parsed with a subcommand
+ config.loadedPlugins = {'test_command': self.serviceMaker}
+ config.subOptions = object()
+ config.subCommand = 'test_command'
+ self.config = config
+
+
+ def test_applicationRunnerGetsCorrectApplication(self):
+ """
+ Ensure that a twistd plugin gets used in appropriate ways: it
+ is passed its Options instance, and the service it returns is
+ added to the application.
+ """
+ arunner = CrippledApplicationRunner(self.config)
+ arunner.run()
+
+ self.assertIdentical(
+ self.serviceMaker.options, self.config.subOptions,
+ "ServiceMaker.makeService needs to be passed the correct "
+ "sub Command object.")
+ self.assertIdentical(
+ self.serviceMaker.service,
+ service.IService(arunner.application).services[0],
+ "ServiceMaker.makeService's result needs to be set as a child "
+ "of the Application.")
+
+
+ def test_preAndPostApplication(self):
+ """
+ Test thet preApplication and postApplication methods are
+ called by ApplicationRunner.run() when appropriate.
+ """
+ s = TestApplicationRunner(self.config)
+ s.run()
+ self.assertFalse(s.hadApplicationPreApplication)
+ self.assertTrue(s.hadApplicationPostApplication)
+ self.assertTrue(s.hadApplicationLogObserver)
+ self.assertEquals(s.order, ["pre", "log", "post"])
+
+
+ def _applicationStartsWithConfiguredID(self, argv, uid, gid):
+ """
+ Assert that given a particular command line, an application is started
+ as a particular UID/GID.
+
+ @param argv: A list of strings giving the options to parse.
+ @param uid: An integer giving the expected UID.
+ @param gid: An integer giving the expected GID.
+ """
+ self.config.parseOptions(argv)
+
+ events = []
+ class FakeUnixApplicationRunner(twistd._SomeApplicationRunner):
+ def setupEnvironment(self, chroot, rundir, nodaemon, umask,
+ pidfile):
+ events.append('environment')
+
+ def shedPrivileges(self, euid, uid, gid):
+ events.append(('privileges', euid, uid, gid))
+
+ def startReactor(self, reactor, oldstdout, oldstderr):
+ events.append('reactor')
+
+ def removePID(self, pidfile):
+ pass
+
+
+ class FakeService(object):
+ implements(service.IService, service.IProcess)
+
+ processName = None
+
+ def privilegedStartService(self):
+ events.append('privilegedStartService')
+
+ def startService(self):
+ events.append('startService')
+
+ def stopService(self):
+ pass
+
+ runner = FakeUnixApplicationRunner(self.config)
+ runner.preApplication()
+ runner.application = FakeService()
+ runner.postApplication()
+
+ self.assertEqual(
+ events,
+ ['environment', 'privilegedStartService',
+ ('privileges', False, uid, gid), 'startService', 'reactor'])
+
+
+ def test_applicationStartsWithConfiguredNumericIDs(self):
+ """
+ L{postApplication} should change the UID and GID to the values
+ specified as numeric strings by the configuration after running
+ L{service.IService.privilegedStartService} and before running
+ L{service.IService.startService}.
+ """
+ uid = 1234
+ gid = 4321
+ self._applicationStartsWithConfiguredID(
+ ["--uid", str(uid), "--gid", str(gid)], uid, gid)
+
+
+ def test_applicationStartsWithConfiguredNameIDs(self):
+ """
+ L{postApplication} should change the UID and GID to the values
+ specified as user and group names by the configuration after running
+ L{service.IService.privilegedStartService} and before running
+ L{service.IService.startService}.
+ """
+ user = "foo"
+ uid = 1234
+ group = "bar"
+ gid = 4321
+ patchUserDatabase(self.patch, user, uid, group, gid)
+ self._applicationStartsWithConfiguredID(
+ ["--uid", user, "--gid", group], uid, gid)
+
+ if getattr(os, 'setuid', None) is None:
+ msg = "Platform does not support --uid/--gid twistd options."
+ test_applicationStartsWithConfiguredNameIDs.skip = msg
+ test_applicationStartsWithConfiguredNumericIDs.skip = msg
+ del msg
+
+
+ def test_startReactorRunsTheReactor(self):
+ """
+ L{startReactor} calls L{reactor.run}.
+ """
+ reactor = DummyReactor()
+ runner = app.ApplicationRunner({
+ "profile": False,
+ "profiler": "profile",
+ "debug": False})
+ runner.startReactor(reactor, None, None)
+ self.assertTrue(
+ reactor.called, "startReactor did not call reactor.run()")
+
+
+ def test_legacyApplicationRunnerGetLogObserver(self):
+ """
+ L{app.ApplicationRunner} subclasses can have a getLogObserver that used
+ to return a log observer. This test is there to ensure that it's
+ supported but it raises a warning when used.
+ """
+ observer = []
+ self.addCleanup(log.removeObserver, observer.append)
+ class GetLogObserverRunner(app.ApplicationRunner):
+ def getLogObserver(self):
+ return observer.append
+
+ def startLogging(self, observer):
+ """
+ Override C{startLogging} to call L{log.addObserver} instead of
+ L{log.startLoggingWithObserver}.
+ """
+ log.addObserver(observer)
+ self.logger._initialLog()
+
+ def preApplication(self):
+ pass
+
+ def postApplication(self):
+ pass
+
+ def createOrGetApplication(self):
+ pass
+
+ conf = twistd.ServerOptions()
+ runner = GetLogObserverRunner(conf)
+ self.assertWarns(DeprecationWarning,
+ "Specifying a log observer with getLogObserver is "
+ "deprecated. Please use a loggerFactory instead.",
+ app.__file__, runner.run)
+ self.assertEquals(len(observer), 3)
+
+
+
+class UnixApplicationRunnerSetupEnvironmentTests(unittest.TestCase):
+ """
+ Tests for L{UnixApplicationRunner.setupEnvironment}.
+
+ @ivar root: The root of the filesystem, or C{unset} if none has been
+ specified with a call to L{os.chroot} (patched for this TestCase with
+ L{UnixApplicationRunnerSetupEnvironmentTests.chroot ).
+
+ @ivar cwd: The current working directory of the process, or C{unset} if
+ none has been specified with a call to L{os.chdir} (patched for this
+ TestCase with L{UnixApplicationRunnerSetupEnvironmentTests.chdir).
+
+ @ivar mask: The current file creation mask of the process, or C{unset} if
+ none has been specified with a call to L{os.umask} (patched for this
+ TestCase with L{UnixApplicationRunnerSetupEnvironmentTests.umask).
+
+ @ivar daemon: A boolean indicating whether daemonization has been performed
+ by a call to L{_twistd_unix.daemonize} (patched for this TestCase with
+ L{UnixApplicationRunnerSetupEnvironmentTests.
+ """
+ if _twistd_unix is None:
+ skip = "twistd unix not available"
+
+ unset = object()
+
+ def setUp(self):
+ self.root = self.unset
+ self.cwd = self.unset
+ self.mask = self.unset
+ self.daemon = False
+ self.pid = os.getpid()
+ self.patch(os, 'chroot', lambda path: setattr(self, 'root', path))
+ self.patch(os, 'chdir', lambda path: setattr(self, 'cwd', path))
+ self.patch(os, 'umask', lambda mask: setattr(self, 'mask', mask))
+ self.patch(_twistd_unix, "daemonize", self.daemonize)
+ self.runner = UnixApplicationRunner({})
+
+
+ def daemonize(self):
+ """
+ Indicate that daemonization has happened and change the PID so that the
+ value written to the pidfile can be tested in the daemonization case.
+ """
+ self.daemon = True
+ self.patch(os, 'getpid', lambda: self.pid + 1)
+
+
+ def test_chroot(self):
+ """
+ L{UnixApplicationRunner.setupEnvironment} changes the root of the
+ filesystem if passed a non-C{None} value for the C{chroot} parameter.
+ """
+ self.runner.setupEnvironment("/foo/bar", ".", True, None, None)
+ self.assertEqual(self.root, "/foo/bar")
+
+
+ def test_noChroot(self):
+ """
+ L{UnixApplicationRunner.setupEnvironment} does not change the root of
+ the filesystem if passed C{None} for the C{chroot} parameter.
+ """
+ self.runner.setupEnvironment(None, ".", True, None, None)
+ self.assertIdentical(self.root, self.unset)
+
+
+ def test_changeWorkingDirectory(self):
+ """
+ L{UnixApplicationRunner.setupEnvironment} changes the working directory
+ of the process to the path given for the C{rundir} parameter.
+ """
+ self.runner.setupEnvironment(None, "/foo/bar", True, None, None)
+ self.assertEqual(self.cwd, "/foo/bar")
+
+
+ def test_daemonize(self):
+ """
+ L{UnixApplicationRunner.setupEnvironment} daemonizes the process if
+ C{False} is passed for the C{nodaemon} parameter.
+ """
+ self.runner.setupEnvironment(None, ".", False, None, None)
+ self.assertTrue(self.daemon)
+
+
+ def test_noDaemonize(self):
+ """
+ L{UnixApplicationRunner.setupEnvironment} does not daemonize the
+ process if C{True} is passed for the C{nodaemon} parameter.
+ """
+ self.runner.setupEnvironment(None, ".", True, None, None)
+ self.assertFalse(self.daemon)
+
+
+ def test_nonDaemonPIDFile(self):
+ """
+ L{UnixApplicationRunner.setupEnvironment} writes the process's PID to
+ the file specified by the C{pidfile} parameter.
+ """
+ pidfile = self.mktemp()
+ self.runner.setupEnvironment(None, ".", True, None, pidfile)
+ fObj = file(pidfile)
+ pid = int(fObj.read())
+ fObj.close()
+ self.assertEqual(pid, self.pid)
+
+
+ def test_daemonPIDFile(self):
+ """
+ L{UnixApplicationRunner.setupEnvironment} writes the daemonized
+ process's PID to the file specified by the C{pidfile} parameter if
+ C{nodaemon} is C{False}.
+ """
+ pidfile = self.mktemp()
+ self.runner.setupEnvironment(None, ".", False, None, pidfile)
+ fObj = file(pidfile)
+ pid = int(fObj.read())
+ fObj.close()
+ self.assertEqual(pid, self.pid + 1)
+
+
+ def test_umask(self):
+ """
+ L{UnixApplicationRunner.setupEnvironment} changes the process umask to
+ the value specified by the C{umask} parameter.
+ """
+ self.runner.setupEnvironment(None, ".", False, 123, None)
+ self.assertEqual(self.mask, 123)
+
+
+ def test_noDaemonizeNoUmask(self):
+ """
+ L{UnixApplicationRunner.setupEnvironment} doesn't change the process
+ umask if C{None} is passed for the C{umask} parameter and C{True} is
+ passed for the C{nodaemon} parameter.
+ """
+ self.runner.setupEnvironment(None, ".", True, None, None)
+ self.assertIdentical(self.mask, self.unset)
+
+
+ def test_daemonizedNoUmask(self):
+ """
+ L{UnixApplicationRunner.setupEnvironment} changes the process umask to
+ C{0077} if C{None} is passed for the C{umask} parameter and C{False} is
+ passed for the C{nodaemon} parameter.
+ """
+ self.runner.setupEnvironment(None, ".", False, None, None)
+ self.assertEqual(self.mask, 0077)
+
+
+
+class UnixApplicationRunnerStartApplicationTests(unittest.TestCase):
+ """
+ Tests for L{UnixApplicationRunner.startApplication}.
+ """
+ if _twistd_unix is None:
+ skip = "twistd unix not available"
+
+ def test_setupEnvironment(self):
+ """
+ L{UnixApplicationRunner.startApplication} calls
+ L{UnixApplicationRunner.setupEnvironment} with the chroot, rundir,
+ nodaemon, umask, and pidfile parameters from the configuration it is
+ constructed with.
+ """
+ options = twistd.ServerOptions()
+ options.parseOptions([
+ '--nodaemon',
+ '--umask', '0070',
+ '--chroot', '/foo/chroot',
+ '--rundir', '/foo/rundir',
+ '--pidfile', '/foo/pidfile'])
+ application = service.Application("test_setupEnvironment")
+ self.runner = UnixApplicationRunner(options)
+
+ args = []
+ def fakeSetupEnvironment(self, chroot, rundir, nodaemon, umask, pidfile):
+ args.extend((chroot, rundir, nodaemon, umask, pidfile))
+
+ # Sanity check
+ self.assertEqual(
+ inspect.getargspec(self.runner.setupEnvironment),
+ inspect.getargspec(fakeSetupEnvironment))
+
+ self.patch(UnixApplicationRunner, 'setupEnvironment', fakeSetupEnvironment)
+ self.patch(UnixApplicationRunner, 'shedPrivileges', lambda *a, **kw: None)
+ self.patch(app, 'startApplication', lambda *a, **kw: None)
+ self.runner.startApplication(application)
+
+ self.assertEqual(
+ args,
+ ['/foo/chroot', '/foo/rundir', True, 56, '/foo/pidfile'])
+
+
+
+class UnixApplicationRunnerRemovePID(unittest.TestCase):
+ """
+ Tests for L{UnixApplicationRunner.removePID}.
+ """
+ if _twistd_unix is None:
+ skip = "twistd unix not available"
+
+
+ def test_removePID(self):
+ """
+ L{UnixApplicationRunner.removePID} deletes the file the name of
+ which is passed to it.
+ """
+ runner = UnixApplicationRunner({})
+ path = self.mktemp()
+ os.makedirs(path)
+ pidfile = os.path.join(path, "foo.pid")
+ file(pidfile, "w").close()
+ runner.removePID(pidfile)
+ self.assertFalse(os.path.exists(pidfile))
+
+
+ def test_removePIDErrors(self):
+ """
+ Calling L{UnixApplicationRunner.removePID} with a non-existent filename logs
+ an OSError.
+ """
+ runner = UnixApplicationRunner({})
+ runner.removePID("fakepid")
+ errors = self.flushLoggedErrors(OSError)
+ self.assertEquals(len(errors), 1)
+ self.assertEquals(errors[0].value.errno, errno.ENOENT)
+
+
+
+class DummyReactor(object):
+ """
+ A dummy reactor, only providing a C{run} method and checking that it
+ has been called.
+
+ @ivar called: if C{run} has been called or not.
+ @type called: C{bool}
+ """
+ called = False
+
+ def run(self):
+ """
+ A fake run method, checking that it's been called one and only time.
+ """
+ if self.called:
+ raise RuntimeError("Already called")
+ self.called = True
+
+
+
+class AppProfilingTestCase(unittest.TestCase):
+ """
+ Tests for L{app.AppProfiler}.
+ """
+
+ def test_profile(self):
+ """
+ L{app.ProfileRunner.run} should call the C{run} method of the reactor
+ and save profile data in the specified file.
+ """
+ config = twistd.ServerOptions()
+ config["profile"] = self.mktemp()
+ config["profiler"] = "profile"
+ profiler = app.AppProfiler(config)
+ reactor = DummyReactor()
+
+ profiler.run(reactor)
+
+ self.assertTrue(reactor.called)
+ data = file(config["profile"]).read()
+ self.assertIn("DummyReactor.run", data)
+ self.assertIn("function calls", data)
+
+ if profile is None:
+ test_profile.skip = "profile module not available"
+
+
+ def _testStats(self, statsClass, profile):
+ out = StringIO.StringIO()
+
+ # Patch before creating the pstats, because pstats binds self.stream to
+ # sys.stdout early in 2.5 and newer.
+ stdout = self.patch(sys, 'stdout', out)
+
+ # If pstats.Stats can load the data and then reformat it, then the
+ # right thing probably happened.
+ stats = statsClass(profile)
+ stats.print_stats()
+ stdout.restore()
+
+ data = out.getvalue()
+ self.assertIn("function calls", data)
+ self.assertIn("(run)", data)
+
+
+ def test_profileSaveStats(self):
+ """
+ With the C{savestats} option specified, L{app.ProfileRunner.run}
+ should save the raw stats object instead of a summary output.
+ """
+ config = twistd.ServerOptions()
+ config["profile"] = self.mktemp()
+ config["profiler"] = "profile"
+ config["savestats"] = True
+ profiler = app.AppProfiler(config)
+ reactor = DummyReactor()
+
+ profiler.run(reactor)
+
+ self.assertTrue(reactor.called)
+ self._testStats(pstats.Stats, config['profile'])
+
+ if profile is None:
+ test_profileSaveStats.skip = "profile module not available"
+
+
+ def test_withoutProfile(self):
+ """
+ When the C{profile} module is not present, L{app.ProfilerRunner.run}
+ should raise a C{SystemExit} exception.
+ """
+ savedModules = sys.modules.copy()
+
+ config = twistd.ServerOptions()
+ config["profiler"] = "profile"
+ profiler = app.AppProfiler(config)
+
+ sys.modules["profile"] = None
+ try:
+ self.assertRaises(SystemExit, profiler.run, None)
+ finally:
+ sys.modules.clear()
+ sys.modules.update(savedModules)
+
+
+ def test_profilePrintStatsError(self):
+ """
+ When an error happens during the print of the stats, C{sys.stdout}
+ should be restored to its initial value.
+ """
+ class ErroneousProfile(profile.Profile):
+ def print_stats(self):
+ raise RuntimeError("Boom")
+ self.patch(profile, "Profile", ErroneousProfile)
+
+ config = twistd.ServerOptions()
+ config["profile"] = self.mktemp()
+ config["profiler"] = "profile"
+ profiler = app.AppProfiler(config)
+ reactor = DummyReactor()
+
+ oldStdout = sys.stdout
+ self.assertRaises(RuntimeError, profiler.run, reactor)
+ self.assertIdentical(sys.stdout, oldStdout)
+
+ if profile is None:
+ test_profilePrintStatsError.skip = "profile module not available"
+
+
+ def test_hotshot(self):
+ """
+ L{app.HotshotRunner.run} should call the C{run} method of the reactor
+ and save profile data in the specified file.
+ """
+ config = twistd.ServerOptions()
+ config["profile"] = self.mktemp()
+ config["profiler"] = "hotshot"
+ profiler = app.AppProfiler(config)
+ reactor = DummyReactor()
+
+ profiler.run(reactor)
+
+ self.assertTrue(reactor.called)
+ data = file(config["profile"]).read()
+ self.assertIn("run", data)
+ self.assertIn("function calls", data)
+
+ if hotshot is None:
+ test_hotshot.skip = "hotshot module not available"
+
+
+ def test_hotshotSaveStats(self):
+ """
+ With the C{savestats} option specified, L{app.HotshotRunner.run} should
+ save the raw stats object instead of a summary output.
+ """
+ config = twistd.ServerOptions()
+ config["profile"] = self.mktemp()
+ config["profiler"] = "hotshot"
+ config["savestats"] = True
+ profiler = app.AppProfiler(config)
+ reactor = DummyReactor()
+
+ profiler.run(reactor)
+
+ self.assertTrue(reactor.called)
+ self._testStats(hotshot.stats.load, config['profile'])
+
+ if hotshot is None:
+ test_hotshotSaveStats.skip = "hotshot module not available"
+
+
+ def test_withoutHotshot(self):
+ """
+ When the C{hotshot} module is not present, L{app.HotshotRunner.run}
+ should raise a C{SystemExit} exception and log the C{ImportError}.
+ """
+ savedModules = sys.modules.copy()
+ sys.modules["hotshot"] = None
+
+ config = twistd.ServerOptions()
+ config["profiler"] = "hotshot"
+ profiler = app.AppProfiler(config)
+ try:
+ self.assertRaises(SystemExit, profiler.run, None)
+ finally:
+ sys.modules.clear()
+ sys.modules.update(savedModules)
+
+
+ def test_nothotshotDeprecation(self):
+ """
+ Check that switching on the C{nothotshot} option produces a warning and
+ sets the profiler to B{profile}.
+ """
+ config = twistd.ServerOptions()
+ config['nothotshot'] = True
+ profiler = self.assertWarns(DeprecationWarning,
+ "The --nothotshot option is deprecated. Please specify the "
+ "profiler name using the --profiler option",
+ app.__file__, app.AppProfiler, config)
+ self.assertEquals(profiler.profiler, "profile")
+
+
+ def test_hotshotPrintStatsError(self):
+ """
+ When an error happens while printing the stats, C{sys.stdout}
+ should be restored to its initial value.
+ """
+ class ErroneousStats(pstats.Stats):
+ def print_stats(self):
+ raise RuntimeError("Boom")
+ self.patch(pstats, "Stats", ErroneousStats)
+
+ config = twistd.ServerOptions()
+ config["profile"] = self.mktemp()
+ config["profiler"] = "hotshot"
+ profiler = app.AppProfiler(config)
+ reactor = DummyReactor()
+
+ oldStdout = sys.stdout
+ self.assertRaises(RuntimeError, profiler.run, reactor)
+ self.assertIdentical(sys.stdout, oldStdout)
+
+ if hotshot is None:
+ test_hotshotPrintStatsError.skip = "hotshot module not available"
+
+
+ def test_cProfile(self):
+ """
+ L{app.CProfileRunner.run} should call the C{run} method of the
+ reactor and save profile data in the specified file.
+ """
+ config = twistd.ServerOptions()
+ config["profile"] = self.mktemp()
+ config["profiler"] = "cProfile"
+ profiler = app.AppProfiler(config)
+ reactor = DummyReactor()
+
+ profiler.run(reactor)
+
+ self.assertTrue(reactor.called)
+ data = file(config["profile"]).read()
+ self.assertIn("run", data)
+ self.assertIn("function calls", data)
+
+ if cProfile is None:
+ test_cProfile.skip = "cProfile module not available"
+
+
+ def test_cProfileSaveStats(self):
+ """
+ With the C{savestats} option specified,
+ L{app.CProfileRunner.run} should save the raw stats object
+ instead of a summary output.
+ """
+ config = twistd.ServerOptions()
+ config["profile"] = self.mktemp()
+ config["profiler"] = "cProfile"
+ config["savestats"] = True
+ profiler = app.AppProfiler(config)
+ reactor = DummyReactor()
+
+ profiler.run(reactor)
+
+ self.assertTrue(reactor.called)
+ self._testStats(pstats.Stats, config['profile'])
+
+ if cProfile is None:
+ test_cProfileSaveStats.skip = "cProfile module not available"
+
+
+ def test_withoutCProfile(self):
+ """
+ When the C{cProfile} module is not present,
+ L{app.CProfileRunner.run} should raise a C{SystemExit}
+ exception and log the C{ImportError}.
+ """
+ savedModules = sys.modules.copy()
+ sys.modules["cProfile"] = None
+
+ config = twistd.ServerOptions()
+ config["profiler"] = "cProfile"
+ profiler = app.AppProfiler(config)
+ try:
+ self.assertRaises(SystemExit, profiler.run, None)
+ finally:
+ sys.modules.clear()
+ sys.modules.update(savedModules)
+
+
+ def test_unknownProfiler(self):
+ """
+ Check that L{app.AppProfiler} raises L{SystemExit} when given an
+ unknown profiler name.
+ """
+ config = twistd.ServerOptions()
+ config["profile"] = self.mktemp()
+ config["profiler"] = "foobar"
+
+ error = self.assertRaises(SystemExit, app.AppProfiler, config)
+ self.assertEquals(str(error), "Unsupported profiler name: foobar")
+
+
+ def test_defaultProfiler(self):
+ """
+ L{app.Profiler} defaults to the hotshot profiler if not specified.
+ """
+ profiler = app.AppProfiler({})
+ self.assertEquals(profiler.profiler, "hotshot")
+
+
+ def test_profilerNameCaseInsentive(self):
+ """
+ The case of the profiler name passed to L{app.AppProfiler} is not
+ relevant.
+ """
+ profiler = app.AppProfiler({"profiler": "HotShot"})
+ self.assertEquals(profiler.profiler, "hotshot")
+
+
+ def test_oldRunWithProfiler(self):
+ """
+ L{app.runWithProfiler} should print a C{DeprecationWarning} pointing
+ at L{AppProfiler}.
+ """
+ class DummyProfiler(object):
+ called = False
+ def run(self, reactor):
+ self.called = True
+ profiler = DummyProfiler()
+ self.patch(app, "AppProfiler", lambda conf: profiler)
+
+ def runWithProfiler():
+ return app.runWithProfiler(DummyReactor(), {})
+
+ self.assertWarns(DeprecationWarning,
+ "runWithProfiler is deprecated since Twisted 8.0. "
+ "Use ProfileRunner instead.", __file__,
+ runWithProfiler)
+ self.assertTrue(profiler.called)
+
+
+ def test_oldRunWithHotshot(self):
+ """
+ L{app.runWithHotshot} should print a C{DeprecationWarning} pointing
+ at L{AppProfiler}.
+ """
+ class DummyProfiler(object):
+ called = False
+ def run(self, reactor):
+ self.called = True
+ profiler = DummyProfiler()
+ self.patch(app, "AppProfiler", lambda conf: profiler)
+
+ def runWithHotshot():
+ return app.runWithHotshot(DummyReactor(), {})
+
+ self.assertWarns(DeprecationWarning,
+ "runWithHotshot is deprecated since Twisted 8.0. "
+ "Use HotshotRunner instead.", __file__,
+ runWithHotshot)
+ self.assertTrue(profiler.called)
+
+
+
+def _patchFileLogObserver(patch):
+ """
+ Patch L{log.FileLogObserver} to record every call and keep a reference to
+ the passed log file for tests.
+
+ @param patch: a callback for patching (usually L{unittest.TestCase.patch}).
+
+ @return: the list that keeps track of the log files.
+ @rtype: C{list}
+ """
+ logFiles = []
+ oldFileLobObserver = log.FileLogObserver
+ def FileLogObserver(logFile):
+ logFiles.append(logFile)
+ return oldFileLobObserver(logFile)
+ patch(log, 'FileLogObserver', FileLogObserver)
+ return logFiles
+
+
+
+class AppLoggerTestCase(unittest.TestCase):
+ """
+ Tests for L{app.AppLogger}.
+
+ @ivar observers: list of observers installed during the tests.
+ @type observers: C{list}
+ """
+
+ def setUp(self):
+ """
+ Override L{log.addObserver} so that we can trace the observers
+ installed in C{self.observers}.
+ """
+ self.observers = []
+ def startLoggingWithObserver(observer):
+ self.observers.append(observer)
+ log.addObserver(observer)
+ self.patch(log, 'startLoggingWithObserver', startLoggingWithObserver)
+
+
+ def tearDown(self):
+ """
+ Remove all installed observers.
+ """
+ for observer in self.observers:
+ log.removeObserver(observer)
+
+
+ def _checkObserver(self, logs):
+ """
+ Ensure that initial C{twistd} logs are written to the given list.
+
+ @type logs: C{list}
+ @param logs: The list whose C{append} method was specified as the
+ initial log observer.
+ """
+ self.assertEquals(self.observers, [logs.append])
+ self.assertIn("starting up", logs[0]["message"][0])
+ self.assertIn("reactor class", logs[1]["message"][0])
+
+
+ def test_start(self):
+ """
+ L{app.AppLogger.start} calls L{log.addObserver}, and then writes some
+ messages about twistd and the reactor.
+ """
+ logger = app.AppLogger({})
+ observer = []
+ logger._getLogObserver = lambda: observer.append
+ logger.start(Componentized())
+ self._checkObserver(observer)
+
+
+ def test_startUsesApplicationLogObserver(self):
+ """
+ When the L{ILogObserver} component is available on the application,
+ that object will be used as the log observer instead of constructing a
+ new one.
+ """
+ application = Componentized()
+ logs = []
+ application.setComponent(ILogObserver, logs.append)
+ logger = app.AppLogger({})
+ logger.start(application)
+ self._checkObserver(logs)
+
+
+ def test_getLogObserverStdout(self):
+ """
+ When logfile is empty or set to C{-}, L{app.AppLogger._getLogObserver}
+ returns a log observer pointing at C{sys.stdout}.
+ """
+ logger = app.AppLogger({"logfile": "-"})
+ logFiles = _patchFileLogObserver(self.patch)
+
+ observer = logger._getLogObserver()
+
+ self.assertEquals(len(logFiles), 1)
+ self.assertIdentical(logFiles[0], sys.stdout)
+
+ logger = app.AppLogger({"logfile": ""})
+ observer = logger._getLogObserver()
+
+ self.assertEquals(len(logFiles), 2)
+ self.assertIdentical(logFiles[1], sys.stdout)
+
+
+ def test_getLogObserverFile(self):
+ """
+ When passing the C{logfile} option, L{app.AppLogger._getLogObserver}
+ returns a log observer pointing at the specified path.
+ """
+ logFiles = _patchFileLogObserver(self.patch)
+ filename = self.mktemp()
+ logger = app.AppLogger({"logfile": filename})
+
+ observer = logger._getLogObserver()
+
+ self.assertEquals(len(logFiles), 1)
+ self.assertEquals(logFiles[0].path,
+ os.path.abspath(filename))
+
+
+ def test_stop(self):
+ """
+ L{app.AppLogger.stop} removes the observer created in C{start}, and
+ reinitialize its C{_observer} so that if C{stop} is called several
+ times it doesn't break.
+ """
+ removed = []
+ observer = object()
+ def remove(observer):
+ removed.append(observer)
+ self.patch(log, 'removeObserver', remove)
+ logger = app.AppLogger({})
+ logger._observer = observer
+ logger.stop()
+ self.assertEquals(removed, [observer])
+ logger.stop()
+ self.assertEquals(removed, [observer])
+ self.assertIdentical(logger._observer, None)
+
+
+
+class UnixAppLoggerTestCase(unittest.TestCase):
+ """
+ Tests for L{UnixAppLogger}.
+
+ @ivar signals: list of signal handlers installed.
+ @type signals: C{list}
+ """
+ if _twistd_unix is None:
+ skip = "twistd unix not available"
+
+ def setUp(self):
+ """
+ Fake C{signal.signal} for not installing the handlers but saving them
+ in C{self.signals}.
+ """
+ self.signals = []
+ def fakeSignal(sig, f):
+ self.signals.append((sig, f))
+ self.patch(signal, "signal", fakeSignal)
+
+
+ def test_getLogObserverStdout(self):
+ """
+ When non-daemonized and C{logfile} is empty or set to C{-},
+ L{UnixAppLogger._getLogObserver} returns a log observer pointing at
+ C{sys.stdout}.
+ """
+ logFiles = _patchFileLogObserver(self.patch)
+
+ logger = UnixAppLogger({"logfile": "-", "nodaemon": True})
+ observer = logger._getLogObserver()
+ self.assertEquals(len(logFiles), 1)
+ self.assertIdentical(logFiles[0], sys.stdout)
+
+ logger = UnixAppLogger({"logfile": "", "nodaemon": True})
+ observer = logger._getLogObserver()
+ self.assertEquals(len(logFiles), 2)
+ self.assertIdentical(logFiles[1], sys.stdout)
+
+
+ def test_getLogObserverStdoutDaemon(self):
+ """
+ When daemonized and C{logfile} is set to C{-},
+ L{UnixAppLogger._getLogObserver} raises C{SystemExit}.
+ """
+ logger = UnixAppLogger({"logfile": "-", "nodaemon": False})
+ error = self.assertRaises(SystemExit, logger._getLogObserver)
+ self.assertEquals(str(error), "Daemons cannot log to stdout, exiting!")
+
+
+ def test_getLogObserverFile(self):
+ """
+ When C{logfile} contains a file name, L{app.AppLogger._getLogObserver}
+ returns a log observer pointing at the specified path, and a signal
+ handler rotating the log is installed.
+ """
+ logFiles = _patchFileLogObserver(self.patch)
+ filename = self.mktemp()
+ logger = UnixAppLogger({"logfile": filename})
+ observer = logger._getLogObserver()
+
+ self.assertEquals(len(logFiles), 1)
+ self.assertEquals(logFiles[0].path,
+ os.path.abspath(filename))
+
+ self.assertEquals(len(self.signals), 1)
+ self.assertEquals(self.signals[0][0], signal.SIGUSR1)
+
+ d = Deferred()
+ def rotate():
+ d.callback(None)
+ logFiles[0].rotate = rotate
+
+ rotateLog = self.signals[0][1]
+ rotateLog(None, None)
+ return d
+
+
+ def test_getLogObserverDontOverrideSignalHandler(self):
+ """
+ If a signal handler is already installed,
+ L{UnixAppLogger._getLogObserver} doesn't override it.
+ """
+ def fakeGetSignal(sig):
+ self.assertEquals(sig, signal.SIGUSR1)
+ return object()
+ self.patch(signal, "getsignal", fakeGetSignal)
+ filename = self.mktemp()
+ logger = UnixAppLogger({"logfile": filename})
+ observer = logger._getLogObserver()
+
+ self.assertEquals(self.signals, [])
+
+
+ def test_getLogObserverDefaultFile(self):
+ """
+ When daemonized and C{logfile} is empty, the observer returned by
+ L{UnixAppLogger._getLogObserver} points at C{twistd.log} in the current
+ directory.
+ """
+ logFiles = _patchFileLogObserver(self.patch)
+ logger = UnixAppLogger({"logfile": "", "nodaemon": False})
+ observer = logger._getLogObserver()
+
+ self.assertEquals(len(logFiles), 1)
+ self.assertEquals(logFiles[0].path,
+ os.path.abspath("twistd.log"))
+
+
+ def test_getLogObserverSyslog(self):
+ """
+ If C{syslog} is set to C{True}, L{UnixAppLogger._getLogObserver} starts
+ a L{syslog.SyslogObserver} with given C{prefix}.
+ """
+ class fakesyslogobserver(object):
+ def __init__(self, prefix):
+ fakesyslogobserver.prefix = prefix
+ def emit(self, eventDict):
+ pass
+ self.patch(syslog, "SyslogObserver", fakesyslogobserver)
+ logger = UnixAppLogger({"syslog": True, "prefix": "test-prefix"})
+ observer = logger._getLogObserver()
+ self.assertEquals(fakesyslogobserver.prefix, "test-prefix")
+
+ if syslog is None:
+ test_getLogObserverSyslog.skip = "Syslog not available"
+
+
+
+class DeprecationTests(unittest.TestCase):
+ """
+ Tests for deprecated features.
+ """
+
+ def test_initialLog(self):
+ """
+ L{app.initialLog} is deprecated.
+ """
+ logs = []
+ log.addObserver(logs.append)
+ self.addCleanup(log.removeObserver, logs.append)
+ self.callDeprecated(Version("Twisted", 8, 2, 0), app.initialLog)
+ self.assertEquals(len(logs), 2)
+ self.assertIn("starting up", logs[0]["message"][0])
+ self.assertIn("reactor class", logs[1]["message"][0])