Commit 855816ad authored by Xavier Barbosa's avatar Xavier Barbosa

pad empty slots with None values

parent b0c37b59
Pipeline #524 passed with stages
......@@ -35,7 +35,7 @@ each command in detail. There are a few exceptions:
In addition to the changes above, it implements some async sugar:
* iterators::
* Fancy async iterators::
async for jobs in client.client.getjob_iter('q', nohang=True):
print(jobs)
......
......@@ -6,6 +6,7 @@ from collections import namedtuple
__all__ = ['Disque', 'Job', 'Cursor']
class Cursor(namedtuple('_Cursor', 'cursor, items')):
"""
Attributes:
......@@ -162,17 +163,20 @@ class Disque:
return render_jobs(response)
def getjob_iter(self, *queues, nohang=None, timeout=None, count=None,
withcounters=None):
withcounters=None, padding=None):
"""Returns an async iterator for getjob command.
It accepts the same parameters than :meth:`~Disque.getjob`.
Parameters:
padding (int): if count is set, it will pad results
Yields:
Job
"""
assert queues, 'At least one queue required'
return JobsIterator(self, *queues, nohang=nohang, timeout=timeout,
count=count, withcounters=withcounters)
count=count, withcounters=withcounters,
padding=padding)
async def ackjob(self, *jobs):
"""Acknowledges the execution of one or more jobs
......
......@@ -4,9 +4,65 @@ __all__ = ['JobsIterator']
class JobsIterator(AsyncIterator):
"""Job async iterator
Parameters:
nohang (bool): ask the command to don't block even if there are
no jobs in all the specified queues
timeout (int): in micro seconds
count (int): number of jobs per calls
withcounters (bool): Return the best-effort count of negative
acknowledges received by this job,
and the number of additional deliveries
performed for this job
*queues: list of queue names, with one required
padding (int): if count is set, it will pad results
``padding`` ensures that iterations will all have the same size.
For example, this fails:
>>> client = Disque()
>>> await client.addjob('q', 'j1')
>>> await client.addjob('q', 'j2')
>>> jobs = JobsIterator(client, 'q', count=3, nohang=True)
>>> try:
>>> await for j1, j2, j3 in jobs:
>>> assert isinstance(j1, Job)
>>> assert isinstance(j2, Job)
>>> assert j3 is None
>>> except ValueError as error:
>>> print('It has been failed', error)
But this one works:
>>> client = Disque()
>>> await client.addjob('q', 'j1')
>>> await client.addjob('q', 'j2')
>>> jobs = JobsIterator(client, 'q', count=3, nohang=True, padding=True)
>>> await for j1, j2, j3 in jobs:
>>> assert isinstance(j1, Job)
>>> assert isinstance(j2, Job)
>>> assert j3 is None
"""
def __init__(self, client, *queues, nohang=None, timeout=None,
count=None, withcounters=None):
count=None, withcounters=None, padding=None):
"""
Parameters:
nohang (bool): ask the command to don't block even if there are
no jobs in all the specified queues
timeout (int): in micro seconds
count (int): number of jobs per calls
withcounters (bool): Return the best-effort count of negative
acknowledges received by this job,
and the number of additional deliveries
performed for this job
*queues: list of queue names, with one required
padding (int): if count is set, it will pad results
"""
self.client = client
self.args = queues
self.kwargs = {
......@@ -15,6 +71,7 @@ class JobsIterator(AsyncIterator):
'count': count,
'withcounters': withcounters
}
self.padding = padding and count
async def __aiter__(self):
return self
......@@ -26,4 +83,16 @@ class JobsIterator(AsyncIterator):
raise StopAsyncIteration()
async def get(self):
return await self.client.getjob(*self.args, **self.kwargs)
"""Get fetch new jobs
Returns:
Returns a list of :class:`Job`.
By default it will returns the exacts jobs found, but sometimes
we need to garanty the same number of slots.
For this case filling ``padding`` and ``count`` values will
fill results with ``None` slots.
"""
jobs = await self.client.getjob(*self.args, **self.kwargs)
if jobs and self.padding:
return jobs + [None] * (self.padding - len(jobs))
return jobs
......@@ -14,7 +14,8 @@ Then start to play with it:
from aiodisque import Disque
client = Disque()
await for job1, job2 in client.getjob_iter('my-queue', nohang=True, count=2):
jobs = client.getjob_iter('my-queue', nohang=True, count=2, padding=True)
await for job1, job2 in jobs:
print('-', job1.id, job1.body)
print('-', job2.id, job2.body)
......
import pytest
from aiodisque import Disque
from aiodisque import Disque, Job
from aiodisque.iterators import JobsIterator
......@@ -8,8 +8,8 @@ async def test_queues(node, event_loop):
client = Disque(node.port, loop=event_loop)
expected = set()
for i in range(0, 256):
job_id = await client.addjob('q', 'job-%s' % i, 5000, replicate=1, retry=0)
expected.add(job_id)
res = await client.addjob('q', 'job-%s' % i, 5000, replicate=1, retry=0)
expected.add(res)
it = client.getjob_iter('q', nohang=True)
results = set()
......@@ -17,3 +17,39 @@ async def test_queues(node, event_loop):
results.update(job.id for job in jobs)
assert results == expected
assert isinstance(it, JobsIterator)
@pytest.mark.asyncio
async def test_queues_padding(node, event_loop):
client = Disque(node.port, loop=event_loop)
for i in range(0, 4):
await client.addjob('q', 'job-%s' % i, 5000, replicate=1, retry=0)
count = 0
it = client.getjob_iter('q', nohang=True, count=3, padding=True)
async for j1, j2, j3 in it:
if count == 0:
assert isinstance(j1, Job)
assert isinstance(j2, Job)
assert isinstance(j3, Job)
elif count == 1:
assert isinstance(j1, Job)
assert j2 is None
assert j3 is None
else:
break
count += 1
@pytest.mark.asyncio
async def test_queues_padding_missing(node, event_loop):
client = Disque(node.port, loop=event_loop)
for i in range(0, 2):
await client.addjob('q', 'job-%s' % i, 5000, replicate=1, retry=0)
with pytest.raises(ValueError):
it = client.getjob_iter('q', nohang=True, count=3)
async for j1, j2, j3 in it:
pass
......@@ -30,7 +30,7 @@ async def test_get_nowait(node, event_loop):
async def test_put(node, event_loop):
client = Disque(node.port, loop=event_loop)
queue = JobsQueue('q', client, loop=event_loop)
job_id = await queue.put('job')
await queue.put('job')
response = await client.getjob('q')
assert len(response) == 1
job = response[0]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment