summaryrefslogtreecommitdiff
path: root/tests/test_callbacks.py
blob: c47ad84c56a847e8a6be73af394f6cbc177bb9af (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
from datetime import timedelta

from rq import Queue, Worker
from rq.job import UNEVALUATED, Job, JobStatus
from rq.worker import SimpleWorker
from tests import RQTestCase
from tests.fixtures import div_by_zero, erroneous_callback, save_exception, save_result, say_hello


class QueueCallbackTestCase(RQTestCase):
    def test_enqueue_with_success_callback(self):
        """Test enqueue* methods with on_success"""
        queue = Queue(connection=self.testconn)

        # Only functions and builtins are supported as callback
        with self.assertRaises(ValueError):
            queue.enqueue(say_hello, on_success=Job.fetch)

        job = queue.enqueue(say_hello, on_success=print)

        job = Job.fetch(id=job.id, connection=self.testconn)
        self.assertEqual(job.success_callback, print)

        job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_success=print)

        job = Job.fetch(id=job.id, connection=self.testconn)
        self.assertEqual(job.success_callback, print)

    def test_enqueue_with_failure_callback(self):
        """queue.enqueue* methods with on_failure is persisted correctly"""
        queue = Queue(connection=self.testconn)

        # Only functions and builtins are supported as callback
        with self.assertRaises(ValueError):
            queue.enqueue(say_hello, on_failure=Job.fetch)

        job = queue.enqueue(say_hello, on_failure=print)

        job = Job.fetch(id=job.id, connection=self.testconn)
        self.assertEqual(job.failure_callback, print)

        job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_failure=print)

        job = Job.fetch(id=job.id, connection=self.testconn)
        self.assertEqual(job.failure_callback, print)


class SyncJobCallback(RQTestCase):
    def test_success_callback(self):
        """Test success callback is executed only when job is successful"""
        queue = Queue(is_async=False)

        job = queue.enqueue(say_hello, on_success=save_result)
        self.assertEqual(job.get_status(), JobStatus.FINISHED)
        self.assertEqual(self.testconn.get('success_callback:%s' % job.id).decode(), job.result)

        job = queue.enqueue(div_by_zero, on_success=save_result)
        self.assertEqual(job.get_status(), JobStatus.FAILED)
        self.assertFalse(self.testconn.exists('success_callback:%s' % job.id))

    def test_failure_callback(self):
        """queue.enqueue* methods with on_failure is persisted correctly"""
        queue = Queue(is_async=False)

        job = queue.enqueue(div_by_zero, on_failure=save_exception)
        self.assertEqual(job.get_status(), JobStatus.FAILED)
        self.assertIn('div_by_zero', self.testconn.get('failure_callback:%s' % job.id).decode())

        job = queue.enqueue(div_by_zero, on_success=save_result)
        self.assertEqual(job.get_status(), JobStatus.FAILED)
        self.assertFalse(self.testconn.exists('failure_callback:%s' % job.id))


class WorkerCallbackTestCase(RQTestCase):
    def test_success_callback(self):
        """Test success callback is executed only when job is successful"""
        queue = Queue(connection=self.testconn)
        worker = SimpleWorker([queue])

        job = queue.enqueue(say_hello, on_success=save_result)

        # Callback is executed when job is successfully executed
        worker.work(burst=True)
        self.assertEqual(job.get_status(), JobStatus.FINISHED)
        self.assertEqual(self.testconn.get('success_callback:%s' % job.id).decode(), job.return_value())

        job = queue.enqueue(div_by_zero, on_success=save_result)
        worker.work(burst=True)
        self.assertEqual(job.get_status(), JobStatus.FAILED)
        self.assertFalse(self.testconn.exists('success_callback:%s' % job.id))

    def test_erroneous_success_callback(self):
        """Test exception handling when executing success callback"""
        queue = Queue(connection=self.testconn)
        worker = Worker([queue])

        # If success_callback raises an error, job will is considered as failed
        job = queue.enqueue(say_hello, on_success=erroneous_callback)
        worker.work(burst=True)
        self.assertEqual(job.get_status(), JobStatus.FAILED)

    def test_failure_callback(self):
        """Test failure callback is executed only when job a fails"""
        queue = Queue(connection=self.testconn)
        worker = SimpleWorker([queue])

        job = queue.enqueue(div_by_zero, on_failure=save_exception)

        # Callback is executed when job is successfully executed
        worker.work(burst=True)
        self.assertEqual(job.get_status(), JobStatus.FAILED)
        job.refresh()
        print(job.exc_info)
        self.assertIn('div_by_zero', self.testconn.get('failure_callback:%s' % job.id).decode())

        job = queue.enqueue(div_by_zero, on_success=save_result)
        worker.work(burst=True)
        self.assertEqual(job.get_status(), JobStatus.FAILED)
        self.assertFalse(self.testconn.exists('failure_callback:%s' % job.id))

        # TODO: add test case for error while executing failure callback


class JobCallbackTestCase(RQTestCase):
    def test_job_creation_with_success_callback(self):
        """Ensure callbacks are created and persisted properly"""
        job = Job.create(say_hello)
        self.assertIsNone(job._success_callback_name)
        # _success_callback starts with UNEVALUATED
        self.assertEqual(job._success_callback, UNEVALUATED)
        self.assertEqual(job.success_callback, None)
        # _success_callback becomes `None` after `job.success_callback` is called if there's no success callback
        self.assertEqual(job._success_callback, None)

        # job.success_callback is assigned properly
        job = Job.create(say_hello, on_success=print)
        self.assertIsNotNone(job._success_callback_name)
        self.assertEqual(job.success_callback, print)
        job.save()

        job = Job.fetch(id=job.id, connection=self.testconn)
        self.assertEqual(job.success_callback, print)

    def test_job_creation_with_failure_callback(self):
        """Ensure failure callbacks are persisted properly"""
        job = Job.create(say_hello)
        self.assertIsNone(job._failure_callback_name)
        # _failure_callback starts with UNEVALUATED
        self.assertEqual(job._failure_callback, UNEVALUATED)
        self.assertEqual(job.failure_callback, None)
        # _failure_callback becomes `None` after `job.failure_callback` is called if there's no failure callback
        self.assertEqual(job._failure_callback, None)

        # job.failure_callback is assigned properly
        job = Job.create(say_hello, on_failure=print)
        self.assertIsNotNone(job._failure_callback_name)
        self.assertEqual(job.failure_callback, print)
        job.save()

        job = Job.fetch(id=job.id, connection=self.testconn)
        self.assertEqual(job.failure_callback, print)