diff options
Diffstat (limited to 'vendor/Twisted-10.0.0/twisted/test/test_application.py')
-rw-r--r-- | vendor/Twisted-10.0.0/twisted/test/test_application.py | 867 |
1 files changed, 867 insertions, 0 deletions
diff --git a/vendor/Twisted-10.0.0/twisted/test/test_application.py b/vendor/Twisted-10.0.0/twisted/test/test_application.py new file mode 100644 index 0000000000..953509e15a --- /dev/null +++ b/vendor/Twisted-10.0.0/twisted/test/test_application.py @@ -0,0 +1,867 @@ +# Copyright (c) 2001-2009 Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Tests for L{twisted.application} and its interaction with +L{twisted.persisted.sob}. +""" + +import sys, copy, os, pickle +from StringIO import StringIO + +from twisted.trial import unittest, util +from twisted.application import service, internet, app +from twisted.persisted import sob +from twisted.python import usage +from twisted.python.runtime import platform +from twisted.internet import interfaces, defer +from twisted.protocols import wire, basic +from twisted.internet import protocol, reactor +from twisted.internet.utils import getProcessOutputAndValue +from twisted.application import reactors +from twisted.test.proto_helpers import MemoryReactor + + +oldAppSuppressions = [util.suppress(message='twisted.internet.app is deprecated', + category=DeprecationWarning)] + +skipWindowsNopywin32 = None +if platform.isWindows(): + try: + import win32process + except ImportError: + skipWindowsNopywin32 = ("On windows, spawnProcess is not available " + "in the absence of win32process.") + +class Dummy: + processName=None + +class TestService(unittest.TestCase): + + def testName(self): + s = service.Service() + s.setName("hello") + self.failUnlessEqual(s.name, "hello") + + def testParent(self): + s = service.Service() + p = service.MultiService() + s.setServiceParent(p) + self.failUnlessEqual(list(p), [s]) + self.failUnlessEqual(s.parent, p) + + def testApplicationAsParent(self): + s = service.Service() + p = service.Application("") + s.setServiceParent(p) + self.failUnlessEqual(list(service.IServiceCollection(p)), [s]) + self.failUnlessEqual(s.parent, service.IServiceCollection(p)) + + def testNamedChild(self): + s = service.Service() + p = service.MultiService() + s.setName("hello") + s.setServiceParent(p) + self.failUnlessEqual(list(p), [s]) + self.failUnlessEqual(s.parent, p) + self.failUnlessEqual(p.getServiceNamed("hello"), s) + + def testDoublyNamedChild(self): + s = service.Service() + p = service.MultiService() + s.setName("hello") + s.setServiceParent(p) + self.failUnlessRaises(RuntimeError, s.setName, "lala") + + def testDuplicateNamedChild(self): + s = service.Service() + p = service.MultiService() + s.setName("hello") + s.setServiceParent(p) + s = service.Service() + s.setName("hello") + self.failUnlessRaises(RuntimeError, s.setServiceParent, p) + + def testDisowning(self): + s = service.Service() + p = service.MultiService() + s.setServiceParent(p) + self.failUnlessEqual(list(p), [s]) + self.failUnlessEqual(s.parent, p) + s.disownServiceParent() + self.failUnlessEqual(list(p), []) + self.failUnlessEqual(s.parent, None) + + def testRunning(self): + s = service.Service() + self.assert_(not s.running) + s.startService() + self.assert_(s.running) + s.stopService() + self.assert_(not s.running) + + def testRunningChildren1(self): + s = service.Service() + p = service.MultiService() + s.setServiceParent(p) + self.assert_(not s.running) + self.assert_(not p.running) + p.startService() + self.assert_(s.running) + self.assert_(p.running) + p.stopService() + self.assert_(not s.running) + self.assert_(not p.running) + + def testRunningChildren2(self): + s = service.Service() + def checkRunning(): + self.assert_(s.running) + t = service.Service() + t.stopService = checkRunning + t.startService = checkRunning + p = service.MultiService() + s.setServiceParent(p) + t.setServiceParent(p) + p.startService() + p.stopService() + + def testAddingIntoRunning(self): + p = service.MultiService() + p.startService() + s = service.Service() + self.assert_(not s.running) + s.setServiceParent(p) + self.assert_(s.running) + s.disownServiceParent() + self.assert_(not s.running) + + def testPrivileged(self): + s = service.Service() + def pss(): + s.privilegedStarted = 1 + s.privilegedStartService = pss + s1 = service.Service() + p = service.MultiService() + s.setServiceParent(p) + s1.setServiceParent(p) + p.privilegedStartService() + self.assert_(s.privilegedStarted) + + def testCopying(self): + s = service.Service() + s.startService() + s1 = copy.copy(s) + self.assert_(not s1.running) + self.assert_(s.running) + + +if hasattr(os, "getuid"): + curuid = os.getuid() + curgid = os.getgid() +else: + curuid = curgid = 0 + + +class TestProcess(unittest.TestCase): + + def testID(self): + p = service.Process(5, 6) + self.assertEqual(p.uid, 5) + self.assertEqual(p.gid, 6) + + def testDefaults(self): + p = service.Process(5) + self.assertEqual(p.uid, 5) + self.assertEqual(p.gid, None) + p = service.Process(gid=5) + self.assertEqual(p.uid, None) + self.assertEqual(p.gid, 5) + p = service.Process() + self.assertEqual(p.uid, None) + self.assertEqual(p.gid, None) + + def testProcessName(self): + p = service.Process() + self.assertEqual(p.processName, None) + p.processName = 'hello' + self.assertEqual(p.processName, 'hello') + + +class TestInterfaces(unittest.TestCase): + + def testService(self): + self.assert_(service.IService.providedBy(service.Service())) + + def testMultiService(self): + self.assert_(service.IService.providedBy(service.MultiService())) + self.assert_(service.IServiceCollection.providedBy(service.MultiService())) + + def testProcess(self): + self.assert_(service.IProcess.providedBy(service.Process())) + + +class TestApplication(unittest.TestCase): + + def testConstructor(self): + service.Application("hello") + service.Application("hello", 5) + service.Application("hello", 5, 6) + + def testProcessComponent(self): + a = service.Application("hello") + self.assertEqual(service.IProcess(a).uid, None) + self.assertEqual(service.IProcess(a).gid, None) + a = service.Application("hello", 5) + self.assertEqual(service.IProcess(a).uid, 5) + self.assertEqual(service.IProcess(a).gid, None) + a = service.Application("hello", 5, 6) + self.assertEqual(service.IProcess(a).uid, 5) + self.assertEqual(service.IProcess(a).gid, 6) + + def testServiceComponent(self): + a = service.Application("hello") + self.assert_(service.IService(a) is service.IServiceCollection(a)) + self.assertEqual(service.IService(a).name, "hello") + self.assertEqual(service.IService(a).parent, None) + + def testPersistableComponent(self): + a = service.Application("hello") + p = sob.IPersistable(a) + self.assertEqual(p.style, 'pickle') + self.assertEqual(p.name, 'hello') + self.assert_(p.original is a) + +class TestLoading(unittest.TestCase): + + def test_simpleStoreAndLoad(self): + a = service.Application("hello") + p = sob.IPersistable(a) + for style in 'source pickle'.split(): + p.setStyle(style) + p.save() + a1 = service.loadApplication("hello.ta"+style[0], style) + self.assertEqual(service.IService(a1).name, "hello") + f = open("hello.tac", 'w') + f.writelines([ + "from twisted.application import service\n", + "application = service.Application('hello')\n", + ]) + f.close() + a1 = service.loadApplication("hello.tac", 'python') + self.assertEqual(service.IService(a1).name, "hello") + + + +class TestAppSupport(unittest.TestCase): + + def testPassphrase(self): + self.assertEqual(app.getPassphrase(0), None) + + def testLoadApplication(self): + """ + Test loading an application file in different dump format. + """ + a = service.Application("hello") + baseconfig = {'file': None, 'source': None, 'python':None} + for style in 'source pickle'.split(): + config = baseconfig.copy() + config[{'pickle': 'file'}.get(style, style)] = 'helloapplication' + sob.IPersistable(a).setStyle(style) + sob.IPersistable(a).save(filename='helloapplication') + a1 = app.getApplication(config, None) + self.assertEqual(service.IService(a1).name, "hello") + config = baseconfig.copy() + config['python'] = 'helloapplication' + f = open("helloapplication", 'w') + f.writelines([ + "from twisted.application import service\n", + "application = service.Application('hello')\n", + ]) + f.close() + a1 = app.getApplication(config, None) + self.assertEqual(service.IService(a1).name, "hello") + + def test_convertStyle(self): + appl = service.Application("lala") + for instyle in 'source pickle'.split(): + for outstyle in 'source pickle'.split(): + sob.IPersistable(appl).setStyle(instyle) + sob.IPersistable(appl).save(filename="converttest") + app.convertStyle("converttest", instyle, None, + "converttest.out", outstyle, 0) + appl2 = service.loadApplication("converttest.out", outstyle) + self.assertEqual(service.IService(appl2).name, "lala") + + def test_getLogFile(self): + """ + Test L{app.getLogFile}, veryfying the LogFile instance it returns. + """ + os.mkdir("logfiledir") + l = app.getLogFile(os.path.join("logfiledir", "lala")) + self.assertEqual(l.path, + os.path.abspath(os.path.join("logfiledir", "lala"))) + self.assertEqual(l.name, "lala") + self.assertEqual(l.directory, os.path.abspath("logfiledir")) + + test_getLogFile.suppress = [ + util.suppress(message="app.getLogFile is deprecated. Use " + "twisted.python.logfile.LogFile.fromFullPath instead", + category=DeprecationWarning)] + + def test_startApplication(self): + appl = service.Application("lala") + app.startApplication(appl, 0) + self.assert_(service.IService(appl).running) + + +class Foo(basic.LineReceiver): + def connectionMade(self): + self.transport.write('lalala\r\n') + def lineReceived(self, line): + self.factory.line = line + self.transport.loseConnection() + def connectionLost(self, reason): + self.factory.d.callback(self.factory.line) + + +class DummyApp: + processName = None + def addService(self, service): + self.services[service.name] = service + def removeService(self, service): + del self.services[service.name] + + +class TimerTarget: + def __init__(self): + self.l = [] + def append(self, what): + self.l.append(what) + +class TestEcho(wire.Echo): + def connectionLost(self, reason): + self.d.callback(True) + +class TestInternet2(unittest.TestCase): + + def testTCP(self): + s = service.MultiService() + s.startService() + factory = protocol.ServerFactory() + factory.protocol = TestEcho + TestEcho.d = defer.Deferred() + t = internet.TCPServer(0, factory) + t.setServiceParent(s) + num = t._port.getHost().port + factory = protocol.ClientFactory() + factory.d = defer.Deferred() + factory.protocol = Foo + factory.line = None + internet.TCPClient('127.0.0.1', num, factory).setServiceParent(s) + factory.d.addCallback(self.assertEqual, 'lalala') + factory.d.addCallback(lambda x : s.stopService()) + factory.d.addCallback(lambda x : TestEcho.d) + return factory.d + + + def test_UDP(self): + """ + Test L{internet.UDPServer} with a random port: starting the service + should give it valid port, and stopService should free it so that we + can start a server on the same port again. + """ + if not interfaces.IReactorUDP(reactor, None): + raise unittest.SkipTest("This reactor does not support UDP sockets") + p = protocol.DatagramProtocol() + t = internet.UDPServer(0, p) + t.startService() + num = t._port.getHost().port + self.assertNotEquals(num, 0) + def onStop(ignored): + t = internet.UDPServer(num, p) + t.startService() + return t.stopService() + return defer.maybeDeferred(t.stopService).addCallback(onStop) + + + def testPrivileged(self): + factory = protocol.ServerFactory() + factory.protocol = TestEcho + TestEcho.d = defer.Deferred() + t = internet.TCPServer(0, factory) + t.privileged = 1 + t.privilegedStartService() + num = t._port.getHost().port + factory = protocol.ClientFactory() + factory.d = defer.Deferred() + factory.protocol = Foo + factory.line = None + c = internet.TCPClient('127.0.0.1', num, factory) + c.startService() + factory.d.addCallback(self.assertEqual, 'lalala') + factory.d.addCallback(lambda x : c.stopService()) + factory.d.addCallback(lambda x : t.stopService()) + factory.d.addCallback(lambda x : TestEcho.d) + return factory.d + + def testConnectionGettingRefused(self): + factory = protocol.ServerFactory() + factory.protocol = wire.Echo + t = internet.TCPServer(0, factory) + t.startService() + num = t._port.getHost().port + t.stopService() + d = defer.Deferred() + factory = protocol.ClientFactory() + factory.clientConnectionFailed = lambda *args: d.callback(None) + c = internet.TCPClient('127.0.0.1', num, factory) + c.startService() + return d + + def testUNIX(self): + # FIXME: This test is far too dense. It needs comments. + # -- spiv, 2004-11-07 + if not interfaces.IReactorUNIX(reactor, None): + raise unittest.SkipTest, "This reactor does not support UNIX domain sockets" + s = service.MultiService() + s.startService() + factory = protocol.ServerFactory() + factory.protocol = TestEcho + TestEcho.d = defer.Deferred() + t = internet.UNIXServer('echo.skt', factory) + t.setServiceParent(s) + factory = protocol.ClientFactory() + factory.protocol = Foo + factory.d = defer.Deferred() + factory.line = None + internet.UNIXClient('echo.skt', factory).setServiceParent(s) + factory.d.addCallback(self.assertEqual, 'lalala') + factory.d.addCallback(lambda x : s.stopService()) + factory.d.addCallback(lambda x : TestEcho.d) + factory.d.addCallback(self._cbTestUnix, factory, s) + return factory.d + + def _cbTestUnix(self, ignored, factory, s): + TestEcho.d = defer.Deferred() + factory.line = None + factory.d = defer.Deferred() + s.startService() + factory.d.addCallback(self.assertEqual, 'lalala') + factory.d.addCallback(lambda x : s.stopService()) + factory.d.addCallback(lambda x : TestEcho.d) + return factory.d + + def testVolatile(self): + if not interfaces.IReactorUNIX(reactor, None): + raise unittest.SkipTest, "This reactor does not support UNIX domain sockets" + factory = protocol.ServerFactory() + factory.protocol = wire.Echo + t = internet.UNIXServer('echo.skt', factory) + t.startService() + self.failIfIdentical(t._port, None) + t1 = copy.copy(t) + self.assertIdentical(t1._port, None) + t.stopService() + self.assertIdentical(t._port, None) + self.failIf(t.running) + + factory = protocol.ClientFactory() + factory.protocol = wire.Echo + t = internet.UNIXClient('echo.skt', factory) + t.startService() + self.failIfIdentical(t._connection, None) + t1 = copy.copy(t) + self.assertIdentical(t1._connection, None) + t.stopService() + self.assertIdentical(t._connection, None) + self.failIf(t.running) + + def testStoppingServer(self): + if not interfaces.IReactorUNIX(reactor, None): + raise unittest.SkipTest, "This reactor does not support UNIX domain sockets" + factory = protocol.ServerFactory() + factory.protocol = wire.Echo + t = internet.UNIXServer('echo.skt', factory) + t.startService() + t.stopService() + self.failIf(t.running) + factory = protocol.ClientFactory() + d = defer.Deferred() + factory.clientConnectionFailed = lambda *args: d.callback(None) + reactor.connectUNIX('echo.skt', factory) + return d + + def testPickledTimer(self): + target = TimerTarget() + t0 = internet.TimerService(1, target.append, "hello") + t0.startService() + s = pickle.dumps(t0) + t0.stopService() + + t = pickle.loads(s) + self.failIf(t.running) + + def testBrokenTimer(self): + d = defer.Deferred() + t = internet.TimerService(1, lambda: 1 / 0) + oldFailed = t._failed + def _failed(why): + oldFailed(why) + d.callback(None) + t._failed = _failed + t.startService() + d.addCallback(lambda x : t.stopService) + d.addCallback(lambda x : self.assertEqual( + [ZeroDivisionError], + [o.value.__class__ for o in self.flushLoggedErrors(ZeroDivisionError)])) + return d + + def testEverythingThere(self): + trans = 'TCP UNIX SSL UDP UNIXDatagram Multicast'.split() + for tran in trans[:]: + if not getattr(interfaces, "IReactor"+tran)(reactor, None): + trans.remove(tran) + if interfaces.IReactorArbitrary(reactor, None) is not None: + trans.insert(0, "Generic") + for tran in trans: + for side in 'Server Client'.split(): + if tran == "Multicast" and side == "Client": + continue + self.assert_(hasattr(internet, tran+side)) + method = getattr(internet, tran+side).method + prefix = {'Server': 'listen', 'Client': 'connect'}[side] + self.assert_(hasattr(reactor, prefix+method) or + (prefix == "connect" and method == "UDP")) + o = getattr(internet, tran+side)() + self.assertEqual(service.IService(o), o) + + + def test_reactorParametrizationInServer(self): + """ + L{internet._AbstractServer} supports a C{reactor} keyword argument + that can be used to parametrize the reactor used to listen for + connections. + """ + reactor = MemoryReactor() + + factory = object() + t = internet.TCPServer(1234, factory, reactor=reactor) + t.startService() + self.assertEquals(reactor.tcpServers.pop()[:2], (1234, factory)) + + + def test_reactorParametrizationInClient(self): + """ + L{internet._AbstractClient} supports a C{reactor} keyword arguments + that can be used to parametrize the reactor used to create new client + connections. + """ + reactor = MemoryReactor() + + factory = object() + t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor) + t.startService() + self.assertEquals( + reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory)) + + + def test_reactorParametrizationInServerMultipleStart(self): + """ + Like L{test_reactorParametrizationInServer}, but stop and restart the + service and check that the given reactor is still used. + """ + reactor = MemoryReactor() + + factory = object() + t = internet.TCPServer(1234, factory, reactor=reactor) + t.startService() + self.assertEquals(reactor.tcpServers.pop()[:2], (1234, factory)) + t.stopService() + t.startService() + self.assertEquals(reactor.tcpServers.pop()[:2], (1234, factory)) + + + def test_reactorParametrizationInClientMultipleStart(self): + """ + Like L{test_reactorParametrizationInClient}, but stop and restart the + service and check that the given reactor is still used. + """ + reactor = MemoryReactor() + + factory = object() + t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor) + t.startService() + self.assertEquals( + reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory)) + t.stopService() + t.startService() + self.assertEquals( + reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory)) + + + +class TestTimerBasic(unittest.TestCase): + + def testTimerRuns(self): + d = defer.Deferred() + self.t = internet.TimerService(1, d.callback, 'hello') + self.t.startService() + d.addCallback(self.assertEqual, 'hello') + d.addCallback(lambda x : self.t.stopService()) + d.addCallback(lambda x : self.failIf(self.t.running)) + return d + + def tearDown(self): + return self.t.stopService() + + def testTimerRestart(self): + # restart the same TimerService + d1 = defer.Deferred() + d2 = defer.Deferred() + work = [(d2, "bar"), (d1, "foo")] + def trigger(): + d, arg = work.pop() + d.callback(arg) + self.t = internet.TimerService(1, trigger) + self.t.startService() + def onFirstResult(result): + self.assertEqual(result, 'foo') + return self.t.stopService() + def onFirstStop(ignored): + self.failIf(self.t.running) + self.t.startService() + return d2 + def onSecondResult(result): + self.assertEqual(result, 'bar') + self.t.stopService() + d1.addCallback(onFirstResult) + d1.addCallback(onFirstStop) + d1.addCallback(onSecondResult) + return d1 + + def testTimerLoops(self): + l = [] + def trigger(data, number, d): + l.append(data) + if len(l) == number: + d.callback(l) + d = defer.Deferred() + self.t = internet.TimerService(0.01, trigger, "hello", 10, d) + self.t.startService() + d.addCallback(self.assertEqual, ['hello'] * 10) + d.addCallback(lambda x : self.t.stopService()) + return d + + +class FakeReactor(reactors.Reactor): + """ + A fake reactor with a hooked install method. + """ + + def __init__(self, install, *args, **kwargs): + """ + @param install: any callable that will be used as install method. + @type install: C{callable} + """ + reactors.Reactor.__init__(self, *args, **kwargs) + self.install = install + + + +class PluggableReactorTestCase(unittest.TestCase): + """ + Tests for the reactor discovery/inspection APIs. + """ + + def setUp(self): + """ + Override the L{reactors.getPlugins} function, normally bound to + L{twisted.plugin.getPlugins}, in order to control which + L{IReactorInstaller} plugins are seen as available. + + C{self.pluginResults} can be customized and will be used as the + result of calls to C{reactors.getPlugins}. + """ + self.pluginCalls = [] + self.pluginResults = [] + self.originalFunction = reactors.getPlugins + reactors.getPlugins = self._getPlugins + + + def tearDown(self): + """ + Restore the original L{reactors.getPlugins}. + """ + reactors.getPlugins = self.originalFunction + + + def _getPlugins(self, interface, package=None): + """ + Stand-in for the real getPlugins method which records its arguments + and returns a fixed result. + """ + self.pluginCalls.append((interface, package)) + return list(self.pluginResults) + + + def test_getPluginReactorTypes(self): + """ + Test that reactor plugins are returned from L{getReactorTypes} + """ + name = 'fakereactortest' + package = __name__ + '.fakereactor' + description = 'description' + self.pluginResults = [reactors.Reactor(name, package, description)] + reactorTypes = reactors.getReactorTypes() + + self.assertEqual( + self.pluginCalls, + [(reactors.IReactorInstaller, None)]) + + for r in reactorTypes: + if r.shortName == name: + self.assertEqual(r.description, description) + break + else: + self.fail("Reactor plugin not present in getReactorTypes() result") + + + def test_reactorInstallation(self): + """ + Test that L{reactors.Reactor.install} loads the correct module and + calls its install attribute. + """ + installed = [] + def install(): + installed.append(True) + installer = FakeReactor(install, + 'fakereactortest', __name__, 'described') + installer.install() + self.assertEqual(installed, [True]) + + + def test_installReactor(self): + """ + Test that the L{reactors.installReactor} function correctly installs + the specified reactor. + """ + installed = [] + def install(): + installed.append(True) + name = 'fakereactortest' + package = __name__ + description = 'description' + self.pluginResults = [FakeReactor(install, name, package, description)] + reactors.installReactor(name) + self.assertEqual(installed, [True]) + + + def test_installNonExistentReactor(self): + """ + Test that L{reactors.installReactor} raises L{reactors.NoSuchReactor} + when asked to install a reactor which it cannot find. + """ + self.pluginResults = [] + self.assertRaises( + reactors.NoSuchReactor, + reactors.installReactor, 'somereactor') + + + def test_installNotAvailableReactor(self): + """ + Test that L{reactors.installReactor} raises an exception when asked to + install a reactor which doesn't work in this environment. + """ + def install(): + raise ImportError("Missing foo bar") + name = 'fakereactortest' + package = __name__ + description = 'description' + self.pluginResults = [FakeReactor(install, name, package, description)] + self.assertRaises(ImportError, reactors.installReactor, name) + + + def test_reactorSelectionMixin(self): + """ + Test that the reactor selected is installed as soon as possible, ie + when the option is parsed. + """ + executed = [] + INSTALL_EVENT = 'reactor installed' + SUBCOMMAND_EVENT = 'subcommands loaded' + + class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin): + def subCommands(self): + executed.append(SUBCOMMAND_EVENT) + return [('subcommand', None, lambda: self, 'test subcommand')] + subCommands = property(subCommands) + + def install(): + executed.append(INSTALL_EVENT) + self.pluginResults = [ + FakeReactor(install, 'fakereactortest', __name__, 'described') + ] + + options = ReactorSelectionOptions() + options.parseOptions(['--reactor', 'fakereactortest', 'subcommand']) + self.assertEqual(executed[0], INSTALL_EVENT) + self.assertEqual(executed.count(INSTALL_EVENT), 1) + + + def test_reactorSelectionMixinNonExistent(self): + """ + Test that the usage mixin exits when trying to use a non existent + reactor (the name not matching to any reactor), giving an error + message. + """ + class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin): + pass + self.pluginResults = [] + + options = ReactorSelectionOptions() + options.messageOutput = StringIO() + e = self.assertRaises(usage.UsageError, options.parseOptions, + ['--reactor', 'fakereactortest', 'subcommand']) + self.assertIn("fakereactortest", e.args[0]) + self.assertIn("help-reactors", e.args[0]) + + + def test_reactorSelectionMixinNotAvailable(self): + """ + Test that the usage mixin exits when trying to use a reactor not + available (the reactor raises an error at installation), giving an + error message. + """ + class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin): + pass + message = "Missing foo bar" + def install(): + raise ImportError(message) + + name = 'fakereactortest' + package = __name__ + description = 'description' + self.pluginResults = [FakeReactor(install, name, package, description)] + + options = ReactorSelectionOptions() + options.messageOutput = StringIO() + e = self.assertRaises(usage.UsageError, options.parseOptions, + ['--reactor', 'fakereactortest', 'subcommand']) + self.assertIn(message, e.args[0]) + self.assertIn("help-reactors", e.args[0]) + + + +class ReportProfileTestCase(unittest.TestCase): + """ + Tests for L{app.reportProfile}. + """ + + def test_deprecation(self): + """ + Check that L{app.reportProfile} prints a warning and does nothing else. + """ + self.assertWarns(DeprecationWarning, + "reportProfile is deprecated and a no-op since Twisted 8.0.", + app.__file__, app.reportProfile, None, None) |