summaryrefslogtreecommitdiff
path: root/tests/test_pipeline.py
blob: 56eed4a9c12b7ae691456803dcfa68f39d8c8209 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import unittest

from sqlparse.filters import ColumnsSelect
from sqlparse.lexer import tokenize
from sqlparse.pipeline import Pipeline

class Test(unittest.TestCase):

    def setUp(self):
        self.pipe = Pipeline()
        self.pipe.append(tokenize)
        self.pipe.append(ColumnsSelect())

    def test_1(self):
        sql = """
        -- type: script
        -- return: integer

        INCLUDE "Direntry.make.sql";

        INSERT INTO directories(inode)
        VALUES(:inode)
        LIMIT 1"""
        self.assertEqual([], self.pipe(sql))

    def test_2(self):
        sql = """
        SELECT child_entry,asdf AS inode, creation
        FROM links
        WHERE parent_dir == :parent_dir AND name == :name
        LIMIT 1"""
        self.assertEqual([u'child_entry', u'inode', u'creation'],
                         self.pipe(sql))

    def test_3(self):
        sql = """
        SELECT
        0 AS st_dev,
        0 AS st_uid,
        0 AS st_gid,

        dir_entries.type         AS st_mode,
        dir_entries.inode        AS st_ino,
        COUNT(links.child_entry) AS st_nlink,

        :creation                AS st_ctime,
        dir_entries.access       AS st_atime,
        dir_entries.modification AS st_mtime,
        --    :creation                                                AS st_ctime,
        --    CAST(STRFTIME('%s',dir_entries.access)       AS INTEGER) AS st_atime,
        --    CAST(STRFTIME('%s',dir_entries.modification) AS INTEGER) AS st_mtime,

        COALESCE(files.size,0) AS st_size, -- Python-FUSE
        COALESCE(files.size,0) AS size     -- PyFilesystem

        FROM dir_entries
        LEFT JOIN files
        ON dir_entries.inode == files.inode
        LEFT JOIN links
        ON dir_entries.inode == links.child_entry

        WHERE dir_entries.inode == :inode

        GROUP BY dir_entries.inode
        LIMIT 1"""
        self.assertEqual([u'st_dev', u'st_uid', u'st_gid', u'st_mode',
                          u'st_ino', u'st_nlink', u'st_ctime',
                          u'st_atime', u'st_mtime', u'st_size', u'size'],
                         self.pipe(sql))