Commit 19f27fb1 authored by xa's avatar xa

Merge branch 'queue' into 'master'

Queue



See merge request !3
parents 88c6e0ed 8acaea1c
Pipeline #521 passed with stages
AIO Disque
==========
Python3.5 & Asyncio client for disque_ message broker.
Asyncio client form disque_.
Installation
------------
Installation::
aiodisque requires a running disque server.
::
python -m pip install -e .
Getting started
---------------
Usage::
from aiodisque import Disque
client = Disque()
job_id = await client.sendjob('queue', 'body')
async for queue in client.qscan_iterator(count=128):
API Reference
-------------
The `official Disque command documentation`_ does a great job of explaining
each command in detail. There are a few exceptions:
* each method are lowered
* async keywords are replaced by asap
In addition to the changes above, it implements some async sugar:
* iterators::
async for jobs in client.client.getjob_iter('q', nohang=True):
print(jobs)
async for queue in client.qscan_iter(count=128):
print(queue)
async for job in client.jscan_iter(count=128):
print(job)
* mimic an asyncio Queue::
from aiodisque.queue import Queue
queue = JobsQueue('queue', client)
job_id = await queue.put('job')
job = await queue.get()
assert job.id == job_id
.. _disque: https://github.com/antirez/disque
.. _`official Disque command documentation`: https://github.com/antirez/disque#main-api
from .client import *
from .connections import *
from .queues import *
__all__ = client.__all__ + connections.__all__
__all__ = client.__all__ + connections.__all__ + queues.__all__
from ._version import get_versions
__version__ = get_versions()['version']
......
from .connections import Connection
from .iterators import JScanIterator, QScanIterator
from .iterators import JobsIterator
from .scanners import JobsScanner, QueuesScanner
from .util import grouper
from collections import namedtuple
......@@ -142,6 +143,12 @@ class Disque:
if response is not None:
return render_jobs(response)
def getjob_iter(self, *queues, nohang=None, timeout=None, count=None,
withcounters=None):
assert queues, 'At least one queue required'
return JobsIterator(self, *queues, nohang=nohang, timeout=timeout,
count=count, withcounters=withcounters)
async def ackjob(self, *jobs):
"""Acknowledges the execution of one or more jobs via job IDs
......@@ -370,14 +377,14 @@ class Disque:
cursor, items = await self.execute_command(*params)
return Cursor(int(cursor), items)
def qscan_iterator(self, *, count=None,
minlen=None, maxlen=None, import_rate=None):
return QScanIterator(self, count=count, minlen=minlen,
def qscan_iter(self, *, count=None,
minlen=None, maxlen=None, import_rate=None):
return QueuesScanner(self, count=count, minlen=minlen,
maxlen=maxlen, import_rate=import_rate)
def jscan_iterator(self, *states, count=None, queue=None, reply=None):
return JScanIterator(self, states=states, count=count,
queue=queue, reply=reply)
def jscan_iter(self, *states, count=None, queue=None, reply=None):
return JobsScanner(self, states=states, count=count, queue=queue,
reply=reply)
async def jscan(self, cursor=None, *states, count=None, busyloop=None,
queue=None, reply=None):
......
from collections import deque
from collections.abc import AsyncIterator
class Iterator:
class JobsIterator(AsyncIterator):
def __init__(self, func, func_args, func_kwargs):
self.cursor = 0
self.buffer = deque()
self.state = 'waiting'
self.func = func
self.func_args = func_args
self.func_kwargs = func_kwargs
def __init__(self, client, *queues, nohang=None, timeout=None,
count=None, withcounters=None):
self.client = client
self.args = queues
self.kwargs = {
'nohang': nohang,
'timeout': timeout,
'count': count,
'withcounters': withcounters
}
async def __aiter__(self):
self.cursor = 0
self.state == 'waiting'
return self
async def __anext__(self):
if not self.buffer:
await self.fetch_data()
if self.buffer:
return self.buffer.popleft()
jobs = await self.get()
if jobs:
return jobs
raise StopAsyncIteration()
async def fetch_data(self):
if self.state != 'finished':
self.cursor, data = await self.func(self.cursor,
*self.func_args,
**self.func_kwargs)
self.state = 'finished' if self.cursor == 0 else 'running'
self.buffer.extend(data)
class JScanIterator(Iterator):
def __init__(self, client, *,
states=None, count=None, queue=None, reply=None):
func = client.jscan
func_args = states or []
func_kwargs = {
'count': count,
'queue': queue,
'reply': reply
}
super().__init__(func, func_args, func_kwargs)
class QScanIterator(Iterator):
def __init__(self, client, *,
count=None, minlen=None, maxlen=None, import_rate=None):
func = client.qscan
func_args = []
func_kwargs = {
'count': count,
'minlen': minlen,
'maxlen': maxlen,
'import_rate': import_rate
}
super().__init__(func, func_args, func_kwargs)
async def get(self):
return await self.client.getjob(*self.args, **self.kwargs)
import asyncio
__all__ = ['JobsQueue']
class JobsQueue:
class Empty(Exception):
"""
Exception raised when non-blocking :meth:`~EventsQueue.get`
is called on a :class:`JobsQueue` object which is empty.
"""
class Full(Exception):
"""
Exception raised when :meth:`~EventsQueue.put` is called on a
:class:`JobsQueue` object which is full.
"""
def __init__(self, queue, client, *, maxsize=0, loop=None):
"""Constructor for a FIFO queue
maxsize is an integer that sets the upperbound limit on the number of
items that can be placed in the queue. Insertion will block once this
size has been reached, until queue items are consumed. If maxsize is
less than or equal to zero, the queue size is infinite
"""
self.name = queue
self.client = client
self.maxsize = maxsize
self.loop = loop or asyncio.get_event_loop()
def empty(self):
"""Return True if the queue is empty, False otherwise
"""
raise NotImplementedError
def full(self):
"""Return True if there are maxsize items in the queue
"""
raise NotImplementedError
async def get(self, withcounters=None):
"""Remove and return an item from the queue
If queue is empty, wait until an item is available.
See also The empty() method.
"""
jobs = await self.client.getjob(self.name, nohang=False,
withcounters=None)
return jobs.pop()
def get_nowait(self, withcounters=None):
"""Remove and return an item from the queue
Return an item if one is immediately available, else raise QueueEmpty
"""
raise NotImplementedError
async def join(self):
"""Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to
the queue. The count goes down whenever a consumer thread calls
task_done() to indicate that the item was retrieved and all work on it
is complete. When the count of unfinished tasks drops to zero, join()
unblocks.
"""
raise NotImplementedError
async def put(self, job, *, ms_timeout=0, replicate=None,
delay=None, retry=None, ttl=None):
"""Put an item into the queue.
If the queue is full, wait until a free slot is available before
adding item
"""
job = getattr(job, 'body', job)
response = await self.client.addjob(self.name, job, ms_timeout=0,
replicate=None, delay=None,
retry=None, ttl=None, asap=False,
maxlen=self.maxsize or None)
return response
def put_nowait(self, job, *, ms_timeout=0, replicate=None,
delay=None, retry=None, ttl=None, maxlen=None):
"""Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
"""
raise NotImplementedError
def qsize(self):
"""Number of items in the queue
"""
raise NotImplementedError
def task_done(self):
"""Indicate that a formerly enqueued task is complete
Used by queue consumers. For each get() used to fetch a task, a
subsequent call to task_done() tells the queue that the processing
on the task is complete.
If a join() is currently blocking, it will resume when all items have
been processed (meaning that a task_done() call was received for every
item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in
the queue.
"""
raise NotImplementedError
from collections import deque
from collections.abc import AsyncIterator
class ScannerIterator(AsyncIterator):
def __init__(self, func, func_args, func_kwargs):
self.cursor = 0
self.buffer = deque()
self.state = 'waiting'
self.func = func
self.func_args = func_args
self.func_kwargs = func_kwargs
async def __aiter__(self):
self.cursor = 0
self.state == 'waiting'
return self
async def __anext__(self):
if not self.buffer:
await self.fetch_data()
if self.buffer:
return self.buffer.popleft()
raise StopAsyncIteration()
async def fetch_data(self):
if self.state != 'finished':
self.cursor, data = await self.func(self.cursor,
*self.func_args,
**self.func_kwargs)
self.state = 'finished' if self.cursor == 0 else 'running'
self.buffer.extend(data)
class JobsScanner(ScannerIterator):
def __init__(self, client, *,
states=None, count=None, queue=None, reply=None):
func = client.jscan
func_args = states or []
func_kwargs = {
'count': count,
'queue': queue,
'reply': reply
}
super().__init__(func, func_args, func_kwargs)
class QueuesScanner(ScannerIterator):
def __init__(self, client, *,
count=None, minlen=None, maxlen=None, import_rate=None):
func = client.qscan
func_args = []
func_kwargs = {
'count': count,
'minlen': minlen,
'maxlen': maxlen,
'import_rate': import_rate
}
super().__init__(func, func_args, func_kwargs)
......@@ -31,6 +31,7 @@ exclude_lines =
raise NotImplementedError
if 0:
if __name__ == .__main__.:
raise NotImplementedError
[pytest]
flake8-ignore =
......
......@@ -9,5 +9,18 @@ setup(
install_requires=[
'hiredis'
],
classifiers=[
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.5",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules"
],
keywords=['message broker'],
url='https://lab.errorist.xyz/aio/aiodisque',
license='MIT',
cmdclass=versioneer.get_cmdclass()
)
import pytest
from aiodisque import Disque
from aiodisque.iterators import JScanIterator, QScanIterator
from aiodisque.iterators import JobsIterator
@pytest.mark.asyncio
async def test_queues(node, event_loop):
client = Disque(node.port, loop=event_loop)
queues = set()
for i in range(0, 512):
queue = 'queue-%s' % i
queues.add(queue)
await client.addjob(queue, 'job', 5000, replicate=1, retry=0)
expected = set()
for i in range(0, 256):
job_id = await client.addjob('q', 'job-%s' % i, 5000, replicate=1, retry=0)
expected.add(job_id)
found_queues = set()
it = client.qscan_iterator(count=128)
async for queue in it:
found_queues.add(queue)
assert found_queues == queues
assert isinstance(it, QScanIterator)
@pytest.mark.asyncio
async def test_jobs(node, event_loop):
client = Disque(node.port, loop=event_loop)
jobs = set()
for i in range(0, 512):
job_id = await client.addjob('q', i, 5000, replicate=1, retry=0)
jobs.add(job_id)
found_jobs = set()
it = client.jscan_iterator(count=128)
async for queue in it:
found_jobs.add(queue)
assert found_jobs == jobs
assert isinstance(it, JScanIterator)
it = client.getjob_iter('q', nohang=True)
results = set()
async for jobs in it:
results.update(job.id for job in jobs)
assert results == expected
assert isinstance(it, JobsIterator)
import pytest
from aiodisque import Disque
from aiodisque.queues import JobsQueue
@pytest.mark.asyncio
async def test_get(node, event_loop):
client = Disque(node.port, loop=event_loop)
queue = JobsQueue('q', client, loop=event_loop)
await client.addjob('q', 'job', 5000, replicate=1, retry=0)
job = await queue.get()
assert hasattr(job, 'id')
assert hasattr(job, 'body')
assert hasattr(job, 'body')
assert hasattr(job, 'queue')
assert not hasattr(job, 'nacks')
assert not hasattr(job, 'additional_deliveries')
@pytest.mark.asyncio
async def test_get_nowait(node, event_loop):
client = Disque(node.port, loop=event_loop)
queue = JobsQueue('q', client, loop=event_loop)
with pytest.raises(NotImplementedError):
queue.get_nowait()
@pytest.mark.asyncio
async def test_put(node, event_loop):
client = Disque(node.port, loop=event_loop)
queue = JobsQueue('q', client, loop=event_loop)
job_id = await queue.put('job')
response = await client.getjob('q')
assert len(response) == 1
job = response[0]
assert hasattr(job, 'id')
assert hasattr(job, 'body')
assert hasattr(job, 'body')
assert hasattr(job, 'queue')
assert not hasattr(job, 'nacks')
assert not hasattr(job, 'additional_deliveries')
@pytest.mark.asyncio
async def test_put_nowait(node, event_loop):
client = Disque(node.port, loop=event_loop)
queue = JobsQueue('q', client, loop=event_loop)
with pytest.raises(NotImplementedError):
queue.put_nowait('job')
import pytest
from aiodisque import Disque
from aiodisque.scanners import JobsScanner, QueuesScanner
@pytest.mark.asyncio
async def test_queues_scanner(node, event_loop):
client = Disque(node.port, loop=event_loop)
queues = set()
for i in range(0, 512):
queue = 'queue-%s' % i
queues.add(queue)
await client.addjob(queue, 'job', 5000, replicate=1, retry=0)
found_queues = set()
it = client.qscan_iter(count=128)
async for queue in it:
found_queues.add(queue)
assert found_queues == queues
assert isinstance(it, QueuesScanner)
@pytest.mark.asyncio
async def test_jobs_scanner(node, event_loop):
client = Disque(node.port, loop=event_loop)
jobs = set()
for i in range(0, 512):
job_id = await client.addjob('q', i, 5000, replicate=1, retry=0)
jobs.add(job_id)
found_jobs = set()
it = client.jscan_iter(count=128)
async for queue in it:
found_jobs.add(queue)
assert found_jobs == jobs
assert isinstance(it, JobsScanner)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment