Commit 3fcfb45a authored by Xavier Barbosa's avatar Xavier Barbosa

added iterators

parent e36012a0
This diff is collapsed.
from collections import deque
class Iterator:
def __init__(self, func, func_args, func_kwargs):
self.cursor = 0
self.buffer = deque()
self.state = 'waiting'
self.func = func
self.func_args = func_args
self.func_kwargs = func_kwargs
async def __aiter__(self):
self.cursor = 0
self.state == 'waiting'
return self
async def __anext__(self):
if not self.buffer:
await self.fetch_data()
if self.buffer:
return self.buffer.popleft()
raise StopAsyncIteration()
async def fetch_data(self):
if self.state != 'finished':
self.cursor, data = await self.func(self.cursor,
*self.func_args,
**self.func_kwargs)
self.state = 'finished' if self.cursor == 0 else 'running'
self.buffer.extend(data)
class JScanIterator(Iterator):
def __init__(self, client, *,
states=None, count=None, queue=None, reply=None):
func = client.jscan
func_args = states or []
func_kwargs = {
'count': count,
'queue': queue,
'reply': reply
}
super().__init__(func, func_args, func_kwargs)
class QScanIterator(Iterator):
def __init__(self, client, *,
count=None, minlen=None, maxlen=None, import_rate=None):
func = client.qscan
func_args = []
func_kwargs = {
'count': count,
'minlen': minlen,
'maxlen': maxlen,
'import_rate': import_rate
}
super().__init__(func, func_args, func_kwargs)
......@@ -170,3 +170,126 @@ async def test_qpeek_negative(node, event_loop):
assert len(response) == 2
assert response[0].body == 'baz'
assert response[1].body == 'bar'
@pytest.mark.asyncio
async def test_pause(node, event_loop):
client = Disque(node.port, loop=event_loop)
response = await client.pause('q', 'state')
assert response == 'none'
await client.addjob('q', 'foo')
response = await client.pause('q', 'all')
assert response == 'all'
with pytest.raises(ConnectionError):
await client.addjob('q', 'bar')
response = await client.pause('q', 'out', 'in')
assert response == 'all'
with pytest.raises(ConnectionError):
await client.addjob('q', 'baz')
response = await client.pause('q', 'in')
assert response == 'in'
with pytest.raises(ConnectionError):
await client.addjob('q', 'qux')
@pytest.mark.asyncio
async def test_show_job(node, event_loop):
client = Disque(node.port, loop=event_loop)
job_id = await client.addjob('q', 'bar')
response = await client.show(job_id)
assert hasattr(response, 'id')
assert hasattr(response, 'queue')
assert hasattr(response, 'state')
assert hasattr(response, 'repl')
assert hasattr(response, 'ttl')
assert hasattr(response, 'ctime')
assert hasattr(response, 'delay')
assert hasattr(response, 'retry')
assert hasattr(response, 'nacks')
assert hasattr(response, 'additional_deliveries')
assert hasattr(response, 'nodes_delivered')
assert hasattr(response, 'nodes_confirmed')
assert hasattr(response, 'next_requeue_within')
assert hasattr(response, 'next_awake_within')
assert hasattr(response, 'body')
assert response.id == job_id
with pytest.raises(ConnectionError):
await client.show('foobar')
@pytest.mark.asyncio
async def test_delete_job(node, event_loop):
client = Disque(node.port, loop=event_loop)
job_id = await client.addjob('q', 'bar')
response = await client.deljob(job_id)
assert response == 1
response = await client.deljob(job_id)
assert response == 0
@pytest.mark.asyncio
async def test_enqueue(node, event_loop):
client = Disque(node.port, loop=event_loop)
job_id = await client.addjob('q', 'foo')
response = await client.dequeue(job_id)
assert response == 1
response = await client.dequeue(job_id)
assert response == 0
response = await client.enqueue(job_id)
assert response == 1
response = await client.enqueue(job_id)
assert response == 0
@pytest.mark.asyncio
async def test_jscan(node, event_loop):
client = Disque(node.port, loop=event_loop)
jobs = set()
for i in range(0, 512):
job_id = await client.addjob('q', i, 5000, replicate=1, retry=0)
jobs.add(job_id)
found_jobs, cursor = set(), 0
while True:
cursor, items = await client.jscan(cursor, busyloop=True, count=128)
found_jobs.update(items)
if not cursor:
break
assert found_jobs == jobs
@pytest.mark.asyncio
async def test_jscan_all(node, event_loop):
client = Disque(node.port, loop=event_loop)
job_id = await client.addjob('q', 'j', 5000, replicate=1, retry=0)
response = await client.jscan(busyloop=True, reply='all')
assert len(response.items) == 1
job = response.items[0]
assert hasattr(job, 'id')
assert hasattr(job, 'queue')
assert hasattr(job, 'body')
assert job.id == job_id
@pytest.mark.asyncio
async def test_qscan(node, event_loop):
client = Disque(node.port, loop=event_loop)
queues = set()
for i in range(0, 512):
queue = 'queue-%s' % i
queues.add(queue)
await client.addjob(queue, 'job', 5000, replicate=1, retry=0)
found_queues, cursor = set(), 0
while True:
cursor, items = await client.qscan(cursor, count=128)
found_queues.update(items)
if not cursor:
break
assert found_queues == queues
import pytest
from aiodisque import Disque
from aiodisque.iterators import JScanIterator, QScanIterator
@pytest.mark.asyncio
async def test_queues(node, event_loop):
client = Disque(node.port, loop=event_loop)
queues = set()
for i in range(0, 512):
queue = 'queue-%s' % i
queues.add(queue)
await client.addjob(queue, 'job', 5000, replicate=1, retry=0)
found_queues = set()
it = client.qscan_iterator(count=128)
async for queue in it:
found_queues.add(queue)
assert found_queues == queues
assert isinstance(it, QScanIterator)
@pytest.mark.asyncio
async def test_jobs(node, event_loop):
client = Disque(node.port, loop=event_loop)
jobs = set()
for i in range(0, 512):
job_id = await client.addjob('q', i, 5000, replicate=1, retry=0)
jobs.add(job_id)
found_jobs = set()
it = client.jscan_iterator(count=128)
async for queue in it:
found_jobs.add(queue)
assert found_jobs == jobs
assert isinstance(it, JScanIterator)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment