Commit 8acaea1c authored by Xavier Barbosa's avatar Xavier Barbosa

jobs iterator

parent d1c7795f
Pipeline #520 passed with stages
AIO Disque
==========
Python3.5 & Asyncio client for disque_ message broker.
Python3.5 & Asyncio client for disque_.
Installation
------------
Installation::
aiodisque requires a running disque server.
::
python -m pip install -e .
Getting started
---------------
Usage::
from aiodisque import Disque
client = Disque()
job_id = await client.sendjob('queue', 'body')
Asyncio iterators::
API Reference
-------------
The `official Disque command documentation`_ does a great job of explaining
each command in detail. There are a few exceptions:
* each method are lowered
* async keywords are replaced by asap
async for queue in client.qscan_iterator(count=128):
In addition to the changes above, it implements some async sugar:
* iterators::
async for jobs in client.client.getjob_iter('q', nohang=True):
print(jobs)
async for queue in client.qscan_iter(count=128):
print(queue)
Mimic an asyncio Queue::
async for job in client.jscan_iter(count=128):
print(job)
* mimic an asyncio Queue::
from aiodisque.queue import Queue
queue = JobsQueue('queue', client)
......@@ -30,3 +54,4 @@ Mimic an asyncio Queue::
assert job.id == job_id
.. _disque: https://github.com/antirez/disque
.. _`official Disque command documentation`: https://github.com/antirez/disque#main-api
from .connections import Connection
from .iterators import JScanIterator, QScanIterator
from .iterators import JobsIterator
from .scanners import JobsScanner, QueuesScanner
from .util import grouper
from collections import namedtuple
......@@ -142,6 +143,12 @@ class Disque:
if response is not None:
return render_jobs(response)
def getjob_iter(self, *queues, nohang=None, timeout=None, count=None,
withcounters=None):
assert queues, 'At least one queue required'
return JobsIterator(self, *queues, nohang=nohang, timeout=timeout,
count=count, withcounters=withcounters)
async def ackjob(self, *jobs):
"""Acknowledges the execution of one or more jobs via job IDs
......@@ -370,14 +377,14 @@ class Disque:
cursor, items = await self.execute_command(*params)
return Cursor(int(cursor), items)
def qscan_iterator(self, *, count=None,
minlen=None, maxlen=None, import_rate=None):
return QScanIterator(self, count=count, minlen=minlen,
def qscan_iter(self, *, count=None,
minlen=None, maxlen=None, import_rate=None):
return QueuesScanner(self, count=count, minlen=minlen,
maxlen=maxlen, import_rate=import_rate)
def jscan_iterator(self, *states, count=None, queue=None, reply=None):
return JScanIterator(self, states=states, count=count,
queue=queue, reply=reply)
def jscan_iter(self, *states, count=None, queue=None, reply=None):
return JobsScanner(self, states=states, count=count, queue=queue,
reply=reply)
async def jscan(self, cursor=None, *states, count=None, busyloop=None,
queue=None, reply=None):
......
from collections import deque
from collections.abc import AsyncIterator
class Iterator:
class JobsIterator(AsyncIterator):
def __init__(self, func, func_args, func_kwargs):
self.cursor = 0
self.buffer = deque()
self.state = 'waiting'
self.func = func
self.func_args = func_args
self.func_kwargs = func_kwargs
def __init__(self, client, *queues, nohang=None, timeout=None,
count=None, withcounters=None):
self.client = client
self.args = queues
self.kwargs = {
'nohang': nohang,
'timeout': timeout,
'count': count,
'withcounters': withcounters
}
async def __aiter__(self):
self.cursor = 0
self.state == 'waiting'
return self
async def __anext__(self):
if not self.buffer:
await self.fetch_data()
if self.buffer:
return self.buffer.popleft()
jobs = await self.get()
if jobs:
return jobs
raise StopAsyncIteration()
async def fetch_data(self):
if self.state != 'finished':
self.cursor, data = await self.func(self.cursor,
*self.func_args,
**self.func_kwargs)
self.state = 'finished' if self.cursor == 0 else 'running'
self.buffer.extend(data)
class JScanIterator(Iterator):
def __init__(self, client, *,
states=None, count=None, queue=None, reply=None):
func = client.jscan
func_args = states or []
func_kwargs = {
'count': count,
'queue': queue,
'reply': reply
}
super().__init__(func, func_args, func_kwargs)
class QScanIterator(Iterator):
def __init__(self, client, *,
count=None, minlen=None, maxlen=None, import_rate=None):
func = client.qscan
func_args = []
func_kwargs = {
'count': count,
'minlen': minlen,
'maxlen': maxlen,
'import_rate': import_rate
}
super().__init__(func, func_args, func_kwargs)
async def get(self):
return await self.client.getjob(*self.args, **self.kwargs)
from collections import deque
from collections.abc import AsyncIterator
class ScannerIterator(AsyncIterator):
def __init__(self, func, func_args, func_kwargs):
self.cursor = 0
self.buffer = deque()
self.state = 'waiting'
self.func = func
self.func_args = func_args
self.func_kwargs = func_kwargs
async def __aiter__(self):
self.cursor = 0
self.state == 'waiting'
return self
async def __anext__(self):
if not self.buffer:
await self.fetch_data()
if self.buffer:
return self.buffer.popleft()
raise StopAsyncIteration()
async def fetch_data(self):
if self.state != 'finished':
self.cursor, data = await self.func(self.cursor,
*self.func_args,
**self.func_kwargs)
self.state = 'finished' if self.cursor == 0 else 'running'
self.buffer.extend(data)
class JobsScanner(ScannerIterator):
def __init__(self, client, *,
states=None, count=None, queue=None, reply=None):
func = client.jscan
func_args = states or []
func_kwargs = {
'count': count,
'queue': queue,
'reply': reply
}
super().__init__(func, func_args, func_kwargs)
class QueuesScanner(ScannerIterator):
def __init__(self, client, *,
count=None, minlen=None, maxlen=None, import_rate=None):
func = client.qscan
func_args = []
func_kwargs = {
'count': count,
'minlen': minlen,
'maxlen': maxlen,
'import_rate': import_rate
}
super().__init__(func, func_args, func_kwargs)
......@@ -9,5 +9,18 @@ setup(
install_requires=[
'hiredis'
],
classifiers=[
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.5",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules"
],
keywords=['message broker'],
url='https://lab.errorist.xyz/aio/aiodisque',
license='MIT',
cmdclass=versioneer.get_cmdclass()
)
import pytest
from aiodisque import Disque
from aiodisque.iterators import JScanIterator, QScanIterator
from aiodisque.iterators import JobsIterator
@pytest.mark.asyncio
async def test_queues(node, event_loop):
client = Disque(node.port, loop=event_loop)
queues = set()
for i in range(0, 512):
queue = 'queue-%s' % i
queues.add(queue)
await client.addjob(queue, 'job', 5000, replicate=1, retry=0)
expected = set()
for i in range(0, 256):
job_id = await client.addjob('q', 'job-%s' % i, 5000, replicate=1, retry=0)
expected.add(job_id)
found_queues = set()
it = client.qscan_iterator(count=128)
async for queue in it:
found_queues.add(queue)
assert found_queues == queues
assert isinstance(it, QScanIterator)
@pytest.mark.asyncio
async def test_jobs(node, event_loop):
client = Disque(node.port, loop=event_loop)
jobs = set()
for i in range(0, 512):
job_id = await client.addjob('q', i, 5000, replicate=1, retry=0)
jobs.add(job_id)
found_jobs = set()
it = client.jscan_iterator(count=128)
async for queue in it:
found_jobs.add(queue)
assert found_jobs == jobs
assert isinstance(it, JScanIterator)
it = client.getjob_iter('q', nohang=True)
results = set()
async for jobs in it:
results.update(job.id for job in jobs)
assert results == expected
assert isinstance(it, JobsIterator)
import pytest
from aiodisque import Disque
from aiodisque.scanners import JobsScanner, QueuesScanner
@pytest.mark.asyncio
async def test_queues_scanner(node, event_loop):
client = Disque(node.port, loop=event_loop)
queues = set()
for i in range(0, 512):
queue = 'queue-%s' % i
queues.add(queue)
await client.addjob(queue, 'job', 5000, replicate=1, retry=0)
found_queues = set()
it = client.qscan_iter(count=128)
async for queue in it:
found_queues.add(queue)
assert found_queues == queues
assert isinstance(it, QueuesScanner)
@pytest.mark.asyncio
async def test_jobs_scanner(node, event_loop):
client = Disque(node.port, loop=event_loop)
jobs = set()
for i in range(0, 512):
job_id = await client.addjob('q', i, 5000, replicate=1, retry=0)
jobs.add(job_id)
found_jobs = set()
it = client.jscan_iter(count=128)
async for queue in it:
found_jobs.add(queue)
assert found_jobs == jobs
assert isinstance(it, JobsScanner)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment