summaryrefslogtreecommitdiff
path: root/t/unit/asynchronous
diff options
context:
space:
mode:
authorThomas Achtemichuk <tom@tomchuk.com>2018-03-19 13:28:43 -0400
committerOmer Katz <omer.drow@gmail.com>2018-03-19 19:28:43 +0200
commit75695205f6e7af8e7e9178e010debc3871b19106 (patch)
treea45acc3ae34c048d756ea42587b6a0c42d0bd5dc /t/unit/asynchronous
parentdba85e2d9515b9ce202bd30e8690131aa055e6bf (diff)
downloadkombu-75695205f6e7af8e7e9178e010debc3871b19106.tar.gz
Rename `async` keyword to `asynchronous` (#839)
* Rename `async` keyword to `asynchronous` * Fixes #742 * Resolves "DeprecationWarning: 'async' and 'await' will become reserved keywords in Python 3.7" * Address PR feedback * Update appveyor config * Rename docs and tests
Diffstat (limited to 't/unit/asynchronous')
-rw-r--r--t/unit/asynchronous/__init__.py0
-rw-r--r--t/unit/asynchronous/aws/__init__.py0
-rw-r--r--t/unit/asynchronous/aws/case.py14
-rw-r--r--t/unit/asynchronous/aws/sqs/__init__.py0
-rw-r--r--t/unit/asynchronous/aws/sqs/test_connection.py324
-rw-r--r--t/unit/asynchronous/aws/sqs/test_queue.py207
-rw-r--r--t/unit/asynchronous/aws/test_aws.py16
-rw-r--r--t/unit/asynchronous/aws/test_connection.py263
-rw-r--r--t/unit/asynchronous/http/__init__.py0
-rw-r--r--t/unit/asynchronous/http/test_curl.py134
-rw-r--r--t/unit/asynchronous/http/test_http.py157
-rw-r--r--t/unit/asynchronous/test_hub.py529
-rw-r--r--t/unit/asynchronous/test_semaphore.py43
-rw-r--r--t/unit/asynchronous/test_timer.py158
14 files changed, 1845 insertions, 0 deletions
diff --git a/t/unit/asynchronous/__init__.py b/t/unit/asynchronous/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/t/unit/asynchronous/__init__.py
diff --git a/t/unit/asynchronous/aws/__init__.py b/t/unit/asynchronous/aws/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/t/unit/asynchronous/aws/__init__.py
diff --git a/t/unit/asynchronous/aws/case.py b/t/unit/asynchronous/aws/case.py
new file mode 100644
index 00000000..70d9565f
--- /dev/null
+++ b/t/unit/asynchronous/aws/case.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+
+import pytest
+
+from case import skip
+
+
+@skip.if_pypy()
+@skip.unless_module('boto3')
+@skip.unless_module('pycurl')
+@pytest.mark.usefixtures('hub')
+class AWSCase(object):
+ pass
diff --git a/t/unit/asynchronous/aws/sqs/__init__.py b/t/unit/asynchronous/aws/sqs/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/t/unit/asynchronous/aws/sqs/__init__.py
diff --git a/t/unit/asynchronous/aws/sqs/test_connection.py b/t/unit/asynchronous/aws/sqs/test_connection.py
new file mode 100644
index 00000000..ca1a8868
--- /dev/null
+++ b/t/unit/asynchronous/aws/sqs/test_connection.py
@@ -0,0 +1,324 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+
+from case import Mock, MagicMock
+
+from kombu.asynchronous.aws.sqs.connection import (
+ AsyncSQSConnection
+)
+from kombu.asynchronous.aws.ext import boto3
+from kombu.asynchronous.aws.sqs.message import AsyncMessage
+from kombu.asynchronous.aws.sqs.queue import AsyncQueue
+from kombu.utils.uuid import uuid
+
+from t.mocks import PromiseMock
+
+from ..case import AWSCase
+
+
+class test_AsyncSQSConnection(AWSCase):
+
+ def setup(self):
+ session = boto3.session.Session(
+ aws_access_key_id='AAA',
+ aws_secret_access_key='AAAA',
+ region_name='us-west-2',
+ )
+ sqs_client = session.client('sqs')
+ self.x = AsyncSQSConnection(sqs_client, 'ak', 'sk', http_client=Mock())
+ self.x.get_object = Mock(name='X.get_object')
+ self.x.get_status = Mock(name='X.get_status')
+ self.x.get_list = Mock(name='X.get_list')
+ self.callback = PromiseMock(name='callback')
+
+ sqs_client.get_queue_url = MagicMock(return_value={
+ 'QueueUrl': 'http://aws.com'
+ })
+
+ def test_create_queue(self):
+ self.x.create_queue('foo', callback=self.callback)
+ self.x.get_object.assert_called_with(
+ 'CreateQueue', {'QueueName': 'foo'},
+ callback=self.callback,
+ )
+
+ def test_create_queue__with_visibility_timeout(self):
+ self.x.create_queue(
+ 'foo', visibility_timeout=33, callback=self.callback,
+ )
+ self.x.get_object.assert_called_with(
+ 'CreateQueue', {
+ 'QueueName': 'foo',
+ 'DefaultVisibilityTimeout': '33'
+ },
+ callback=self.callback
+ )
+
+ def test_delete_queue(self):
+ queue = Mock(name='queue')
+ self.x.delete_queue(queue, callback=self.callback)
+ self.x.get_status.assert_called_with(
+ 'DeleteQueue', None, queue.id, callback=self.callback,
+ )
+
+ def test_get_queue_attributes(self):
+ queue = Mock(name='queue')
+ self.x.get_queue_attributes(
+ queue, attribute='QueueSize', callback=self.callback,
+ )
+ self.x.get_object.assert_called_with(
+ 'GetQueueAttributes', {'AttributeName': 'QueueSize'},
+ queue.id, callback=self.callback,
+ )
+
+ def test_set_queue_attribute(self):
+ queue = Mock(name='queue')
+ self.x.set_queue_attribute(
+ queue, 'Expires', '3600', callback=self.callback,
+ )
+ self.x.get_status.assert_called_with(
+ 'SetQueueAttribute', {
+ 'Attribute.Name': 'Expires',
+ 'Attribute.Value': '3600',
+ },
+ queue.id, callback=self.callback,
+ )
+
+ def test_receive_message(self):
+ queue = Mock(name='queue')
+ self.x.receive_message(queue, 4, callback=self.callback)
+ self.x.get_list.assert_called_with(
+ 'ReceiveMessage', {'MaxNumberOfMessages': 4},
+ [('Message', AsyncMessage)],
+ 'http://aws.com', callback=self.callback,
+ parent=queue,
+ )
+
+ def test_receive_message__with_visibility_timeout(self):
+ queue = Mock(name='queue')
+ self.x.receive_message(queue, 4, 3666, callback=self.callback)
+ self.x.get_list.assert_called_with(
+ 'ReceiveMessage', {
+ 'MaxNumberOfMessages': 4,
+ 'VisibilityTimeout': 3666,
+ },
+ [('Message', AsyncMessage)],
+ 'http://aws.com', callback=self.callback,
+ parent=queue,
+ )
+
+ def test_receive_message__with_wait_time_seconds(self):
+ queue = Mock(name='queue')
+ self.x.receive_message(
+ queue, 4, wait_time_seconds=303, callback=self.callback,
+ )
+ self.x.get_list.assert_called_with(
+ 'ReceiveMessage', {
+ 'MaxNumberOfMessages': 4,
+ 'WaitTimeSeconds': 303,
+ },
+ [('Message', AsyncMessage)],
+ 'http://aws.com', callback=self.callback,
+ parent=queue,
+ )
+
+ def test_receive_message__with_attributes(self):
+ queue = Mock(name='queue')
+ self.x.receive_message(
+ queue, 4, attributes=['foo', 'bar'], callback=self.callback,
+ )
+ self.x.get_list.assert_called_with(
+ 'ReceiveMessage', {
+ 'AttributeName.1': 'foo',
+ 'AttributeName.2': 'bar',
+ 'MaxNumberOfMessages': 4,
+ },
+ [('Message', AsyncMessage)],
+ 'http://aws.com', callback=self.callback,
+ parent=queue,
+ )
+
+ def MockMessage(self, id=None, receipt_handle=None, body=None):
+ m = Mock(name='message')
+ m.id = id or uuid()
+ m.receipt_handle = receipt_handle or uuid()
+ m._body = body
+
+ def _get_body():
+ return m._body
+ m.get_body.side_effect = _get_body
+
+ def _set_body(value):
+ m._body = value
+ m.set_body.side_effect = _set_body
+
+ return m
+
+ def test_delete_message(self):
+ queue = Mock(name='queue')
+ message = self.MockMessage()
+ self.x.delete_message(queue, message.receipt_handle,
+ callback=self.callback)
+ self.x.get_status.assert_called_with(
+ 'DeleteMessage', {'ReceiptHandle': message.receipt_handle},
+ queue, callback=self.callback,
+ )
+
+ def test_delete_message_batch(self):
+ queue = Mock(name='queue')
+ messages = [self.MockMessage('1', 'r1'),
+ self.MockMessage('2', 'r2')]
+ self.x.delete_message_batch(queue, messages, callback=self.callback)
+ self.x.get_object.assert_called_with(
+ 'DeleteMessageBatch', {
+ 'DeleteMessageBatchRequestEntry.1.Id': '1',
+ 'DeleteMessageBatchRequestEntry.1.ReceiptHandle': 'r1',
+ 'DeleteMessageBatchRequestEntry.2.Id': '2',
+ 'DeleteMessageBatchRequestEntry.2.ReceiptHandle': 'r2',
+ },
+ queue.id, verb='POST', callback=self.callback,
+ )
+
+ def test_send_message(self):
+ queue = Mock(name='queue')
+ self.x.send_message(queue, 'hello', callback=self.callback)
+ self.x.get_object.assert_called_with(
+ 'SendMessage', {'MessageBody': 'hello'},
+ queue.id, verb='POST', callback=self.callback,
+ )
+
+ def test_send_message__with_delay_seconds(self):
+ queue = Mock(name='queue')
+ self.x.send_message(
+ queue, 'hello', delay_seconds='303', callback=self.callback,
+ )
+ self.x.get_object.assert_called_with(
+ 'SendMessage', {'MessageBody': 'hello', 'DelaySeconds': 303},
+ queue.id, verb='POST', callback=self.callback,
+ )
+
+ def test_send_message_batch(self):
+ queue = Mock(name='queue')
+ messages = [self.MockMessage('1', 'r1', 'A'),
+ self.MockMessage('2', 'r2', 'B')]
+ self.x.send_message_batch(
+ queue, [(m.id, m.get_body(), 303) for m in messages],
+ callback=self.callback
+ )
+ self.x.get_object.assert_called_with(
+ 'SendMessageBatch', {
+ 'SendMessageBatchRequestEntry.1.Id': '1',
+ 'SendMessageBatchRequestEntry.1.MessageBody': 'A',
+ 'SendMessageBatchRequestEntry.1.DelaySeconds': 303,
+ 'SendMessageBatchRequestEntry.2.Id': '2',
+ 'SendMessageBatchRequestEntry.2.MessageBody': 'B',
+ 'SendMessageBatchRequestEntry.2.DelaySeconds': 303,
+ },
+ queue.id, verb='POST', callback=self.callback,
+ )
+
+ def test_change_message_visibility(self):
+ queue = Mock(name='queue')
+ self.x.change_message_visibility(
+ queue, 'rcpt', 33, callback=self.callback,
+ )
+ self.x.get_status.assert_called_with(
+ 'ChangeMessageVisibility', {
+ 'ReceiptHandle': 'rcpt',
+ 'VisibilityTimeout': 33,
+ },
+ queue.id, callback=self.callback,
+ )
+
+ def test_change_message_visibility_batch(self):
+ queue = Mock(name='queue')
+ messages = [
+ (self.MockMessage('1', 'r1'), 303),
+ (self.MockMessage('2', 'r2'), 909),
+ ]
+ self.x.change_message_visibility_batch(
+ queue, messages, callback=self.callback,
+ )
+
+ def preamble(n):
+ return '.'.join(['ChangeMessageVisibilityBatchRequestEntry', n])
+
+ self.x.get_object.assert_called_with(
+ 'ChangeMessageVisibilityBatch', {
+ preamble('1.Id'): '1',
+ preamble('1.ReceiptHandle'): 'r1',
+ preamble('1.VisibilityTimeout'): 303,
+ preamble('2.Id'): '2',
+ preamble('2.ReceiptHandle'): 'r2',
+ preamble('2.VisibilityTimeout'): 909,
+ },
+ queue.id, verb='POST', callback=self.callback,
+ )
+
+ def test_get_all_queues(self):
+ self.x.get_all_queues(callback=self.callback)
+ self.x.get_list.assert_called_with(
+ 'ListQueues', {}, [('QueueUrl', AsyncQueue)],
+ callback=self.callback,
+ )
+
+ def test_get_all_queues__with_prefix(self):
+ self.x.get_all_queues(prefix='kombu.', callback=self.callback)
+ self.x.get_list.assert_called_with(
+ 'ListQueues', {'QueueNamePrefix': 'kombu.'},
+ [('QueueUrl', AsyncQueue)],
+ callback=self.callback,
+ )
+
+ def MockQueue(self, url):
+ q = Mock(name='Queue')
+ q.url = url
+ return q
+
+ def test_get_queue(self):
+ self.x.get_queue('foo', callback=self.callback)
+ self.x.get_list.assert_called()
+ on_ready = self.x.get_list.call_args[1]['callback']
+ queues = [
+ self.MockQueue('/queues/bar'),
+ self.MockQueue('/queues/baz'),
+ self.MockQueue('/queues/foo'),
+ ]
+ on_ready(queues)
+ self.callback.assert_called_with(queues[-1])
+
+ self.x.get_list.assert_called_with(
+ 'ListQueues', {'QueueNamePrefix': 'foo'},
+ [('QueueUrl', AsyncQueue)],
+ callback=on_ready,
+ )
+
+ def test_get_dead_letter_source_queues(self):
+ queue = Mock(name='queue')
+ self.x.get_dead_letter_source_queues(queue, callback=self.callback)
+ self.x.get_list.assert_called_with(
+ 'ListDeadLetterSourceQueues', {'QueueUrl': queue.url},
+ [('QueueUrl', AsyncQueue)], callback=self.callback,
+ )
+
+ def test_add_permission(self):
+ queue = Mock(name='queue')
+ self.x.add_permission(
+ queue, 'label', 'accid', 'action', callback=self.callback,
+ )
+ self.x.get_status.assert_called_with(
+ 'AddPermission', {
+ 'Label': 'label',
+ 'AWSAccountId': 'accid',
+ 'ActionName': 'action',
+ },
+ queue.id, callback=self.callback,
+ )
+
+ def test_remove_permission(self):
+ queue = Mock(name='queue')
+ self.x.remove_permission(queue, 'label', callback=self.callback)
+ self.x.get_status.assert_called_with(
+ 'RemovePermission', {'Label': 'label'}, queue.id,
+ callback=self.callback,
+ )
diff --git a/t/unit/asynchronous/aws/sqs/test_queue.py b/t/unit/asynchronous/aws/sqs/test_queue.py
new file mode 100644
index 00000000..add591df
--- /dev/null
+++ b/t/unit/asynchronous/aws/sqs/test_queue.py
@@ -0,0 +1,207 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+
+import pytest
+
+from case import Mock
+
+from kombu.asynchronous.aws.sqs.message import AsyncMessage
+from kombu.asynchronous.aws.sqs.queue import AsyncQueue
+
+from t.mocks import PromiseMock
+
+from ..case import AWSCase
+
+
+class test_AsyncQueue(AWSCase):
+
+ def setup(self):
+ self.conn = Mock(name='connection')
+ self.x = AsyncQueue(self.conn, '/url')
+ self.callback = PromiseMock(name='callback')
+
+ def test_message_class(self):
+ assert issubclass(self.x.message_class, AsyncMessage)
+
+ def test_get_attributes(self):
+ self.x.get_attributes(attributes='QueueSize', callback=self.callback)
+ self.x.connection.get_queue_attributes.assert_called_with(
+ self.x, 'QueueSize', self.callback,
+ )
+
+ def test_set_attribute(self):
+ self.x.set_attribute('key', 'value', callback=self.callback)
+ self.x.connection.set_queue_attribute.assert_called_with(
+ self.x, 'key', 'value', self.callback,
+ )
+
+ def test_get_timeout(self):
+ self.x.get_timeout(callback=self.callback)
+ self.x.connection.get_queue_attributes.assert_called()
+ on_ready = self.x.connection.get_queue_attributes.call_args[0][2]
+ self.x.connection.get_queue_attributes.assert_called_with(
+ self.x, 'VisibilityTimeout', on_ready,
+ )
+
+ on_ready({'VisibilityTimeout': '303'})
+ self.callback.assert_called_with(303)
+
+ def test_set_timeout(self):
+ self.x.set_timeout(808, callback=self.callback)
+ self.x.connection.set_queue_attribute.assert_called()
+ on_ready = self.x.connection.set_queue_attribute.call_args[0][3]
+ self.x.connection.set_queue_attribute.assert_called_with(
+ self.x, 'VisibilityTimeout', 808, on_ready,
+ )
+ on_ready(808)
+ self.callback.assert_called_with(808)
+ assert self.x.visibility_timeout == 808
+
+ on_ready(None)
+ assert self.x.visibility_timeout == 808
+
+ def test_add_permission(self):
+ self.x.add_permission(
+ 'label', 'accid', 'action', callback=self.callback,
+ )
+ self.x.connection.add_permission.assert_called_with(
+ self.x, 'label', 'accid', 'action', self.callback,
+ )
+
+ def test_remove_permission(self):
+ self.x.remove_permission('label', callback=self.callback)
+ self.x.connection.remove_permission.assert_called_with(
+ self.x, 'label', self.callback,
+ )
+
+ def test_read(self):
+ self.x.read(visibility_timeout=909, callback=self.callback)
+ self.x.connection.receive_message.assert_called()
+ on_ready = self.x.connection.receive_message.call_args[1]['callback']
+ self.x.connection.receive_message.assert_called_with(
+ self.x, number_messages=1, visibility_timeout=909,
+ attributes=None, wait_time_seconds=None, callback=on_ready,
+ )
+
+ messages = [Mock(name='message1')]
+ on_ready(messages)
+
+ self.callback.assert_called_with(messages[0])
+
+ def MockMessage(self, id, md5):
+ m = Mock(name='Message-{0}'.format(id))
+ m.id = id
+ m.md5 = md5
+ return m
+
+ def test_write(self):
+ message = self.MockMessage('id1', 'digest1')
+ self.x.write(message, delay_seconds=303, callback=self.callback)
+ self.x.connection.send_message.assert_called()
+ on_ready = self.x.connection.send_message.call_args[1]['callback']
+ self.x.connection.send_message.assert_called_with(
+ self.x, message.get_body_encoded(), 303,
+ callback=on_ready,
+ )
+
+ new_message = self.MockMessage('id2', 'digest2')
+ on_ready(new_message)
+ assert message.id == 'id2'
+ assert message.md5 == 'digest2'
+
+ def test_write_batch(self):
+ messages = [('id1', 'A', 0), ('id2', 'B', 303)]
+ self.x.write_batch(messages, callback=self.callback)
+ self.x.connection.send_message_batch.assert_called_with(
+ self.x, messages, callback=self.callback,
+ )
+
+ def test_delete_message(self):
+ message = self.MockMessage('id1', 'digest1')
+ self.x.delete_message(message, callback=self.callback)
+ self.x.connection.delete_message.assert_called_with(
+ self.x, message, self.callback,
+ )
+
+ def test_delete_message_batch(self):
+ messages = [
+ self.MockMessage('id1', 'r1'),
+ self.MockMessage('id2', 'r2'),
+ ]
+ self.x.delete_message_batch(messages, callback=self.callback)
+ self.x.connection.delete_message_batch.assert_called_with(
+ self.x, messages, callback=self.callback,
+ )
+
+ def test_change_message_visibility_batch(self):
+ messages = [
+ (self.MockMessage('id1', 'r1'), 303),
+ (self.MockMessage('id2', 'r2'), 909),
+ ]
+ self.x.change_message_visibility_batch(
+ messages, callback=self.callback,
+ )
+ self.x.connection.change_message_visibility_batch.assert_called_with(
+ self.x, messages, callback=self.callback,
+ )
+
+ def test_delete(self):
+ self.x.delete(callback=self.callback)
+ self.x.connection.delete_queue.assert_called_with(
+ self.x, callback=self.callback,
+ )
+
+ def test_count(self):
+ self.x.count(callback=self.callback)
+ self.x.connection.get_queue_attributes.assert_called()
+ on_ready = self.x.connection.get_queue_attributes.call_args[0][2]
+ self.x.connection.get_queue_attributes.assert_called_with(
+ self.x, 'ApproximateNumberOfMessages', on_ready,
+ )
+
+ on_ready({'ApproximateNumberOfMessages': '909'})
+ self.callback.assert_called_with(909)
+
+ def test_interface__count_slow(self):
+ with pytest.raises(NotImplementedError):
+ self.x.count_slow()
+
+ def test_interface__dump(self):
+ with pytest.raises(NotImplementedError):
+ self.x.dump()
+
+ def test_interface__save_to_file(self):
+ with pytest.raises(NotImplementedError):
+ self.x.save_to_file()
+
+ def test_interface__save_to_filename(self):
+ with pytest.raises(NotImplementedError):
+ self.x.save_to_filename()
+
+ def test_interface__save(self):
+ with pytest.raises(NotImplementedError):
+ self.x.save()
+
+ def test_interface__save_to_s3(self):
+ with pytest.raises(NotImplementedError):
+ self.x.save_to_s3()
+
+ def test_interface__load_from_s3(self):
+ with pytest.raises(NotImplementedError):
+ self.x.load_from_s3()
+
+ def test_interface__load_from_file(self):
+ with pytest.raises(NotImplementedError):
+ self.x.load_from_file()
+
+ def test_interface__load_from_filename(self):
+ with pytest.raises(NotImplementedError):
+ self.x.load_from_filename()
+
+ def test_interface__load(self):
+ with pytest.raises(NotImplementedError):
+ self.x.load()
+
+ def test_interface__clear(self):
+ with pytest.raises(NotImplementedError):
+ self.x.clear()
diff --git a/t/unit/asynchronous/aws/test_aws.py b/t/unit/asynchronous/aws/test_aws.py
new file mode 100644
index 00000000..d0e98b70
--- /dev/null
+++ b/t/unit/asynchronous/aws/test_aws.py
@@ -0,0 +1,16 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+
+from case import Mock
+
+from kombu.asynchronous.aws import connect_sqs
+
+from .case import AWSCase
+
+
+class test_connect_sqs(AWSCase):
+
+ def test_connection(self):
+ x = connect_sqs('AAKI', 'ASAK', http_client=Mock())
+ assert x
+ assert x.sqs_connection
diff --git a/t/unit/asynchronous/aws/test_connection.py b/t/unit/asynchronous/aws/test_connection.py
new file mode 100644
index 00000000..b378c020
--- /dev/null
+++ b/t/unit/asynchronous/aws/test_connection.py
@@ -0,0 +1,263 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+
+from contextlib import contextmanager
+
+from case import Mock
+from vine.abstract import Thenable
+
+from kombu.exceptions import HttpError
+from kombu.five import WhateverIO
+
+from kombu.asynchronous import http
+from kombu.asynchronous.aws.connection import (
+ AsyncHTTPSConnection,
+ AsyncHTTPResponse,
+ AsyncConnection,
+ AsyncAWSQueryConnection,
+)
+from kombu.asynchronous.aws.ext import boto3
+
+from .case import AWSCase
+
+from t.mocks import PromiseMock
+
+try:
+ from urllib.parse import urlparse, parse_qs
+except ImportError:
+ from urlparse import urlparse, parse_qs # noqa
+
+# Not currently working
+VALIDATES_CERT = False
+
+
+def passthrough(*args, **kwargs):
+ m = Mock(*args, **kwargs)
+
+ def side_effect(ret):
+ return ret
+ m.side_effect = side_effect
+ return m
+
+
+class test_AsyncHTTPSConnection(AWSCase):
+
+ def test_http_client(self):
+ x = AsyncHTTPSConnection()
+ assert x.http_client is http.get_client()
+ client = Mock(name='http_client')
+ y = AsyncHTTPSConnection(http_client=client)
+ assert y.http_client is client
+
+ def test_args(self):
+ x = AsyncHTTPSConnection(
+ strict=True, timeout=33.3,
+ )
+ assert x.strict
+ assert x.timeout == 33.3
+
+ def test_request(self):
+ x = AsyncHTTPSConnection('aws.vandelay.com')
+ x.request('PUT', '/importer-exporter')
+ assert x.path == '/importer-exporter'
+ assert x.method == 'PUT'
+
+ def test_request_with_body_buffer(self):
+ x = AsyncHTTPSConnection('aws.vandelay.com')
+ body = Mock(name='body')
+ body.read.return_value = 'Vandelay Industries'
+ x.request('PUT', '/importer-exporter', body)
+ assert x.method == 'PUT'
+ assert x.path == '/importer-exporter'
+ assert x.body == 'Vandelay Industries'
+ body.read.assert_called_with()
+
+ def test_request_with_body_text(self):
+ x = AsyncHTTPSConnection('aws.vandelay.com')
+ x.request('PUT', '/importer-exporter', 'Vandelay Industries')
+ assert x.method == 'PUT'
+ assert x.path == '/importer-exporter'
+ assert x.body == 'Vandelay Industries'
+
+ def test_request_with_headers(self):
+ x = AsyncHTTPSConnection()
+ headers = {'Proxy': 'proxy.vandelay.com'}
+ x.request('PUT', '/importer-exporter', None, headers)
+ assert 'Proxy' in dict(x.headers)
+ assert dict(x.headers)['Proxy'] == 'proxy.vandelay.com'
+
+ def assert_request_created_with(self, url, conn):
+ conn.Request.assert_called_with(
+ url, method=conn.method,
+ headers=http.Headers(conn.headers), body=conn.body,
+ connect_timeout=conn.timeout, request_timeout=conn.timeout,
+ validate_cert=VALIDATES_CERT,
+ )
+
+ def test_getresponse(self):
+ client = Mock(name='client')
+ client.add_request = passthrough(name='client.add_request')
+ x = AsyncHTTPSConnection(http_client=client)
+ x.Response = Mock(name='x.Response')
+ request = x.getresponse()
+ x.http_client.add_request.assert_called_with(request)
+ assert isinstance(request, Thenable)
+ assert isinstance(request.on_ready, Thenable)
+
+ response = Mock(name='Response')
+ request.on_ready(response)
+ x.Response.assert_called_with(response)
+
+ def test_getresponse__real_response(self):
+ client = Mock(name='client')
+ client.add_request = passthrough(name='client.add_request')
+ callback = PromiseMock(name='callback')
+ x = AsyncHTTPSConnection(http_client=client)
+ request = x.getresponse(callback)
+ x.http_client.add_request.assert_called_with(request)
+
+ buf = WhateverIO()
+ buf.write('The quick brown fox jumps')
+
+ headers = http.Headers({'X-Foo': 'Hello', 'X-Bar': 'World'})
+
+ response = http.Response(request, 200, headers, buf)
+ request.on_ready(response)
+ callback.assert_called()
+ wresponse = callback.call_args[0][0]
+
+ assert wresponse.read() == 'The quick brown fox jumps'
+ assert wresponse.status == 200
+ assert wresponse.getheader('X-Foo') == 'Hello'
+ headers_dict = wresponse.getheaders()
+ assert dict(headers_dict) == headers
+ assert wresponse.msg
+ assert repr(wresponse)
+
+ def test_repr(self):
+ assert repr(AsyncHTTPSConnection())
+
+ def test_putrequest(self):
+ x = AsyncHTTPSConnection()
+ x.putrequest('UPLOAD', '/new')
+ assert x.method == 'UPLOAD'
+ assert x.path == '/new'
+
+ def test_putheader(self):
+ x = AsyncHTTPSConnection()
+ x.putheader('X-Foo', 'bar')
+ assert x.headers == [('X-Foo', 'bar')]
+ x.putheader('X-Bar', 'baz')
+ assert x.headers == [
+ ('X-Foo', 'bar'),
+ ('X-Bar', 'baz'),
+ ]
+
+ def test_send(self):
+ x = AsyncHTTPSConnection()
+ x.send('foo')
+ assert x.body == 'foo'
+ x.send('bar')
+ assert x.body == 'foobar'
+
+ def test_interface(self):
+ x = AsyncHTTPSConnection()
+ assert x.set_debuglevel(3) is None
+ assert x.connect() is None
+ assert x.close() is None
+ assert x.endheaders() is None
+
+
+class test_AsyncHTTPResponse(AWSCase):
+
+ def test_with_error(self):
+ r = Mock(name='response')
+ r.error = HttpError(404, 'NotFound')
+ x = AsyncHTTPResponse(r)
+ assert x.reason == 'NotFound'
+
+ r.error = None
+ assert not x.reason
+
+
+class test_AsyncConnection(AWSCase):
+
+ def test_client(self):
+ sqs = Mock(name='sqs')
+ x = AsyncConnection(sqs)
+ assert x._httpclient is http.get_client()
+ client = Mock(name='client')
+ y = AsyncConnection(sqs, http_client=client)
+ assert y._httpclient is client
+
+ def test_get_http_connection(self):
+ sqs = Mock(name='sqs')
+ x = AsyncConnection(sqs)
+ assert isinstance(
+ x.get_http_connection(),
+ AsyncHTTPSConnection,
+ )
+ conn = x.get_http_connection()
+ assert conn.http_client is x._httpclient
+
+
+class test_AsyncAWSQueryConnection(AWSCase):
+
+ def setup(self):
+ session = boto3.session.Session(
+ aws_access_key_id='AAA',
+ aws_secret_access_key='AAAA',
+ region_name='us-west-2',
+ )
+ sqs_client = session.client('sqs')
+ self.x = AsyncAWSQueryConnection(sqs_client,
+ http_client=Mock(name='client'))
+
+ def test_make_request(self):
+ _mexe, self.x._mexe = self.x._mexe, Mock(name='_mexe')
+ Conn = self.x.get_http_connection = Mock(name='get_http_connection')
+ callback = PromiseMock(name='callback')
+ self.x.make_request(
+ 'action', {'foo': 1}, 'https://foo.com/', 'GET', callback=callback,
+ )
+ self.x._mexe.assert_called()
+ request = self.x._mexe.call_args[0][0]
+ parsed = urlparse(request.url)
+ params = parse_qs(parsed.query)
+ assert params['Action'][0] == 'action'
+
+ ret = _mexe(request, callback=callback)
+ assert ret is callback
+ Conn.return_value.request.assert_called()
+ Conn.return_value.getresponse.assert_called_with(
+ callback=callback,
+ )
+
+ def test_make_request__no_action(self):
+ self.x._mexe = Mock(name='_mexe')
+ self.x.get_http_connection = Mock(name='get_http_connection')
+ callback = PromiseMock(name='callback')
+ self.x.make_request(
+ None, {'foo': 1}, 'http://foo.com/', 'GET', callback=callback,
+ )
+ self.x._mexe.assert_called()
+ request = self.x._mexe.call_args[0][0]
+ parsed = urlparse(request.url)
+ params = parse_qs(parsed.query)
+ assert 'Action' not in params
+
+ def Response(self, status, body):
+ r = Mock(name='response')
+ r.status = status
+ r.read.return_value = body
+ return r
+
+ @contextmanager
+ def mock_make_request(self):
+ self.x.make_request = Mock(name='make_request')
+ callback = PromiseMock(name='callback')
+ yield callback
+
+ def assert_make_request_called(self):
+ self.x.make_request.assert_called()
+ return self.x.make_request.call_args[1]['callback']
diff --git a/t/unit/asynchronous/http/__init__.py b/t/unit/asynchronous/http/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/t/unit/asynchronous/http/__init__.py
diff --git a/t/unit/asynchronous/http/test_curl.py b/t/unit/asynchronous/http/test_curl.py
new file mode 100644
index 00000000..553b0ea7
--- /dev/null
+++ b/t/unit/asynchronous/http/test_curl.py
@@ -0,0 +1,134 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+
+import pytest
+
+from case import Mock, call, patch, skip
+
+from kombu.asynchronous.http.curl import READ, WRITE, CurlClient
+
+
+@skip.if_pypy()
+@skip.unless_module('pycurl')
+@pytest.mark.usefixtures('hub')
+class test_CurlClient:
+
+ class Client(CurlClient):
+ Curl = Mock(name='Curl')
+
+ def test_when_pycurl_missing(self, patching):
+ patching('kombu.asynchronous.http.curl.pycurl', None)
+ with pytest.raises(ImportError):
+ self.Client()
+
+ def test_max_clients_set(self):
+ x = self.Client(max_clients=303)
+ assert x.max_clients == 303
+
+ def test_init(self):
+ with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl:
+ x = self.Client()
+ assert x._multi is not None
+ assert x._pending is not None
+ assert x._free_list is not None
+ assert x._fds is not None
+ assert x._socket_action == x._multi.socket_action
+ assert len(x._curls) == x.max_clients
+ assert x._timeout_check_tref
+
+ x._multi.setopt.assert_has_calls([
+ call(_pycurl.M_TIMERFUNCTION, x._set_timeout),
+ call(_pycurl.M_SOCKETFUNCTION, x._handle_socket),
+ ])
+
+ def test_close(self):
+ with patch('kombu.asynchronous.http.curl.pycurl'):
+ x = self.Client()
+ x._timeout_check_tref = Mock(name='timeout_check_tref')
+ x.close()
+ x._timeout_check_tref.cancel.assert_called_with()
+ for _curl in x._curls:
+ _curl.close.assert_called_with()
+ x._multi.close.assert_called_with()
+
+ def test_add_request(self):
+ with patch('kombu.asynchronous.http.curl.pycurl'):
+ x = self.Client()
+ x._process_queue = Mock(name='_process_queue')
+ x._set_timeout = Mock(name='_set_timeout')
+ request = Mock(name='request')
+ x.add_request(request)
+ assert request in x._pending
+ x._process_queue.assert_called_with()
+ x._set_timeout.assert_called_with(0)
+
+ def test_handle_socket(self):
+ with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl:
+ hub = Mock(name='hub')
+ x = self.Client(hub)
+ fd = Mock(name='fd1')
+
+ # POLL_REMOVE
+ x._fds[fd] = fd
+ x._handle_socket(_pycurl.POLL_REMOVE, fd, x._multi, None, _pycurl)
+ hub.remove.assert_called_with(fd)
+ assert fd not in x._fds
+ x._handle_socket(_pycurl.POLL_REMOVE, fd, x._multi, None, _pycurl)
+
+ # POLL_IN
+ hub = x.hub = Mock(name='hub')
+ fds = [fd, Mock(name='fd2'), Mock(name='fd3')]
+ x._fds = {f: f for f in fds}
+ x._handle_socket(_pycurl.POLL_IN, fd, x._multi, None, _pycurl)
+ hub.remove.assert_has_calls([call(fd)])
+ hub.add_reader.assert_called_with(fd, x.on_readable, fd)
+ assert x._fds[fd] == READ
+
+ # POLL_OUT
+ hub = x.hub = Mock(name='hub')
+ x._handle_socket(_pycurl.POLL_OUT, fd, x._multi, None, _pycurl)
+ hub.add_writer.assert_called_with(fd, x.on_writable, fd)
+ assert x._fds[fd] == WRITE
+
+ # POLL_INOUT
+ hub = x.hub = Mock(name='hub')
+ x._handle_socket(_pycurl.POLL_INOUT, fd, x._multi, None, _pycurl)
+ hub.add_reader.assert_called_with(fd, x.on_readable, fd)
+ hub.add_writer.assert_called_with(fd, x.on_writable, fd)
+ assert x._fds[fd] == READ | WRITE
+
+ # UNKNOWN EVENT
+ hub = x.hub = Mock(name='hub')
+ x._handle_socket(0xff3f, fd, x._multi, None, _pycurl)
+
+ # FD NOT IN FDS
+ hub = x.hub = Mock(name='hub')
+ x._fds.clear()
+ x._handle_socket(0xff3f, fd, x._multi, None, _pycurl)
+ hub.remove.assert_not_called()
+
+ def test_set_timeout(self):
+ x = self.Client()
+ x._set_timeout(100)
+
+ def test_timeout_check(self):
+ with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl:
+ x = self.Client()
+ x._process_pending_requests = Mock(name='process_pending')
+ x._multi.socket_all.return_value = 333, 1
+ _pycurl.error = KeyError
+ x._timeout_check(_pycurl=_pycurl)
+
+ x._multi.socket_all.return_value = None
+ x._multi.socket_all.side_effect = _pycurl.error(333)
+ x._timeout_check(_pycurl=_pycurl)
+
+ def test_on_readable_on_writeable(self):
+ with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl:
+ x = self.Client()
+ x._on_event = Mock(name='on_event')
+ fd = Mock(name='fd')
+ x.on_readable(fd, _pycurl=_pycurl)
+ x._on_event.assert_called_with(fd, _pycurl.CSELECT_IN)
+ x.on_writable(fd, _pycurl=_pycurl)
+ x._on_event.assert_called_with(fd, _pycurl.CSELECT_OUT)
diff --git a/t/unit/asynchronous/http/test_http.py b/t/unit/asynchronous/http/test_http.py
new file mode 100644
index 00000000..21c66b97
--- /dev/null
+++ b/t/unit/asynchronous/http/test_http.py
@@ -0,0 +1,157 @@
+from __future__ import absolute_import, unicode_literals
+
+import pytest
+
+from io import BytesIO
+
+from vine import promise
+
+from case import Mock, skip
+
+from kombu.asynchronous import http
+from kombu.asynchronous.http.base import BaseClient, normalize_header
+from kombu.exceptions import HttpError
+
+from t.mocks import PromiseMock
+
+
+class test_Headers:
+
+ def test_normalize(self):
+ assert normalize_header('accept-encoding') == 'Accept-Encoding'
+
+
+@pytest.mark.usefixtures('hub')
+class test_Request:
+
+ def test_init(self):
+ x = http.Request('http://foo', method='POST')
+ assert x.url == 'http://foo'
+ assert x.method == 'POST'
+
+ x = http.Request('x', max_redirects=100)
+ assert x.max_redirects == 100
+
+ assert isinstance(x.headers, http.Headers)
+ h = http.Headers()
+ x = http.Request('x', headers=h)
+ assert x.headers is h
+ assert isinstance(x.on_ready, promise)
+
+ def test_then(self):
+ callback = PromiseMock(name='callback')
+ x = http.Request('http://foo')
+ x.then(callback)
+
+ x.on_ready(1)
+ callback.assert_called_with(1)
+
+
+@pytest.mark.usefixtures('hub')
+class test_Response:
+
+ def test_init(self):
+ req = http.Request('http://foo')
+ r = http.Response(req, 200)
+
+ assert r.status == 'OK'
+ assert r.effective_url == 'http://foo'
+ r.raise_for_error()
+
+ def test_raise_for_error(self):
+ req = http.Request('http://foo')
+ r = http.Response(req, 404)
+ assert r.status == 'Not Found'
+ assert r.error
+
+ with pytest.raises(HttpError):
+ r.raise_for_error()
+
+ def test_get_body(self):
+ req = http.Request('http://foo')
+ req.buffer = BytesIO()
+ req.buffer.write(b'hello')
+
+ rn = http.Response(req, 200, buffer=None)
+ assert rn.body is None
+
+ r = http.Response(req, 200, buffer=req.buffer)
+ assert r._body is None
+ assert r.body == b'hello'
+ assert r._body == b'hello'
+ assert r.body == b'hello'
+
+
+class test_BaseClient:
+
+ @pytest.fixture(autouse=True)
+ def setup_hub(self, hub):
+ self.hub = hub
+
+ def test_init(self):
+ c = BaseClient(Mock(name='hub'))
+ assert c.hub
+ assert c._header_parser
+
+ def test_perform(self):
+ c = BaseClient(Mock(name='hub'))
+ c.add_request = Mock(name='add_request')
+
+ c.perform('http://foo')
+ c.add_request.assert_called()
+ assert isinstance(c.add_request.call_args[0][0], http.Request)
+
+ req = http.Request('http://bar')
+ c.perform(req)
+ c.add_request.assert_called_with(req)
+
+ def test_add_request(self):
+ c = BaseClient(Mock(name='hub'))
+ with pytest.raises(NotImplementedError):
+ c.add_request(Mock(name='request'))
+
+ def test_header_parser(self):
+ c = BaseClient(Mock(name='hub'))
+ parser = c._header_parser
+ headers = http.Headers()
+
+ c.on_header(headers, 'HTTP/1.1')
+ c.on_header(headers, 'x-foo-bar: 123')
+ c.on_header(headers, 'People: George Costanza')
+ assert headers._prev_key == 'People'
+ c.on_header(headers, ' Jerry Seinfeld')
+ c.on_header(headers, ' Elaine Benes')
+ c.on_header(headers, ' Cosmo Kramer')
+ assert not headers.complete
+ c.on_header(headers, '')
+ assert headers.complete
+
+ with pytest.raises(KeyError):
+ parser.throw(KeyError('foo'))
+ c.on_header(headers, '')
+
+ assert headers['X-Foo-Bar'] == '123'
+ assert (headers['People'] ==
+ 'George Costanza Jerry Seinfeld Elaine Benes Cosmo Kramer')
+
+ def test_close(self):
+ BaseClient(Mock(name='hub')).close()
+
+ def test_as_context(self):
+ c = BaseClient(Mock(name='hub'))
+ c.close = Mock(name='close')
+ with c:
+ pass
+ c.close.assert_called_with()
+
+
+@skip.if_pypy()
+@skip.unless_module('pycurl')
+class test_Client:
+
+ def test_get_client(self, hub):
+ client = http.get_client()
+ assert client.hub is hub
+ client2 = http.get_client(hub)
+ assert client2 is client
+ assert client2.hub is hub
diff --git a/t/unit/asynchronous/test_hub.py b/t/unit/asynchronous/test_hub.py
new file mode 100644
index 00000000..e09a2926
--- /dev/null
+++ b/t/unit/asynchronous/test_hub.py
@@ -0,0 +1,529 @@
+from __future__ import absolute_import, unicode_literals
+
+import errno
+import pytest
+
+from case import Mock, call, patch
+from vine import promise
+
+from kombu.asynchronous import hub as _hub
+from kombu.asynchronous import Hub, READ, WRITE, ERR
+from kombu.asynchronous.debug import callback_for, repr_flag, _rcb
+from kombu.asynchronous.hub import (
+ Stop, get_event_loop, set_event_loop,
+ _raise_stop_error, _dummy_context
+)
+from kombu.asynchronous.semaphore import DummyLock, LaxBoundedSemaphore
+
+
+class File(object):
+
+ def __init__(self, fd):
+ self.fd = fd
+
+ def fileno(self):
+ return self.fd
+
+ def __eq__(self, other):
+ if isinstance(other, File):
+ return self.fd == other.fd
+ return NotImplemented
+
+ def __hash__(self):
+ return hash(self.fd)
+
+
+def test_DummyLock():
+ with DummyLock():
+ pass
+
+
+class test_LaxBoundedSemaphore:
+
+ def test_acquire_release(self):
+ x = LaxBoundedSemaphore(2)
+
+ c1 = Mock()
+ x.acquire(c1, 1)
+ assert x.value == 1
+ c1.assert_called_with(1)
+
+ c2 = Mock()
+ x.acquire(c2, 2)
+ assert x.value == 0
+ c2.assert_called_with(2)
+
+ c3 = Mock()
+ x.acquire(c3, 3)
+ assert x.value == 0
+ c3.assert_not_called()
+
+ x.release()
+ assert x.value == 0
+ x.release()
+ assert x.value == 1
+ x.release()
+ assert x.value == 2
+ c3.assert_called_with(3)
+
+ def test_repr(self):
+ assert repr(LaxBoundedSemaphore(2))
+
+ def test_bounded(self):
+ x = LaxBoundedSemaphore(2)
+ for i in range(100):
+ x.release()
+ assert x.value == 2
+
+ def test_grow_shrink(self):
+ x = LaxBoundedSemaphore(1)
+ assert x.initial_value == 1
+ cb1 = Mock()
+ x.acquire(cb1, 1)
+ cb1.assert_called_with(1)
+ assert x.value == 0
+
+ cb2 = Mock()
+ x.acquire(cb2, 2)
+ cb2.assert_not_called()
+ assert x.value == 0
+
+ cb3 = Mock()
+ x.acquire(cb3, 3)
+ cb3.assert_not_called()
+
+ x.grow(2)
+ cb2.assert_called_with(2)
+ cb3.assert_called_with(3)
+ assert x.value == 2
+ assert x.initial_value == 3
+
+ assert not x._waiting
+ x.grow(3)
+ for i in range(x.initial_value):
+ assert x.acquire(Mock())
+ assert not x.acquire(Mock())
+ x.clear()
+
+ x.shrink(3)
+ for i in range(x.initial_value):
+ assert x.acquire(Mock())
+ assert not x.acquire(Mock())
+ assert x.value == 0
+
+ for i in range(100):
+ x.release()
+ assert x.value == x.initial_value
+
+ def test_clear(self):
+ x = LaxBoundedSemaphore(10)
+ for i in range(11):
+ x.acquire(Mock())
+ assert x._waiting
+ assert x.value == 0
+
+ x.clear()
+ assert not x._waiting
+ assert x.value == x.initial_value
+
+
+class test_Utils:
+
+ def setup(self):
+ self._prev_loop = get_event_loop()
+
+ def teardown(self):
+ set_event_loop(self._prev_loop)
+
+ def test_get_set_event_loop(self):
+ set_event_loop(None)
+ assert _hub._current_loop is None
+ assert get_event_loop() is None
+ hub = Hub()
+ set_event_loop(hub)
+ assert _hub._current_loop is hub
+ assert get_event_loop() is hub
+
+ def test_dummy_context(self):
+ with _dummy_context():
+ pass
+
+ def test_raise_stop_error(self):
+ with pytest.raises(Stop):
+ _raise_stop_error()
+
+
+class test_Hub:
+
+ def setup(self):
+ self.hub = Hub()
+
+ def teardown(self):
+ self.hub.close()
+
+ def test_reset(self):
+ self.hub.close = Mock(name='close')
+ self.hub._create_poller = Mock(name='_create_poller')
+ self.hub.reset()
+ self.hub.close.assert_called_with()
+ self.hub._create_poller.assert_called_with()
+
+ def test__close_poller__no_poller(self):
+ self.hub.poller = None
+ self.hub._close_poller()
+
+ def test__close_poller(self):
+ poller = self.hub.poller = Mock(name='poller')
+ self.hub._close_poller()
+ poller.close.assert_called_with()
+ assert self.hub.poller is None
+
+ def test_stop(self):
+ self.hub.call_soon = Mock(name='call_soon')
+ self.hub.stop()
+ self.hub.call_soon.assert_called_with(_raise_stop_error)
+
+ @patch('kombu.asynchronous.hub.promise')
+ def test_call_soon(self, promise):
+ callback = Mock(name='callback')
+ ret = self.hub.call_soon(callback, 1, 2, 3)
+ promise.assert_called_with(callback, (1, 2, 3))
+ assert promise() in self.hub._ready
+ assert ret is promise()
+
+ def test_call_soon__promise_argument(self):
+ callback = promise(Mock(name='callback'), (1, 2, 3))
+ ret = self.hub.call_soon(callback)
+ assert ret is callback
+ assert ret in self.hub._ready
+
+ def test_call_later(self):
+ callback = Mock(name='callback')
+ self.hub.timer = Mock(name='hub.timer')
+ self.hub.call_later(10.0, callback, 1, 2)
+ self.hub.timer.call_after.assert_called_with(10.0, callback, (1, 2))
+
+ def test_call_at(self):
+ callback = Mock(name='callback')
+ self.hub.timer = Mock(name='hub.timer')
+ self.hub.call_at(21231122, callback, 1, 2)
+ self.hub.timer.call_at.assert_called_with(21231122, callback, (1, 2))
+
+ def test_repr(self):
+ assert repr(self.hub)
+
+ def test_repr_flag(self):
+ assert repr_flag(READ) == 'R'
+ assert repr_flag(WRITE) == 'W'
+ assert repr_flag(ERR) == '!'
+ assert repr_flag(READ | WRITE) == 'RW'
+ assert repr_flag(READ | ERR) == 'R!'
+ assert repr_flag(WRITE | ERR) == 'W!'
+ assert repr_flag(READ | WRITE | ERR) == 'RW!'
+
+ def test_repr_callback_rcb(self):
+
+ def f():
+ pass
+
+ assert _rcb(f) == f.__name__
+ assert _rcb('foo') == 'foo'
+
+ @patch('kombu.asynchronous.hub.poll')
+ def test_start_stop(self, poll):
+ self.hub = Hub()
+ poll.assert_called_with()
+
+ poller = self.hub.poller
+ self.hub.stop()
+ self.hub.close()
+ poller.close.assert_called_with()
+
+ def test_fire_timers(self):
+ self.hub.timer = Mock()
+ self.hub.timer._queue = []
+ assert self.hub.fire_timers(
+ min_delay=42.324, max_delay=32.321) == 32.321
+
+ self.hub.timer._queue = [1]
+ self.hub.scheduler = iter([(3.743, None)])
+ assert self.hub.fire_timers() == 3.743
+
+ e1, e2, e3 = Mock(), Mock(), Mock()
+ entries = [e1, e2, e3]
+
+ def reset():
+ return [m.reset() for m in [e1, e2, e3]]
+
+ def se():
+ while 1:
+ while entries:
+ yield None, entries.pop()
+ yield 3.982, None
+ self.hub.scheduler = se()
+
+ assert self.hub.fire_timers(max_timers=10) == 3.982
+ for E in [e3, e2, e1]:
+ E.assert_called_with()
+ reset()
+
+ entries[:] = [Mock() for _ in range(11)]
+ keep = list(entries)
+ assert self.hub.fire_timers(
+ max_timers=10, min_delay=1.13) == 1.13
+ for E in reversed(keep[1:]):
+ E.assert_called_with()
+ reset()
+ assert self.hub.fire_timers(max_timers=10) == 3.982
+ keep[0].assert_called_with()
+
+ def test_fire_timers_raises(self):
+ eback = Mock()
+ eback.side_effect = KeyError('foo')
+ self.hub.timer = Mock()
+ self.hub.scheduler = iter([(0, eback)])
+ with pytest.raises(KeyError):
+ self.hub.fire_timers(propagate=(KeyError,))
+
+ eback.side_effect = ValueError('foo')
+ self.hub.scheduler = iter([(0, eback)])
+ with patch('kombu.asynchronous.hub.logger') as logger:
+ with pytest.raises(StopIteration):
+ self.hub.fire_timers()
+ logger.error.assert_called()
+
+ eback.side_effect = MemoryError('foo')
+ self.hub.scheduler = iter([(0, eback)])
+ with pytest.raises(MemoryError):
+ self.hub.fire_timers()
+
+ eback.side_effect = OSError()
+ eback.side_effect.errno = errno.ENOMEM
+ self.hub.scheduler = iter([(0, eback)])
+ with pytest.raises(OSError):
+ self.hub.fire_timers()
+
+ eback.side_effect = OSError()
+ eback.side_effect.errno = errno.ENOENT
+ self.hub.scheduler = iter([(0, eback)])
+ with patch('kombu.asynchronous.hub.logger') as logger:
+ with pytest.raises(StopIteration):
+ self.hub.fire_timers()
+ logger.error.assert_called()
+
+ def test_add_raises_ValueError(self):
+ self.hub.poller = Mock(name='hub.poller')
+ self.hub.poller.register.side_effect = ValueError()
+ self.hub._discard = Mock(name='hub.discard')
+ with pytest.raises(ValueError):
+ self.hub.add(2, Mock(), READ)
+ self.hub._discard.assert_called_with(2)
+
+ def test_remove_reader(self):
+ self.hub.poller = Mock(name='hub.poller')
+ self.hub.add(2, Mock(), READ)
+ self.hub.add(2, Mock(), WRITE)
+ self.hub.remove_reader(2)
+ assert 2 not in self.hub.readers
+ assert 2 in self.hub.writers
+
+ def test_remove_reader__not_writeable(self):
+ self.hub.poller = Mock(name='hub.poller')
+ self.hub.add(2, Mock(), READ)
+ self.hub.remove_reader(2)
+ assert 2 not in self.hub.readers
+
+ def test_remove_writer(self):
+ self.hub.poller = Mock(name='hub.poller')
+ self.hub.add(2, Mock(), READ)
+ self.hub.add(2, Mock(), WRITE)
+ self.hub.remove_writer(2)
+ assert 2 in self.hub.readers
+ assert 2 not in self.hub.writers
+
+ def test_remove_writer__not_readable(self):
+ self.hub.poller = Mock(name='hub.poller')
+ self.hub.add(2, Mock(), WRITE)
+ self.hub.remove_writer(2)
+ assert 2 not in self.hub.writers
+
+ def test_add__consolidate(self):
+ self.hub.poller = Mock(name='hub.poller')
+ self.hub.add(2, Mock(), WRITE, consolidate=True)
+ assert 2 in self.hub.consolidate
+ assert self.hub.writers[2] is None
+
+ @patch('kombu.asynchronous.hub.logger')
+ def test_on_callback_error(self, logger):
+ self.hub.on_callback_error(Mock(name='callback'), KeyError())
+ logger.error.assert_called()
+
+ def test_loop_property(self):
+ self.hub._loop = None
+ self.hub.create_loop = Mock(name='hub.create_loop')
+ assert self.hub.loop is self.hub.create_loop()
+ assert self.hub._loop is self.hub.create_loop()
+
+ def test_run_forever(self):
+ self.hub.run_once = Mock(name='hub.run_once')
+ self.hub.run_once.side_effect = Stop()
+ self.hub.run_forever()
+
+ def test_run_once(self):
+ self.hub._loop = iter([1])
+ self.hub.run_once()
+ self.hub.run_once()
+ assert self.hub._loop is None
+
+ def test_repr_active(self):
+ self.hub.readers = {1: Mock(), 2: Mock()}
+ self.hub.writers = {3: Mock(), 4: Mock()}
+ for value in list(
+ self.hub.readers.values()) + list(self.hub.writers.values()):
+ value.__name__ = 'mock'
+ assert self.hub.repr_active()
+
+ def test_repr_events(self):
+ self.hub.readers = {6: Mock(), 7: Mock(), 8: Mock()}
+ self.hub.writers = {9: Mock()}
+ for value in list(
+ self.hub.readers.values()) + list(self.hub.writers.values()):
+ value.__name__ = 'mock'
+ assert self.hub.repr_events([
+ (6, READ),
+ (7, ERR),
+ (8, READ | ERR),
+ (9, WRITE),
+ (10, 13213),
+ ])
+
+ def test_callback_for(self):
+ reader, writer = Mock(), Mock()
+ self.hub.readers = {6: reader}
+ self.hub.writers = {7: writer}
+
+ assert callback_for(self.hub, 6, READ) == reader
+ assert callback_for(self.hub, 7, WRITE) == writer
+ with pytest.raises(KeyError):
+ callback_for(self.hub, 6, WRITE)
+ assert callback_for(self.hub, 6, WRITE, 'foo') == 'foo'
+
+ def test_add_remove_readers(self):
+ P = self.hub.poller = Mock()
+
+ read_A = Mock()
+ read_B = Mock()
+ self.hub.add_reader(10, read_A, 10)
+ self.hub.add_reader(File(11), read_B, 11)
+
+ P.register.assert_has_calls([
+ call(10, self.hub.READ | self.hub.ERR),
+ call(11, self.hub.READ | self.hub.ERR),
+ ], any_order=True)
+
+ assert self.hub.readers[10] == (read_A, (10,))
+ assert self.hub.readers[11] == (read_B, (11,))
+
+ self.hub.remove(10)
+ assert 10 not in self.hub.readers
+ self.hub.remove(File(11))
+ assert 11 not in self.hub.readers
+ P.unregister.assert_has_calls([
+ call(10), call(11),
+ ])
+
+ def test_can_remove_unknown_fds(self):
+ self.hub.poller = Mock()
+ self.hub.remove(30)
+ self.hub.remove(File(301))
+
+ def test_remove__unregister_raises(self):
+ self.hub.poller = Mock()
+ self.hub.poller.unregister.side_effect = OSError()
+
+ self.hub.remove(313)
+
+ def test_add_writers(self):
+ P = self.hub.poller = Mock()
+
+ write_A = Mock()
+ write_B = Mock()
+ self.hub.add_writer(20, write_A)
+ self.hub.add_writer(File(21), write_B)
+
+ P.register.assert_has_calls([
+ call(20, self.hub.WRITE),
+ call(21, self.hub.WRITE),
+ ], any_order=True)
+
+ assert self.hub.writers[20], (write_A == ())
+ assert self.hub.writers[21], (write_B == ())
+
+ self.hub.remove(20)
+ assert 20 not in self.hub.writers
+ self.hub.remove(File(21))
+ assert 21 not in self.hub.writers
+ P.unregister.assert_has_calls([
+ call(20), call(21),
+ ])
+
+ def test_enter__exit(self):
+ P = self.hub.poller = Mock()
+ on_close = Mock()
+ self.hub.on_close.add(on_close)
+
+ try:
+ read_A = Mock()
+ read_B = Mock()
+ self.hub.add_reader(10, read_A)
+ self.hub.add_reader(File(11), read_B)
+ write_A = Mock()
+ write_B = Mock()
+ self.hub.add_writer(20, write_A)
+ self.hub.add_writer(File(21), write_B)
+ assert self.hub.readers
+ assert self.hub.writers
+ finally:
+ assert self.hub.poller
+ self.hub.close()
+ assert not self.hub.readers
+ assert not self.hub.writers
+
+ P.unregister.assert_has_calls([
+ call(10), call(11), call(20), call(21),
+ ], any_order=True)
+
+ on_close.assert_called_with(self.hub)
+
+ def test_scheduler_property(self):
+ hub = Hub(timer=[1, 2, 3])
+ assert list(hub.scheduler), [1, 2 == 3]
+
+ def test_loop__tick_callbacks(self):
+ self.hub._ready = Mock(name='_ready')
+ self.hub._ready.__len__ = Mock(name="_ready.__len__")
+ self.hub._ready.__len__.side_effect = RuntimeError()
+ ticks = [Mock(name='cb1'), Mock(name='cb2')]
+ self.hub.on_tick = list(ticks)
+
+ with pytest.raises(RuntimeError):
+ next(self.hub.loop)
+
+ ticks[0].assert_called_once_with()
+ ticks[1].assert_called_once_with()
+
+ def test_loop__todo(self):
+ self.hub.fire_timers = Mock(name='fire_timers')
+ self.hub.fire_timers.side_effect = RuntimeError()
+ self.hub.timer = Mock(name='timer')
+
+ callbacks = [Mock(name='cb1'), Mock(name='cb2')]
+ for cb in callbacks:
+ self.hub.call_soon(cb)
+ self.hub._ready.add(None)
+
+ with pytest.raises(RuntimeError):
+ next(self.hub.loop)
+
+ callbacks[0].assert_called_once_with()
+ callbacks[1].assert_called_once_with()
diff --git a/t/unit/asynchronous/test_semaphore.py b/t/unit/asynchronous/test_semaphore.py
new file mode 100644
index 00000000..86f58cce
--- /dev/null
+++ b/t/unit/asynchronous/test_semaphore.py
@@ -0,0 +1,43 @@
+from __future__ import absolute_import, unicode_literals
+
+from kombu.asynchronous.semaphore import LaxBoundedSemaphore
+
+
+class test_LaxBoundedSemaphore:
+
+ def test_over_release(self):
+ x = LaxBoundedSemaphore(2)
+ calls = []
+ for i in range(1, 21):
+ x.acquire(calls.append, i)
+ x.release()
+ x.acquire(calls.append, 'x')
+ x.release()
+ x.acquire(calls.append, 'y')
+
+ assert calls, [1, 2, 3 == 4]
+
+ for i in range(30):
+ x.release()
+ assert calls, list(range(1, 21)) + ['x' == 'y']
+ assert x.value == x.initial_value
+
+ calls[:] = []
+ for i in range(1, 11):
+ x.acquire(calls.append, i)
+ for i in range(1, 11):
+ x.release()
+ assert calls, list(range(1 == 11))
+
+ calls[:] = []
+ assert x.value == x.initial_value
+ x.acquire(calls.append, 'x')
+ assert x.value == 1
+ x.acquire(calls.append, 'y')
+ assert x.value == 0
+ x.release()
+ assert x.value == 1
+ x.release()
+ assert x.value == 2
+ x.release()
+ assert x.value == 2
diff --git a/t/unit/asynchronous/test_timer.py b/t/unit/asynchronous/test_timer.py
new file mode 100644
index 00000000..8ceb725f
--- /dev/null
+++ b/t/unit/asynchronous/test_timer.py
@@ -0,0 +1,158 @@
+from __future__ import absolute_import, unicode_literals
+
+import pytest
+
+from datetime import datetime
+
+from case import Mock, patch
+
+from kombu.asynchronous.timer import Entry, Timer, to_timestamp
+from kombu.five import bytes_if_py2
+
+
+class test_to_timestamp:
+
+ def test_timestamp(self):
+ assert to_timestamp(3.13) is 3.13
+
+ def test_datetime(self):
+ assert to_timestamp(datetime.utcnow())
+
+
+class test_Entry:
+
+ def test_call(self):
+ fun = Mock(name='fun')
+ tref = Entry(fun, (4, 4), {'moo': 'baz'})
+ tref()
+ fun.assert_called_with(4, 4, moo='baz')
+
+ def test_cancel(self):
+ tref = Entry(lambda x: x, (1,), {})
+ assert not tref.canceled
+ assert not tref.cancelled
+ tref.cancel()
+ assert tref.canceled
+ assert tref.cancelled
+
+ def test_repr(self):
+ tref = Entry(lambda x: x(1,), {})
+ assert repr(tref)
+
+ def test_hash(self):
+ assert hash(Entry(lambda: None))
+
+ def test_ordering(self):
+ # we don't care about results, just that it's possible
+ Entry(lambda x: 1) < Entry(lambda x: 2)
+ Entry(lambda x: 1) > Entry(lambda x: 2)
+ Entry(lambda x: 1) >= Entry(lambda x: 2)
+ Entry(lambda x: 1) <= Entry(lambda x: 2)
+
+ def test_eq(self):
+ x = Entry(lambda x: 1)
+ y = Entry(lambda x: 1)
+ assert x == x
+ assert x != y
+
+
+class test_Timer:
+
+ def test_enter_exit(self):
+ x = Timer()
+ x.stop = Mock(name='timer.stop')
+ with x:
+ pass
+ x.stop.assert_called_with()
+
+ def test_supports_Timer_interface(self):
+ x = Timer()
+ x.stop()
+
+ tref = Mock()
+ x.cancel(tref)
+ tref.cancel.assert_called_with()
+
+ assert x.schedule is x
+
+ def test_handle_error(self):
+ from datetime import datetime
+ on_error = Mock(name='on_error')
+
+ s = Timer(on_error=on_error)
+
+ with patch('kombu.asynchronous.timer.to_timestamp') as tot:
+ tot.side_effect = OverflowError()
+ s.enter_at(Entry(lambda: None, (), {}),
+ eta=datetime.now())
+ s.enter_at(Entry(lambda: None, (), {}), eta=None)
+ s.on_error = None
+ with pytest.raises(OverflowError):
+ s.enter_at(Entry(lambda: None, (), {}),
+ eta=datetime.now())
+ on_error.assert_called_once()
+ exc = on_error.call_args[0][0]
+ assert isinstance(exc, OverflowError)
+
+ def test_call_repeatedly(self):
+ t = Timer()
+ try:
+ t.schedule.enter_after = Mock()
+
+ myfun = Mock()
+ myfun.__name__ = bytes_if_py2('myfun')
+ t.call_repeatedly(0.03, myfun)
+
+ assert t.schedule.enter_after.call_count == 1
+ args1, _ = t.schedule.enter_after.call_args_list[0]
+ sec1, tref1, _ = args1
+ assert sec1 == 0.03
+ tref1()
+
+ assert t.schedule.enter_after.call_count == 2
+ args2, _ = t.schedule.enter_after.call_args_list[1]
+ sec2, tref2, _ = args2
+ assert sec2 == 0.03
+ tref2.canceled = True
+ tref2()
+
+ assert t.schedule.enter_after.call_count == 2
+ finally:
+ t.stop()
+
+ @patch('kombu.asynchronous.timer.logger')
+ def test_apply_entry_error_handled(self, logger):
+ t = Timer()
+ t.schedule.on_error = None
+
+ fun = Mock()
+ fun.side_effect = ValueError()
+
+ t.schedule.apply_entry(fun)
+ logger.error.assert_called()
+
+ def test_apply_entry_error_not_handled(self, stdouts):
+ t = Timer()
+ t.schedule.on_error = Mock()
+
+ fun = Mock()
+ fun.side_effect = ValueError()
+ t.schedule.apply_entry(fun)
+ fun.assert_called_with()
+ assert not stdouts.stderr.getvalue()
+
+ def test_enter_after(self):
+ t = Timer()
+ t._enter = Mock()
+ fun = Mock(name='fun')
+ time = Mock(name='time')
+ time.return_value = 10
+ t.enter_after(10, fun, time=time)
+ time.assert_called_with()
+ t._enter.assert_called_with(20, 0, fun)
+
+ def test_cancel(self):
+ t = Timer()
+ tref = Mock()
+ t.cancel(tref)
+ tref.cancel.assert_called_with()