summaryrefslogtreecommitdiff
path: root/test/query.py
blob: 946180618f9c7437bb0d825e27b4cdaaa2a1c664 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
from testbase import PersistTest
import testbase
import unittest, sys, datetime

import sqlalchemy.databases.sqlite as sqllite

import tables
db = testbase.db
#db.echo='debug'
from sqlalchemy import *
from sqlalchemy.engine import ResultProxy, RowProxy

class QueryTest(PersistTest):
    
    def setUpAll(self):
        global users
        users = Table('query_users', db,
            Column('user_id', INT, primary_key = True),
            Column('user_name', VARCHAR(20)),
            redefine = True
        )
        users.create()
    
    def setUp(self):
        self.users = users
    def tearDown(self):
        self.users.delete().execute()
    
    def tearDownAll(self):
        global users
        users.drop()
        
    def testinsert(self):
        self.users.insert().execute(user_id = 7, user_name = 'jack')
        print repr(self.users.select().execute().fetchall())
        
    def testupdate(self):

        self.users.insert().execute(user_id = 7, user_name = 'jack')
        print repr(self.users.select().execute().fetchall())

        self.users.update(self.users.c.user_id == 7).execute(user_name = 'fred')
        print repr(self.users.select().execute().fetchall())

    def testrowiteration(self):
        self.users.insert().execute(user_id = 7, user_name = 'jack')
        self.users.insert().execute(user_id = 8, user_name = 'ed')
        self.users.insert().execute(user_id = 9, user_name = 'fred')
        r = self.users.select().execute()
        l = []
        for row in r:
            l.append(row)
        self.assert_(len(l) == 3)
        
    def testpassiveoverride(self):
        """primarily for postgres, tests that when we get a primary key column back 
        from reflecting a table which has a default value on it, we pre-execute
        that PassiveDefault upon insert, even though PassiveDefault says 
        "let the database execute this", because in postgres we must have all the primary
        key values in memory before insert; otherwise we cant locate the just inserted row."""
        if db.engine.name != 'postgres':
            return
        try:
            db.execute("""
             CREATE TABLE speedy_users
             (
                 speedy_user_id   SERIAL     PRIMARY KEY,
            
                 user_name        VARCHAR    NOT NULL,
                 user_password    VARCHAR    NOT NULL
             );
            """, None)
            
            t = Table("speedy_users", db, autoload=True)
            t.insert().execute(user_name='user', user_password='lala')
            l = t.select().execute().fetchall()
            print l
            self.assert_(l == [(1, 'user', 'lala')])
        finally:
            db.execute("drop table speedy_users", None)

    def testschema(self):
        if not db.engine.__module__.endswith('postgres'):
            return 
            
        test_table = Table('my_table', db,
                    Column('id', Integer, primary_key=True),
                    Column('data', String(20), nullable=False),
                    schema='alt_schema'
                 )
        test_table.create()
        try:
            # plain insert
            test_table.insert().execute(data='test')

            # try with a PassiveDefault
            test_table.deregister()
            test_table = Table('my_table', db, autoload=True, redefine=True, schema='alt_schema')
            test_table.insert().execute(data='test')

        finally:
            test_table.drop()

        
    def testdelete(self):
        c = db.connection()

        self.users.insert().execute(user_id = 7, user_name = 'jack')
        self.users.insert().execute(user_id = 8, user_name = 'fred')
        print repr(self.users.select().execute().fetchall())

        self.users.delete(self.users.c.user_name == 'fred').execute()
        
        print repr(self.users.select().execute().fetchall())
        
    def testtransaction(self):
        def dostuff():
            self.users.insert().execute(user_id = 7, user_name = 'john')
            self.users.insert().execute(user_id = 8, user_name = 'jack')
        
        db.transaction(dostuff)
        print repr(self.users.select().execute().fetchall())    

    def testselectlimit(self):
        self.users.insert().execute(user_id=1, user_name='john')
        self.users.insert().execute(user_id=2, user_name='jack')
        self.users.insert().execute(user_id=3, user_name='ed')
        self.users.insert().execute(user_id=4, user_name='wendy')
        self.users.insert().execute(user_id=5, user_name='laura')
        self.users.insert().execute(user_id=6, user_name='ralph')
        self.users.insert().execute(user_id=7, user_name='fido')
        r = self.users.select(limit=3, order_by=[self.users.c.user_id]).execute().fetchall()
        self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r))
        r = self.users.select(limit=3, offset=2, order_by=[self.users.c.user_id]).execute().fetchall()
        self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')])
        r = self.users.select(offset=5, order_by=[self.users.c.user_id]).execute().fetchall()
        self.assert_(r==[(6, 'ralph'), (7, 'fido')])
      
  
    def test_column_accessor(self):
        self.users.insert().execute(user_id=1, user_name='john')
        self.users.insert().execute(user_id=2, user_name='jack')
        r = self.users.select(self.users.c.user_id==2).execute().fetchone()
        self.assert_(r.user_id == r['user_id'] == r[self.users.c.user_id] == 2)
        self.assert_(r.user_name == r['user_name'] == r[self.users.c.user_name] == 'jack')

    def test_keys(self):
        self.users.insert().execute(user_id=1, user_name='foo')
        r = self.users.select().execute().fetchone()
        self.assertEqual(r.keys(), ['user_id', 'user_name'])

    def test_items(self):
        self.users.insert().execute(user_id=1, user_name='foo')
        r = self.users.select().execute().fetchone()
        self.assertEqual(r.items(), [('user_id', 1), ('user_name', 'foo')])

    def test_len(self):
        self.users.insert().execute(user_id=1, user_name='foo')
        r = self.users.select().execute().fetchone()
        self.assertEqual(len(r), 2)
        r = db.execute('select user_name, user_id from query_users', {}).fetchone()
        self.assertEqual(len(r), 2)
        r = db.execute('select user_name from query_users', {}).fetchone()
        self.assertEqual(len(r), 1)
        
    def test_column_order_with_simple_query(self):
        # should return values in column definition order
        self.users.insert().execute(user_id=1, user_name='foo')
        r = self.users.select(self.users.c.user_id==1).execute().fetchone()
        self.assertEqual(r[0], 1)
        self.assertEqual(r[1], 'foo')
        self.assertEqual(r.keys(), ['user_id', 'user_name'])
        self.assertEqual(r.values(), [1, 'foo'])
        
    def test_column_order_with_text_query(self):
        # should return values in query order
        self.users.insert().execute(user_id=1, user_name='foo')
        r = db.execute('select user_name, user_id from query_users', {}).fetchone()
        self.assertEqual(r[0], 'foo')
        self.assertEqual(r[1], 1)
        self.assertEqual(r.keys(), ['user_name', 'user_id'])
        self.assertEqual(r.values(), ['foo', 1])
        
    def test_column_accessor_shadow(self):
        if db.engine.__module__.endswith('oracle'):
            return

        shadowed = Table('test_shadowed', db,
                         Column('shadow_id', INT, primary_key = True),
                         Column('shadow_name', VARCHAR(20)),
                         Column('parent', VARCHAR(20)),
                         Column('row', VARCHAR(40)),
                         Column('__parent', VARCHAR(20)),
                         Column('__row', VARCHAR(20)),
            redefine = True
        )
        shadowed.create()
        try:
            shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow', __parent='Hidden parent', __row='Hidden row')
            r = shadowed.select(shadowed.c.shadow_id==1).execute().fetchone()
            self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1)
            self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow')
            self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light')
            self.assert_(r.row == r['row'] == r[shadowed.c.row] == 'Without light there is no shadow')
            self.assert_(r['__parent'] == 'Hidden parent')
            self.assert_(r['__row'] == 'Hidden row')
            try:
                print r.__parent, r.__row
                self.fail('Should not allow access to private attributes')
            except AttributeError:
                pass # expected
        finally:
            shadowed.drop()
        
if __name__ == "__main__":
    testbase.main()