Skip to content

Commit

Permalink
Merge pull request #15 from EasyPost/production-pool
Browse files Browse the repository at this point in the history
add ProductionPool; drop support for Python 2
  • Loading branch information
Roguelazer authored Aug 23, 2022
2 parents f06f85d + f334085 commit af5a683
Show file tree
Hide file tree
Showing 12 changed files with 313 additions and 35 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
pythonversion: ['3.6', '3.7', '3.8', '3.9', 'pypy-3.6']
pythonversion: ['3.6', '3.7', '3.8', '3.9', '3.10']
steps:
- uses: actions/checkout@v2
- name: cache beanstalkd
Expand Down Expand Up @@ -44,3 +44,5 @@ jobs:
run: pytest --cov=pystalk/ --cov-report=term-missing --cov-fail-under=60 tests/
env:
BEANSTALKD_PATH: ~/beanstalks/beanstalkd
- name: check mypy
run: mypy pystalk/ tests/
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ htmlcov
__pycache__

.DS_Store
/build
/dist
29 changes: 25 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
**`pystalk`** is an extremely simple client for [beanstalkd](http://kr.github.io/beanstalkd/). It is
compatible with both Python 2 and Python 3.
**`pystalk`** is an extremely simple Python client for [beanstalkd](http://kr.github.io/beanstalkd/).

This project was initially created for [beancmd](https://github.com/EasyPost/beancmd). You may also be interested in that tool!

[![CI](https://github.com/EasyPost/pystalk/workflows/CI/badge.svg)](https://github.com/EasyPost/pystalk/actions?query=workflow%3ACI)
[![ReadTheDocs](https://readthedocs.org/projects/pip/badge/?version=latest)](http://pystalk.readthedocs.io/en/latest/)

Note that _none_ of the objects in this package are inherently synchronized (thread-safe), and if you are going to use
them from multiple threads, you should always protect them with a mutex. Clients are also not fork-safe, and should be
initialized after any forking.

## Requirements / Installing

This software works with Python 2.7, and 3.5+.
This software works with Python 3.6+. It should work PyPy3 but has not been tested extensively.

It does not support any asynchronous event loops and has not been tested with gevent. It's designed for simple,
synchronous use.
Expand Down Expand Up @@ -78,7 +81,25 @@ for job in client.reserve_iter():

Note that, even though we require that job data be UTF-8 encodeable in the `put_job` method, we do not decode for you -- the job data that comes out is a byte-string in Python 3.5. You should call `.decode("utf-8")` on it if you want to get the input data back out. If you would like that behavior, pass `auto_decode=True` to the `BeanstalkClient` constructor; note that this might make it difficult for you to consume data injected by other systems which don't assume UTF-8.

### Multiple Job Servers
### Producing into Multiple Job Servers

This library includes the `ProductionPool` class, which will insert jobs into beanstalk servers, rotating between them
when an error occurs. Example usage:

```python
from pystalk import BeanstalkClient, ProductionPool

pool = ProductionPool.from_uris(
['beanstalkd://10.0.0.1:10300', 'beanstalkd://10.0.0.2:10300'],
socket_timeout=10
)
pool.put_job_into('some tube', 'some job')
```

The Pool **only** supports the `put_job` and `put_job_into` methods and makes no fairness guarantees; you should not use
it for consumption.

### Consuming From Multiple Job Servers

The following will reserve jobs from a group of Beanstalk servers, fairly rotating between them.

Expand Down
7 changes: 7 additions & 0 deletions docs/source/CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
pystalk ChangeLog
#################

=====
NEXT
======
* Drop support for Python < 3.6
* Add `pystalk.pool.ProductionPool`
* Add some `typing` annotations

=====
0.6.1
=====
Expand Down
3 changes: 2 additions & 1 deletion pystalk/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from .client import BeanstalkClient, BeanstalkError
from .pool import ProductionPool

version_info = (0, 6, 1)
__version__ = '.'.join(str(s) for s in version_info)
__author__ = 'EasyPost <oss@easypost.com>'


__all__ = ['BeanstalkClient', 'BeanstalkError']
__all__ = ['BeanstalkClient', 'BeanstalkError', 'ProductionPool']
24 changes: 10 additions & 14 deletions pystalk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import socket
import yaml
import re
from typing import Optional, Union

import six

Expand All @@ -22,17 +23,11 @@ class Job(object):
job_data = attr.ib()


if getattr(attr, "__version_info__", (0,)) >= (19, 2):
_attrs_kwargs = dict(eq=True)
else:
_attrs_kwargs = dict(cmp=True)


@attr.s(frozen=True, hash=True, **_attrs_kwargs)
@attr.s(frozen=True, hash=True, eq=True)
class BeanstalkError(Exception):
"""Common error raised when something goes wrong with beanstalk"""

message = attr.ib(converter=lambda m: m.decode('ascii'))
message: str = attr.ib(converter=lambda m: m.decode('ascii'))


def yaml_load(fo):
Expand All @@ -47,10 +42,10 @@ def yaml_load(fo):
class BeanstalkInsertingProxy(object):
"""Proxy object yielded by :func:`BeanstalkClient.using()`"""

beanstalk_client = attr.ib()
tube = attr.ib()
beanstalk_client: 'BeanstalkClient' = attr.ib()
tube: str = attr.ib()

def put_job(self, data, pri=65536, delay=0, ttr=120):
def put_job(self, data: Union[str, bytes], pri: int = 65536, delay: int = 0, ttr: int = 120):
"""Method to insert a job into the tube selected with :func:`BeanstalkClient.using`.
:param data: Job body
Expand Down Expand Up @@ -88,7 +83,8 @@ class BeanstalkClient(object):
Setting socket timeout to a value lower than the value you pass to blocking functions like
:func:`reserve_job()` will cause errors!
"""
def __init__(self, host, port=11300, socket_timeout=None, auto_decode=False):
def __init__(self, host: str, port: int = 11300, socket_timeout: Optional[float] = None,
auto_decode: bool = False):
"""Construct a synchronous Beanstalk Client. Does not connect!"""
self.host = host
self.port = port
Expand Down Expand Up @@ -271,7 +267,7 @@ def stats(self):
stats = yaml_load(body)
return stats

def put_job(self, data, pri=65536, delay=0, ttr=120):
def put_job(self, data: Union[str, bytes], pri: int = 65536, delay: int = 0, ttr: int = 120):
"""Insert a new job into whatever queue is currently USEd
:param data: Job body
Expand Down Expand Up @@ -303,7 +299,7 @@ def put_job(self, data, pri=65536, delay=0, ttr=120):
self._send_message(message, socket)
return self._receive_id(socket)

def put_job_into(self, tube_name, data, pri=65536, delay=0, ttr=120):
def put_job_into(self, tube_name: str, data: Union[str, bytes], pri: int = 65536, delay: int = 0, ttr: int = 120):
"""Insert a new job into a specific queue. Wrapper around :func:`put_job`.
:param tube_name: Tube name
Expand Down
184 changes: 184 additions & 0 deletions pystalk/pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
from typing import List, Optional, Union
from collections import deque
import time
import random
import socket
import logging

import attr

from .client import BeanstalkClient, BeanstalkError


RETRIABLE_ERRORS = ('INTERNAL_ERROR', 'OUT_OF_MEMORY')


def _get_time():
return time.monotonic()


class NoMoreClients(Exception):
def __str__(self):
return "No clients can process requests at this time"


@attr.s
class ClientRecord(object):
client: BeanstalkClient = attr.ib()
last_failed_at: Optional[float] = attr.ib(default=None)

def is_ok(self, backoff_time, now=None):
if now is None:
now = _get_time()
if self.last_failed_at is None:
return True
return self.last_failed_at < (now - backoff_time)

def mark_failed(self, now=None):
self.last_failed_at = _get_time()


class ProductionPool(object):
"""A pool for producing jobs into a list of beanstalk servers. When an error occurs, job insertion
will be re-attempted on the next server in the pool.
:param clients: List of beanstalk client instances to use
:param round_robin: If true, every insertion will go to a different server in the pool. If false,
the server will only be changed when an exception occurs.
:param backoff_time: Number of seconds after an error before a server will be reused
:param initial_shuffle: Randomly shuffle clients at initialization
All clients should have a socket timeout set or else some errors will not be detected.
NOTE: This will give you at-least-once deliverability (presuming at least one server is up), but can *easily*
result in jobs being issued multiple times. Only use this functionality with idempotent jobs.
This method of pooling is only suitable for use when *producing* jobs. For *consuming* jobs from a cluster of
beanstalkd servers, consider the `pystalkworker` project.
"""
def __init__(self, clients: List[BeanstalkClient], round_robin: bool = True,
backoff_time: float = 10.0, initial_shuffle: bool = True):
if not clients:
raise ValueError('Must pass at least one BeanstalkClient')
client_records = [ClientRecord(c) for c in clients]
if initial_shuffle:
random.shuffle(client_records)
self._clients = deque(client_records)
self.current_tube: Optional[str] = None
self.round_robin = round_robin
self.backoff_time = backoff_time
self.log = logging.getLogger('pystalk.ProductionPool')

@classmethod
def from_uris(cls, uris: List[str], socket_timeout: float = None, auto_decode: bool = False,
round_robin: bool = True, backoff_time: float = 10.0, initial_shuffle: bool = True):
"""Construct a pool from a list of URIs. See `pystalk.client.Client.from_uri` for more information.
:param uris: A list of URIs
:param socket_timeout: Socket timeout to set on all constructed clients
:param auto_decode: Whether bodies should be bytes (False) or strings (True)
"""
return cls(
clients=[BeanstalkClient.from_uri(uri, socket_timeout=socket_timeout, auto_decode=auto_decode)
for uri in uris],
round_robin=round_robin,
backoff_time=backoff_time,
initial_shuffle=initial_shuffle
)

def use(self, tube: str):
"""Start producing jobs into the given tube.
:param tube: Name of the tube to USE
Subsequent calls to :func:`put_job` insert jobs into this tube.
"""
self.current_tube = tube

def _get_client(self):
# attempt to find the next live client and activate it
for _ in range(len(self._clients)):
if self._clients[0].is_ok(self.backoff_time):
client = self._clients[0]
if client.client.current_tube != self.current_tube:
client.client.use(self.current_tube)
return client
else:
self._clients.rotate()
self.log.error('All clients are failed!')
raise NoMoreClients()

def _mark_client_failed(self):
self._clients[0].mark_failed()
self._clients.rotate()

def _attempt_on_all_clients(self, thunk):
while True:
try:
client_record = self._get_client()
rv = thunk(client_record.client)
if self.round_robin:
self._clients.rotate()
return rv
except BeanstalkError as e:
if e.message in RETRIABLE_ERRORS:
self.log.warning('error on server %r: %r', client_record, e)
self._mark_client_failed()
else:
raise
except (socket.error) as e:
self.log.warning('error on server %r: %r', client_record, e)
self._mark_client_failed()

def put_job(self, data: Union[str, bytes], pri: int = 65536, delay: int = 0, ttr: int = 120):
"""Insert a new job into whatever queue is currently USEd
:param data: Job body
:type data: Text (either str which will be encoded as utf-8, or bytes which are already utf-8
:param pri: Priority for the job
:type pri: int
:param delay: Delay in seconds before the job should be placed on the ready queue
:type delay: int
:param ttr: Time to reserve (how long a worker may work on this job before we assume the worker is blocked
and give the job to another worker
:type ttr: int
.. seealso::
:func:`put_job_into()`
Put a job into a specific tube
:func:`using()`
Insert a job using an external guard
"""
return self._attempt_on_all_clients(
lambda client: client.put_job(data=data, pri=pri, delay=delay, ttr=120)
)

def put_job_into(self, tube_name: str, data: Union[str, bytes], pri: int = 65536,
delay: int = 0, ttr: int = 120):
"""Insert a new job into a specific queue. Wrapper around :func:`put_job`.
:param tube_name: Tube name
:type tube_name: str
:param data: Job body
:type data: Text (either str which will be encoded as utf-8, or bytes which are already utf-8
:param pri: Priority for the job
:type pri: int
:param delay: Delay in seconds before the job should be placed on the ready queue
:type delay: int
:param ttr: Time to reserve (how long a worker may work on this job before we assume the worker is blocked
and give the job to another worker
:type ttr: int
.. seealso::
:func:`put_job()`
Put a job into whatever the current tube is
:func:`using()`
Insert a job using an external guard
"""
return self._attempt_on_all_clients(
lambda client: client.put_job_into(tube_name=tube_name, data=data, pri=pri, delay=delay, ttr=120)
)
8 changes: 6 additions & 2 deletions requirements-tests.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
pytest==3.5.*
pytest_cov==2.5.*
pytest==7.*
pytest-cov==3.*
pytest-mock==3.*
mock
flake8
mypy==0.971
types-PyYAML
types-six
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
PyYAML>=3.0
attrs>=17.4
attrs>=19.2
six
Loading

0 comments on commit af5a683

Please sign in to comment.