Commit b05a3731 authored by Xavier Barbosa's avatar Xavier Barbosa

getjob returns one job instance by default

parent 5f2eb906
Pipeline #531 passed with stages
......@@ -160,8 +160,8 @@ class Disque:
performed for this job
*queues: list of queue names, with one required
Returns:
Returns a list of :class:`Job` or ``None`` if ``timeout``
is reached.
It returns a single :class:`Job` if ``count`` is empty,
None if ``timeout`` is reached, a list otherwise.
"""
assert queues, 'At least one queue required'
params = ['GETJOB']
......@@ -177,7 +177,10 @@ class Disque:
params.extend(queues)
response = await self.execute_command(*params)
if response is not None:
return render_jobs(response)
jobs = render_jobs(response)
if count is None:
return jobs.pop()
return jobs
def getjob_iter(self, *queues, nohang=None, timeout=None, count=None,
withcounters=None, padding=None):
......@@ -188,7 +191,7 @@ class Disque:
Parameters:
padding (int): if count is set, it will pad results
Yields:
Job
A single :class:`Job` or a list
"""
assert queues, 'At least one queue required'
return JobsIterator(self, *queues, nohang=nohang, timeout=timeout,
......@@ -446,8 +449,8 @@ class Disque:
count jobs queued
maxlen (int): Don't return elements with more than
count jobs queued
import_rate <rate>: Only return elements with an job import rate
(from other nodes) >= rate
import_rate (obj): Only return elements with an job import rate
(from other nodes) >= rate
Returns:
Cursor
"""
......
......@@ -46,9 +46,9 @@ class JobsQueue:
If queue is empty, wait until an item is available.
See also The empty() method.
"""
jobs = await self.client.getjob(self.name, nohang=False,
withcounters=None)
return jobs.pop()
job = await self.client.getjob(self.name, nohang=False,
withcounters=None)
return job
def get_nowait(self, withcounters=None):
"""Remove and return an item from the queue
......
.. py:module:: aiodisque
.. _getting-started:
Getting started
===============
......@@ -32,6 +35,22 @@ Some changes must be noticed:
Other goodies
-------------
:meth:`Disque.getjob` returns a single job by default. but it will return a
list if ``count`` is set:
.. code-block:: python
from aiodisque import Disque
client = Disque()
# get a job instance
await client.addjob('my-queue', 'job-1')
job = await client.getjob('my-queue')
# get a list of job instances
await client.addjob('my-queue', 'job-2')
jobs = await client.getjob('my-queue', count=1)
``padding`` with ``count`` ensure that iteration will returns the same number
of slots:
......
import asyncio
import pytest
from aiodisque import Disque, ConnectionError
from aiodisque import Disque, ConnectionError, Job
@pytest.mark.asyncio
......@@ -51,9 +51,8 @@ async def test_job(node, event_loop):
job_id = await client.addjob('foo', 'bar')
assert job_id.startswith('D-')
response = await client.getjob('foo')
assert isinstance(response, list)
job = response[0]
job = await client.getjob('foo')
assert isinstance(job, Job)
assert hasattr(job, 'id')
assert hasattr(job, 'body')
assert hasattr(job, 'body')
......@@ -71,15 +70,27 @@ async def test_job(node, event_loop):
assert result == 0
@pytest.mark.asyncio
async def test_many_jobs(node, event_loop):
client = Disque(node.port, loop=event_loop)
job_id = await client.addjob('foo', 'bar')
job_id = await client.addjob('foo', 'baz')
assert job_id.startswith('D-')
jobs = await client.getjob('foo', count=2)
assert isinstance(jobs, list)
assert isinstance(jobs[0], Job)
assert isinstance(jobs[1], Job)
@pytest.mark.asyncio
async def test_job_fastack(node, event_loop):
client = Disque(node.port, loop=event_loop)
job_id = await client.addjob('foo', 'bar')
assert job_id.startswith('D-')
response = await client.getjob('foo')
assert isinstance(response, list)
job = response[0]
job = await client.getjob('foo')
assert isinstance(job, Job)
assert hasattr(job, 'id')
assert hasattr(job, 'body')
assert hasattr(job, 'body')
......@@ -103,9 +114,8 @@ async def test_job_working(node, event_loop):
job_id = await client.addjob('foo', 'bar')
assert job_id.startswith('D-')
response = await client.getjob('foo')
assert isinstance(response, list)
job = response[0]
job = await client.getjob('foo')
assert isinstance(job, Job)
assert hasattr(job, 'id')
assert hasattr(job, 'body')
assert hasattr(job, 'body')
......@@ -133,9 +143,8 @@ async def test_job_withcounters(node, event_loop):
job_id = await client.addjob('foo', 'bar')
assert job_id.startswith('D-')
response = await client.getjob('foo', withcounters=True)
assert isinstance(response, list)
job = response[0]
job = await client.getjob('foo', withcounters=True)
assert isinstance(job, Job)
assert hasattr(job, 'id')
assert hasattr(job, 'body')
assert hasattr(job, 'queue')
......
......@@ -13,6 +13,22 @@ async def test_queues(node, event_loop):
it = client.getjob_iter('q', nohang=True)
results = set()
async for job in it:
results.add(job.id)
assert results == expected
assert isinstance(it, JobsIterator)
@pytest.mark.asyncio
async def test_queues_count(node, event_loop):
client = Disque(node.port, loop=event_loop)
expected = set()
for i in range(0, 256):
res = await client.addjob('q', 'job-%s' % i, 5000, replicate=1, retry=0)
expected.add(res)
it = client.getjob_iter('q', nohang=True, count=2)
results = set()
async for jobs in it:
results.update(job.id for job in jobs)
assert results == expected
......
......@@ -31,9 +31,7 @@ async def test_put(node, event_loop):
client = Disque(node.port, loop=event_loop)
queue = JobsQueue('q', client, loop=event_loop)
await queue.put('job')
response = await client.getjob('q')
assert len(response) == 1
job = response[0]
job = await client.getjob('q')
assert hasattr(job, 'id')
assert hasattr(job, 'body')
assert hasattr(job, 'body')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment