diff options
| author | Thomas Achtemichuk <tom@tomchuk.com> | 2018-03-19 13:28:43 -0400 |
|---|---|---|
| committer | Omer Katz <omer.drow@gmail.com> | 2018-03-19 19:28:43 +0200 |
| commit | 75695205f6e7af8e7e9178e010debc3871b19106 (patch) | |
| tree | a45acc3ae34c048d756ea42587b6a0c42d0bd5dc /t/unit/asynchronous | |
| parent | dba85e2d9515b9ce202bd30e8690131aa055e6bf (diff) | |
| download | kombu-75695205f6e7af8e7e9178e010debc3871b19106.tar.gz | |
Rename `async` keyword to `asynchronous` (#839)
* Rename `async` keyword to `asynchronous`
* Fixes #742
* Resolves "DeprecationWarning: 'async' and 'await' will become reserved
keywords in Python 3.7"
* Address PR feedback
* Update appveyor config
* Rename docs and tests
Diffstat (limited to 't/unit/asynchronous')
| -rw-r--r-- | t/unit/asynchronous/__init__.py | 0 | ||||
| -rw-r--r-- | t/unit/asynchronous/aws/__init__.py | 0 | ||||
| -rw-r--r-- | t/unit/asynchronous/aws/case.py | 14 | ||||
| -rw-r--r-- | t/unit/asynchronous/aws/sqs/__init__.py | 0 | ||||
| -rw-r--r-- | t/unit/asynchronous/aws/sqs/test_connection.py | 324 | ||||
| -rw-r--r-- | t/unit/asynchronous/aws/sqs/test_queue.py | 207 | ||||
| -rw-r--r-- | t/unit/asynchronous/aws/test_aws.py | 16 | ||||
| -rw-r--r-- | t/unit/asynchronous/aws/test_connection.py | 263 | ||||
| -rw-r--r-- | t/unit/asynchronous/http/__init__.py | 0 | ||||
| -rw-r--r-- | t/unit/asynchronous/http/test_curl.py | 134 | ||||
| -rw-r--r-- | t/unit/asynchronous/http/test_http.py | 157 | ||||
| -rw-r--r-- | t/unit/asynchronous/test_hub.py | 529 | ||||
| -rw-r--r-- | t/unit/asynchronous/test_semaphore.py | 43 | ||||
| -rw-r--r-- | t/unit/asynchronous/test_timer.py | 158 |
14 files changed, 1845 insertions, 0 deletions
diff --git a/t/unit/asynchronous/__init__.py b/t/unit/asynchronous/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/t/unit/asynchronous/__init__.py diff --git a/t/unit/asynchronous/aws/__init__.py b/t/unit/asynchronous/aws/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/t/unit/asynchronous/aws/__init__.py diff --git a/t/unit/asynchronous/aws/case.py b/t/unit/asynchronous/aws/case.py new file mode 100644 index 00000000..70d9565f --- /dev/null +++ b/t/unit/asynchronous/aws/case.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import pytest + +from case import skip + + +@skip.if_pypy() +@skip.unless_module('boto3') +@skip.unless_module('pycurl') +@pytest.mark.usefixtures('hub') +class AWSCase(object): + pass diff --git a/t/unit/asynchronous/aws/sqs/__init__.py b/t/unit/asynchronous/aws/sqs/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/t/unit/asynchronous/aws/sqs/__init__.py diff --git a/t/unit/asynchronous/aws/sqs/test_connection.py b/t/unit/asynchronous/aws/sqs/test_connection.py new file mode 100644 index 00000000..ca1a8868 --- /dev/null +++ b/t/unit/asynchronous/aws/sqs/test_connection.py @@ -0,0 +1,324 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +from case import Mock, MagicMock + +from kombu.asynchronous.aws.sqs.connection import ( + AsyncSQSConnection +) +from kombu.asynchronous.aws.ext import boto3 +from kombu.asynchronous.aws.sqs.message import AsyncMessage +from kombu.asynchronous.aws.sqs.queue import AsyncQueue +from kombu.utils.uuid import uuid + +from t.mocks import PromiseMock + +from ..case import AWSCase + + +class test_AsyncSQSConnection(AWSCase): + + def setup(self): + session = boto3.session.Session( + aws_access_key_id='AAA', + aws_secret_access_key='AAAA', + region_name='us-west-2', + ) + sqs_client = session.client('sqs') + self.x = AsyncSQSConnection(sqs_client, 'ak', 'sk', http_client=Mock()) + self.x.get_object = Mock(name='X.get_object') + self.x.get_status = Mock(name='X.get_status') + self.x.get_list = Mock(name='X.get_list') + self.callback = PromiseMock(name='callback') + + sqs_client.get_queue_url = MagicMock(return_value={ + 'QueueUrl': 'http://aws.com' + }) + + def test_create_queue(self): + self.x.create_queue('foo', callback=self.callback) + self.x.get_object.assert_called_with( + 'CreateQueue', {'QueueName': 'foo'}, + callback=self.callback, + ) + + def test_create_queue__with_visibility_timeout(self): + self.x.create_queue( + 'foo', visibility_timeout=33, callback=self.callback, + ) + self.x.get_object.assert_called_with( + 'CreateQueue', { + 'QueueName': 'foo', + 'DefaultVisibilityTimeout': '33' + }, + callback=self.callback + ) + + def test_delete_queue(self): + queue = Mock(name='queue') + self.x.delete_queue(queue, callback=self.callback) + self.x.get_status.assert_called_with( + 'DeleteQueue', None, queue.id, callback=self.callback, + ) + + def test_get_queue_attributes(self): + queue = Mock(name='queue') + self.x.get_queue_attributes( + queue, attribute='QueueSize', callback=self.callback, + ) + self.x.get_object.assert_called_with( + 'GetQueueAttributes', {'AttributeName': 'QueueSize'}, + queue.id, callback=self.callback, + ) + + def test_set_queue_attribute(self): + queue = Mock(name='queue') + self.x.set_queue_attribute( + queue, 'Expires', '3600', callback=self.callback, + ) + self.x.get_status.assert_called_with( + 'SetQueueAttribute', { + 'Attribute.Name': 'Expires', + 'Attribute.Value': '3600', + }, + queue.id, callback=self.callback, + ) + + def test_receive_message(self): + queue = Mock(name='queue') + self.x.receive_message(queue, 4, callback=self.callback) + self.x.get_list.assert_called_with( + 'ReceiveMessage', {'MaxNumberOfMessages': 4}, + [('Message', AsyncMessage)], + 'http://aws.com', callback=self.callback, + parent=queue, + ) + + def test_receive_message__with_visibility_timeout(self): + queue = Mock(name='queue') + self.x.receive_message(queue, 4, 3666, callback=self.callback) + self.x.get_list.assert_called_with( + 'ReceiveMessage', { + 'MaxNumberOfMessages': 4, + 'VisibilityTimeout': 3666, + }, + [('Message', AsyncMessage)], + 'http://aws.com', callback=self.callback, + parent=queue, + ) + + def test_receive_message__with_wait_time_seconds(self): + queue = Mock(name='queue') + self.x.receive_message( + queue, 4, wait_time_seconds=303, callback=self.callback, + ) + self.x.get_list.assert_called_with( + 'ReceiveMessage', { + 'MaxNumberOfMessages': 4, + 'WaitTimeSeconds': 303, + }, + [('Message', AsyncMessage)], + 'http://aws.com', callback=self.callback, + parent=queue, + ) + + def test_receive_message__with_attributes(self): + queue = Mock(name='queue') + self.x.receive_message( + queue, 4, attributes=['foo', 'bar'], callback=self.callback, + ) + self.x.get_list.assert_called_with( + 'ReceiveMessage', { + 'AttributeName.1': 'foo', + 'AttributeName.2': 'bar', + 'MaxNumberOfMessages': 4, + }, + [('Message', AsyncMessage)], + 'http://aws.com', callback=self.callback, + parent=queue, + ) + + def MockMessage(self, id=None, receipt_handle=None, body=None): + m = Mock(name='message') + m.id = id or uuid() + m.receipt_handle = receipt_handle or uuid() + m._body = body + + def _get_body(): + return m._body + m.get_body.side_effect = _get_body + + def _set_body(value): + m._body = value + m.set_body.side_effect = _set_body + + return m + + def test_delete_message(self): + queue = Mock(name='queue') + message = self.MockMessage() + self.x.delete_message(queue, message.receipt_handle, + callback=self.callback) + self.x.get_status.assert_called_with( + 'DeleteMessage', {'ReceiptHandle': message.receipt_handle}, + queue, callback=self.callback, + ) + + def test_delete_message_batch(self): + queue = Mock(name='queue') + messages = [self.MockMessage('1', 'r1'), + self.MockMessage('2', 'r2')] + self.x.delete_message_batch(queue, messages, callback=self.callback) + self.x.get_object.assert_called_with( + 'DeleteMessageBatch', { + 'DeleteMessageBatchRequestEntry.1.Id': '1', + 'DeleteMessageBatchRequestEntry.1.ReceiptHandle': 'r1', + 'DeleteMessageBatchRequestEntry.2.Id': '2', + 'DeleteMessageBatchRequestEntry.2.ReceiptHandle': 'r2', + }, + queue.id, verb='POST', callback=self.callback, + ) + + def test_send_message(self): + queue = Mock(name='queue') + self.x.send_message(queue, 'hello', callback=self.callback) + self.x.get_object.assert_called_with( + 'SendMessage', {'MessageBody': 'hello'}, + queue.id, verb='POST', callback=self.callback, + ) + + def test_send_message__with_delay_seconds(self): + queue = Mock(name='queue') + self.x.send_message( + queue, 'hello', delay_seconds='303', callback=self.callback, + ) + self.x.get_object.assert_called_with( + 'SendMessage', {'MessageBody': 'hello', 'DelaySeconds': 303}, + queue.id, verb='POST', callback=self.callback, + ) + + def test_send_message_batch(self): + queue = Mock(name='queue') + messages = [self.MockMessage('1', 'r1', 'A'), + self.MockMessage('2', 'r2', 'B')] + self.x.send_message_batch( + queue, [(m.id, m.get_body(), 303) for m in messages], + callback=self.callback + ) + self.x.get_object.assert_called_with( + 'SendMessageBatch', { + 'SendMessageBatchRequestEntry.1.Id': '1', + 'SendMessageBatchRequestEntry.1.MessageBody': 'A', + 'SendMessageBatchRequestEntry.1.DelaySeconds': 303, + 'SendMessageBatchRequestEntry.2.Id': '2', + 'SendMessageBatchRequestEntry.2.MessageBody': 'B', + 'SendMessageBatchRequestEntry.2.DelaySeconds': 303, + }, + queue.id, verb='POST', callback=self.callback, + ) + + def test_change_message_visibility(self): + queue = Mock(name='queue') + self.x.change_message_visibility( + queue, 'rcpt', 33, callback=self.callback, + ) + self.x.get_status.assert_called_with( + 'ChangeMessageVisibility', { + 'ReceiptHandle': 'rcpt', + 'VisibilityTimeout': 33, + }, + queue.id, callback=self.callback, + ) + + def test_change_message_visibility_batch(self): + queue = Mock(name='queue') + messages = [ + (self.MockMessage('1', 'r1'), 303), + (self.MockMessage('2', 'r2'), 909), + ] + self.x.change_message_visibility_batch( + queue, messages, callback=self.callback, + ) + + def preamble(n): + return '.'.join(['ChangeMessageVisibilityBatchRequestEntry', n]) + + self.x.get_object.assert_called_with( + 'ChangeMessageVisibilityBatch', { + preamble('1.Id'): '1', + preamble('1.ReceiptHandle'): 'r1', + preamble('1.VisibilityTimeout'): 303, + preamble('2.Id'): '2', + preamble('2.ReceiptHandle'): 'r2', + preamble('2.VisibilityTimeout'): 909, + }, + queue.id, verb='POST', callback=self.callback, + ) + + def test_get_all_queues(self): + self.x.get_all_queues(callback=self.callback) + self.x.get_list.assert_called_with( + 'ListQueues', {}, [('QueueUrl', AsyncQueue)], + callback=self.callback, + ) + + def test_get_all_queues__with_prefix(self): + self.x.get_all_queues(prefix='kombu.', callback=self.callback) + self.x.get_list.assert_called_with( + 'ListQueues', {'QueueNamePrefix': 'kombu.'}, + [('QueueUrl', AsyncQueue)], + callback=self.callback, + ) + + def MockQueue(self, url): + q = Mock(name='Queue') + q.url = url + return q + + def test_get_queue(self): + self.x.get_queue('foo', callback=self.callback) + self.x.get_list.assert_called() + on_ready = self.x.get_list.call_args[1]['callback'] + queues = [ + self.MockQueue('/queues/bar'), + self.MockQueue('/queues/baz'), + self.MockQueue('/queues/foo'), + ] + on_ready(queues) + self.callback.assert_called_with(queues[-1]) + + self.x.get_list.assert_called_with( + 'ListQueues', {'QueueNamePrefix': 'foo'}, + [('QueueUrl', AsyncQueue)], + callback=on_ready, + ) + + def test_get_dead_letter_source_queues(self): + queue = Mock(name='queue') + self.x.get_dead_letter_source_queues(queue, callback=self.callback) + self.x.get_list.assert_called_with( + 'ListDeadLetterSourceQueues', {'QueueUrl': queue.url}, + [('QueueUrl', AsyncQueue)], callback=self.callback, + ) + + def test_add_permission(self): + queue = Mock(name='queue') + self.x.add_permission( + queue, 'label', 'accid', 'action', callback=self.callback, + ) + self.x.get_status.assert_called_with( + 'AddPermission', { + 'Label': 'label', + 'AWSAccountId': 'accid', + 'ActionName': 'action', + }, + queue.id, callback=self.callback, + ) + + def test_remove_permission(self): + queue = Mock(name='queue') + self.x.remove_permission(queue, 'label', callback=self.callback) + self.x.get_status.assert_called_with( + 'RemovePermission', {'Label': 'label'}, queue.id, + callback=self.callback, + ) diff --git a/t/unit/asynchronous/aws/sqs/test_queue.py b/t/unit/asynchronous/aws/sqs/test_queue.py new file mode 100644 index 00000000..add591df --- /dev/null +++ b/t/unit/asynchronous/aws/sqs/test_queue.py @@ -0,0 +1,207 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import pytest + +from case import Mock + +from kombu.asynchronous.aws.sqs.message import AsyncMessage +from kombu.asynchronous.aws.sqs.queue import AsyncQueue + +from t.mocks import PromiseMock + +from ..case import AWSCase + + +class test_AsyncQueue(AWSCase): + + def setup(self): + self.conn = Mock(name='connection') + self.x = AsyncQueue(self.conn, '/url') + self.callback = PromiseMock(name='callback') + + def test_message_class(self): + assert issubclass(self.x.message_class, AsyncMessage) + + def test_get_attributes(self): + self.x.get_attributes(attributes='QueueSize', callback=self.callback) + self.x.connection.get_queue_attributes.assert_called_with( + self.x, 'QueueSize', self.callback, + ) + + def test_set_attribute(self): + self.x.set_attribute('key', 'value', callback=self.callback) + self.x.connection.set_queue_attribute.assert_called_with( + self.x, 'key', 'value', self.callback, + ) + + def test_get_timeout(self): + self.x.get_timeout(callback=self.callback) + self.x.connection.get_queue_attributes.assert_called() + on_ready = self.x.connection.get_queue_attributes.call_args[0][2] + self.x.connection.get_queue_attributes.assert_called_with( + self.x, 'VisibilityTimeout', on_ready, + ) + + on_ready({'VisibilityTimeout': '303'}) + self.callback.assert_called_with(303) + + def test_set_timeout(self): + self.x.set_timeout(808, callback=self.callback) + self.x.connection.set_queue_attribute.assert_called() + on_ready = self.x.connection.set_queue_attribute.call_args[0][3] + self.x.connection.set_queue_attribute.assert_called_with( + self.x, 'VisibilityTimeout', 808, on_ready, + ) + on_ready(808) + self.callback.assert_called_with(808) + assert self.x.visibility_timeout == 808 + + on_ready(None) + assert self.x.visibility_timeout == 808 + + def test_add_permission(self): + self.x.add_permission( + 'label', 'accid', 'action', callback=self.callback, + ) + self.x.connection.add_permission.assert_called_with( + self.x, 'label', 'accid', 'action', self.callback, + ) + + def test_remove_permission(self): + self.x.remove_permission('label', callback=self.callback) + self.x.connection.remove_permission.assert_called_with( + self.x, 'label', self.callback, + ) + + def test_read(self): + self.x.read(visibility_timeout=909, callback=self.callback) + self.x.connection.receive_message.assert_called() + on_ready = self.x.connection.receive_message.call_args[1]['callback'] + self.x.connection.receive_message.assert_called_with( + self.x, number_messages=1, visibility_timeout=909, + attributes=None, wait_time_seconds=None, callback=on_ready, + ) + + messages = [Mock(name='message1')] + on_ready(messages) + + self.callback.assert_called_with(messages[0]) + + def MockMessage(self, id, md5): + m = Mock(name='Message-{0}'.format(id)) + m.id = id + m.md5 = md5 + return m + + def test_write(self): + message = self.MockMessage('id1', 'digest1') + self.x.write(message, delay_seconds=303, callback=self.callback) + self.x.connection.send_message.assert_called() + on_ready = self.x.connection.send_message.call_args[1]['callback'] + self.x.connection.send_message.assert_called_with( + self.x, message.get_body_encoded(), 303, + callback=on_ready, + ) + + new_message = self.MockMessage('id2', 'digest2') + on_ready(new_message) + assert message.id == 'id2' + assert message.md5 == 'digest2' + + def test_write_batch(self): + messages = [('id1', 'A', 0), ('id2', 'B', 303)] + self.x.write_batch(messages, callback=self.callback) + self.x.connection.send_message_batch.assert_called_with( + self.x, messages, callback=self.callback, + ) + + def test_delete_message(self): + message = self.MockMessage('id1', 'digest1') + self.x.delete_message(message, callback=self.callback) + self.x.connection.delete_message.assert_called_with( + self.x, message, self.callback, + ) + + def test_delete_message_batch(self): + messages = [ + self.MockMessage('id1', 'r1'), + self.MockMessage('id2', 'r2'), + ] + self.x.delete_message_batch(messages, callback=self.callback) + self.x.connection.delete_message_batch.assert_called_with( + self.x, messages, callback=self.callback, + ) + + def test_change_message_visibility_batch(self): + messages = [ + (self.MockMessage('id1', 'r1'), 303), + (self.MockMessage('id2', 'r2'), 909), + ] + self.x.change_message_visibility_batch( + messages, callback=self.callback, + ) + self.x.connection.change_message_visibility_batch.assert_called_with( + self.x, messages, callback=self.callback, + ) + + def test_delete(self): + self.x.delete(callback=self.callback) + self.x.connection.delete_queue.assert_called_with( + self.x, callback=self.callback, + ) + + def test_count(self): + self.x.count(callback=self.callback) + self.x.connection.get_queue_attributes.assert_called() + on_ready = self.x.connection.get_queue_attributes.call_args[0][2] + self.x.connection.get_queue_attributes.assert_called_with( + self.x, 'ApproximateNumberOfMessages', on_ready, + ) + + on_ready({'ApproximateNumberOfMessages': '909'}) + self.callback.assert_called_with(909) + + def test_interface__count_slow(self): + with pytest.raises(NotImplementedError): + self.x.count_slow() + + def test_interface__dump(self): + with pytest.raises(NotImplementedError): + self.x.dump() + + def test_interface__save_to_file(self): + with pytest.raises(NotImplementedError): + self.x.save_to_file() + + def test_interface__save_to_filename(self): + with pytest.raises(NotImplementedError): + self.x.save_to_filename() + + def test_interface__save(self): + with pytest.raises(NotImplementedError): + self.x.save() + + def test_interface__save_to_s3(self): + with pytest.raises(NotImplementedError): + self.x.save_to_s3() + + def test_interface__load_from_s3(self): + with pytest.raises(NotImplementedError): + self.x.load_from_s3() + + def test_interface__load_from_file(self): + with pytest.raises(NotImplementedError): + self.x.load_from_file() + + def test_interface__load_from_filename(self): + with pytest.raises(NotImplementedError): + self.x.load_from_filename() + + def test_interface__load(self): + with pytest.raises(NotImplementedError): + self.x.load() + + def test_interface__clear(self): + with pytest.raises(NotImplementedError): + self.x.clear() diff --git a/t/unit/asynchronous/aws/test_aws.py b/t/unit/asynchronous/aws/test_aws.py new file mode 100644 index 00000000..d0e98b70 --- /dev/null +++ b/t/unit/asynchronous/aws/test_aws.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +from case import Mock + +from kombu.asynchronous.aws import connect_sqs + +from .case import AWSCase + + +class test_connect_sqs(AWSCase): + + def test_connection(self): + x = connect_sqs('AAKI', 'ASAK', http_client=Mock()) + assert x + assert x.sqs_connection diff --git a/t/unit/asynchronous/aws/test_connection.py b/t/unit/asynchronous/aws/test_connection.py new file mode 100644 index 00000000..b378c020 --- /dev/null +++ b/t/unit/asynchronous/aws/test_connection.py @@ -0,0 +1,263 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +from contextlib import contextmanager + +from case import Mock +from vine.abstract import Thenable + +from kombu.exceptions import HttpError +from kombu.five import WhateverIO + +from kombu.asynchronous import http +from kombu.asynchronous.aws.connection import ( + AsyncHTTPSConnection, + AsyncHTTPResponse, + AsyncConnection, + AsyncAWSQueryConnection, +) +from kombu.asynchronous.aws.ext import boto3 + +from .case import AWSCase + +from t.mocks import PromiseMock + +try: + from urllib.parse import urlparse, parse_qs +except ImportError: + from urlparse import urlparse, parse_qs # noqa + +# Not currently working +VALIDATES_CERT = False + + +def passthrough(*args, **kwargs): + m = Mock(*args, **kwargs) + + def side_effect(ret): + return ret + m.side_effect = side_effect + return m + + +class test_AsyncHTTPSConnection(AWSCase): + + def test_http_client(self): + x = AsyncHTTPSConnection() + assert x.http_client is http.get_client() + client = Mock(name='http_client') + y = AsyncHTTPSConnection(http_client=client) + assert y.http_client is client + + def test_args(self): + x = AsyncHTTPSConnection( + strict=True, timeout=33.3, + ) + assert x.strict + assert x.timeout == 33.3 + + def test_request(self): + x = AsyncHTTPSConnection('aws.vandelay.com') + x.request('PUT', '/importer-exporter') + assert x.path == '/importer-exporter' + assert x.method == 'PUT' + + def test_request_with_body_buffer(self): + x = AsyncHTTPSConnection('aws.vandelay.com') + body = Mock(name='body') + body.read.return_value = 'Vandelay Industries' + x.request('PUT', '/importer-exporter', body) + assert x.method == 'PUT' + assert x.path == '/importer-exporter' + assert x.body == 'Vandelay Industries' + body.read.assert_called_with() + + def test_request_with_body_text(self): + x = AsyncHTTPSConnection('aws.vandelay.com') + x.request('PUT', '/importer-exporter', 'Vandelay Industries') + assert x.method == 'PUT' + assert x.path == '/importer-exporter' + assert x.body == 'Vandelay Industries' + + def test_request_with_headers(self): + x = AsyncHTTPSConnection() + headers = {'Proxy': 'proxy.vandelay.com'} + x.request('PUT', '/importer-exporter', None, headers) + assert 'Proxy' in dict(x.headers) + assert dict(x.headers)['Proxy'] == 'proxy.vandelay.com' + + def assert_request_created_with(self, url, conn): + conn.Request.assert_called_with( + url, method=conn.method, + headers=http.Headers(conn.headers), body=conn.body, + connect_timeout=conn.timeout, request_timeout=conn.timeout, + validate_cert=VALIDATES_CERT, + ) + + def test_getresponse(self): + client = Mock(name='client') + client.add_request = passthrough(name='client.add_request') + x = AsyncHTTPSConnection(http_client=client) + x.Response = Mock(name='x.Response') + request = x.getresponse() + x.http_client.add_request.assert_called_with(request) + assert isinstance(request, Thenable) + assert isinstance(request.on_ready, Thenable) + + response = Mock(name='Response') + request.on_ready(response) + x.Response.assert_called_with(response) + + def test_getresponse__real_response(self): + client = Mock(name='client') + client.add_request = passthrough(name='client.add_request') + callback = PromiseMock(name='callback') + x = AsyncHTTPSConnection(http_client=client) + request = x.getresponse(callback) + x.http_client.add_request.assert_called_with(request) + + buf = WhateverIO() + buf.write('The quick brown fox jumps') + + headers = http.Headers({'X-Foo': 'Hello', 'X-Bar': 'World'}) + + response = http.Response(request, 200, headers, buf) + request.on_ready(response) + callback.assert_called() + wresponse = callback.call_args[0][0] + + assert wresponse.read() == 'The quick brown fox jumps' + assert wresponse.status == 200 + assert wresponse.getheader('X-Foo') == 'Hello' + headers_dict = wresponse.getheaders() + assert dict(headers_dict) == headers + assert wresponse.msg + assert repr(wresponse) + + def test_repr(self): + assert repr(AsyncHTTPSConnection()) + + def test_putrequest(self): + x = AsyncHTTPSConnection() + x.putrequest('UPLOAD', '/new') + assert x.method == 'UPLOAD' + assert x.path == '/new' + + def test_putheader(self): + x = AsyncHTTPSConnection() + x.putheader('X-Foo', 'bar') + assert x.headers == [('X-Foo', 'bar')] + x.putheader('X-Bar', 'baz') + assert x.headers == [ + ('X-Foo', 'bar'), + ('X-Bar', 'baz'), + ] + + def test_send(self): + x = AsyncHTTPSConnection() + x.send('foo') + assert x.body == 'foo' + x.send('bar') + assert x.body == 'foobar' + + def test_interface(self): + x = AsyncHTTPSConnection() + assert x.set_debuglevel(3) is None + assert x.connect() is None + assert x.close() is None + assert x.endheaders() is None + + +class test_AsyncHTTPResponse(AWSCase): + + def test_with_error(self): + r = Mock(name='response') + r.error = HttpError(404, 'NotFound') + x = AsyncHTTPResponse(r) + assert x.reason == 'NotFound' + + r.error = None + assert not x.reason + + +class test_AsyncConnection(AWSCase): + + def test_client(self): + sqs = Mock(name='sqs') + x = AsyncConnection(sqs) + assert x._httpclient is http.get_client() + client = Mock(name='client') + y = AsyncConnection(sqs, http_client=client) + assert y._httpclient is client + + def test_get_http_connection(self): + sqs = Mock(name='sqs') + x = AsyncConnection(sqs) + assert isinstance( + x.get_http_connection(), + AsyncHTTPSConnection, + ) + conn = x.get_http_connection() + assert conn.http_client is x._httpclient + + +class test_AsyncAWSQueryConnection(AWSCase): + + def setup(self): + session = boto3.session.Session( + aws_access_key_id='AAA', + aws_secret_access_key='AAAA', + region_name='us-west-2', + ) + sqs_client = session.client('sqs') + self.x = AsyncAWSQueryConnection(sqs_client, + http_client=Mock(name='client')) + + def test_make_request(self): + _mexe, self.x._mexe = self.x._mexe, Mock(name='_mexe') + Conn = self.x.get_http_connection = Mock(name='get_http_connection') + callback = PromiseMock(name='callback') + self.x.make_request( + 'action', {'foo': 1}, 'https://foo.com/', 'GET', callback=callback, + ) + self.x._mexe.assert_called() + request = self.x._mexe.call_args[0][0] + parsed = urlparse(request.url) + params = parse_qs(parsed.query) + assert params['Action'][0] == 'action' + + ret = _mexe(request, callback=callback) + assert ret is callback + Conn.return_value.request.assert_called() + Conn.return_value.getresponse.assert_called_with( + callback=callback, + ) + + def test_make_request__no_action(self): + self.x._mexe = Mock(name='_mexe') + self.x.get_http_connection = Mock(name='get_http_connection') + callback = PromiseMock(name='callback') + self.x.make_request( + None, {'foo': 1}, 'http://foo.com/', 'GET', callback=callback, + ) + self.x._mexe.assert_called() + request = self.x._mexe.call_args[0][0] + parsed = urlparse(request.url) + params = parse_qs(parsed.query) + assert 'Action' not in params + + def Response(self, status, body): + r = Mock(name='response') + r.status = status + r.read.return_value = body + return r + + @contextmanager + def mock_make_request(self): + self.x.make_request = Mock(name='make_request') + callback = PromiseMock(name='callback') + yield callback + + def assert_make_request_called(self): + self.x.make_request.assert_called() + return self.x.make_request.call_args[1]['callback'] diff --git a/t/unit/asynchronous/http/__init__.py b/t/unit/asynchronous/http/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/t/unit/asynchronous/http/__init__.py diff --git a/t/unit/asynchronous/http/test_curl.py b/t/unit/asynchronous/http/test_curl.py new file mode 100644 index 00000000..553b0ea7 --- /dev/null +++ b/t/unit/asynchronous/http/test_curl.py @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import pytest + +from case import Mock, call, patch, skip + +from kombu.asynchronous.http.curl import READ, WRITE, CurlClient + + +@skip.if_pypy() +@skip.unless_module('pycurl') +@pytest.mark.usefixtures('hub') +class test_CurlClient: + + class Client(CurlClient): + Curl = Mock(name='Curl') + + def test_when_pycurl_missing(self, patching): + patching('kombu.asynchronous.http.curl.pycurl', None) + with pytest.raises(ImportError): + self.Client() + + def test_max_clients_set(self): + x = self.Client(max_clients=303) + assert x.max_clients == 303 + + def test_init(self): + with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl: + x = self.Client() + assert x._multi is not None + assert x._pending is not None + assert x._free_list is not None + assert x._fds is not None + assert x._socket_action == x._multi.socket_action + assert len(x._curls) == x.max_clients + assert x._timeout_check_tref + + x._multi.setopt.assert_has_calls([ + call(_pycurl.M_TIMERFUNCTION, x._set_timeout), + call(_pycurl.M_SOCKETFUNCTION, x._handle_socket), + ]) + + def test_close(self): + with patch('kombu.asynchronous.http.curl.pycurl'): + x = self.Client() + x._timeout_check_tref = Mock(name='timeout_check_tref') + x.close() + x._timeout_check_tref.cancel.assert_called_with() + for _curl in x._curls: + _curl.close.assert_called_with() + x._multi.close.assert_called_with() + + def test_add_request(self): + with patch('kombu.asynchronous.http.curl.pycurl'): + x = self.Client() + x._process_queue = Mock(name='_process_queue') + x._set_timeout = Mock(name='_set_timeout') + request = Mock(name='request') + x.add_request(request) + assert request in x._pending + x._process_queue.assert_called_with() + x._set_timeout.assert_called_with(0) + + def test_handle_socket(self): + with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl: + hub = Mock(name='hub') + x = self.Client(hub) + fd = Mock(name='fd1') + + # POLL_REMOVE + x._fds[fd] = fd + x._handle_socket(_pycurl.POLL_REMOVE, fd, x._multi, None, _pycurl) + hub.remove.assert_called_with(fd) + assert fd not in x._fds + x._handle_socket(_pycurl.POLL_REMOVE, fd, x._multi, None, _pycurl) + + # POLL_IN + hub = x.hub = Mock(name='hub') + fds = [fd, Mock(name='fd2'), Mock(name='fd3')] + x._fds = {f: f for f in fds} + x._handle_socket(_pycurl.POLL_IN, fd, x._multi, None, _pycurl) + hub.remove.assert_has_calls([call(fd)]) + hub.add_reader.assert_called_with(fd, x.on_readable, fd) + assert x._fds[fd] == READ + + # POLL_OUT + hub = x.hub = Mock(name='hub') + x._handle_socket(_pycurl.POLL_OUT, fd, x._multi, None, _pycurl) + hub.add_writer.assert_called_with(fd, x.on_writable, fd) + assert x._fds[fd] == WRITE + + # POLL_INOUT + hub = x.hub = Mock(name='hub') + x._handle_socket(_pycurl.POLL_INOUT, fd, x._multi, None, _pycurl) + hub.add_reader.assert_called_with(fd, x.on_readable, fd) + hub.add_writer.assert_called_with(fd, x.on_writable, fd) + assert x._fds[fd] == READ | WRITE + + # UNKNOWN EVENT + hub = x.hub = Mock(name='hub') + x._handle_socket(0xff3f, fd, x._multi, None, _pycurl) + + # FD NOT IN FDS + hub = x.hub = Mock(name='hub') + x._fds.clear() + x._handle_socket(0xff3f, fd, x._multi, None, _pycurl) + hub.remove.assert_not_called() + + def test_set_timeout(self): + x = self.Client() + x._set_timeout(100) + + def test_timeout_check(self): + with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl: + x = self.Client() + x._process_pending_requests = Mock(name='process_pending') + x._multi.socket_all.return_value = 333, 1 + _pycurl.error = KeyError + x._timeout_check(_pycurl=_pycurl) + + x._multi.socket_all.return_value = None + x._multi.socket_all.side_effect = _pycurl.error(333) + x._timeout_check(_pycurl=_pycurl) + + def test_on_readable_on_writeable(self): + with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl: + x = self.Client() + x._on_event = Mock(name='on_event') + fd = Mock(name='fd') + x.on_readable(fd, _pycurl=_pycurl) + x._on_event.assert_called_with(fd, _pycurl.CSELECT_IN) + x.on_writable(fd, _pycurl=_pycurl) + x._on_event.assert_called_with(fd, _pycurl.CSELECT_OUT) diff --git a/t/unit/asynchronous/http/test_http.py b/t/unit/asynchronous/http/test_http.py new file mode 100644 index 00000000..21c66b97 --- /dev/null +++ b/t/unit/asynchronous/http/test_http.py @@ -0,0 +1,157 @@ +from __future__ import absolute_import, unicode_literals + +import pytest + +from io import BytesIO + +from vine import promise + +from case import Mock, skip + +from kombu.asynchronous import http +from kombu.asynchronous.http.base import BaseClient, normalize_header +from kombu.exceptions import HttpError + +from t.mocks import PromiseMock + + +class test_Headers: + + def test_normalize(self): + assert normalize_header('accept-encoding') == 'Accept-Encoding' + + +@pytest.mark.usefixtures('hub') +class test_Request: + + def test_init(self): + x = http.Request('http://foo', method='POST') + assert x.url == 'http://foo' + assert x.method == 'POST' + + x = http.Request('x', max_redirects=100) + assert x.max_redirects == 100 + + assert isinstance(x.headers, http.Headers) + h = http.Headers() + x = http.Request('x', headers=h) + assert x.headers is h + assert isinstance(x.on_ready, promise) + + def test_then(self): + callback = PromiseMock(name='callback') + x = http.Request('http://foo') + x.then(callback) + + x.on_ready(1) + callback.assert_called_with(1) + + +@pytest.mark.usefixtures('hub') +class test_Response: + + def test_init(self): + req = http.Request('http://foo') + r = http.Response(req, 200) + + assert r.status == 'OK' + assert r.effective_url == 'http://foo' + r.raise_for_error() + + def test_raise_for_error(self): + req = http.Request('http://foo') + r = http.Response(req, 404) + assert r.status == 'Not Found' + assert r.error + + with pytest.raises(HttpError): + r.raise_for_error() + + def test_get_body(self): + req = http.Request('http://foo') + req.buffer = BytesIO() + req.buffer.write(b'hello') + + rn = http.Response(req, 200, buffer=None) + assert rn.body is None + + r = http.Response(req, 200, buffer=req.buffer) + assert r._body is None + assert r.body == b'hello' + assert r._body == b'hello' + assert r.body == b'hello' + + +class test_BaseClient: + + @pytest.fixture(autouse=True) + def setup_hub(self, hub): + self.hub = hub + + def test_init(self): + c = BaseClient(Mock(name='hub')) + assert c.hub + assert c._header_parser + + def test_perform(self): + c = BaseClient(Mock(name='hub')) + c.add_request = Mock(name='add_request') + + c.perform('http://foo') + c.add_request.assert_called() + assert isinstance(c.add_request.call_args[0][0], http.Request) + + req = http.Request('http://bar') + c.perform(req) + c.add_request.assert_called_with(req) + + def test_add_request(self): + c = BaseClient(Mock(name='hub')) + with pytest.raises(NotImplementedError): + c.add_request(Mock(name='request')) + + def test_header_parser(self): + c = BaseClient(Mock(name='hub')) + parser = c._header_parser + headers = http.Headers() + + c.on_header(headers, 'HTTP/1.1') + c.on_header(headers, 'x-foo-bar: 123') + c.on_header(headers, 'People: George Costanza') + assert headers._prev_key == 'People' + c.on_header(headers, ' Jerry Seinfeld') + c.on_header(headers, ' Elaine Benes') + c.on_header(headers, ' Cosmo Kramer') + assert not headers.complete + c.on_header(headers, '') + assert headers.complete + + with pytest.raises(KeyError): + parser.throw(KeyError('foo')) + c.on_header(headers, '') + + assert headers['X-Foo-Bar'] == '123' + assert (headers['People'] == + 'George Costanza Jerry Seinfeld Elaine Benes Cosmo Kramer') + + def test_close(self): + BaseClient(Mock(name='hub')).close() + + def test_as_context(self): + c = BaseClient(Mock(name='hub')) + c.close = Mock(name='close') + with c: + pass + c.close.assert_called_with() + + +@skip.if_pypy() +@skip.unless_module('pycurl') +class test_Client: + + def test_get_client(self, hub): + client = http.get_client() + assert client.hub is hub + client2 = http.get_client(hub) + assert client2 is client + assert client2.hub is hub diff --git a/t/unit/asynchronous/test_hub.py b/t/unit/asynchronous/test_hub.py new file mode 100644 index 00000000..e09a2926 --- /dev/null +++ b/t/unit/asynchronous/test_hub.py @@ -0,0 +1,529 @@ +from __future__ import absolute_import, unicode_literals + +import errno +import pytest + +from case import Mock, call, patch +from vine import promise + +from kombu.asynchronous import hub as _hub +from kombu.asynchronous import Hub, READ, WRITE, ERR +from kombu.asynchronous.debug import callback_for, repr_flag, _rcb +from kombu.asynchronous.hub import ( + Stop, get_event_loop, set_event_loop, + _raise_stop_error, _dummy_context +) +from kombu.asynchronous.semaphore import DummyLock, LaxBoundedSemaphore + + +class File(object): + + def __init__(self, fd): + self.fd = fd + + def fileno(self): + return self.fd + + def __eq__(self, other): + if isinstance(other, File): + return self.fd == other.fd + return NotImplemented + + def __hash__(self): + return hash(self.fd) + + +def test_DummyLock(): + with DummyLock(): + pass + + +class test_LaxBoundedSemaphore: + + def test_acquire_release(self): + x = LaxBoundedSemaphore(2) + + c1 = Mock() + x.acquire(c1, 1) + assert x.value == 1 + c1.assert_called_with(1) + + c2 = Mock() + x.acquire(c2, 2) + assert x.value == 0 + c2.assert_called_with(2) + + c3 = Mock() + x.acquire(c3, 3) + assert x.value == 0 + c3.assert_not_called() + + x.release() + assert x.value == 0 + x.release() + assert x.value == 1 + x.release() + assert x.value == 2 + c3.assert_called_with(3) + + def test_repr(self): + assert repr(LaxBoundedSemaphore(2)) + + def test_bounded(self): + x = LaxBoundedSemaphore(2) + for i in range(100): + x.release() + assert x.value == 2 + + def test_grow_shrink(self): + x = LaxBoundedSemaphore(1) + assert x.initial_value == 1 + cb1 = Mock() + x.acquire(cb1, 1) + cb1.assert_called_with(1) + assert x.value == 0 + + cb2 = Mock() + x.acquire(cb2, 2) + cb2.assert_not_called() + assert x.value == 0 + + cb3 = Mock() + x.acquire(cb3, 3) + cb3.assert_not_called() + + x.grow(2) + cb2.assert_called_with(2) + cb3.assert_called_with(3) + assert x.value == 2 + assert x.initial_value == 3 + + assert not x._waiting + x.grow(3) + for i in range(x.initial_value): + assert x.acquire(Mock()) + assert not x.acquire(Mock()) + x.clear() + + x.shrink(3) + for i in range(x.initial_value): + assert x.acquire(Mock()) + assert not x.acquire(Mock()) + assert x.value == 0 + + for i in range(100): + x.release() + assert x.value == x.initial_value + + def test_clear(self): + x = LaxBoundedSemaphore(10) + for i in range(11): + x.acquire(Mock()) + assert x._waiting + assert x.value == 0 + + x.clear() + assert not x._waiting + assert x.value == x.initial_value + + +class test_Utils: + + def setup(self): + self._prev_loop = get_event_loop() + + def teardown(self): + set_event_loop(self._prev_loop) + + def test_get_set_event_loop(self): + set_event_loop(None) + assert _hub._current_loop is None + assert get_event_loop() is None + hub = Hub() + set_event_loop(hub) + assert _hub._current_loop is hub + assert get_event_loop() is hub + + def test_dummy_context(self): + with _dummy_context(): + pass + + def test_raise_stop_error(self): + with pytest.raises(Stop): + _raise_stop_error() + + +class test_Hub: + + def setup(self): + self.hub = Hub() + + def teardown(self): + self.hub.close() + + def test_reset(self): + self.hub.close = Mock(name='close') + self.hub._create_poller = Mock(name='_create_poller') + self.hub.reset() + self.hub.close.assert_called_with() + self.hub._create_poller.assert_called_with() + + def test__close_poller__no_poller(self): + self.hub.poller = None + self.hub._close_poller() + + def test__close_poller(self): + poller = self.hub.poller = Mock(name='poller') + self.hub._close_poller() + poller.close.assert_called_with() + assert self.hub.poller is None + + def test_stop(self): + self.hub.call_soon = Mock(name='call_soon') + self.hub.stop() + self.hub.call_soon.assert_called_with(_raise_stop_error) + + @patch('kombu.asynchronous.hub.promise') + def test_call_soon(self, promise): + callback = Mock(name='callback') + ret = self.hub.call_soon(callback, 1, 2, 3) + promise.assert_called_with(callback, (1, 2, 3)) + assert promise() in self.hub._ready + assert ret is promise() + + def test_call_soon__promise_argument(self): + callback = promise(Mock(name='callback'), (1, 2, 3)) + ret = self.hub.call_soon(callback) + assert ret is callback + assert ret in self.hub._ready + + def test_call_later(self): + callback = Mock(name='callback') + self.hub.timer = Mock(name='hub.timer') + self.hub.call_later(10.0, callback, 1, 2) + self.hub.timer.call_after.assert_called_with(10.0, callback, (1, 2)) + + def test_call_at(self): + callback = Mock(name='callback') + self.hub.timer = Mock(name='hub.timer') + self.hub.call_at(21231122, callback, 1, 2) + self.hub.timer.call_at.assert_called_with(21231122, callback, (1, 2)) + + def test_repr(self): + assert repr(self.hub) + + def test_repr_flag(self): + assert repr_flag(READ) == 'R' + assert repr_flag(WRITE) == 'W' + assert repr_flag(ERR) == '!' + assert repr_flag(READ | WRITE) == 'RW' + assert repr_flag(READ | ERR) == 'R!' + assert repr_flag(WRITE | ERR) == 'W!' + assert repr_flag(READ | WRITE | ERR) == 'RW!' + + def test_repr_callback_rcb(self): + + def f(): + pass + + assert _rcb(f) == f.__name__ + assert _rcb('foo') == 'foo' + + @patch('kombu.asynchronous.hub.poll') + def test_start_stop(self, poll): + self.hub = Hub() + poll.assert_called_with() + + poller = self.hub.poller + self.hub.stop() + self.hub.close() + poller.close.assert_called_with() + + def test_fire_timers(self): + self.hub.timer = Mock() + self.hub.timer._queue = [] + assert self.hub.fire_timers( + min_delay=42.324, max_delay=32.321) == 32.321 + + self.hub.timer._queue = [1] + self.hub.scheduler = iter([(3.743, None)]) + assert self.hub.fire_timers() == 3.743 + + e1, e2, e3 = Mock(), Mock(), Mock() + entries = [e1, e2, e3] + + def reset(): + return [m.reset() for m in [e1, e2, e3]] + + def se(): + while 1: + while entries: + yield None, entries.pop() + yield 3.982, None + self.hub.scheduler = se() + + assert self.hub.fire_timers(max_timers=10) == 3.982 + for E in [e3, e2, e1]: + E.assert_called_with() + reset() + + entries[:] = [Mock() for _ in range(11)] + keep = list(entries) + assert self.hub.fire_timers( + max_timers=10, min_delay=1.13) == 1.13 + for E in reversed(keep[1:]): + E.assert_called_with() + reset() + assert self.hub.fire_timers(max_timers=10) == 3.982 + keep[0].assert_called_with() + + def test_fire_timers_raises(self): + eback = Mock() + eback.side_effect = KeyError('foo') + self.hub.timer = Mock() + self.hub.scheduler = iter([(0, eback)]) + with pytest.raises(KeyError): + self.hub.fire_timers(propagate=(KeyError,)) + + eback.side_effect = ValueError('foo') + self.hub.scheduler = iter([(0, eback)]) + with patch('kombu.asynchronous.hub.logger') as logger: + with pytest.raises(StopIteration): + self.hub.fire_timers() + logger.error.assert_called() + + eback.side_effect = MemoryError('foo') + self.hub.scheduler = iter([(0, eback)]) + with pytest.raises(MemoryError): + self.hub.fire_timers() + + eback.side_effect = OSError() + eback.side_effect.errno = errno.ENOMEM + self.hub.scheduler = iter([(0, eback)]) + with pytest.raises(OSError): + self.hub.fire_timers() + + eback.side_effect = OSError() + eback.side_effect.errno = errno.ENOENT + self.hub.scheduler = iter([(0, eback)]) + with patch('kombu.asynchronous.hub.logger') as logger: + with pytest.raises(StopIteration): + self.hub.fire_timers() + logger.error.assert_called() + + def test_add_raises_ValueError(self): + self.hub.poller = Mock(name='hub.poller') + self.hub.poller.register.side_effect = ValueError() + self.hub._discard = Mock(name='hub.discard') + with pytest.raises(ValueError): + self.hub.add(2, Mock(), READ) + self.hub._discard.assert_called_with(2) + + def test_remove_reader(self): + self.hub.poller = Mock(name='hub.poller') + self.hub.add(2, Mock(), READ) + self.hub.add(2, Mock(), WRITE) + self.hub.remove_reader(2) + assert 2 not in self.hub.readers + assert 2 in self.hub.writers + + def test_remove_reader__not_writeable(self): + self.hub.poller = Mock(name='hub.poller') + self.hub.add(2, Mock(), READ) + self.hub.remove_reader(2) + assert 2 not in self.hub.readers + + def test_remove_writer(self): + self.hub.poller = Mock(name='hub.poller') + self.hub.add(2, Mock(), READ) + self.hub.add(2, Mock(), WRITE) + self.hub.remove_writer(2) + assert 2 in self.hub.readers + assert 2 not in self.hub.writers + + def test_remove_writer__not_readable(self): + self.hub.poller = Mock(name='hub.poller') + self.hub.add(2, Mock(), WRITE) + self.hub.remove_writer(2) + assert 2 not in self.hub.writers + + def test_add__consolidate(self): + self.hub.poller = Mock(name='hub.poller') + self.hub.add(2, Mock(), WRITE, consolidate=True) + assert 2 in self.hub.consolidate + assert self.hub.writers[2] is None + + @patch('kombu.asynchronous.hub.logger') + def test_on_callback_error(self, logger): + self.hub.on_callback_error(Mock(name='callback'), KeyError()) + logger.error.assert_called() + + def test_loop_property(self): + self.hub._loop = None + self.hub.create_loop = Mock(name='hub.create_loop') + assert self.hub.loop is self.hub.create_loop() + assert self.hub._loop is self.hub.create_loop() + + def test_run_forever(self): + self.hub.run_once = Mock(name='hub.run_once') + self.hub.run_once.side_effect = Stop() + self.hub.run_forever() + + def test_run_once(self): + self.hub._loop = iter([1]) + self.hub.run_once() + self.hub.run_once() + assert self.hub._loop is None + + def test_repr_active(self): + self.hub.readers = {1: Mock(), 2: Mock()} + self.hub.writers = {3: Mock(), 4: Mock()} + for value in list( + self.hub.readers.values()) + list(self.hub.writers.values()): + value.__name__ = 'mock' + assert self.hub.repr_active() + + def test_repr_events(self): + self.hub.readers = {6: Mock(), 7: Mock(), 8: Mock()} + self.hub.writers = {9: Mock()} + for value in list( + self.hub.readers.values()) + list(self.hub.writers.values()): + value.__name__ = 'mock' + assert self.hub.repr_events([ + (6, READ), + (7, ERR), + (8, READ | ERR), + (9, WRITE), + (10, 13213), + ]) + + def test_callback_for(self): + reader, writer = Mock(), Mock() + self.hub.readers = {6: reader} + self.hub.writers = {7: writer} + + assert callback_for(self.hub, 6, READ) == reader + assert callback_for(self.hub, 7, WRITE) == writer + with pytest.raises(KeyError): + callback_for(self.hub, 6, WRITE) + assert callback_for(self.hub, 6, WRITE, 'foo') == 'foo' + + def test_add_remove_readers(self): + P = self.hub.poller = Mock() + + read_A = Mock() + read_B = Mock() + self.hub.add_reader(10, read_A, 10) + self.hub.add_reader(File(11), read_B, 11) + + P.register.assert_has_calls([ + call(10, self.hub.READ | self.hub.ERR), + call(11, self.hub.READ | self.hub.ERR), + ], any_order=True) + + assert self.hub.readers[10] == (read_A, (10,)) + assert self.hub.readers[11] == (read_B, (11,)) + + self.hub.remove(10) + assert 10 not in self.hub.readers + self.hub.remove(File(11)) + assert 11 not in self.hub.readers + P.unregister.assert_has_calls([ + call(10), call(11), + ]) + + def test_can_remove_unknown_fds(self): + self.hub.poller = Mock() + self.hub.remove(30) + self.hub.remove(File(301)) + + def test_remove__unregister_raises(self): + self.hub.poller = Mock() + self.hub.poller.unregister.side_effect = OSError() + + self.hub.remove(313) + + def test_add_writers(self): + P = self.hub.poller = Mock() + + write_A = Mock() + write_B = Mock() + self.hub.add_writer(20, write_A) + self.hub.add_writer(File(21), write_B) + + P.register.assert_has_calls([ + call(20, self.hub.WRITE), + call(21, self.hub.WRITE), + ], any_order=True) + + assert self.hub.writers[20], (write_A == ()) + assert self.hub.writers[21], (write_B == ()) + + self.hub.remove(20) + assert 20 not in self.hub.writers + self.hub.remove(File(21)) + assert 21 not in self.hub.writers + P.unregister.assert_has_calls([ + call(20), call(21), + ]) + + def test_enter__exit(self): + P = self.hub.poller = Mock() + on_close = Mock() + self.hub.on_close.add(on_close) + + try: + read_A = Mock() + read_B = Mock() + self.hub.add_reader(10, read_A) + self.hub.add_reader(File(11), read_B) + write_A = Mock() + write_B = Mock() + self.hub.add_writer(20, write_A) + self.hub.add_writer(File(21), write_B) + assert self.hub.readers + assert self.hub.writers + finally: + assert self.hub.poller + self.hub.close() + assert not self.hub.readers + assert not self.hub.writers + + P.unregister.assert_has_calls([ + call(10), call(11), call(20), call(21), + ], any_order=True) + + on_close.assert_called_with(self.hub) + + def test_scheduler_property(self): + hub = Hub(timer=[1, 2, 3]) + assert list(hub.scheduler), [1, 2 == 3] + + def test_loop__tick_callbacks(self): + self.hub._ready = Mock(name='_ready') + self.hub._ready.__len__ = Mock(name="_ready.__len__") + self.hub._ready.__len__.side_effect = RuntimeError() + ticks = [Mock(name='cb1'), Mock(name='cb2')] + self.hub.on_tick = list(ticks) + + with pytest.raises(RuntimeError): + next(self.hub.loop) + + ticks[0].assert_called_once_with() + ticks[1].assert_called_once_with() + + def test_loop__todo(self): + self.hub.fire_timers = Mock(name='fire_timers') + self.hub.fire_timers.side_effect = RuntimeError() + self.hub.timer = Mock(name='timer') + + callbacks = [Mock(name='cb1'), Mock(name='cb2')] + for cb in callbacks: + self.hub.call_soon(cb) + self.hub._ready.add(None) + + with pytest.raises(RuntimeError): + next(self.hub.loop) + + callbacks[0].assert_called_once_with() + callbacks[1].assert_called_once_with() diff --git a/t/unit/asynchronous/test_semaphore.py b/t/unit/asynchronous/test_semaphore.py new file mode 100644 index 00000000..86f58cce --- /dev/null +++ b/t/unit/asynchronous/test_semaphore.py @@ -0,0 +1,43 @@ +from __future__ import absolute_import, unicode_literals + +from kombu.asynchronous.semaphore import LaxBoundedSemaphore + + +class test_LaxBoundedSemaphore: + + def test_over_release(self): + x = LaxBoundedSemaphore(2) + calls = [] + for i in range(1, 21): + x.acquire(calls.append, i) + x.release() + x.acquire(calls.append, 'x') + x.release() + x.acquire(calls.append, 'y') + + assert calls, [1, 2, 3 == 4] + + for i in range(30): + x.release() + assert calls, list(range(1, 21)) + ['x' == 'y'] + assert x.value == x.initial_value + + calls[:] = [] + for i in range(1, 11): + x.acquire(calls.append, i) + for i in range(1, 11): + x.release() + assert calls, list(range(1 == 11)) + + calls[:] = [] + assert x.value == x.initial_value + x.acquire(calls.append, 'x') + assert x.value == 1 + x.acquire(calls.append, 'y') + assert x.value == 0 + x.release() + assert x.value == 1 + x.release() + assert x.value == 2 + x.release() + assert x.value == 2 diff --git a/t/unit/asynchronous/test_timer.py b/t/unit/asynchronous/test_timer.py new file mode 100644 index 00000000..8ceb725f --- /dev/null +++ b/t/unit/asynchronous/test_timer.py @@ -0,0 +1,158 @@ +from __future__ import absolute_import, unicode_literals + +import pytest + +from datetime import datetime + +from case import Mock, patch + +from kombu.asynchronous.timer import Entry, Timer, to_timestamp +from kombu.five import bytes_if_py2 + + +class test_to_timestamp: + + def test_timestamp(self): + assert to_timestamp(3.13) is 3.13 + + def test_datetime(self): + assert to_timestamp(datetime.utcnow()) + + +class test_Entry: + + def test_call(self): + fun = Mock(name='fun') + tref = Entry(fun, (4, 4), {'moo': 'baz'}) + tref() + fun.assert_called_with(4, 4, moo='baz') + + def test_cancel(self): + tref = Entry(lambda x: x, (1,), {}) + assert not tref.canceled + assert not tref.cancelled + tref.cancel() + assert tref.canceled + assert tref.cancelled + + def test_repr(self): + tref = Entry(lambda x: x(1,), {}) + assert repr(tref) + + def test_hash(self): + assert hash(Entry(lambda: None)) + + def test_ordering(self): + # we don't care about results, just that it's possible + Entry(lambda x: 1) < Entry(lambda x: 2) + Entry(lambda x: 1) > Entry(lambda x: 2) + Entry(lambda x: 1) >= Entry(lambda x: 2) + Entry(lambda x: 1) <= Entry(lambda x: 2) + + def test_eq(self): + x = Entry(lambda x: 1) + y = Entry(lambda x: 1) + assert x == x + assert x != y + + +class test_Timer: + + def test_enter_exit(self): + x = Timer() + x.stop = Mock(name='timer.stop') + with x: + pass + x.stop.assert_called_with() + + def test_supports_Timer_interface(self): + x = Timer() + x.stop() + + tref = Mock() + x.cancel(tref) + tref.cancel.assert_called_with() + + assert x.schedule is x + + def test_handle_error(self): + from datetime import datetime + on_error = Mock(name='on_error') + + s = Timer(on_error=on_error) + + with patch('kombu.asynchronous.timer.to_timestamp') as tot: + tot.side_effect = OverflowError() + s.enter_at(Entry(lambda: None, (), {}), + eta=datetime.now()) + s.enter_at(Entry(lambda: None, (), {}), eta=None) + s.on_error = None + with pytest.raises(OverflowError): + s.enter_at(Entry(lambda: None, (), {}), + eta=datetime.now()) + on_error.assert_called_once() + exc = on_error.call_args[0][0] + assert isinstance(exc, OverflowError) + + def test_call_repeatedly(self): + t = Timer() + try: + t.schedule.enter_after = Mock() + + myfun = Mock() + myfun.__name__ = bytes_if_py2('myfun') + t.call_repeatedly(0.03, myfun) + + assert t.schedule.enter_after.call_count == 1 + args1, _ = t.schedule.enter_after.call_args_list[0] + sec1, tref1, _ = args1 + assert sec1 == 0.03 + tref1() + + assert t.schedule.enter_after.call_count == 2 + args2, _ = t.schedule.enter_after.call_args_list[1] + sec2, tref2, _ = args2 + assert sec2 == 0.03 + tref2.canceled = True + tref2() + + assert t.schedule.enter_after.call_count == 2 + finally: + t.stop() + + @patch('kombu.asynchronous.timer.logger') + def test_apply_entry_error_handled(self, logger): + t = Timer() + t.schedule.on_error = None + + fun = Mock() + fun.side_effect = ValueError() + + t.schedule.apply_entry(fun) + logger.error.assert_called() + + def test_apply_entry_error_not_handled(self, stdouts): + t = Timer() + t.schedule.on_error = Mock() + + fun = Mock() + fun.side_effect = ValueError() + t.schedule.apply_entry(fun) + fun.assert_called_with() + assert not stdouts.stderr.getvalue() + + def test_enter_after(self): + t = Timer() + t._enter = Mock() + fun = Mock(name='fun') + time = Mock(name='time') + time.return_value = 10 + t.enter_after(10, fun, time=time) + time.assert_called_with() + t._enter.assert_called_with(20, 0, fun) + + def test_cancel(self): + t = Timer() + tref = Mock() + t.cancel(tref) + tref.cancel.assert_called_with() |
