diff options
56 files changed, 344 insertions, 386 deletions
diff --git a/.coveragerc b/.coveragerc index b78c524..2838512 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,16 +1,13 @@ [run] +source = rq omit = - rq/scripts/* - rq/compat/* rq/contrib/legacy.py - rq/dummy.py rq/local.py rq/tests/* tests/* [report] exclude_lines = + if __name__ == .__main__.: if TYPE_CHECKING: pragma: no cover - if __name__ == .__main__.: -
\ No newline at end of file diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index df3f274..7a3dbca 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -25,15 +25,13 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install flake8 black + pip install black ruff - - name: Lint with flake8 + - name: Lint with black run: | - # stop the build if there are Python syntax errors or undefined names - flake8 . --select=E9,F63,F7,F82 --show-source - # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide - flake8 . --exit-zero --max-complexity=5 + black --check --skip-string-normalization --line-length 120 rq tests - - name: Lint with black + - name: Lint with ruff run: | - black -S -l 120 rq/ + # stop the build if there are Python syntax errors. + ruff check --show-source rq tests diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 1f77645..67052e8 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -59,7 +59,7 @@ jobs: - name: Test with pytest run: | - RUN_SLOW_TESTS_TOO=1 pytest --cov=./ --cov-report=xml --durations=5 + RUN_SLOW_TESTS_TOO=1 pytest --cov=rq --cov-config=.coveragerc --cov-report=xml --durations=5 - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 @@ -98,7 +98,7 @@ jobs: - name: Test with pytest run: | - RUN_SLOW_TESTS_TOO=1 pytest --cov=./ --cov-report=xml --durations=5 + RUN_SLOW_TESTS_TOO=1 pytest --cov=rq --cov-config=.coveragerc --cov-report=xml --durations=5 - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..d45026b --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,9 @@ +repos: + - repo: https://github.com/psf/black + rev: 23.3.0 + hooks: + - id: black + - repo: https://github.com/charliermarsh/ruff-pre-commit + rev: "v0.0.267" + hooks: + - id: ruff diff --git a/codecov.yml b/codecov.yml index aa84c67..6e566ad 100644 --- a/codecov.yml +++ b/codecov.yml @@ -1,2 +1,3 @@ ignore: + - setup.py - "*/tests/*" diff --git a/docs/contrib/testing.md b/docs/contrib/testing.md index f8da71e..3264e43 100644 --- a/docs/contrib/testing.md +++ b/docs/contrib/testing.md @@ -37,7 +37,7 @@ RUN_SLOW_TESTS_TOO=1 pytest . If you want to analyze the coverage reports, you can use the `--cov` argument to `pytest`. By adding `--cov-report`, you also have some flexibility in terms of the report output format: ```sh -RUN_SLOW_TESTS_TOO=1 pytest --cov=./ --cov-report={{report_format}} --durations=5 +RUN_SLOW_TESTS_TOO=1 pytest --cov=rq --cov-config=.coveragerc --cov-report={{report_format}} --durations=5 ``` Where you replace the `report_format` by the desired format (`term` / `html` / `xml`). diff --git a/examples/fib.py b/examples/fib.py index 2130b3c..4ca4493 100644 --- a/examples/fib.py +++ b/examples/fib.py @@ -2,4 +2,4 @@ def slow_fib(n): if n <= 1: return 1 else: - return slow_fib(n-1) + slow_fib(n-2) + return slow_fib(n - 1) + slow_fib(n - 2) diff --git a/examples/run_example.py b/examples/run_example.py index 93f62bd..43fe163 100644 --- a/examples/run_example.py +++ b/examples/run_example.py @@ -1,10 +1,10 @@ import os import time -from rq import Connection, Queue - from fib import slow_fib +from rq import Connection, Queue + def main(): # Range of Fibonacci numbers to compute diff --git a/pyproject.toml b/pyproject.toml index 4787b13..ebcd7e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,4 +1,20 @@ [tool.black] line-length = 120 -target-version = ['py36'] -skip-string-normalization = true
\ No newline at end of file +target-version = ['py38'] +skip-string-normalization = true + +[tool.ruff] +# Set what ruff should check for. +# See https://beta.ruff.rs/docs/rules/ for a list of rules. +select = [ + "E", # pycodestyle errors + "F", # pyflakes errors + "I", # import sorting + "W", # pycodestyle warnings +] +line-length = 120 # To match black. +target-version = 'py38' + +[tool.ruff.isort] +known-first-party = ["rq"] +section-order = ["future", "standard-library", "third-party", "first-party", "local-folder"] diff --git a/rq/__init__.py b/rq/__init__.py index 0ab7065..b385e76 100644 --- a/rq/__init__.py +++ b/rq/__init__.py @@ -1,7 +1,6 @@ -# flake8: noqa - +# ruff: noqa: F401 from .connections import Connection, get_current_connection, pop_connection, push_connection -from .job import cancel_job, get_current_job, requeue_job, Retry, Callback +from .job import Callback, Retry, cancel_job, get_current_job, requeue_job from .queue import Queue from .version import VERSION from .worker import SimpleWorker, Worker diff --git a/rq/cli/__init__.py b/rq/cli/__init__.py index 821f9d7..ec850b8 100644 --- a/rq/cli/__init__.py +++ b/rq/cli/__init__.py @@ -1,4 +1,4 @@ -# flake8: noqa +# ruff: noqa: F401 I001 from .cli import main # TODO: the following imports can be removed when we drop the `rqinfo` and diff --git a/rq/cli/cli.py b/rq/cli/cli.py index bccde97..eb18293 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -5,45 +5,47 @@ RQ command line tool import os import sys import warnings - from typing import List, Type import click from redis.exceptions import ConnectionError -from rq import Connection, Retry, __version__ as version +from rq import Connection, Retry +from rq import __version__ as version from rq.cli.helpers import ( + parse_function_args, + parse_schedule, + pass_cli_config, read_config_file, refresh, setup_loghandlers_from_args, show_both, show_queues, show_workers, - parse_function_args, - parse_schedule, - pass_cli_config, ) # from rq.cli.pool import pool from rq.contrib.legacy import cleanup_ghosts from rq.defaults import ( - DEFAULT_RESULT_TTL, - DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, - DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT, + DEFAULT_LOGGING_FORMAT, DEFAULT_MAINTENANCE_TASK_INTERVAL, + DEFAULT_RESULT_TTL, + DEFAULT_WORKER_TTL, ) from rq.exceptions import InvalidJobOperationError from rq.job import Job, JobStatus from rq.logutils import blue from rq.registry import FailedJobRegistry, clean_registries from rq.serializers import DefaultSerializer -from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended +from rq.suspension import is_suspended +from rq.suspension import resume as connection_resume +from rq.suspension import suspend as connection_suspend +from rq.utils import get_call_string, import_attribute from rq.worker import Worker from rq.worker_pool import WorkerPool from rq.worker_registration import clean_worker_registry -from rq.utils import import_attribute, get_call_string @click.group() diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index bea2c37..e585ca7 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -1,14 +1,12 @@ -import sys import importlib -import time import os - -from functools import partial, update_wrapper -from enum import Enum - -from datetime import datetime, timezone, timedelta -from json import loads, JSONDecodeError +import sys +import time from ast import literal_eval +from datetime import datetime, timedelta, timezone +from enum import Enum +from functools import partial, update_wrapper +from json import JSONDecodeError, loads from shutil import get_terminal_size import click @@ -20,8 +18,8 @@ from rq.defaults import ( DEFAULT_DEATH_PENALTY_CLASS, DEFAULT_JOB_CLASS, DEFAULT_QUEUE_CLASS, - DEFAULT_WORKER_CLASS, DEFAULT_SERIALIZER_CLASS, + DEFAULT_WORKER_CLASS, ) from rq.logutils import setup_loghandlers from rq.utils import import_attribute, parse_timeout diff --git a/rq/command.py b/rq/command.py index 4566ec0..0488d68 100644 --- a/rq/command.py +++ b/rq/command.py @@ -1,17 +1,16 @@ import json import os import signal - -from typing import TYPE_CHECKING, Dict, Any +from typing import TYPE_CHECKING, Any, Dict if TYPE_CHECKING: from redis import Redis + from .worker import Worker from rq.exceptions import InvalidJobOperation from rq.job import Job - PUBSUB_CHANNEL_TEMPLATE = 'rq:pubsub:%s' diff --git a/rq/connections.py b/rq/connections.py index 02b50e3..5d10ea4 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -2,7 +2,8 @@ import warnings from contextlib import contextmanager from typing import Optional, Tuple, Type -from redis import Connection as RedisConnection, Redis +from redis import Connection as RedisConnection +from redis import Redis from .local import LocalStack diff --git a/rq/contrib/legacy.py b/rq/contrib/legacy.py index 33ecf18..be44b65 100644 --- a/rq/contrib/legacy.py +++ b/rq/contrib/legacy.py @@ -1,7 +1,6 @@ import logging -from rq import get_current_connection -from rq import Worker +from rq import Worker, get_current_connection logger = logging.getLogger(__name__) diff --git a/rq/decorators.py b/rq/decorators.py index 2bf46e8..a24101e 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -1,8 +1,9 @@ from functools import wraps -from typing import TYPE_CHECKING, Callable, Dict, Optional, List, Any, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union if TYPE_CHECKING: from redis import Redis + from .job import Retry from .defaults import DEFAULT_RESULT_TTL diff --git a/rq/defaults.py b/rq/defaults.py index 3744c12..0cea711 100644 --- a/rq/defaults.py +++ b/rq/defaults.py @@ -99,4 +99,4 @@ Defaults to the `UnixSignalDeathPenalty` class within the `rq.timeouts` module UNSERIALIZABLE_RETURN_VALUE_PAYLOAD = 'Unserializable return value' """ The value that we store in the job's _result property or in the Result's return_value in case the return value of the actual job is not serializable -"""
\ No newline at end of file +""" @@ -1,25 +1,26 @@ +import asyncio import inspect import json import logging import warnings import zlib -import asyncio - from datetime import datetime, timedelta, timezone from enum import Enum -from redis import WatchError -from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, Type +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple, Type, Union from uuid import uuid4 +from redis import WatchError + from .defaults import CALLBACK_TIMEOUT, UNSERIALIZABLE_RETURN_VALUE_PAYLOAD -from .timeouts import JobTimeoutException, BaseDeathPenalty +from .timeouts import BaseDeathPenalty, JobTimeoutException if TYPE_CHECKING: - from .results import Result - from .queue import Queue from redis import Redis from redis.client import Pipeline + from .queue import Queue + from .results import Result + from .connections import resolve_connection from .exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError from .local import LocalStack @@ -167,8 +168,8 @@ class Job: func (FunctionReference): The function/method/callable for the Job. This can be a reference to a concrete callable or a string representing the path of function/method to be imported. Effectively this is the only required attribute when creating a new Job. - args (Union[List[Any], Optional[Tuple]], optional): A Tuple / List of positional arguments to pass the callable. - Defaults to None, meaning no args being passed. + args (Union[List[Any], Optional[Tuple]], optional): A Tuple / List of positional arguments to pass the + callable. Defaults to None, meaning no args being passed. kwargs (Optional[Dict], optional): A Dictionary of keyword arguments to pass the callable. Defaults to None, meaning no kwargs being passed. connection (Optional[Redis], optional): The Redis connection to use. Defaults to None. @@ -179,13 +180,16 @@ class Job: status (JobStatus, optional): The Job Status. Defaults to None. description (Optional[str], optional): The Job Description. Defaults to None. depends_on (Union['Dependency', List[Union['Dependency', 'Job']]], optional): What the jobs depends on. - This accepts a variaty of different arguments including a `Dependency`, a list of `Dependency` or a `Job` - list of `Job`. Defaults to None. - timeout (Optional[int], optional): The amount of time in seconds that should be a hardlimit for a job execution. Defaults to None. + This accepts a variaty of different arguments including a `Dependency`, a list of `Dependency` or a + `Job` list of `Job`. Defaults to None. + timeout (Optional[int], optional): The amount of time in seconds that should be a hardlimit for a job + execution. Defaults to None. id (Optional[str], optional): An Optional ID (str) for the Job. Defaults to None. origin (Optional[str], optional): The queue of origin. Defaults to None. - meta (Optional[Dict[str, Any]], optional): Custom metadata about the job, takes a dictioanry. Defaults to None. - failure_ttl (Optional[int], optional): THe time to live in seconds for failed-jobs information. Defaults to None. + meta (Optional[Dict[str, Any]], optional): Custom metadata about the job, takes a dictioanry. + Defaults to None. + failure_ttl (Optional[int], optional): THe time to live in seconds for failed-jobs information. + Defaults to None. serializer (Optional[str], optional): The serializer class path to use. Should be a string with the import path for the serializer to use. eg. `mymodule.myfile.MySerializer` Defaults to None. on_success (Optional[Callable[..., Any]], optional): A callback function, should be a callable to run @@ -1081,8 +1085,8 @@ class Job: """ if self.is_canceled: raise InvalidJobOperation("Cannot cancel already canceled job: {}".format(self.get_id())) - from .registry import CanceledJobRegistry from .queue import Queue + from .registry import CanceledJobRegistry pipe = pipeline or self.connection.pipeline() diff --git a/rq/local.py b/rq/local.py index e6b070b..2fe22c9 100644 --- a/rq/local.py +++ b/rq/local.py @@ -1,4 +1,4 @@ -# flake8: noqa +# ruff: noqa: E731 """ werkzeug.local ~~~~~~~~~~~~~~ @@ -13,14 +13,14 @@ # current thread ident. try: from greenlet import getcurrent as get_ident -except ImportError: # noqa +except ImportError: try: - from threading import get_ident # noqa - except ImportError: # noqa + from threading import get_ident + except ImportError: try: - from _thread import get_ident # noqa - except ImportError: # noqa - from dummy_thread import get_ident # noqa + from _thread import get_ident + except ImportError: + from dummy_thread import get_ident def release_local(local): @@ -120,7 +120,7 @@ class LocalStack: def _get__ident_func__(self): return self._local.__ident_func__ - def _set__ident_func__(self, value): # noqa + def _set__ident_func__(self, value): object.__setattr__(self._local, '__ident_func__', value) __ident_func__ = property(_get__ident_func__, _set__ident_func__) @@ -348,7 +348,6 @@ class LocalProxy: __invert__ = lambda x: ~(x._get_current_object()) __complex__ = lambda x: complex(x._get_current_object()) __int__ = lambda x: int(x._get_current_object()) - __long__ = lambda x: long(x._get_current_object()) __float__ = lambda x: float(x._get_current_object()) __oct__ = lambda x: oct(x._get_current_object()) __hex__ = lambda x: hex(x._get_current_object()) diff --git a/rq/logutils.py b/rq/logutils.py index b36ece8..9a1c6c5 100644 --- a/rq/logutils.py +++ b/rq/logutils.py @@ -2,7 +2,7 @@ import logging import sys from typing import Union -from rq.defaults import DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT +from rq.defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT class _Colorizer: @@ -24,12 +24,12 @@ class _Colorizer: light_colors = ["darkgray", "red", "green", "yellow", "blue", "fuchsia", "turquoise", "white"] x = 30 - for d, l in zip(dark_colors, light_colors): - self.codes[d] = esc + "%im" % x - self.codes[l] = esc + "%i;01m" % x + for dark, light in zip(dark_colors, light_colors): + self.codes[dark] = esc + "%im" % x + self.codes[light] = esc + "%i;01m" % x x += 1 - del d, l, x + del dark, light, x self.codes["darkteal"] = self.codes["turquoise"] self.codes["darkyellow"] = self.codes["brown"] @@ -117,7 +117,8 @@ def setup_loghandlers( level (Union[int, str, None], optional): The log level. Access an integer level (10-50) or a string level ("info", "debug" etc). Defaults to None. date_format (str, optional): The date format to use. Defaults to DEFAULT_LOGGING_DATE_FORMAT ('%H:%M:%S'). - log_format (str, optional): The log format to use. Defaults to DEFAULT_LOGGING_FORMAT ('%(asctime)s %(message)s'). + log_format (str, optional): The log format to use. + Defaults to DEFAULT_LOGGING_FORMAT ('%(asctime)s %(message)s'). name (str, optional): The looger name. Defaults to 'rq.worker'. """ logger = logging.getLogger(name) diff --git a/rq/queue.py b/rq/queue.py index f7a0c87..2d74ddf 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -4,9 +4,9 @@ import traceback import uuid import warnings from collections import namedtuple -from datetime import datetime, timezone, timedelta +from datetime import datetime, timedelta, timezone from functools import total_ordering -from typing import TYPE_CHECKING, Dict, List, Any, Callable, Optional, Tuple, Type, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Type, Union from redis import WatchError @@ -15,18 +15,17 @@ from .timeouts import BaseDeathPenalty, UnixSignalDeathPenalty if TYPE_CHECKING: from redis import Redis from redis.client import Pipeline + from .job import Retry -from .utils import as_text from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL from .exceptions import DequeueTimeout, NoSuchJobError from .job import Job, JobStatus from .logutils import blue, green -from .types import FunctionReferenceType, JobDependencyType from .serializers import resolve_serializer -from .utils import backend_class, get_version, import_attribute, parse_timeout, utcnow, compact - +from .types import FunctionReferenceType, JobDependencyType +from .utils import as_text, backend_class, compact, get_version, import_attribute, parse_timeout, utcnow logger = logging.getLogger("rq.queue") @@ -158,9 +157,11 @@ class Queue: connection (Optional[Redis], optional): Redis connection. Defaults to None. is_async (bool, optional): Whether jobs should run "async" (using the worker). If `is_async` is false, jobs will run on the same process from where it was called. Defaults to True. - job_class (Union[str, 'Job', optional): Job class or a string referencing the Job class path. Defaults to None. + job_class (Union[str, 'Job', optional): Job class or a string referencing the Job class path. + Defaults to None. serializer (Any, optional): Serializer. Defaults to None. - death_penalty_class (Type[BaseDeathPenalty, optional): Job class or a string referencing the Job class path. Defaults to UnixSignalDeathPenalty. + death_penalty_class (Type[BaseDeathPenalty, optional): Job class or a string referencing the Job class path. + Defaults to UnixSignalDeathPenalty. """ self.connection = connection or resolve_connection() prefix = self.redis_queue_namespace_prefix diff --git a/rq/registry.py b/rq/registry.py index acd6bd7..b955d6b 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -1,26 +1,24 @@ import calendar import logging -import traceback - -from rq.serializers import resolve_serializer import time +import traceback from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING, Any, List, Optional, Type, Union -from .timeouts import UnixSignalDeathPenalty, BaseDeathPenalty +from rq.serializers import resolve_serializer + +from .timeouts import BaseDeathPenalty, UnixSignalDeathPenalty if TYPE_CHECKING: from redis import Redis from redis.client import Pipeline -from .utils import as_text from .connections import resolve_connection from .defaults import DEFAULT_FAILURE_TTL -from .exceptions import InvalidJobOperation, NoSuchJobError, AbandonedJobError +from .exceptions import AbandonedJobError, InvalidJobOperation, NoSuchJobError from .job import Job, JobStatus from .queue import Queue -from .utils import backend_class, current_timestamp - +from .utils import as_text, backend_class, current_timestamp logger = logging.getLogger("rq.registry") @@ -237,8 +235,9 @@ class StartedJobRegistry(BaseRegistry): except NoSuchJobError: continue - job.execute_failure_callback(self.death_penalty_class, AbandonedJobError, AbandonedJobError(), - traceback.extract_stack()) + job.execute_failure_callback( + self.death_penalty_class, AbandonedJobError, AbandonedJobError(), traceback.extract_stack() + ) retry = job.retries_left and job.retries_left > 0 @@ -248,8 +247,10 @@ class StartedJobRegistry(BaseRegistry): else: exc_string = f"due to {AbandonedJobError.__name__}" - logger.warning(f'{self.__class__.__name__} cleanup: Moving job to {FailedJobRegistry.__name__} ' - f'({exc_string})') + logger.warning( + f'{self.__class__.__name__} cleanup: Moving job to {FailedJobRegistry.__name__} ' + f'({exc_string})' + ) job.set_status(JobStatus.FAILED) job._exc_info = f"Moved to {FailedJobRegistry.__name__}, {exc_string}, at {datetime.now()}" job.save(pipeline=pipeline, include_meta=False) diff --git a/rq/results.py b/rq/results.py index fdbb763..27bab15 100644 --- a/rq/results.py +++ b/rq/results.py @@ -1,16 +1,15 @@ -from typing import Any, Optional import zlib - from base64 import b64decode, b64encode from datetime import datetime, timezone from enum import Enum +from typing import Any, Optional + from redis import Redis from .defaults import UNSERIALIZABLE_RETURN_VALUE_PAYLOAD -from .utils import decode_redis_hash from .job import Job from .serializers import resolve_serializer -from .utils import now +from .utils import decode_redis_hash, now def get_key(job_id): diff --git a/rq/scheduler.py b/rq/scheduler.py index a64b400..97d627c 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -8,7 +8,7 @@ from enum import Enum from multiprocessing import Process from typing import List, Set -from redis import ConnectionPool, Redis, SSLConnection, UnixDomainSocketConnection +from redis import ConnectionPool, Redis from .connections import parse_connection from .defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, DEFAULT_SCHEDULER_FALLBACK_PERIOD diff --git a/rq/serializers.py b/rq/serializers.py index 96de3f5..94eddbf 100644 --- a/rq/serializers.py +++ b/rq/serializers.py @@ -1,6 +1,6 @@ -from functools import partial -import pickle import json +import pickle +from functools import partial from typing import Optional, Type, Union from .utils import import_attribute diff --git a/rq/suspension.py b/rq/suspension.py index 77df9b8..10af5ba 100644 --- a/rq/suspension.py +++ b/rq/suspension.py @@ -2,6 +2,7 @@ from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from redis import Redis + from rq.worker import Worker diff --git a/rq/utils.py b/rq/utils.py index db483ab..5e61983 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -7,16 +7,16 @@ terminal colorizing code, originally by Georg Brandl. import calendar import datetime +import datetime as dt import importlib import logging import numbers -import sys -import datetime as dt from collections.abc import Iterable -from typing import TYPE_CHECKING, Dict, List, Optional, Any, Callable, Tuple, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union if TYPE_CHECKING: from redis import Redis + from .queue import Queue from redis.exceptions import ResponseError diff --git a/rq/worker.py b/rq/worker.py index ade789b..062b1b4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -13,8 +13,8 @@ import warnings from datetime import datetime, timedelta from enum import Enum from random import shuffle -from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type, Union from types import FrameType +from typing import TYPE_CHECKING, Callable, List, Optional, Tuple, Type, Union from uuid import uuid4 if TYPE_CHECKING: @@ -35,19 +35,17 @@ from contextlib import suppress import redis.exceptions from . import worker_registration -from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command -from .connections import get_current_connection, push_connection, pop_connection - +from .command import PUBSUB_CHANNEL_TEMPLATE, handle_command, parse_payload +from .connections import get_current_connection, pop_connection, push_connection from .defaults import ( + DEFAULT_JOB_MONITORING_INTERVAL, + DEFAULT_LOGGING_DATE_FORMAT, + DEFAULT_LOGGING_FORMAT, DEFAULT_MAINTENANCE_TASK_INTERVAL, DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL, - DEFAULT_JOB_MONITORING_INTERVAL, - DEFAULT_LOGGING_FORMAT, - DEFAULT_LOGGING_DATE_FORMAT, ) -from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException - +from .exceptions import DequeueTimeout, DeserializationError, ShutDownImminentException from .job import Job, JobStatus from .logutils import blue, green, setup_loghandlers, yellow from .queue import Queue @@ -55,20 +53,19 @@ from .registry import StartedJobRegistry, clean_registries from .scheduler import RQScheduler from .serializers import resolve_serializer from .suspension import is_suspended -from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty +from .timeouts import HorseMonitorTimeoutException, JobTimeoutException, UnixSignalDeathPenalty from .utils import ( + as_text, backend_class, + compact, ensure_list, get_version, utcformat, utcnow, utcparse, - compact, - as_text, ) from .version import VERSION - try: from setproctitle import setproctitle as setprocname except ImportError: @@ -373,7 +370,8 @@ class BaseWorker: max_jobs (Optional[int], optional): Max number of jobs. Defaults to None. max_idle_time (Optional[int], optional): Max seconds for worker to be idle. Defaults to None. with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False. - dequeue_strategy (DequeueStrategy, optional): Which strategy to use to dequeue jobs. Defaults to DequeueStrategy.DEFAULT + dequeue_strategy (DequeueStrategy, optional): Which strategy to use to dequeue jobs. + Defaults to DequeueStrategy.DEFAULT Returns: worked (bool): Will return True if any job was processed, False otherwise. diff --git a/rq/worker_pool.py b/rq/worker_pool.py index 005c3b9..b161cc8 100644 --- a/rq/worker_pool.py +++ b/rq/worker_pool.py @@ -4,17 +4,14 @@ import logging import os import signal import time - from enum import Enum from multiprocessing import Process -from typing import Dict, List, NamedTuple, Optional, Set, Type, Union +from typing import Dict, List, NamedTuple, Optional, Type, Union from uuid import uuid4 -from redis import Redis -from redis import ConnectionPool -from rq.serializers import DefaultSerializer +from redis import ConnectionPool, Redis -from rq.timeouts import HorseMonitorTimeoutException, UnixSignalDeathPenalty +from rq.serializers import DefaultSerializer from .connections import parse_connection from .defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT diff --git a/rq/worker_registration.py b/rq/worker_registration.py index fe4dc04..838c63f 100644 --- a/rq/worker_registration.py +++ b/rq/worker_registration.py @@ -1,15 +1,16 @@ -from typing import Optional, TYPE_CHECKING, Any, Set +from typing import TYPE_CHECKING, Optional, Set if TYPE_CHECKING: from redis import Redis from redis.client import Pipeline - from .worker import Worker - from .queue import Queue -from .utils import as_text + from .queue import Queue + from .worker import Worker from rq.utils import split_list +from .utils import as_text + WORKERS_BY_QUEUE_KEY = 'rq:workers:%s' REDIS_WORKER_KEYS = 'rq:workers' MAX_KEYS = 1000 @@ -4,9 +4,3 @@ requires = redis >= 3.0.0 [wheel] universal = 1 - -[flake8] -max-line-length=120 -ignore=E731 -count=True -statistics=True @@ -3,7 +3,8 @@ rq is a simple, lightweight, library for creating background jobs, and processing them. """ import os -from setuptools import setup, find_packages + +from setuptools import find_packages, setup def get_version(): @@ -33,11 +34,10 @@ setup( license='BSD', author='Vincent Driessen', author_email='vincent@3rdcloud.com', - description='RQ is a simple, lightweight, library for creating background ' - 'jobs, and processing them.', + description='RQ is a simple, lightweight, library for creating background jobs, and processing them.', long_description=__doc__, packages=find_packages(exclude=['tests', 'tests.*']), - package_data = {"rq": ["py.typed"]}, + package_data={"rq": ["py.typed"]}, include_package_data=True, zip_safe=False, platforms='any', @@ -46,7 +46,6 @@ setup( entry_points={ 'console_scripts': [ 'rq = rq.cli:main', - # NOTE: rqworker/rqinfo are kept for backward-compatibility, # remove eventually (TODO) 'rqinfo = rq.cli:info', @@ -85,6 +84,5 @@ setup( 'Topic :: System :: Distributed Computing', 'Topic :: System :: Systems Administration', 'Topic :: System :: Monitoring', - - ] + ], ) diff --git a/tests/__init__.py b/tests/__init__.py index 36b2bc6..9da4687 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,10 +1,10 @@ import logging import os +import unittest from redis import Redis -from rq import pop_connection, push_connection -import unittest +from rq import pop_connection, push_connection def find_empty_redis_database(ssl=False): diff --git a/tests/fixtures.py b/tests/fixtures.py index 4536c3c..62ea8e1 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -3,16 +3,17 @@ This file contains all jobs that are used in tests. Each of these test fixtures has a slightly different characteristics. """ +import contextlib import os -import time import signal -import sys import subprocess -import contextlib +import sys +import time from multiprocessing import Process from redis import Redis -from rq import Connection, get_current_job, get_current_connection, Queue + +from rq import Connection, Queue, get_current_connection, get_current_job from rq.command import send_kill_horse_command, send_shutdown_command from rq.decorators import job from rq.job import Job diff --git a/tests/test_callbacks.py b/tests/test_callbacks.py index 680ee38..c47ad84 100644 --- a/tests/test_callbacks.py +++ b/tests/test_callbacks.py @@ -1,15 +1,13 @@ from datetime import timedelta -from tests import RQTestCase -from tests.fixtures import div_by_zero, erroneous_callback, save_exception, save_result, say_hello - from rq import Queue, Worker -from rq.job import Job, JobStatus, UNEVALUATED +from rq.job import UNEVALUATED, Job, JobStatus from rq.worker import SimpleWorker +from tests import RQTestCase +from tests.fixtures import div_by_zero, erroneous_callback, save_exception, save_result, say_hello class QueueCallbackTestCase(RQTestCase): - def test_enqueue_with_success_callback(self): """Test enqueue* methods with on_success""" queue = Queue(connection=self.testconn) @@ -54,10 +52,7 @@ class SyncJobCallback(RQTestCase): job = queue.enqueue(say_hello, on_success=save_result) self.assertEqual(job.get_status(), JobStatus.FINISHED) - self.assertEqual( - self.testconn.get('success_callback:%s' % job.id).decode(), - job.result - ) + self.assertEqual(self.testconn.get('success_callback:%s' % job.id).decode(), job.result) job = queue.enqueue(div_by_zero, on_success=save_result) self.assertEqual(job.get_status(), JobStatus.FAILED) @@ -69,8 +64,7 @@ class SyncJobCallback(RQTestCase): job = queue.enqueue(div_by_zero, on_failure=save_exception) self.assertEqual(job.get_status(), JobStatus.FAILED) - self.assertIn('div_by_zero', - self.testconn.get('failure_callback:%s' % job.id).decode()) + self.assertIn('div_by_zero', self.testconn.get('failure_callback:%s' % job.id).decode()) job = queue.enqueue(div_by_zero, on_success=save_result) self.assertEqual(job.get_status(), JobStatus.FAILED) @@ -88,10 +82,7 @@ class WorkerCallbackTestCase(RQTestCase): # Callback is executed when job is successfully executed worker.work(burst=True) self.assertEqual(job.get_status(), JobStatus.FINISHED) - self.assertEqual( - self.testconn.get('success_callback:%s' % job.id).decode(), - job.return_value() - ) + self.assertEqual(self.testconn.get('success_callback:%s' % job.id).decode(), job.return_value()) job = queue.enqueue(div_by_zero, on_success=save_result) worker.work(burst=True) @@ -120,8 +111,7 @@ class WorkerCallbackTestCase(RQTestCase): self.assertEqual(job.get_status(), JobStatus.FAILED) job.refresh() print(job.exc_info) - self.assertIn('div_by_zero', - self.testconn.get('failure_callback:%s' % job.id).decode()) + self.assertIn('div_by_zero', self.testconn.get('failure_callback:%s' % job.id).decode()) job = queue.enqueue(div_by_zero, on_success=save_result) worker.work(burst=True) @@ -132,7 +122,6 @@ class WorkerCallbackTestCase(RQTestCase): class JobCallbackTestCase(RQTestCase): - def test_job_creation_with_success_callback(self): """Ensure callbacks are created and persisted properly""" job = Job.create(say_hello) diff --git a/tests/test_cli.py b/tests/test_cli.py index 79ac12d..1767a3e 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,26 +1,22 @@ -from datetime import datetime, timezone, timedelta +import json +import os +from datetime import datetime, timedelta, timezone from time import sleep from uuid import uuid4 -import os -import json - -from click import BadParameter +import pytest from click.testing import CliRunner from redis import Redis from rq import Queue from rq.cli import main -from rq.cli.helpers import read_config_file, CliConfig, parse_function_arg, parse_schedule +from rq.cli.helpers import CliConfig, parse_function_arg, parse_schedule, read_config_file from rq.job import Job, JobStatus from rq.registry import FailedJobRegistry, ScheduledJobRegistry +from rq.scheduler import RQScheduler from rq.serializers import JSONSerializer from rq.timeouts import UnixSignalDeathPenalty from rq.worker import Worker, WorkerStatus -from rq.scheduler import RQScheduler - -import pytest - from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello @@ -809,7 +805,7 @@ class WorkerPoolCLITestCase(CLITestCase): queue = Queue('bar', connection=self.connection, serializer=JSONSerializer) job_2 = queue.enqueue(say_hello, 'Hello') runner = CliRunner() - result = runner.invoke( + runner.invoke( main, ['worker-pool', 'foo', 'bar', '-u', self.redis_url, '-b', '--serializer', 'rq.serializers.JSONSerializer'], ) diff --git a/tests/test_commands.py b/tests/test_commands.py index f98a0ec..355b72a 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1,17 +1,15 @@ import time - from multiprocessing import Process from redis import Redis -from tests import RQTestCase -from tests.fixtures import long_running_job, _send_kill_horse_command, _send_shutdown_command - from rq import Queue, Worker from rq.command import send_command, send_kill_horse_command, send_shutdown_command, send_stop_job_command from rq.exceptions import InvalidJobOperation, NoSuchJobError from rq.serializers import JSONSerializer from rq.worker import WorkerStatus +from tests import RQTestCase +from tests.fixtures import _send_kill_horse_command, _send_shutdown_command, long_running_job def start_work(queue_name, worker_name, connection_kwargs): diff --git a/tests/test_decorator.py b/tests/test_decorator.py index fb945e5..69ddde1 100644 --- a/tests/test_decorator.py +++ b/tests/test_decorator.py @@ -11,13 +11,11 @@ from tests.fixtures import decorated_job class TestDecorator(RQTestCase): - def setUp(self): super().setUp() def test_decorator_preserves_functionality(self): - """Ensure that a decorated function's functionality is still preserved. - """ + """Ensure that a decorated function's functionality is still preserved.""" self.assertEqual(decorated_job(1, 2), 3) def test_decorator_adds_delay_attr(self): @@ -34,9 +32,11 @@ class TestDecorator(RQTestCase): """Ensure that passing in queue name to the decorator puts the job in the right queue. """ + @job(queue='queue_name') def hello(): return 'Hi' + result = hello.delay() self.assertEqual(result.origin, 'queue_name') @@ -51,12 +51,12 @@ class TestDecorator(RQTestCase): @job('default', result_ttl=10) def hello(): return 'Why hello' + result = hello.delay() self.assertEqual(result.result_ttl, 10) def test_decorator_accepts_ttl_as_argument(self): - """Ensure that passing in ttl to the decorator sets the ttl on the job - """ + """Ensure that passing in ttl to the decorator sets the ttl on the job""" # Ensure default result = decorated_job.delay(1, 2) self.assertEqual(result.ttl, None) @@ -64,12 +64,12 @@ class TestDecorator(RQTestCase): @job('default', ttl=30) def hello(): return 'Hello' + result = hello.delay() self.assertEqual(result.ttl, 30) def test_decorator_accepts_meta_as_argument(self): - """Ensure that passing in meta to the decorator sets the meta on the job - """ + """Ensure that passing in meta to the decorator sets the meta on the job""" # Ensure default result = decorated_job.delay(1, 2) self.assertEqual(result.meta, {}) @@ -82,6 +82,7 @@ class TestDecorator(RQTestCase): @job('default', meta=test_meta) def hello(): return 'Hello' + result = hello.delay() self.assertEqual(result.meta, test_meta) @@ -153,16 +154,19 @@ class TestDecorator(RQTestCase): """Ensure that passing in on_failure function to the decorator sets the correct on_failure function on the job. """ + # Only functions and builtins are supported as callback @job('default', on_failure=Job.fetch) def foo(): return 'Foo' + with self.assertRaises(ValueError): result = foo.delay() @job('default', on_failure=print) def hello(): return 'Hello' + result = hello.delay() result_job = Job.fetch(id=result.id, connection=self.testconn) self.assertEqual(result_job.failure_callback, print) @@ -171,23 +175,26 @@ class TestDecorator(RQTestCase): """Ensure that passing in on_failure function to the decorator sets the correct on_success function on the job. """ + # Only functions and builtins are supported as callback @job('default', on_failure=Job.fetch) def foo(): return 'Foo' + with self.assertRaises(ValueError): result = foo.delay() @job('default', on_success=print) def hello(): return 'Hello' + result = hello.delay() result_job = Job.fetch(id=result.id, connection=self.testconn) self.assertEqual(result_job.success_callback, print) @mock.patch('rq.queue.resolve_connection') def test_decorator_connection_laziness(self, resolve_connection): - """Ensure that job decorator resolve connection in `lazy` way """ + """Ensure that job decorator resolve connection in `lazy` way""" resolve_connection.return_value = Redis() @@ -207,12 +214,11 @@ class TestDecorator(RQTestCase): def test_decorator_custom_queue_class(self): """Ensure that a custom queue class can be passed to the job decorator""" + class CustomQueue(Queue): pass - CustomQueue.enqueue_call = mock.MagicMock( - spec=lambda *args, **kwargs: None, - name='enqueue_call' - ) + + CustomQueue.enqueue_call = mock.MagicMock(spec=lambda *args, **kwargs: None, name='enqueue_call') custom_decorator = job(queue='default', queue_class=CustomQueue) self.assertIs(custom_decorator.queue_class, CustomQueue) @@ -226,12 +232,11 @@ class TestDecorator(RQTestCase): def test_decorate_custom_queue(self): """Ensure that a custom queue instance can be passed to the job decorator""" + class CustomQueue(Queue): pass - CustomQueue.enqueue_call = mock.MagicMock( - spec=lambda *args, **kwargs: None, - name='enqueue_call' - ) + + CustomQueue.enqueue_call = mock.MagicMock(spec=lambda *args, **kwargs: None, name='enqueue_call') queue = CustomQueue() @job(queue=queue) @@ -252,11 +257,12 @@ class TestDecorator(RQTestCase): @job('default', failure_ttl=10) def hello(): return 'Why hello' + result = hello.delay() self.assertEqual(result.failure_ttl, 10) def test_decorator_custom_retry(self): - """ Ensure that passing in retry to the decorator sets the + """Ensure that passing in retry to the decorator sets the retry on the job """ # Ensure default @@ -267,6 +273,7 @@ class TestDecorator(RQTestCase): @job('default', retry=Retry(3, [2])) def hello(): return 'Why hello' + result = hello.delay() self.assertEqual(result.retries_left, 3) self.assertEqual(result.retry_intervals, [2]) diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index a290a87..13400f7 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -1,12 +1,10 @@ +from rq import Queue, SimpleWorker, Worker +from rq.job import Dependency, Job, JobStatus from tests import RQTestCase from tests.fixtures import check_dependencies_are_met, div_by_zero, say_hello -from rq import Queue, SimpleWorker, Worker -from rq.job import Job, JobStatus, Dependency - class TestDependencies(RQTestCase): - def test_allow_failure_is_persisted(self): """Ensure that job.allow_dependency_failures is properly set when providing Dependency object to depends_on.""" @@ -70,10 +68,8 @@ class TestDependencies(RQTestCase): # When a failing job has multiple dependents, only enqueue those # with allow_failure=True parent_job = q.enqueue(div_by_zero) - job_allow_failure = q.enqueue(say_hello, - depends_on=Dependency(jobs=parent_job, allow_failure=True)) - job = q.enqueue(say_hello, - depends_on=Dependency(jobs=parent_job, allow_failure=False)) + job_allow_failure = q.enqueue(say_hello, depends_on=Dependency(jobs=parent_job, allow_failure=True)) + job = q.enqueue(say_hello, depends_on=Dependency(jobs=parent_job, allow_failure=False)) w.work(burst=True, max_jobs=1) self.assertEqual(parent_job.get_status(), JobStatus.FAILED) self.assertEqual(job_allow_failure.get_status(), JobStatus.QUEUED) @@ -101,22 +97,12 @@ class TestDependencies(RQTestCase): # Test dependant is enqueued at front q.empty() parent_job = q.enqueue(say_hello) - q.enqueue( - say_hello, - job_id='fake_job_id_1', - depends_on=Dependency(jobs=[parent_job]) - ) - q.enqueue( - say_hello, - job_id='fake_job_id_2', - depends_on=Dependency(jobs=[parent_job],enqueue_at_front=True) - ) - #q.enqueue(say_hello) # This is a filler job that will act as a separator for jobs, one will be enqueued at front while the other one at the end of the queue + q.enqueue(say_hello, job_id='fake_job_id_1', depends_on=Dependency(jobs=[parent_job])) + q.enqueue(say_hello, job_id='fake_job_id_2', depends_on=Dependency(jobs=[parent_job], enqueue_at_front=True)) w.work(burst=True, max_jobs=1) self.assertEqual(q.job_ids, ["fake_job_id_2", "fake_job_id_1"]) - def test_dependency_list_in_depends_on(self): """Enqueue with Dependency list in depends_on""" q = Queue(connection=self.testconn) @@ -129,7 +115,6 @@ class TestDependencies(RQTestCase): w.work(burst=True) self.assertEqual(job.get_status(), JobStatus.FINISHED) - def test_enqueue_job_dependency(self): """Enqueue via Queue.enqueue_job() with depencency""" q = Queue(connection=self.testconn) @@ -147,7 +132,6 @@ class TestDependencies(RQTestCase): self.assertEqual(parent_job.get_status(), JobStatus.FINISHED) self.assertEqual(job.get_status(), JobStatus.FINISHED) - def test_dependencies_are_met_if_parent_is_canceled(self): """When parent job is canceled, it should be treated as failed""" queue = Queue(connection=self.testconn) diff --git a/tests/test_fixtures.py b/tests/test_fixtures.py index 383ba15..1517b80 100644 --- a/tests/test_fixtures.py +++ b/tests/test_fixtures.py @@ -1,5 +1,4 @@ from rq import Queue - from tests import RQTestCase, fixtures diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 5a84f71..c351b77 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -1,16 +1,14 @@ -from rq.cli.helpers import get_redis_from_config +from unittest import mock +from rq.cli.helpers import get_redis_from_config from tests import RQTestCase -from unittest import mock -class TestHelpers(RQTestCase): +class TestHelpers(RQTestCase): @mock.patch('rq.cli.helpers.Sentinel') def test_get_redis_from_config(self, sentinel_class_mock): """Ensure Redis connection params are properly parsed""" - settings = { - 'REDIS_URL': 'redis://localhost:1/1' - } + settings = {'REDIS_URL': 'redis://localhost:1/1'} # Ensure REDIS_URL is read redis = get_redis_from_config(settings) @@ -23,7 +21,7 @@ class TestHelpers(RQTestCase): 'REDIS_HOST': 'foo', 'REDIS_DB': 2, 'REDIS_PORT': 2, - 'REDIS_PASSWORD': 'bar' + 'REDIS_PASSWORD': 'bar', } # Ensure REDIS_URL is preferred @@ -42,23 +40,29 @@ class TestHelpers(RQTestCase): self.assertEqual(connection_kwargs['password'], 'bar') # Add Sentinel to the settings - settings.update({ - 'SENTINEL': { - 'INSTANCES':[('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)], - 'MASTER_NAME': 'master', - 'DB': 2, - 'USERNAME': 'redis-user', - 'PASSWORD': 'redis-secret', - 'SOCKET_TIMEOUT': None, - 'CONNECTION_KWARGS': { - 'ssl_ca_path': None, - }, - 'SENTINEL_KWARGS': { - 'username': 'sentinel-user', - 'password': 'sentinel-secret', + settings.update( + { + 'SENTINEL': { + 'INSTANCES': [ + ('remote.host1.org', 26379), + ('remote.host2.org', 26379), + ('remote.host3.org', 26379), + ], + 'MASTER_NAME': 'master', + 'DB': 2, + 'USERNAME': 'redis-user', + 'PASSWORD': 'redis-secret', + 'SOCKET_TIMEOUT': None, + 'CONNECTION_KWARGS': { + 'ssl_ca_path': None, + }, + 'SENTINEL_KWARGS': { + 'username': 'sentinel-user', + 'password': 'sentinel-secret', + }, }, - }, - }) + } + ) # Ensure SENTINEL is preferred against REDIS_* parameters redis = get_redis_from_config(settings) @@ -66,7 +70,7 @@ class TestHelpers(RQTestCase): sentinel_init_sentinel_kwargs = sentinel_class_mock.call_args[1] self.assertEqual( sentinel_init_sentinels_args, - ([('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)],) + ([('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)],), ) self.assertDictEqual( sentinel_init_sentinel_kwargs, @@ -80,6 +84,6 @@ class TestHelpers(RQTestCase): 'sentinel_kwargs': { 'username': 'sentinel-user', 'password': 'sentinel-secret', - } - } + }, + }, ) diff --git a/tests/test_job.py b/tests/test_job.py index 318c41b..444080f 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,32 +1,29 @@ import json - -from rq.defaults import CALLBACK_TIMEOUT -from rq.serializers import JSONSerializer -import time import queue +import time import zlib from datetime import datetime, timedelta +from pickle import dumps, loads from redis import WatchError -from rq.utils import as_text +from rq.defaults import CALLBACK_TIMEOUT from rq.exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError -from rq.job import Job, JobStatus, Dependency, cancel_job, get_current_job, Callback +from rq.job import Callback, Dependency, Job, JobStatus, cancel_job, get_current_job from rq.queue import Queue from rq.registry import ( CanceledJobRegistry, DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, - StartedJobRegistry, ScheduledJobRegistry, + StartedJobRegistry, ) -from rq.utils import utcformat, utcnow +from rq.serializers import JSONSerializer +from rq.utils import as_text, utcformat, utcnow from rq.worker import Worker from tests import RQTestCase, fixtures -from pickle import loads, dumps - class TestJob(RQTestCase): def test_unicode(self): diff --git a/tests/test_queue.py b/tests/test_queue.py index d352736..e91ae54 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,9 +1,8 @@ import json from datetime import datetime, timedelta, timezone -from rq.serializers import JSONSerializer from unittest.mock import patch -from rq import Retry, Queue +from rq import Queue, Retry from rq.job import Job, JobStatus from rq.registry import ( CanceledJobRegistry, @@ -13,10 +12,10 @@ from rq.registry import ( ScheduledJobRegistry, StartedJobRegistry, ) +from rq.serializers import JSONSerializer from rq.worker import Worker - from tests import RQTestCase -from tests.fixtures import CustomJob, echo, say_hello +from tests.fixtures import echo, say_hello class MultipleDependencyJob(Job): diff --git a/tests/test_registry.py b/tests/test_registry.py index 57584b5..5dd0be6 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -1,20 +1,22 @@ from datetime import datetime, timedelta from unittest import mock -from unittest.mock import PropertyMock, ANY +from unittest.mock import ANY -from rq.serializers import JSONSerializer - -from rq.utils import as_text from rq.defaults import DEFAULT_FAILURE_TTL -from rq.exceptions import InvalidJobOperation, AbandonedJobError +from rq.exceptions import AbandonedJobError, InvalidJobOperation from rq.job import Job, JobStatus, requeue_job from rq.queue import Queue -from rq.utils import current_timestamp +from rq.registry import ( + CanceledJobRegistry, + DeferredJobRegistry, + FailedJobRegistry, + FinishedJobRegistry, + StartedJobRegistry, + clean_registries, +) +from rq.serializers import JSONSerializer +from rq.utils import as_text, current_timestamp from rq.worker import Worker -from rq.registry import (CanceledJobRegistry, clean_registries, DeferredJobRegistry, - FailedJobRegistry, FinishedJobRegistry, - StartedJobRegistry) - from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello @@ -24,7 +26,6 @@ class CustomJob(Job): class TestRegistry(RQTestCase): - def setUp(self): super().setUp() self.registry = StartedJobRegistry(connection=self.testconn) @@ -83,8 +84,7 @@ class TestRegistry(RQTestCase): # Test that job is added with the right score self.registry.add(job, 1000) - self.assertLess(self.testconn.zscore(self.registry.key, job.id), - timestamp + 1002) + self.assertLess(self.testconn.zscore(self.registry.key, job.id), timestamp + 1002) # Ensure that a timeout of -1 results in a score of inf self.registry.add(job, -1) @@ -144,8 +144,7 @@ class TestRegistry(RQTestCase): self.testconn.zadd(self.registry.key, {'baz': timestamp + 30}) self.assertEqual(self.registry.get_expired_job_ids(), ['foo']) - self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20), - ['foo', 'bar']) + self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20), ['foo', 'bar']) # CanceledJobRegistry does not implement get_expired_job_ids() registry = CanceledJobRegistry(connection=self.testconn) @@ -268,12 +267,10 @@ class TestRegistry(RQTestCase): self.assertEqual(registry.get_queue(), Queue(connection=self.testconn)) registry = StartedJobRegistry('foo', connection=self.testconn, serializer=JSONSerializer) - self.assertEqual(registry.get_queue(), - Queue('foo', connection=self.testconn, serializer=JSONSerializer)) + self.assertEqual(registry.get_queue(), Queue('foo', connection=self.testconn, serializer=JSONSerializer)) class TestFinishedJobRegistry(RQTestCase): - def setUp(self): super().setUp() self.registry = FinishedJobRegistry(connection=self.testconn) @@ -321,7 +318,6 @@ class TestFinishedJobRegistry(RQTestCase): class TestDeferredRegistry(RQTestCase): - def setUp(self): super().setUp() self.registry = DeferredJobRegistry(connection=self.testconn) @@ -333,8 +329,7 @@ class TestDeferredRegistry(RQTestCase): """Adding a job to DeferredJobsRegistry.""" job = Job() self.registry.add(job) - job_ids = [as_text(job_id) for job_id in - self.testconn.zrange(self.registry.key, 0, -1)] + job_ids = [as_text(job_id) for job_id in self.testconn.zrange(self.registry.key, 0, -1)] self.assertEqual(job_ids, [job.id]) def test_register_dependency(self): @@ -352,7 +347,6 @@ class TestDeferredRegistry(RQTestCase): class TestFailedJobRegistry(RQTestCase): - def test_default_failure_ttl(self): """Job TTL defaults to DEFAULT_FAILURE_TTL""" queue = Queue(connection=self.testconn) @@ -511,11 +505,9 @@ class TestFailedJobRegistry(RQTestCase): w.handle_job_failure(job, q) # job is added to FailedJobRegistry with default failure ttl self.assertIn(job.id, registry.get_job_ids()) - self.assertLess(self.testconn.zscore(registry.key, job.id), - timestamp + DEFAULT_FAILURE_TTL + 5) + self.assertLess(self.testconn.zscore(registry.key, job.id), timestamp + DEFAULT_FAILURE_TTL + 5) # job is added to FailedJobRegistry with specified ttl job = q.enqueue(div_by_zero, failure_ttl=5) w.handle_job_failure(job, q) - self.assertLess(self.testconn.zscore(registry.key, job.id), - timestamp + 7) + self.assertLess(self.testconn.zscore(registry.key, job.id), timestamp + 7) diff --git a/tests/test_results.py b/tests/test_results.py index 4286cec..e27e872 100644 --- a/tests/test_results.py +++ b/tests/test_results.py @@ -1,13 +1,10 @@ -import unittest import tempfile - +import unittest from datetime import timedelta -from unittest.mock import patch, PropertyMock +from unittest.mock import PropertyMock, patch from redis import Redis -from tests import RQTestCase - from rq.defaults import UNSERIALIZABLE_RETURN_VALUE_PAYLOAD from rq.job import Job from rq.queue import Queue @@ -15,13 +12,13 @@ from rq.registry import StartedJobRegistry from rq.results import Result, get_key from rq.utils import get_version, utcnow from rq.worker import Worker +from tests import RQTestCase -from .fixtures import say_hello, div_by_zero +from .fixtures import div_by_zero, say_hello @unittest.skipIf(get_version(Redis()) < (5, 0, 0), 'Skip if Redis server < 5.0') class TestScheduledJobRegistry(RQTestCase): - def test_save_and_get_result(self): """Ensure data is saved properly""" queue = Queue(connection=self.connection) @@ -159,8 +156,7 @@ class TestScheduledJobRegistry(RQTestCase): registry = StartedJobRegistry(connection=self.connection) job.started_at = utcnow() job.ended_at = job.started_at + timedelta(seconds=0.75) - worker.handle_job_failure(job, exc_string='Error', queue=queue, - started_job_registry=registry) + worker.handle_job_failure(job, exc_string='Error', queue=queue, started_job_registry=registry) job = Job.fetch(job.id, connection=self.connection) payload = self.connection.hgetall(job.key) @@ -181,8 +177,7 @@ class TestScheduledJobRegistry(RQTestCase): # If `save_result_to_job` = True, result will be saved to job # hash, simulating older versions of RQ - worker.handle_job_failure(job, exc_string='Error', queue=queue, - started_job_registry=registry) + worker.handle_job_failure(job, exc_string='Error', queue=queue, started_job_registry=registry) payload = self.connection.hgetall(job.key) self.assertTrue(b'exc_info' in payload.keys()) # Delete all new result objects so we only have result stored in job hash, diff --git a/tests/test_retry.py b/tests/test_retry.py index e8fddeb..ed2d477 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -9,7 +9,6 @@ from tests.fixtures import div_by_zero, say_hello class TestRetry(RQTestCase): - def test_persistence_of_retry_data(self): """Retry related data is stored and restored properly""" job = Job.create(func=fixtures.some_calculation) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 96cde1c..8aa722a 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -1,10 +1,10 @@ import os -import redis - from datetime import datetime, timedelta, timezone from multiprocessing import Process from unittest import mock +import redis + from rq import Queue from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL from rq.exceptions import NoSuchJobError @@ -15,6 +15,7 @@ from rq.serializers import JSONSerializer from rq.utils import current_timestamp from rq.worker import Worker from tests import RQTestCase, find_empty_redis_database, ssl_test + from .fixtures import kill_worker, say_hello diff --git a/tests/test_sentry.py b/tests/test_sentry.py index f52f7db..4ae9722 100644 --- a/tests/test_sentry.py +++ b/tests/test_sentry.py @@ -1,15 +1,15 @@ +from unittest import mock + +from click.testing import CliRunner + from rq import Queue from rq.cli import main from rq.cli.helpers import read_config_file from rq.contrib.sentry import register_sentry from rq.worker import SimpleWorker - from tests import RQTestCase from tests.fixtures import div_by_zero -from unittest import mock -from click.testing import CliRunner - class FakeSentry: servers = [] diff --git a/tests/test_serializers.py b/tests/test_serializers.py index 0c50fa7..6ef7ed8 100644 --- a/tests/test_serializers.py +++ b/tests/test_serializers.py @@ -18,10 +18,7 @@ class TestSerializers(unittest.TestCase): test_data = {'test': 'data'} serialized_data = serializer.dumps(test_data) self.assertEqual(serializer.loads(serialized_data), test_data) - self.assertEqual( - next(pickletools.genops(serialized_data))[1], - pickle.HIGHEST_PROTOCOL - ) + self.assertEqual(next(pickletools.genops(serialized_data))[1], pickle.HIGHEST_PROTOCOL) # Test using json serializer serializer = resolve_serializer(json) diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py index 1f392a3..42cd207 100644 --- a/tests/test_timeouts.py +++ b/tests/test_timeouts.py @@ -1,8 +1,8 @@ import time
from rq import Queue, SimpleWorker
-from rq.timeouts import TimerDeathPenalty
from rq.registry import FailedJobRegistry, FinishedJobRegistry
+from rq.timeouts import TimerDeathPenalty
from tests import RQTestCase
diff --git a/tests/test_utils.py b/tests/test_utils.py index b71e67e..2dbb613 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,10 +1,9 @@ -import re import datetime +import re from unittest.mock import Mock from redis import Redis -from tests import RQTestCase, fixtures from rq.exceptions import TimeoutFormatError from rq.utils import ( backend_class, @@ -16,11 +15,12 @@ from rq.utils import ( import_attribute, is_nonstring_iterable, parse_timeout, - utcparse, split_list, truncate_long_string, + utcparse, ) from rq.worker import SimpleWorker +from tests import RQTestCase, fixtures class TestUtils(RQTestCase): diff --git a/tests/test_worker.py b/tests/test_worker.py index 5410180..285ae42 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,57 +1,52 @@ import json import os -import psutil import shutil import signal import subprocess import sys import time import zlib - from datetime import datetime, timedelta from multiprocessing import Process from time import sleep +from unittest import mock, skipIf +from unittest.mock import Mock -from unittest import skipIf - -import redis.exceptions +import psutil import pytest -from unittest import mock -from unittest.mock import Mock +import redis.exceptions +from rq import Queue, SimpleWorker, Worker, get_current_connection from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL +from rq.job import Job, JobStatus, Retry +from rq.registry import FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry +from rq.results import Result +from rq.serializers import JSONSerializer +from rq.suspension import resume, suspend +from rq.utils import as_text, utcnow +from rq.version import VERSION +from rq.worker import HerokuWorker, RandomWorker, RoundRobinWorker, WorkerStatus from tests import RQTestCase, slow from tests.fixtures import ( + CustomJob, access_self, create_file, create_file_after_timeout, create_file_after_timeout_and_setsid, - CustomJob, div_by_zero, do_nothing, kill_worker, + launch_process_within_worker_and_store_pid, long_running_job, modify_self, modify_self_and_error, + raise_exc_mock, run_dummy_heroku_worker, save_key_ttl, say_hello, say_pid, - raise_exc_mock, - launch_process_within_worker_and_store_pid, ) -from rq import Queue, SimpleWorker, Worker, get_current_connection -from rq.utils import as_text -from rq.job import Job, JobStatus, Retry -from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry -from rq.results import Result -from rq.suspension import resume, suspend -from rq.utils import utcnow -from rq.version import VERSION -from rq.worker import HerokuWorker, WorkerStatus, RoundRobinWorker, RandomWorker -from rq.serializers import JSONSerializer - class CustomQueue(Queue): pass @@ -656,7 +651,10 @@ class TestWorker(RQTestCase): self.assertIsNone(w.dequeue_job_and_maintain_ttl(None)) def test_worker_ttl_param_resolves_timeout(self): - """Ensures the worker_ttl param is being considered in the dequeue_timeout and connection_timeout params, takes into account 15 seconds gap (hard coded)""" + """ + Ensures the worker_ttl param is being considered in the dequeue_timeout and + connection_timeout params, takes into account 15 seconds gap (hard coded) + """ q = Queue() w = Worker([q]) self.assertEqual(w.dequeue_timeout, 405) diff --git a/tests/test_worker_pool.py b/tests/test_worker_pool.py index ab2e677..219b4a8 100644 --- a/tests/test_worker_pool.py +++ b/tests/test_worker_pool.py @@ -1,18 +1,16 @@ import os import signal - from multiprocessing import Process from time import sleep -from rq.job import JobStatus - -from tests import TestCase -from tests.fixtures import CustomJob, _send_shutdown_command, long_running_job, say_hello from rq.connections import parse_connection +from rq.job import JobStatus from rq.queue import Queue from rq.serializers import JSONSerializer from rq.worker import SimpleWorker -from rq.worker_pool import run_worker, WorkerPool +from rq.worker_pool import WorkerPool, run_worker +from tests import TestCase +from tests.fixtures import CustomJob, _send_shutdown_command, long_running_job, say_hello def wait_and_send_shutdown_signal(pid, time_to_wait=0.0): @@ -111,9 +109,7 @@ class TestWorkerPool(TestCase): queue.enqueue(say_hello) connection_class, pool_class, pool_kwargs = parse_connection(self.connection) - run_worker( - 'test-worker', ['foo'], connection_class, pool_class, pool_kwargs - ) + run_worker('test-worker', ['foo'], connection_class, pool_class, pool_kwargs) # Worker should have processed the job self.assertEqual(len(queue), 0) diff --git a/tests/test_worker_registration.py b/tests/test_worker_registration.py index 30a3c82..26ee617 100644 --- a/tests/test_worker_registration.py +++ b/tests/test_worker_registration.py @@ -1,15 +1,19 @@ -from rq.utils import ceildiv -from tests import RQTestCase from unittest.mock import patch from rq import Queue, Worker -from rq.worker_registration import (clean_worker_registry, get_keys, register, - unregister, REDIS_WORKER_KEYS, - WORKERS_BY_QUEUE_KEY) +from rq.utils import ceildiv +from rq.worker_registration import ( + REDIS_WORKER_KEYS, + WORKERS_BY_QUEUE_KEY, + clean_worker_registry, + get_keys, + register, + unregister, +) +from tests import RQTestCase class TestWorkerRegistry(RQTestCase): - def test_worker_registration(self): """Ensure worker.key is correctly set in Redis.""" foo_queue = Queue(name='foo') @@ -21,23 +25,15 @@ class TestWorkerRegistry(RQTestCase): self.assertTrue(redis.sismember(worker.redis_workers_keys, worker.key)) self.assertEqual(Worker.count(connection=redis), 1) - self.assertTrue( - redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key) - ) + self.assertTrue(redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key)) self.assertEqual(Worker.count(queue=foo_queue), 1) - self.assertTrue( - redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key) - ) + self.assertTrue(redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key)) self.assertEqual(Worker.count(queue=bar_queue), 1) unregister(worker) self.assertFalse(redis.sismember(worker.redis_workers_keys, worker.key)) - self.assertFalse( - redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key) - ) - self.assertFalse( - redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key) - ) + self.assertFalse(redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key)) + self.assertFalse(redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key)) def test_get_keys_by_queue(self): """get_keys_by_queue only returns active workers for that queue""" @@ -56,17 +52,11 @@ class TestWorkerRegistry(RQTestCase): register(worker3) # get_keys(queue) will return worker keys for that queue - self.assertEqual( - set([worker1.key, worker2.key]), - get_keys(foo_queue) - ) + self.assertEqual(set([worker1.key, worker2.key]), get_keys(foo_queue)) self.assertEqual(set([worker1.key]), get_keys(bar_queue)) # get_keys(connection=connection) will return all worker keys - self.assertEqual( - set([worker1.key, worker2.key, worker3.key]), - get_keys(connection=worker1.connection) - ) + self.assertEqual(set([worker1.key, worker2.key, worker3.key]), get_keys(connection=worker1.connection)) # Calling get_keys without arguments raises an exception self.assertRaises(ValueError, get_keys) @@ -105,8 +95,9 @@ class TestWorkerRegistry(RQTestCase): worker = Worker([queue]) register(worker) - with patch('rq.worker_registration.MAX_KEYS', MAX_KEYS), \ - patch.object(queue.connection, 'pipeline', wraps=queue.connection.pipeline) as pipeline_mock: + with patch('rq.worker_registration.MAX_KEYS', MAX_KEYS), patch.object( + queue.connection, 'pipeline', wraps=queue.connection.pipeline + ) as pipeline_mock: # clean_worker_registry creates a pipeline with a context manager. Configure the mock using the context # manager entry method __enter__ pipeline_mock.return_value.__enter__.return_value.srem.return_value = None @@ -1,8 +1,8 @@ [tox] -envlist=py36,py37,py38,py39,py310,flake8 +envlist=lint,py36,py37,py38,py39,py310 [testenv] -commands=pytest --cov rq --durations=5 {posargs} +commands=pytest --cov rq --cov-config=.coveragerc --durations=5 {posargs} deps= pytest pytest-cov @@ -13,13 +13,14 @@ passenv= RUN_SSL_TESTS RUN_SLOW_TESTS_TOO -[testenv:flake8] -basepython = python3.6 +[testenv:lint] +basepython = python3.10 deps = - flake8 + black + ruff commands = - flake8 rq tests - + black --check rq tests + ruff check rq tests [testenv:py36] skipdist = True |