diff options
Diffstat (limited to 'Lib/test/test_nntplib.py')
| -rw-r--r-- | Lib/test/test_nntplib.py | 1091 | 
1 files changed, 1091 insertions, 0 deletions
| diff --git a/Lib/test/test_nntplib.py b/Lib/test/test_nntplib.py new file mode 100644 index 0000000000..512bcd585c --- /dev/null +++ b/Lib/test/test_nntplib.py @@ -0,0 +1,1091 @@ +import io +import datetime +import textwrap +import unittest +import contextlib +from test import support +from nntplib import NNTP, GroupInfo +import nntplib + +TIMEOUT = 30 + +# TODO: +# - test the `file` arg to more commands +# - test error conditions + + +class NetworkedNNTPTestsMixin: + +    def test_welcome(self): +        welcome = self.server.getwelcome() +        self.assertEqual(str, type(welcome)) + +    def test_help(self): +        resp, list = self.server.help() +        self.assertTrue(resp.startswith("100 "), resp) +        for line in list: +            self.assertEqual(str, type(line)) + +    def test_list(self): +        resp, list = self.server.list() +        if len(list) > 0: +            self.assertEqual(GroupInfo, type(list[0])) +            self.assertEqual(str, type(list[0].group)) + +    def test_unknown_command(self): +        with self.assertRaises(nntplib.NNTPPermanentError) as cm: +            self.server._shortcmd("XYZZY") +        resp = cm.exception.response +        self.assertTrue(resp.startswith("500 "), resp) + +    def test_newgroups(self): +        # gmane gets a constant influx of new groups.  In order not to stress +        # the server too much, we choose a recent date in the past. +        dt = datetime.date.today() - datetime.timedelta(days=7) +        resp, groups = self.server.newgroups(dt) +        if len(groups) > 0: +            self.assertIsInstance(groups[0], GroupInfo) +            self.assertIsInstance(groups[0].group, str) + +    def test_description(self): +        def _check_desc(desc): +            # Sanity checks +            self.assertIsInstance(desc, str) +            self.assertNotIn(self.GROUP_NAME, desc) +        desc = self.server.description(self.GROUP_NAME) +        _check_desc(desc) +        # Another sanity check +        self.assertIn("Python", desc) +        # With a pattern +        desc = self.server.description(self.GROUP_PAT) +        _check_desc(desc) +        # Shouldn't exist +        desc = self.server.description("zk.brrtt.baz") +        self.assertEqual(desc, '') + +    def test_descriptions(self): +        resp, descs = self.server.descriptions(self.GROUP_PAT) +        # 215 for LIST NEWSGROUPS, 282 for XGTITLE +        self.assertTrue( +            resp.startswith("215 ") or resp.startswith("282 "), resp) +        self.assertIsInstance(descs, dict) +        desc = descs[self.GROUP_NAME] +        self.assertEqual(desc, self.server.description(self.GROUP_NAME)) + +    def test_group(self): +        result = self.server.group(self.GROUP_NAME) +        self.assertEqual(5, len(result)) +        resp, count, first, last, group = result +        self.assertEqual(group, self.GROUP_NAME) +        self.assertIsInstance(count, int) +        self.assertIsInstance(first, int) +        self.assertIsInstance(last, int) +        self.assertLessEqual(first, last) +        self.assertTrue(resp.startswith("211 "), resp) + +    def test_date(self): +        resp, date = self.server.date() +        self.assertIsInstance(date, datetime.datetime) +        # Sanity check +        self.assertGreaterEqual(date.year, 1995) +        self.assertLessEqual(date.year, 2030) + +    def _check_art_dict(self, art_dict): +        # Some sanity checks for a field dictionary returned by OVER / XOVER +        self.assertIsInstance(art_dict, dict) +        # NNTP has 7 mandatory fields +        self.assertGreaterEqual(art_dict.keys(), +            {"subject", "from", "date", "message-id", +             "references", ":bytes", ":lines"} +            ) +        for v in art_dict.values(): +            self.assertIsInstance(v, str) + +    def test_xover(self): +        resp, count, first, last, name = self.server.group(self.GROUP_NAME) +        resp, lines = self.server.xover(last, last) +        art_num, art_dict = lines[0] +        self.assertEqual(art_num, last) +        self._check_art_dict(art_dict) + +    def test_over(self): +        resp, count, first, last, name = self.server.group(self.GROUP_NAME) +        start = last - 10 +        # The "start-" article range form +        resp, lines = self.server.over((start, None)) +        art_num, art_dict = lines[0] +        self._check_art_dict(art_dict) +        # The "start-end" article range form +        resp, lines = self.server.over((start, last)) +        art_num, art_dict = lines[-1] +        self.assertEqual(art_num, last) +        self._check_art_dict(art_dict) +        # XXX The "message_id" form is unsupported by gmane +        # 503 Overview by message-ID unsupported + +    def test_xhdr(self): +        resp, count, first, last, name = self.server.group(self.GROUP_NAME) +        resp, lines = self.server.xhdr('subject', last) +        for line in lines: +            self.assertEqual(str, type(line[1])) + +    def check_article_resp(self, resp, article, art_num=None): +        self.assertIsInstance(article, nntplib.ArticleInfo) +        if art_num is not None: +            self.assertEqual(article.number, art_num) +        for line in article.lines: +            self.assertIsInstance(line, bytes) +        # XXX this could exceptionally happen... +        self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n")) + +    def test_article_head_body(self): +        resp, count, first, last, name = self.server.group(self.GROUP_NAME) +        resp, head = self.server.head(last) +        self.assertTrue(resp.startswith("221 "), resp) +        self.check_article_resp(resp, head, last) +        resp, body = self.server.body(last) +        self.assertTrue(resp.startswith("222 "), resp) +        self.check_article_resp(resp, body, last) +        resp, article = self.server.article(last) +        self.assertTrue(resp.startswith("220 "), resp) +        self.check_article_resp(resp, article, last) +        self.assertEqual(article.lines, head.lines + [b''] + body.lines) + +    def test_quit(self): +        self.server.quit() +        self.server = None + + +class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): +    NNTP_HOST = 'news.gmane.org' +    GROUP_NAME = 'gmane.comp.python.devel' +    GROUP_PAT = 'gmane.comp.python.d*' + +    def setUp(self): +        support.requires("network") +        with support.transient_internet(self.NNTP_HOST): +            self.server = NNTP(self.NNTP_HOST, timeout=TIMEOUT) + +    def tearDown(self): +        if self.server is not None: +            self.server.quit() + +    # Disabled with gmane as it produces too much data +    test_list = None + +    def test_capabilities(self): +        # As of this writing, gmane implements NNTP version 2 and has a +        # couple of well-known capabilities. Just sanity check that we +        # got them. +        def _check_caps(caps): +            caps_list = caps['LIST'] +            self.assertIsInstance(caps_list, (list, tuple)) +            self.assertIn('OVERVIEW.FMT', caps_list) +        self.assertGreaterEqual(self.server.nntp_version, 2) +        _check_caps(self.server.getcapabilities()) +        # This re-emits the command +        resp, caps = self.server.capabilities() +        _check_caps(caps) + + +# +# Non-networked tests using a local server (or something mocking it). +# + +class _NNTPServerIO(io.RawIOBase): +    """A raw IO object allowing NNTP commands to be received and processed +    by a handler.  The handler can push responses which can then be read +    from the IO object.""" + +    def __init__(self, handler): +        io.RawIOBase.__init__(self) +        # The channel from the client +        self.c2s = io.BytesIO() +        # The channel to the client +        self.s2c = io.BytesIO() +        self.handler = handler +        self.handler.start(self.c2s.readline, self.push_data) + +    def readable(self): +        return True + +    def writable(self): +        return True + +    def push_data(self, data): +        """Push (buffer) some data to send to the client.""" +        pos = self.s2c.tell() +        self.s2c.seek(0, 2) +        self.s2c.write(data) +        self.s2c.seek(pos) + +    def write(self, b): +        """The client sends us some data""" +        pos = self.c2s.tell() +        self.c2s.write(b) +        self.c2s.seek(pos) +        self.handler.process_pending() +        return len(b) + +    def readinto(self, buf): +        """The client wants to read a response""" +        self.handler.process_pending() +        b = self.s2c.read(len(buf)) +        n = len(b) +        buf[:n] = b +        return n + + +class MockedNNTPTestsMixin: +    # Override in derived classes +    handler_class = None + +    def setUp(self): +        super().setUp() +        self.make_server() + +    def tearDown(self): +        super().tearDown() +        del self.server + +    def make_server(self, *args, **kwargs): +        self.handler = self.handler_class() +        self.sio = _NNTPServerIO(self.handler) +        # Using BufferedRWPair instead of BufferedRandom ensures the file +        # isn't seekable. +        file = io.BufferedRWPair(self.sio, self.sio) +        self.server = nntplib._NNTPBase(file, *args, **kwargs) +        return self.server + + +class NNTPv1Handler: +    """A handler for RFC 977""" + +    welcome = "200 NNTP mock server" + +    def start(self, readline, push_data): +        self.in_body = False +        self.allow_posting = True +        self._readline = readline +        self._push_data = push_data +        # Our welcome +        self.handle_welcome() + +    def _decode(self, data): +        return str(data, "utf-8", "surrogateescape") + +    def process_pending(self): +        if self.in_body: +            while True: +                line = self._readline() +                if not line: +                    return +                self.body.append(line) +                if line == b".\r\n": +                    break +            try: +                meth, tokens = self.body_callback +                meth(*tokens, body=self.body) +            finally: +                self.body_callback = None +                self.body = None +                self.in_body = False +        while True: +            line = self._decode(self._readline()) +            if not line: +                return +            if not line.endswith("\r\n"): +                raise ValueError("line doesn't end with \\r\\n: {!r}".format(line)) +            line = line[:-2] +            cmd, *tokens = line.split() +            #meth = getattr(self.handler, "handle_" + cmd.upper(), None) +            meth = getattr(self, "handle_" + cmd.upper(), None) +            if meth is None: +                self.handle_unknown() +            else: +                try: +                    meth(*tokens) +                except Exception as e: +                    raise ValueError("command failed: {!r}".format(line)) from e +                else: +                    if self.in_body: +                        self.body_callback = meth, tokens +                        self.body = [] + +    def expect_body(self): +        """Flag that the client is expected to post a request body""" +        self.in_body = True + +    def push_data(self, data): +        """Push some binary data""" +        self._push_data(data) + +    def push_lit(self, lit): +        """Push a string literal""" +        lit = textwrap.dedent(lit) +        lit = "\r\n".join(lit.splitlines()) + "\r\n" +        lit = lit.encode('utf-8') +        self.push_data(lit) + +    def handle_unknown(self): +        self.push_lit("500 What?") + +    def handle_welcome(self): +        self.push_lit(self.welcome) + +    def handle_QUIT(self): +        self.push_lit("205 Bye!") + +    def handle_DATE(self): +        self.push_lit("111 20100914001155") + +    def handle_GROUP(self, group): +        if group == "fr.comp.lang.python": +            self.push_lit("211 486 761 1265 fr.comp.lang.python") +        else: +            self.push_lit("411 No such group {}".format(group)) + +    def handle_HELP(self): +        self.push_lit("""\ +            100 Legal commands +              authinfo user Name|pass Password|generic <prog> <args> +              date +              help +            Report problems to <root@example.org> +            .""") + +    def handle_STAT(self, message_spec=None): +        if message_spec is None: +            self.push_lit("412 No newsgroup selected") +        elif message_spec == "3000234": +            self.push_lit("223 3000234 <45223423@example.com>") +        elif message_spec == "<45223423@example.com>": +            self.push_lit("223 0 <45223423@example.com>") +        else: +            self.push_lit("430 No Such Article Found") + +    def handle_NEXT(self): +        self.push_lit("223 3000237 <668929@example.org> retrieved") + +    def handle_LAST(self): +        self.push_lit("223 3000234 <45223423@example.com> retrieved") + +    def handle_LIST(self, action=None, param=None): +        if action is None: +            self.push_lit("""\ +                215 Newsgroups in form "group high low flags". +                comp.lang.python 0000052340 0000002828 y +                comp.lang.python.announce 0000001153 0000000993 m +                free.it.comp.lang.python 0000000002 0000000002 y +                fr.comp.lang.python 0000001254 0000000760 y +                free.it.comp.lang.python.learner 0000000000 0000000001 y +                tw.bbs.comp.lang.python 0000000304 0000000304 y +                .""") +        elif action == "OVERVIEW.FMT": +            self.push_lit("""\ +                215 Order of fields in overview database. +                Subject: +                From: +                Date: +                Message-ID: +                References: +                Bytes: +                Lines: +                Xref:full +                .""") +        elif action == "NEWSGROUPS": +            assert param is not None +            if param == "comp.lang.python": +                self.push_lit("""\ +                    215 Descriptions in form "group description". +                    comp.lang.python\tThe Python computer language. +                    .""") +            elif param == "comp.lang.python*": +                self.push_lit("""\ +                    215 Descriptions in form "group description". +                    comp.lang.python.announce\tAnnouncements about the Python language. (Moderated) +                    comp.lang.python\tThe Python computer language. +                    .""") +            else: +                self.push_lit("""\ +                    215 Descriptions in form "group description". +                    .""") +        else: +            self.push_lit('501 Unknown LIST keyword') + +    def handle_NEWNEWS(self, group, date_str, time_str): +        # We hard code different return messages depending on passed +        # argument and date syntax. +        if (group == "comp.lang.python" and date_str == "20100913" +            and time_str == "082004"): +            # Date was passed in RFC 3977 format (NNTP "v2") +            self.push_lit("""\ +                230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows +                <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com> +                <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com> +                .""") +        elif (group == "comp.lang.python" and date_str == "100913" +            and time_str == "082004"): +            # Date was passed in RFC 977 format (NNTP "v1") +            self.push_lit("""\ +                230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows +                <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com> +                <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com> +                .""") +        else: +            self.push_lit("""\ +                230 An empty list of newsarticles follows +                .""") +        # (Note for experiments: many servers disable NEWNEWS. +        #  As of this writing, sicinfo3.epfl.ch doesn't.) + +    def handle_XOVER(self, message_spec): +        if message_spec == "57-59": +            self.push_lit( +                "224 Overview information for 57-58 follows\n" +                "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout" +                    "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>" +                    "\tSat, 19 Jun 2010 18:04:08 -0400" +                    "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>" +                    "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16" +                    "\tXref: news.gmane.org gmane.comp.python.authors:57" +                    "\n" +                "58\tLooking for a few good bloggers" +                    "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>" +                    "\tThu, 22 Jul 2010 09:14:14 -0400" +                    "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>" +                    "\t\t6683\t16" +                    "\tXref: news.gmane.org gmane.comp.python.authors:58" +                    "\n" +                # An UTF-8 overview line from fr.comp.lang.python +                "59\tRe: Message d'erreur incompréhensible (par moi)" +                    "\tEric Brunel <eric.brunel@pragmadev.nospam.com>" +                    "\tWed, 15 Sep 2010 18:09:15 +0200" +                    "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>" +                    "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27" +                    "\tXref: saria.nerim.net fr.comp.lang.python:1265" +                    "\n" +                ".\n") +        else: +            self.push_lit("""\ +                224 No articles +                .""") + +    def handle_POST(self, *, body=None): +        if body is None: +            if self.allow_posting: +                self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>") +                self.expect_body() +            else: +                self.push_lit("440 Posting not permitted") +        else: +            assert self.allow_posting +            self.push_lit("240 Article received OK") +            self.posted_body = body + +    def handle_IHAVE(self, message_id, *, body=None): +        if body is None: +            if (self.allow_posting and +                message_id == "<i.am.an.article.you.will.want@example.com>"): +                self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>") +                self.expect_body() +            else: +                self.push_lit("435 Article not wanted") +        else: +            assert self.allow_posting +            self.push_lit("235 Article transferred OK") +            self.posted_body = body + +    sample_head = """\ +        From: "Demo User" <nobody@example.net> +        Subject: I am just a test article +        Content-Type: text/plain; charset=UTF-8; format=flowed +        Message-ID: <i.am.an.article.you.will.want@example.com>""" + +    sample_body = """\ +        This is just a test article. +        ..Here is a dot-starting line. + +        -- Signed by Andr\xe9.""" + +    sample_article = sample_head + "\n\n" + sample_body + +    def handle_ARTICLE(self, message_spec=None): +        if message_spec is None: +            self.push_lit("220 3000237 <45223423@example.com>") +        elif message_spec == "<45223423@example.com>": +            self.push_lit("220 0 <45223423@example.com>") +        elif message_spec == "3000234": +            self.push_lit("220 3000234 <45223423@example.com>") +        else: +            self.push_lit("430 No Such Article Found") +            return +        self.push_lit(self.sample_article) +        self.push_lit(".") + +    def handle_HEAD(self, message_spec=None): +        if message_spec is None: +            self.push_lit("221 3000237 <45223423@example.com>") +        elif message_spec == "<45223423@example.com>": +            self.push_lit("221 0 <45223423@example.com>") +        elif message_spec == "3000234": +            self.push_lit("221 3000234 <45223423@example.com>") +        else: +            self.push_lit("430 No Such Article Found") +            return +        self.push_lit(self.sample_head) +        self.push_lit(".") + +    def handle_BODY(self, message_spec=None): +        if message_spec is None: +            self.push_lit("222 3000237 <45223423@example.com>") +        elif message_spec == "<45223423@example.com>": +            self.push_lit("222 0 <45223423@example.com>") +        elif message_spec == "3000234": +            self.push_lit("222 3000234 <45223423@example.com>") +        else: +            self.push_lit("430 No Such Article Found") +            return +        self.push_lit(self.sample_body) +        self.push_lit(".") + + +class NNTPv2Handler(NNTPv1Handler): +    """A handler for RFC 3977 (NNTP "v2")""" + +    def handle_CAPABILITIES(self): +        self.push_lit("""\ +            101 Capability list: +            VERSION 2 +            IMPLEMENTATION INN 2.5.1 +            AUTHINFO USER +            HDR +            LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT +            OVER +            POST +            READER +            .""") + +    def handle_OVER(self, message_spec=None): +        return self.handle_XOVER(message_spec) + + +class NNTPv1v2TestsMixin: + +    def setUp(self): +        super().setUp() + +    def test_welcome(self): +        self.assertEqual(self.server.welcome, self.handler.welcome) + +    def test_date(self): +        resp, date = self.server.date() +        self.assertEqual(resp, "111 20100914001155") +        self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55)) + +    def test_quit(self): +        self.assertFalse(self.sio.closed) +        resp = self.server.quit() +        self.assertEqual(resp, "205 Bye!") +        self.assertTrue(self.sio.closed) + +    def test_help(self): +        resp, help = self.server.help() +        self.assertEqual(resp, "100 Legal commands") +        self.assertEqual(help, [ +            '  authinfo user Name|pass Password|generic <prog> <args>', +            '  date', +            '  help', +            'Report problems to <root@example.org>', +        ]) + +    def test_list(self): +        resp, groups = self.server.list() +        self.assertEqual(len(groups), 6) +        g = groups[1] +        self.assertEqual(g, +            GroupInfo("comp.lang.python.announce", "0000001153", +                      "0000000993", "m")) + +    def test_stat(self): +        resp, art_num, message_id = self.server.stat(3000234) +        self.assertEqual(resp, "223 3000234 <45223423@example.com>") +        self.assertEqual(art_num, 3000234) +        self.assertEqual(message_id, "<45223423@example.com>") +        resp, art_num, message_id = self.server.stat("<45223423@example.com>") +        self.assertEqual(resp, "223 0 <45223423@example.com>") +        self.assertEqual(art_num, 0) +        self.assertEqual(message_id, "<45223423@example.com>") +        with self.assertRaises(nntplib.NNTPTemporaryError) as cm: +            self.server.stat("<non.existent.id>") +        self.assertEqual(cm.exception.response, "430 No Such Article Found") +        with self.assertRaises(nntplib.NNTPTemporaryError) as cm: +            self.server.stat() +        self.assertEqual(cm.exception.response, "412 No newsgroup selected") + +    def test_next(self): +        resp, art_num, message_id = self.server.next() +        self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved") +        self.assertEqual(art_num, 3000237) +        self.assertEqual(message_id, "<668929@example.org>") + +    def test_last(self): +        resp, art_num, message_id = self.server.last() +        self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved") +        self.assertEqual(art_num, 3000234) +        self.assertEqual(message_id, "<45223423@example.com>") + +    def test_description(self): +        desc = self.server.description("comp.lang.python") +        self.assertEqual(desc, "The Python computer language.") +        desc = self.server.description("comp.lang.pythonx") +        self.assertEqual(desc, "") + +    def test_descriptions(self): +        resp, groups = self.server.descriptions("comp.lang.python") +        self.assertEqual(resp, '215 Descriptions in form "group description".') +        self.assertEqual(groups, { +            "comp.lang.python": "The Python computer language.", +            }) +        resp, groups = self.server.descriptions("comp.lang.python*") +        self.assertEqual(groups, { +            "comp.lang.python": "The Python computer language.", +            "comp.lang.python.announce": "Announcements about the Python language. (Moderated)", +            }) +        resp, groups = self.server.descriptions("comp.lang.pythonx") +        self.assertEqual(groups, {}) + +    def test_group(self): +        resp, count, first, last, group = self.server.group("fr.comp.lang.python") +        self.assertTrue(resp.startswith("211 "), resp) +        self.assertEqual(first, 761) +        self.assertEqual(last, 1265) +        self.assertEqual(count, 486) +        self.assertEqual(group, "fr.comp.lang.python") +        with self.assertRaises(nntplib.NNTPTemporaryError) as cm: +            self.server.group("comp.lang.python.devel") +        exc = cm.exception +        self.assertTrue(exc.response.startswith("411 No such group"), +                        exc.response) + +    def test_newnews(self): +        # NEWNEWS comp.lang.python [20]100913 082004 +        dt = datetime.datetime(2010, 9, 13, 8, 20, 4) +        resp, ids = self.server.newnews("comp.lang.python", dt) +        expected = ( +            "230 list of newsarticles (NNTP v{0}) " +            "created after Mon Sep 13 08:20:04 2010 follows" +            ).format(self.nntp_version) +        self.assertEqual(resp, expected) +        self.assertEqual(ids, [ +            "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>", +            "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>", +            ]) +        # NEWNEWS fr.comp.lang.python [20]100913 082004 +        dt = datetime.datetime(2010, 9, 13, 8, 20, 4) +        resp, ids = self.server.newnews("fr.comp.lang.python", dt) +        self.assertEqual(resp, "230 An empty list of newsarticles follows") +        self.assertEqual(ids, []) + +    def _check_article_body(self, lines): +        self.assertEqual(len(lines), 4) +        self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.") +        self.assertEqual(lines[-2], b"") +        self.assertEqual(lines[-3], b".Here is a dot-starting line.") +        self.assertEqual(lines[-4], b"This is just a test article.") + +    def _check_article_head(self, lines): +        self.assertEqual(len(lines), 4) +        self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>') +        self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>") + +    def _check_article_data(self, lines): +        self.assertEqual(len(lines), 9) +        self._check_article_head(lines[:4]) +        self._check_article_body(lines[-4:]) +        self.assertEqual(lines[4], b"") + +    def test_article(self): +        # ARTICLE +        resp, info = self.server.article() +        self.assertEqual(resp, "220 3000237 <45223423@example.com>") +        art_num, message_id, lines = info +        self.assertEqual(art_num, 3000237) +        self.assertEqual(message_id, "<45223423@example.com>") +        self._check_article_data(lines) +        # ARTICLE num +        resp, info = self.server.article(3000234) +        self.assertEqual(resp, "220 3000234 <45223423@example.com>") +        art_num, message_id, lines = info +        self.assertEqual(art_num, 3000234) +        self.assertEqual(message_id, "<45223423@example.com>") +        self._check_article_data(lines) +        # ARTICLE id +        resp, info = self.server.article("<45223423@example.com>") +        self.assertEqual(resp, "220 0 <45223423@example.com>") +        art_num, message_id, lines = info +        self.assertEqual(art_num, 0) +        self.assertEqual(message_id, "<45223423@example.com>") +        self._check_article_data(lines) +        # Non-existent id +        with self.assertRaises(nntplib.NNTPTemporaryError) as cm: +            self.server.article("<non-existent@example.com>") +        self.assertEqual(cm.exception.response, "430 No Such Article Found") + +    def test_article_file(self): +        # With a "file" argument +        f = io.BytesIO() +        resp, info = self.server.article(file=f) +        self.assertEqual(resp, "220 3000237 <45223423@example.com>") +        art_num, message_id, lines = info +        self.assertEqual(art_num, 3000237) +        self.assertEqual(message_id, "<45223423@example.com>") +        self.assertEqual(lines, []) +        data = f.getvalue() +        self.assertTrue(data.startswith( +            b'From: "Demo User" <nobody@example.net>\r\n' +            b'Subject: I am just a test article\r\n' +            ), ascii(data)) +        self.assertTrue(data.endswith( +            b'This is just a test article.\r\n' +            b'.Here is a dot-starting line.\r\n' +            b'\r\n' +            b'-- Signed by Andr\xc3\xa9.\r\n' +            ), ascii(data)) + +    def test_head(self): +        # HEAD +        resp, info = self.server.head() +        self.assertEqual(resp, "221 3000237 <45223423@example.com>") +        art_num, message_id, lines = info +        self.assertEqual(art_num, 3000237) +        self.assertEqual(message_id, "<45223423@example.com>") +        self._check_article_head(lines) +        # HEAD num +        resp, info = self.server.head(3000234) +        self.assertEqual(resp, "221 3000234 <45223423@example.com>") +        art_num, message_id, lines = info +        self.assertEqual(art_num, 3000234) +        self.assertEqual(message_id, "<45223423@example.com>") +        self._check_article_head(lines) +        # HEAD id +        resp, info = self.server.head("<45223423@example.com>") +        self.assertEqual(resp, "221 0 <45223423@example.com>") +        art_num, message_id, lines = info +        self.assertEqual(art_num, 0) +        self.assertEqual(message_id, "<45223423@example.com>") +        self._check_article_head(lines) +        # Non-existent id +        with self.assertRaises(nntplib.NNTPTemporaryError) as cm: +            self.server.head("<non-existent@example.com>") +        self.assertEqual(cm.exception.response, "430 No Such Article Found") + +    def test_body(self): +        # BODY +        resp, info = self.server.body() +        self.assertEqual(resp, "222 3000237 <45223423@example.com>") +        art_num, message_id, lines = info +        self.assertEqual(art_num, 3000237) +        self.assertEqual(message_id, "<45223423@example.com>") +        self._check_article_body(lines) +        # BODY num +        resp, info = self.server.body(3000234) +        self.assertEqual(resp, "222 3000234 <45223423@example.com>") +        art_num, message_id, lines = info +        self.assertEqual(art_num, 3000234) +        self.assertEqual(message_id, "<45223423@example.com>") +        self._check_article_body(lines) +        # BODY id +        resp, info = self.server.body("<45223423@example.com>") +        self.assertEqual(resp, "222 0 <45223423@example.com>") +        art_num, message_id, lines = info +        self.assertEqual(art_num, 0) +        self.assertEqual(message_id, "<45223423@example.com>") +        self._check_article_body(lines) +        # Non-existent id +        with self.assertRaises(nntplib.NNTPTemporaryError) as cm: +            self.server.body("<non-existent@example.com>") +        self.assertEqual(cm.exception.response, "430 No Such Article Found") + +    def check_over_xover_resp(self, resp, overviews): +        self.assertTrue(resp.startswith("224 "), resp) +        self.assertEqual(len(overviews), 3) +        art_num, over = overviews[0] +        self.assertEqual(art_num, 57) +        self.assertEqual(over, { +            "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>", +            "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout", +            "date": "Sat, 19 Jun 2010 18:04:08 -0400", +            "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>", +            "references": "<hvalf7$ort$1@dough.gmane.org>", +            ":bytes": "7103", +            ":lines": "16", +            "xref": "news.gmane.org gmane.comp.python.authors:57" +            }) +        art_num, over = overviews[2] +        self.assertEqual(over["subject"], +                         "Re: Message d'erreur incompréhensible (par moi)") + +    def test_xover(self): +        resp, overviews = self.server.xover(57, 59) +        self.check_over_xover_resp(resp, overviews) + +    def test_over(self): +        # In NNTP "v1", this will fallback on XOVER +        resp, overviews = self.server.over((57, 59)) +        self.check_over_xover_resp(resp, overviews) + +    sample_post = ( +        b'From: "Demo User" <nobody@example.net>\r\n' +        b'Subject: I am just a test article\r\n' +        b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n' +        b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n' +        b'\r\n' +        b'This is just a test article.\r\n' +        b'.Here is a dot-starting line.\r\n' +        b'\r\n' +        b'-- Signed by Andr\xc3\xa9.\r\n' +    ) + +    def _check_posted_body(self): +        # Check the raw body as received by the server +        lines = self.handler.posted_body +        # One additional line for the "." terminator +        self.assertEqual(len(lines), 10) +        self.assertEqual(lines[-1], b'.\r\n') +        self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n') +        self.assertEqual(lines[-3], b'\r\n') +        self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n') +        self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n') + +    def _check_post_ihave_sub(self, func, *args, file_factory): +        # First the prepared post with CRLF endings +        post = self.sample_post +        func_args = args + (file_factory(post),) +        self.handler.posted_body = None +        resp = func(*func_args) +        self._check_posted_body() +        # Then the same post with "normal" line endings - they should be +        # converted by NNTP.post and NNTP.ihave. +        post = self.sample_post.replace(b"\r\n", b"\n") +        func_args = args + (file_factory(post),) +        self.handler.posted_body = None +        resp = func(*func_args) +        self._check_posted_body() +        return resp + +    def check_post_ihave(self, func, success_resp, *args): +        # With a bytes object +        resp = self._check_post_ihave_sub(func, *args, file_factory=bytes) +        self.assertEqual(resp, success_resp) +        # With a bytearray object +        resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray) +        self.assertEqual(resp, success_resp) +        # With a file object +        resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO) +        self.assertEqual(resp, success_resp) +        # With an iterable of terminated lines +        def iterlines(b): +            return iter(b.splitlines(True)) +        resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) +        self.assertEqual(resp, success_resp) +        # With an iterable of non-terminated lines +        def iterlines(b): +            return iter(b.splitlines(False)) +        resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) +        self.assertEqual(resp, success_resp) + +    def test_post(self): +        self.check_post_ihave(self.server.post, "240 Article received OK") +        self.handler.allow_posting = False +        with self.assertRaises(nntplib.NNTPTemporaryError) as cm: +            self.server.post(self.sample_post) +        self.assertEqual(cm.exception.response, +                         "440 Posting not permitted") + +    def test_ihave(self): +        self.check_post_ihave(self.server.ihave, "235 Article transferred OK", +                              "<i.am.an.article.you.will.want@example.com>") +        with self.assertRaises(nntplib.NNTPTemporaryError) as cm: +            self.server.ihave("<another.message.id>", self.sample_post) +        self.assertEqual(cm.exception.response, +                         "435 Article not wanted") + + +class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): +    """Tests an NNTP v1 server (no capabilities).""" + +    nntp_version = 1 +    handler_class = NNTPv1Handler + +    def test_caps(self): +        caps = self.server.getcapabilities() +        self.assertEqual(caps, {}) +        self.assertEqual(self.server.nntp_version, 1) + + +class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): +    """Tests an NNTP v2 server (with capabilities).""" + +    nntp_version = 2 +    handler_class = NNTPv2Handler + +    def test_caps(self): +        caps = self.server.getcapabilities() +        self.assertEqual(caps, { +            'VERSION': ['2'], +            'IMPLEMENTATION': ['INN', '2.5.1'], +            'AUTHINFO': ['USER'], +            'HDR': [], +            'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS', +                     'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'], +            'OVER': [], +            'POST': [], +            'READER': [], +            }) +        self.assertEqual(self.server.nntp_version, 2) + + +class MiscTests(unittest.TestCase): + +    def test_decode_header(self): +        def gives(a, b): +            self.assertEqual(nntplib.decode_header(a), b) +        gives("" , "") +        gives("a plain header", "a plain header") +        gives(" with extra  spaces ", " with extra  spaces ") +        gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python") +        gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?=" +              " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=", +              "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées") +        gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=", +              "Re: problème de matrice") +        # A natively utf-8 header (found in the real world!) +        gives("Re: Message d'erreur incompréhensible (par moi)", +              "Re: Message d'erreur incompréhensible (par moi)") + +    def test_parse_overview_fmt(self): +        # The minimal (default) response +        lines = ["Subject:", "From:", "Date:", "Message-ID:", +                 "References:", ":bytes", ":lines"] +        self.assertEqual(nntplib._parse_overview_fmt(lines), +            ["subject", "from", "date", "message-id", "references", +             ":bytes", ":lines"]) +        # The minimal response using alternative names +        lines = ["Subject:", "From:", "Date:", "Message-ID:", +                 "References:", "Bytes:", "Lines:"] +        self.assertEqual(nntplib._parse_overview_fmt(lines), +            ["subject", "from", "date", "message-id", "references", +             ":bytes", ":lines"]) +        # Variations in casing +        lines = ["subject:", "FROM:", "DaTe:", "message-ID:", +                 "References:", "BYTES:", "Lines:"] +        self.assertEqual(nntplib._parse_overview_fmt(lines), +            ["subject", "from", "date", "message-id", "references", +             ":bytes", ":lines"]) +        # First example from RFC 3977 +        lines = ["Subject:", "From:", "Date:", "Message-ID:", +                 "References:", ":bytes", ":lines", "Xref:full", +                 "Distribution:full"] +        self.assertEqual(nntplib._parse_overview_fmt(lines), +            ["subject", "from", "date", "message-id", "references", +             ":bytes", ":lines", "xref", "distribution"]) +        # Second example from RFC 3977 +        lines = ["Subject:", "From:", "Date:", "Message-ID:", +                 "References:", "Bytes:", "Lines:", "Xref:FULL", +                 "Distribution:FULL"] +        self.assertEqual(nntplib._parse_overview_fmt(lines), +            ["subject", "from", "date", "message-id", "references", +             ":bytes", ":lines", "xref", "distribution"]) +        # A classic response from INN +        lines = ["Subject:", "From:", "Date:", "Message-ID:", +                 "References:", "Bytes:", "Lines:", "Xref:full"] +        self.assertEqual(nntplib._parse_overview_fmt(lines), +            ["subject", "from", "date", "message-id", "references", +             ":bytes", ":lines", "xref"]) + +    def test_parse_overview(self): +        fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"] +        # First example from RFC 3977 +        lines = [ +            '3000234\tI am just a test article\t"Demo User" ' +            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' +            '<45223423@example.com>\t<45454@example.net>\t1234\t' +            '17\tXref: news.example.com misc.test:3000363', +        ] +        overview = nntplib._parse_overview(lines, fmt) +        (art_num, fields), = overview +        self.assertEqual(art_num, 3000234) +        self.assertEqual(fields, { +            'subject': 'I am just a test article', +            'from': '"Demo User" <nobody@example.com>', +            'date': '6 Oct 1998 04:38:40 -0500', +            'message-id': '<45223423@example.com>', +            'references': '<45454@example.net>', +            ':bytes': '1234', +            ':lines': '17', +            'xref': 'news.example.com misc.test:3000363', +        }) + +    def test_parse_datetime(self): +        def gives(a, b, *c): +            self.assertEqual(nntplib._parse_datetime(a, b), +                             datetime.datetime(*c)) +        # Output of DATE command +        gives("19990623135624", None, 1999, 6, 23, 13, 56, 24) +        # Variations +        gives("19990623", "135624", 1999, 6, 23, 13, 56, 24) +        gives("990623", "135624", 1999, 6, 23, 13, 56, 24) +        gives("090623", "135624", 2009, 6, 23, 13, 56, 24) + +    def test_unparse_datetime(self): +        # Test non-legacy mode +        # 1) with a datetime +        def gives(y, M, d, h, m, s, date_str, time_str): +            dt = datetime.datetime(y, M, d, h, m, s) +            self.assertEqual(nntplib._unparse_datetime(dt), +                             (date_str, time_str)) +            self.assertEqual(nntplib._unparse_datetime(dt, False), +                             (date_str, time_str)) +        gives(1999, 6, 23, 13, 56, 24, "19990623", "135624") +        gives(2000, 6, 23, 13, 56, 24, "20000623", "135624") +        gives(2010, 6, 5, 1, 2, 3, "20100605", "010203") +        # 2) with a date +        def gives(y, M, d, date_str, time_str): +            dt = datetime.date(y, M, d) +            self.assertEqual(nntplib._unparse_datetime(dt), +                             (date_str, time_str)) +            self.assertEqual(nntplib._unparse_datetime(dt, False), +                             (date_str, time_str)) +        gives(1999, 6, 23, "19990623", "000000") +        gives(2000, 6, 23, "20000623", "000000") +        gives(2010, 6, 5, "20100605", "000000") + +    def test_unparse_datetime_legacy(self): +        # Test legacy mode (RFC 977) +        # 1) with a datetime +        def gives(y, M, d, h, m, s, date_str, time_str): +            dt = datetime.datetime(y, M, d, h, m, s) +            self.assertEqual(nntplib._unparse_datetime(dt, True), +                             (date_str, time_str)) +        gives(1999, 6, 23, 13, 56, 24, "990623", "135624") +        gives(2000, 6, 23, 13, 56, 24, "000623", "135624") +        gives(2010, 6, 5, 1, 2, 3, "100605", "010203") +        # 2) with a date +        def gives(y, M, d, date_str, time_str): +            dt = datetime.date(y, M, d) +            self.assertEqual(nntplib._unparse_datetime(dt, True), +                             (date_str, time_str)) +        gives(1999, 6, 23, "990623", "000000") +        gives(2000, 6, 23, "000623", "000000") +        gives(2010, 6, 5, "100605", "000000") + + +def test_main(): +    support.run_unittest(MiscTests, NNTPv1Tests, NNTPv2Tests, +                         NetworkedNNTPTests +                         ) + + +if __name__ == "__main__": +    test_main() | 
