summaryrefslogtreecommitdiff
path: root/test/dialect/test_mxodbc.py
blob: 938d457fbb8b63ebdc95745ec7dfe3a98b5c8465 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from sqlalchemy import *
from sqlalchemy.test.testing import eq_, TestBase
from sqlalchemy.test import engines

# TODO: we should probably build mock bases for
# these to share with test_reconnect, test_parseconnect
class MockDBAPI(object):
    paramstyle = 'qmark'
    def __init__(self):
        self.log = []
    def connect(self, *args, **kwargs):
        return MockConnection(self)
    
class MockConnection(object):
    def __init__(self, parent):
        self.parent = parent
    def cursor(self):
        return MockCursor(self)
    def close(self):
        pass
    def rollback(self):
        pass
    def commit(self):
        pass

class MockCursor(object):
    description = None
    rowcount = None
    def __init__(self, parent):
        self.parent = parent
    def execute(self, *args, **kwargs):
        self.parent.parent.log.append('execute')
    def executedirect(self, *args, **kwargs):
        self.parent.parent.log.append('executedirect')
    def close(self):
        pass


class MxODBCTest(TestBase):
    def test_native_odbc_execute(self):
        t1 = Table('t1', MetaData(), Column('c1', Integer))

        dbapi = MockDBAPI()
        engine = engines.testing_engine(
                                'mssql+mxodbc://localhost', 
                                options={'module':dbapi, 
                                        '_initialize':False}
                            )
        conn = engine.connect()
        
        # crud: uses execute
        conn.execute(t1.insert().values(c1='foo'))
        conn.execute(t1.delete().where(t1.c.c1=='foo'))
        conn.execute(t1.update().where(t1.c.c1=='foo').values(c1='bar'))
        
        # select: uses executedirect
        conn.execute(t1.select())
        
        # manual flagging
        conn.execution_options(native_odbc_execute=True).execute(t1.select())
        conn.execution_options(native_odbc_execute=False).execute(t1.insert().values(c1='foo'))
        
        eq_(
            dbapi.log,
            ['execute', 'execute', 'execute', 
                'executedirect', 'execute', 'executedirect']
        )