''' Created on 13/02/2012 @author: piranna ''' from unittest import main, TestCase from sqlparse.filters import IncludeStatement, Tokens2Unicode from sqlparse.lexer import tokenize from sqlparse.filters import compact from sqlparse.functions import getcolumns, getlimit, IsType from tests.utils import FILES_DIR class Test_IncludeStatement(TestCase): sql = """-- type: script -- return: integer INCLUDE "_Make_DirEntry.sql"; INSERT INTO directories(inode) VALUES(:inode) LIMIT 1""" def test_includeStatement(self): stream = tokenize(self.sql) includeStatement = IncludeStatement(FILES_DIR, raiseexceptions=True) stream = includeStatement.process(None, stream) stream = compact(stream) result = Tokens2Unicode(stream) self.assertEqual( result, ( 'INSERT INTO dir_entries(type)VALUES(:type);INSERT INTO ' 'directories(inode)VALUES(:inode)LIMIT 1')) class Test_SQL(TestCase): sql = """-- type: script -- return: integer INSERT INTO directories(inode) VALUES(:inode) LIMIT 1""" sql2 = """SELECT child_entry,asdf AS inode, creation FROM links WHERE parent_dir == :parent_dir AND name == :name LIMIT 1""" sql3 = """SELECT 0 AS st_dev, 0 AS st_uid, 0 AS st_gid, dir_entries.type AS st_mode, dir_entries.inode AS st_ino, COUNT(links.child_entry) AS st_nlink, :creation AS st_ctime, dir_entries.access AS st_atime, dir_entries.modification AS st_mtime, -- :creation AS st_ctime, -- CAST(STRFTIME('%s',dir_entries.access) AS INTEGER) AS st_atime, -- CAST(STRFTIME('%s',dir_entries.modification) AS INTEGER) AS st_mtime, COALESCE(files.size,0) AS st_size, -- Python-FUSE COALESCE(files.size,0) AS size -- PyFilesystem FROM dir_entries LEFT JOIN files ON dir_entries.inode == files.inode LEFT JOIN links ON dir_entries.inode == links.child_entry WHERE dir_entries.inode == :inode GROUP BY dir_entries.inode LIMIT 1""" class Test_Compact(Test_SQL): def test_compact1(self): stream = compact(tokenize(self.sql)) result = Tokens2Unicode(stream) self.assertEqual(result, 'INSERT INTO directories(inode)VALUES(:inode)LIMIT 1') def test_compact2(self): stream = tokenize(self.sql2) result = compact(stream) self.assertEqual( Tokens2Unicode(result), 'SELECT child_entry,asdf AS inode,creation FROM links WHERE ' 'parent_dir==:parent_dir AND name==:name LIMIT 1') def test_compact3(self): stream = tokenize(self.sql3) result = compact(stream) self.assertEqual( Tokens2Unicode(result), 'SELECT 0 AS st_dev,0 AS st_uid,0 AS st_gid,dir_entries.type AS ' 'st_mode,dir_entries.inode AS st_ino,COUNT(links.child_entry)AS ' 'st_nlink,:creation AS st_ctime,dir_entries.access AS st_atime,' 'dir_entries.modification AS st_mtime,COALESCE(files.size,0)AS ' 'st_size,COALESCE(files.size,0)AS size FROM dir_entries LEFT JOIN' ' files ON dir_entries.inode==files.inode LEFT JOIN links ON ' 'dir_entries.inode==links.child_entry WHERE dir_entries.inode==' ':inode GROUP BY dir_entries.inode LIMIT 1') class Test_GetColumns(Test_SQL): def test_getcolumns1(self): columns = getcolumns(tokenize(self.sql)) self.assertEqual(columns, []) def test_getcolumns2(self): columns = getcolumns(tokenize(self.sql2)) self.assertEqual(columns, ['child_entry', 'inode', 'creation']) def test_getcolumns3(self): columns = getcolumns(tokenize(self.sql3)) self.assertEqual(columns, ['st_dev', 'st_uid', 'st_gid', 'st_mode', 'st_ino', 'st_nlink', 'st_ctime', 'st_atime', 'st_mtime', 'st_size', 'size']) class Test_GetLimit(Test_SQL): def test_getlimit1(self): limit = getlimit(tokenize(self.sql)) self.assertEqual(limit, 1) def test_getlimit2(self): limit = getlimit(tokenize(self.sql2)) self.assertEqual(limit, 1) def test_getlimit3(self): limit = getlimit(tokenize(self.sql3)) self.assertEqual(limit, 1) class Test_IsType(Test_SQL): def test_istype2(self): stream = tokenize(self.sql2) self.assertTrue(IsType('SELECT')(stream)) stream = tokenize(self.sql2) self.assertFalse(IsType('INSERT')(stream)) if __name__ == "__main__": main()