summaryrefslogtreecommitdiff
path: root/tests/functional-tests/store/test_sqlite_batch_misused.py
blob: 6f6cfd0a26416dc5f858984f217d522625eb8044 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/python
#
# Copyright (C) 2010, Nokia <ivan.frade@nokia.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
#
"""
Test the query while running BatchSparqlUpdate at the same time. This was raising
some SQLITE_MISUSED errors before.
"""
import os
import dbus
from gi.repository import GObject
from dbus.mainloop.glib import DBusGMainLoop

from common.utils import configuration as cfg
import unittest as ut
from common.utils.storetest import CommonTrackerStoreTest as CommonTrackerStoreTest

# Number of instances per batch
BATCH_SIZE = 3000


class TestSqliteBatchMisused (CommonTrackerStoreTest):

    """
    Send big batchSparqlUpdates and run queries at the same time
    Don't run this script directly, use the bash script "force-sqlite-misused.sh" instead
    to configure properly the environment
    """

    def setUp(self):
        self.main_loop = GObject.MainLoop()
        self.batch_counter = 0

    def test_queries_while_batch_insert(self):
        self.assertTrue(os.path.exists('ttl'))

        for root, dirs, files in os.walk('ttl'):
            for ttl_file in filter(lambda f: f.endswith(".ttl"), files):
                full_path = os.path.abspath(os.path.join(root, ttl_file))
                print full_path

                counter = 0
                current_batch = ""
                for line in open(full_path):
                    if (line.startswith("@prefix")):
                        continue
                    current_batch += line
                    if len(line) > 1 and line[:-1].endswith('.'):
                        counter += 1

                    if counter == BATCH_SIZE:
                        query = "INSERT {" + current_batch + "}"
                        self.tracker.get_tracker_iface(
                        ).BatchSparqlUpdate(query,
                                            timeout=20000,
                                            reply_handler=self.batch_success_cb,
                                            error_handler=self.batch_failed_cb)
                        self.run_a_query()
                        counter = 0
                        current_batch = ""
                        self.batch_counter += 1

        GObject.timeout_add_seconds(2, self.run_a_query)
        # Safeguard of 60 seconds. The last reply should quit the loop
        GObject.timeout_add_seconds(60, self.timeout_cb)
        self.main_loop.run()

    def run_a_query(self):
        QUERY = "SELECT ?u ?title WHERE { ?u a nie:InformationElement; nie:title ?title. }"
        self.tracker.get_tracker_iface().SparqlQuery(QUERY, timeout=20000,
                                                     reply_handler=self.reply_cb,
                                                     error_handler=self.error_handler)
        return True

    def reply_cb(self, results):
        print "Query replied correctly"

    def error_handler(self, error_msg):
        print "Query failed", error_msg

    def batch_success_cb(self):
        self.batch_counter -= 1
        if (self.batch_counter == 0):
            print "Last batch was success"
            self.timeout_cb()
        print "Success processing a batch"

    def batch_failed_cb(self, error):
        print "Failed processing a batch"

    def timeout_cb(self):
        print "Forced timeout after 60 sec."
        self.main_loop.quit()
        return False

if __name__ == "__main__":
    ut.main()