summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2017-05-17 16:58:31 +0000
committerGerrit Code Review <review@openstack.org>2017-05-17 16:58:31 +0000
commit50616da084f9142e4746891e94be76ca6d1d14f9 (patch)
tree6155be0c80b4b71bd9bca33da3c140af54e24376
parent77df412aea40eb7be10efe3da14da13c105afc8d (diff)
parentafb9001c28caa013723344ee5653d193322cfe7e (diff)
downloadgear-50616da084f9142e4746891e94be76ca6d1d14f9.tar.gz
Merge "Provide TextJob and TextWorker for convenience"
-rw-r--r--gear/__init__.py419
-rw-r--r--gear/tests/test_functional.py75
2 files changed, 389 insertions, 105 deletions
diff --git a/gear/__init__.py b/gear/__init__.py
index 5db3b78..7aaeb19 100644
--- a/gear/__init__.py
+++ b/gear/__init__.py
@@ -1426,8 +1426,8 @@ class Client(BaseClient):
if job.unique is None:
unique = b''
else:
- unique = job.unique
- data = b'\x00'.join((job.binary_name, unique, job.arguments))
+ unique = job.binary_unique
+ data = b'\x00'.join((job.binary_name, unique, job.binary_arguments))
if background:
if precedence == PRECEDENCE_NORMAL:
cmd = constants.SUBMIT_JOB_BG
@@ -1696,6 +1696,159 @@ class FunctionRecord(object):
id(self), self.name, self.timeout)
+class BaseJob(object):
+ def __init__(self, name, arguments, unique=None, handle=None):
+ self._name = convert_to_bytes(name)
+ self._validate_arguments(arguments)
+ self._arguments = convert_to_bytes(arguments)
+ self._unique = convert_to_bytes(unique)
+ self.handle = handle
+ self.connection = None
+
+ def _validate_arguments(self, arguments):
+ if (not isinstance(arguments, bytes) and
+ not isinstance(arguments, bytearray)):
+ raise TypeError("arguments must be of type bytes or bytearray")
+
+ @property
+ def arguments(self):
+ return self._arguments
+
+ @arguments.setter
+ def arguments(self, value):
+ self._arguments = value
+
+ @property
+ def unique(self):
+ return self._unique
+
+ @unique.setter
+ def unique(self, value):
+ self._unique = value
+
+ @property
+ def name(self):
+ if isinstance(self._name, six.binary_type):
+ return self._name.decode('utf-8')
+ return self._name
+
+ @name.setter
+ def name(self, value):
+ if isinstance(value, six.text_type):
+ value = value.encode('utf-8')
+ self._name = value
+
+ @property
+ def binary_name(self):
+ return self._name
+
+ @property
+ def binary_arguments(self):
+ return self._arguments
+
+ @property
+ def binary_unique(self):
+ return self._unique
+
+ def __repr__(self):
+ return '<gear.Job 0x%x handle: %s name: %s unique: %s>' % (
+ id(self), self.handle, self.name, self.unique)
+
+
+class WorkerJob(BaseJob):
+ """A job that Gearman has assigned to a Worker. Not intended to
+ be instantiated directly, but rather returned by
+ :py:meth:`Worker.getJob`.
+
+ :arg str handle: The job handle assigned by gearman.
+ :arg str name: The name of the job.
+ :arg bytes arguments: The opaque data blob passed to the worker
+ as arguments.
+ :arg str unique: A byte string to uniquely identify the job to Gearman
+ (optional).
+
+ The following instance attributes are available:
+
+ **name** (str)
+ The name of the job. Assumed to be utf-8.
+ **arguments** (bytes)
+ The opaque data blob passed to the worker as arguments.
+ **unique** (str or None)
+ The unique ID of the job (if supplied).
+ **handle** (bytes)
+ The Gearman job handle.
+ **connection** (:py:class:`Connection` or None)
+ The connection associated with the job. Only set after the job
+ has been submitted to a Gearman server.
+ """
+
+ def __init__(self, handle, name, arguments, unique=None):
+ super(WorkerJob, self).__init__(name, arguments, unique, handle)
+
+ def sendWorkData(self, data=b''):
+ """Send a WORK_DATA packet to the client.
+
+ :arg bytes data: The data to be sent to the client (optional).
+ """
+
+ data = self.handle + b'\x00' + data
+ p = Packet(constants.REQ, constants.WORK_DATA, data)
+ self.connection.sendPacket(p)
+
+ def sendWorkWarning(self, data=b''):
+ """Send a WORK_WARNING packet to the client.
+
+ :arg bytes data: The data to be sent to the client (optional).
+ """
+
+ data = self.handle + b'\x00' + data
+ p = Packet(constants.REQ, constants.WORK_WARNING, data)
+ self.connection.sendPacket(p)
+
+ def sendWorkStatus(self, numerator, denominator):
+ """Send a WORK_STATUS packet to the client.
+
+ Sends a numerator and denominator that together represent the
+ fraction complete of the job.
+
+ :arg numeric numerator: The numerator of the fraction complete.
+ :arg numeric denominator: The denominator of the fraction complete.
+ """
+
+ data = (self.handle + b'\x00' +
+ str(numerator).encode('utf8') + b'\x00' +
+ str(denominator).encode('utf8'))
+ p = Packet(constants.REQ, constants.WORK_STATUS, data)
+ self.connection.sendPacket(p)
+
+ def sendWorkComplete(self, data=b''):
+ """Send a WORK_COMPLETE packet to the client.
+
+ :arg bytes data: The data to be sent to the client (optional).
+ """
+
+ data = self.handle + b'\x00' + data
+ p = Packet(constants.REQ, constants.WORK_COMPLETE, data)
+ self.connection.sendPacket(p)
+
+ def sendWorkFail(self):
+ "Send a WORK_FAIL packet to the client."
+
+ p = Packet(constants.REQ, constants.WORK_FAIL, self.handle)
+ self.connection.sendPacket(p)
+
+ def sendWorkException(self, data=b''):
+ """Send a WORK_EXCEPTION packet to the client.
+
+ :arg bytes data: The exception data to be sent to the client
+ (optional).
+ """
+
+ data = self.handle + b'\x00' + data
+ p = Packet(constants.REQ, constants.WORK_EXCEPTION, data)
+ self.connection.sendPacket(p)
+
+
class Worker(BaseClient):
"""A Gearman worker.
@@ -1708,6 +1861,8 @@ class Worker(BaseClient):
is deprecated, use client_id instead.
"""
+ job_class = WorkerJob
+
def __init__(self, client_id=None, worker_id=None):
if not client_id or worker_id:
raise Exception("A client_id must be provided")
@@ -2027,7 +2182,7 @@ class Worker(BaseClient):
arguments, unique)
def _handleJobAssignment(self, packet, handle, name, arguments, unique):
- job = WorkerJob(handle, name, arguments, unique)
+ job = self.job_class(handle, name, arguments, unique)
job.connection = packet.connection
self.job_lock.acquire()
@@ -2043,38 +2198,6 @@ class Worker(BaseClient):
self.job_lock.release()
-class BaseJob(object):
- def __init__(self, name, arguments, unique=None, handle=None):
- self._name = convert_to_bytes(name)
- if (not isinstance(arguments, bytes) and
- not isinstance(arguments, bytearray)):
- raise TypeError("arguments must be of type bytes or bytearray")
- self.arguments = arguments
- self.unique = convert_to_bytes(unique)
- self.handle = handle
- self.connection = None
-
- @property
- def name(self):
- if isinstance(self._name, six.binary_type):
- return self._name.decode('utf-8')
- return self._name
-
- @name.setter
- def name(self, value):
- if isinstance(value, six.text_type):
- value = value.encode('utf-8')
- self._name = value
-
- @property
- def binary_name(self):
- return self._name
-
- def __repr__(self):
- return '<gear.Job 0x%x handle: %s name: %s unique: %s>' % (
- id(self), self.handle, self.name, self.unique)
-
-
class Job(BaseJob):
"""A job to run or being run by Gearman.
@@ -2097,7 +2220,9 @@ class Job(BaseJob):
**data** (list of byte-arrays)
The result data returned from Gearman. Each packet appends an
element to the list. Depending on the nature of the data, the
- elements may need to be concatenated before use.
+ elements may need to be concatenated before use. This is returned
+ as a snapshot copy of the data to prevent accidental attempts at
+ modification which will be lost.
**exception** (bytes or None)
Exception information returned from Gearman. None if no exception
has been received.
@@ -2127,10 +2252,12 @@ class Job(BaseJob):
has been submitted to a Gearman server.
"""
+ data_type = list
+
def __init__(self, name, arguments, unique=None):
super(Job, self).__init__(name, arguments, unique)
- self.data = []
- self.exception = None
+ self._data = self.data_type()
+ self._exception = None
self.warning = False
self.complete = False
self.failure = False
@@ -2140,99 +2267,181 @@ class Job(BaseJob):
self.known = None
self.running = None
+ @property
+ def binary_data(self):
+ for value in self._data:
+ if isinstance(value, six.text_type):
+ value = value.encode('utf-8')
+ yield value
+
+ @property
+ def data(self):
+ return self._data
-class WorkerJob(BaseJob):
- """A job that Gearman has assigned to a Worker. Not intended to
- be instantiated directly, but rather returned by
- :py:meth:`Worker.getJob`.
+ @data.setter
+ def data(self, value):
+ if not isinstance(value, self.data_type):
+ raise ValueError(
+ "data attribute must be {}".format(self.data_type))
+ self._data = value
- :arg str handle: The job handle assigned by gearman.
- :arg str name: The name of the job.
- :arg bytes arguments: The opaque data blob passed to the worker
- as arguments.
- :arg str unique: A byte string to uniquely identify the job to Gearman
- (optional).
+ @property
+ def exception(self):
+ return self._exception
- The following instance attributes are available:
+ @exception.setter
+ def exception(self, value):
+ self._data = value
+
+
+class TextJobArguments(object):
+ """Assumes utf-8 arguments in addition to name
+
+ If one is always dealing in valid utf-8, using this job class relieves one
+ of the need to encode/decode constantly."""
+
+ def _validate_arguments(self, arguments):
+ pass
+
+ @property
+ def arguments(self):
+ args = self._arguments
+ if isinstance(args, six.binary_type):
+ return args.decode('utf-8')
+ return args
+
+ @arguments.setter
+ def arguments(self, value):
+ if not isinstance(value, six.binary_type):
+ value = value.encode('utf-8')
+ self._arguments = value
+
+
+class TextJobUnique(object):
+ """Assumes utf-8 unique
+
+ If one is always dealing in valid utf-8, using this job class relieves one
+ of the need to encode/decode constantly."""
+
+ @property
+ def unique(self):
+ unique = self._unique
+ if isinstance(unique, six.binary_type):
+ return unique.decode('utf-8')
+ return unique
+
+ @unique.setter
+ def unique(self, value):
+ if not isinstance(value, six.binary_type):
+ value = value.encode('utf-8')
+ self._unique = value
+
+
+class TextList(list):
+ def append(self, x):
+ if isinstance(x, six.binary_type):
+ x = x.decode('utf-8')
+ super(TextList, self).append(x)
+
+ def extend(self, iterable):
+ def _iter():
+ for value in iterable:
+ if isinstance(value, six.binary_type):
+ yield value.decode('utf-8')
+ else:
+ yield value
+ super(TextList, self).extend(_iter)
+
+ def insert(self, i, x):
+ if isinstance(x, six.binary_type):
+ x = x.decode('utf-8')
+ super(TextList, self).insert(i, x)
+
+
+class TextJob(TextJobArguments, TextJobUnique, Job):
+ """ Sends and receives UTF-8 arguments and data.
+
+ Use this instead of Job when you only expect to send valid UTF-8 through
+ gearman. It will automatically encode arguments and work data as UTF-8, and
+ any jobs fetched from this worker will have their arguments and data
+ decoded assuming they are valid UTF-8, and thus return strings.
+
+ Attributes and method signatures are thes ame as Job except as noted here:
+
+ ** arguments ** (str) This will be returned as a string.
+ ** data ** (tuple of str) This will be returned as a tuble of strings.
- **name** (str)
- The name of the job. Assumed to be utf-8.
- **arguments** (bytes)
- The opaque data blob passed to the worker as arguments.
- **unique** (str or None)
- The unique ID of the job (if supplied).
- **handle** (bytes)
- The Gearman job handle.
- **connection** (:py:class:`Connection` or None)
- The connection associated with the job. Only set after the job
- has been submitted to a Gearman server.
"""
- def __init__(self, handle, name, arguments, unique=None):
- super(WorkerJob, self).__init__(name, arguments, unique, handle)
+ data_type = TextList
- def sendWorkData(self, data=b''):
+ @property
+ def exception(self):
+ exception = self._exception
+ if isinstance(exception, six.binary_type):
+ return exception.decode('utf-8')
+ return exception
+
+ @exception.setter
+ def exception(self, value):
+ if not isinstance(value, six.binary_type):
+ value = value.encode('utf-8')
+ self._exception = value
+
+
+class TextWorkerJob(TextJobArguments, TextJobUnique, WorkerJob):
+ """ Sends and receives UTF-8 arguments and data.
+
+ See TextJob. sendWorkData and sendWorkWarning accept strings
+ and will encode them as UTF-8.
+ """
+ def sendWorkData(self, data=''):
"""Send a WORK_DATA packet to the client.
- :arg bytes data: The data to be sent to the client (optional).
+ :arg str data: The data to be sent to the client (optional).
"""
+ if isinstance(data, six.text_type):
+ data = data.encode('utf8')
+ return super(TextWorkerJob, self).sendWorkData(data)
- data = self.handle + b'\x00' + data
- p = Packet(constants.REQ, constants.WORK_DATA, data)
- self.connection.sendPacket(p)
-
- def sendWorkWarning(self, data=b''):
+ def sendWorkWarning(self, data=''):
"""Send a WORK_WARNING packet to the client.
- :arg bytes data: The data to be sent to the client (optional).
+ :arg str data: The data to be sent to the client (optional).
"""
- data = self.handle + b'\x00' + data
- p = Packet(constants.REQ, constants.WORK_WARNING, data)
- self.connection.sendPacket(p)
-
- def sendWorkStatus(self, numerator, denominator):
- """Send a WORK_STATUS packet to the client.
+ if isinstance(data, six.text_type):
+ data = data.encode('utf8')
+ return super(TextWorkerJob, self).sendWorkWarning(data)
- Sends a numerator and denominator that together represent the
- fraction complete of the job.
+ def sendWorkComplete(self, data=''):
+ """Send a WORK_COMPLETE packet to the client.
- :arg numeric numerator: The numerator of the fraction complete.
- :arg numeric denominator: The denominator of the fraction complete.
+ :arg str data: The data to be sent to the client (optional).
"""
+ if isinstance(data, six.text_type):
+ data = data.encode('utf8')
+ return super(TextWorkerJob, self).sendWorkComplete(data)
- data = (self.handle + b'\x00' +
- str(numerator).encode('utf8') + b'\x00' +
- str(denominator).encode('utf8'))
- p = Packet(constants.REQ, constants.WORK_STATUS, data)
- self.connection.sendPacket(p)
-
- def sendWorkComplete(self, data=b''):
- """Send a WORK_COMPLETE packet to the client.
+ def sendWorkException(self, data=''):
+ """Send a WORK_EXCEPTION packet to the client.
- :arg bytes data: The data to be sent to the client (optional).
+ :arg str data: The data to be sent to the client (optional).
"""
- data = self.handle + b'\x00' + data
- p = Packet(constants.REQ, constants.WORK_COMPLETE, data)
- self.connection.sendPacket(p)
+ if isinstance(data, six.text_type):
+ data = data.encode('utf8')
+ return super(TextWorkerJob, self).sendWorkException(data)
- def sendWorkFail(self):
- "Send a WORK_FAIL packet to the client."
- p = Packet(constants.REQ, constants.WORK_FAIL, self.handle)
- self.connection.sendPacket(p)
+class TextWorker(Worker):
+ """ Sends and receives UTF-8 only.
- def sendWorkException(self, data=b''):
- """Send a WORK_EXCEPTION packet to the client.
+ See TextJob.
- :arg bytes data: The exception data to be sent to the client
- (optional).
- """
+ """
- data = self.handle + b'\x00' + data
- p = Packet(constants.REQ, constants.WORK_EXCEPTION, data)
- self.connection.sendPacket(p)
+ job_class = TextWorkerJob
class BaseBinaryJob(object):
@@ -3153,7 +3362,7 @@ class Server(BaseClientServer):
self.sendNoJob(packet.connection)
def sendJobAssignUniq(self, connection, job):
- unique = job.unique
+ unique = job.binary_unique
if not unique:
unique = b''
data = b'\x00'.join((job.handle, job.name, unique, job.arguments))
diff --git a/gear/tests/test_functional.py b/gear/tests/test_functional.py
index ea02ffe..e512f2f 100644
--- a/gear/tests/test_functional.py
+++ b/gear/tests/test_functional.py
@@ -16,6 +16,7 @@
import os
import threading
import time
+import uuid
from OpenSSL import crypto
import fixtures
@@ -149,5 +150,79 @@ class TestFunctional(tests.BaseTestCase):
self.assertEqual('test', workerjob.name)
+class TestFunctionalText(tests.BaseTestCase):
+ def setUp(self):
+ super(TestFunctionalText, self).setUp()
+ self.server = gear.Server(0)
+ self.client = gear.Client('client')
+ self.worker = gear.TextWorker('worker')
+ self.client.addServer('127.0.0.1', self.server.port)
+ self.worker.addServer('127.0.0.1', self.server.port)
+ self.client.waitForServer()
+ self.worker.waitForServer()
+
+ def test_text_job(self):
+ self.worker.registerFunction('test')
+
+ for jobcount in range(2):
+ job = gear.TextJob('test', 'testdata')
+ self.client.submitJob(job)
+ self.assertNotEqual(job.handle, None)
+
+ workerjob = self.worker.getJob()
+ self.assertEqual(workerjob.handle, job.handle)
+ self.assertEqual(workerjob.arguments, 'testdata')
+ workerjob.sendWorkData('workdata')
+ workerjob.sendWorkComplete()
+
+ for count in iterate_timeout(30, "job completion"):
+ if job.complete:
+ break
+ self.assertTrue(job.complete)
+ self.assertEqual(job.data, ['workdata'])
+
+ def test_text_job_unique(self):
+ self.worker.registerFunction('test')
+
+ for jobcount in range(2):
+ jobunique = uuid.uuid4().hex
+ job = gear.TextJob('test', 'testdata', unique=jobunique)
+ self.client.submitJob(job)
+ self.assertNotEqual(job.handle, None)
+
+ workerjob = self.worker.getJob()
+ self.assertEqual(workerjob.handle, job.handle)
+ self.assertEqual(workerjob.arguments, 'testdata')
+ workerjob.sendWorkData('workdata')
+ workerjob.sendWorkComplete()
+
+ for count in iterate_timeout(30, "job completion"):
+ if job.complete:
+ break
+ self.assertTrue(job.complete)
+ self.assertEqual(job.data, ['workdata'])
+ self.assertEqual(job.unique, jobunique)
+ self.assertEqual(workerjob.unique, jobunique)
+
+ def test_text_job_exception(self):
+ self.worker.registerFunction('test')
+
+ for jobcount in range(2):
+ job = gear.TextJob('test', 'testdata')
+ self.client.submitJob(job)
+ self.assertNotEqual(job.handle, None)
+
+ workerjob = self.worker.getJob()
+ self.assertEqual(workerjob.handle, job.handle)
+ self.assertEqual(workerjob.arguments, 'testdata')
+ workerjob.sendWorkException('work failed')
+
+ for count in iterate_timeout(30, "job completion"):
+ if job.complete:
+ break
+ self.assertTrue(job.complete)
+ self.assertEqual(job.exception, 'work failed')
+
+
def load_tests(loader, in_tests, pattern):
return testscenarios.load_tests_apply_scenarios(loader, in_tests, pattern)