Commit 472a4064 authored by Xavier Barbosa's avatar Xavier Barbosa

first commit

parents
Pipeline #508 failed with stages
aiodisque/_version.py export-subst
# Created by https://www.gitignore.io/api/python
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
---
Prepare container:
stage: build
script:
- docker build -t errorist/aiodisque:latest .
tags:
- docker-build
Test:
stage: test
image: errorist/aiodisque:latest
script:
- python -m pip install -e .
- python -m pip install -r requirements-test.txt
- py.test tests/ --cov aiodisque --cov-report term-missing --flake8
tags:
- python3.5
FROM errorist/py:3.5
MAINTAINER Xavier Barbosa <clint.northwood@gmail.com>
ENV DISQUE_PORT 7777
RUN mkdir -p /usr/local/bin
ADD dockerfiles/disque /usr/local/bin/disque
ADD dockerfiles/disque-server /usr/local/bin/disque-server
ADD dockerfiles/disque.conf /etc/disque.conf
RUN chmod +x /usr/local/bin/disque
RUN chmod +x /usr/local/bin/disque-server
CMD ["/usr/local/bin/disque-server", "/etc/disque.conf", "--port", "$DISQUE_PORT"]
include versioneer.py
include aiodisque/_version.py
AIO Disque
==========
Asyncio client form disque_.
Installation::
python -m pip install -e .
Usage::
from aiodisque import Disque
client = Disque()
job_id = await client.sendjob('queue', 'body')
.. _disque: https://github.com/antirez/disque
from .client import *
from .connections import *
__all__ = client.__all__ + connections.__all__
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions
This diff is collapsed.
This diff is collapsed.
import asyncio
import hiredis
import logging
from .util import parse_address, encode_command
__all__ = ['Connection', 'ConnectionError']
def parser():
return hiredis.Reader(protocolError=ProtocolError,
replyError=ConnectionError,
encoding='utf-8')
class ConnectionError(RuntimeError):
pass
class ProtocolError(ConnectionError):
pass
class Connection:
def __init__(self, address, *, loop=None):
self.address = parse_address(address, port=7711)
self.loop = loop
self.reader = None
self.writer = None
self.parser = parser()
self.connected = False
async def send_command(self, *args):
await self.connect()
message = encode_command(*args)
logging.debug("REQ", message)
self.writer.write(message)
data = await self.reader.read(65536)
self.parser.feed(data)
logging.debug("RES", data)
response = self.parser.gets()
if isinstance(response, ProtocolError):
self.parser = parser()
raise response
if isinstance(response, Exception):
raise response
return response
async def connect(self):
if not self.connected:
reader, writer = await asyncio.open_connection(*self.address,
loop=self.loop)
self.reader = reader
self.writer = writer
self.connected = True
from itertools import zip_longest
__all__ = ['parse_address', 'encode_command']
def grouper(n, iterable, fillvalue=None):
"grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(fillvalue=fillvalue, *args)
def parse_address(address, *, host=None, port=None):
host, port = host or 'localhost', port
if isinstance(address, (list, tuple)):
host, port = address
if isinstance(address, int):
port = address
elif isinstance(address, str):
if ':' in address:
a, _, b = address.partition(':')
host = a or host
port = b or port
elif address.isdigit():
port = int(address)
else:
host = address or host
return host, int(port) if port else None
_converters = {
bytes: lambda val: val,
bytearray: lambda val: val,
str: lambda val: val.encode('utf-8'),
int: lambda val: str(val).encode('utf-8'),
float: lambda val: str(val).encode('utf-8'),
}
def _bytes_len(sized):
return str(len(sized)).encode('utf-8')
def encode_command(*args):
"""Encodes arguments into redis bulk-strings array.
Raises TypeError if any of args not of bytes, str, int or float type.
"""
buf = bytearray()
def add(data):
return buf.extend(data + b'\r\n')
add(b'*' + _bytes_len(args))
for arg in args:
if type(arg) in _converters:
barg = _converters[type(arg)](arg)
add(b'$' + _bytes_len(barg))
add(barg)
continue
raise TypeError("Argument {!r} expected to be of bytes,"
" str, int or float type".format(arg))
return buf
This diff is collapsed.
47085f92d651661eab5409c84772c786e82acaaa :0 myself 0 0 connected
pytest
pytest-asyncio
pytest-cov
pytest-flake8
[versioneer]
VCS = git
style = pep440
versionfile_source = aiodisque/_version.py
versionfile_build = aiodisque/_version.py
tag_prefix = v
[metadata]
description-file = README.rst
[flake8]
exclude = _version.py
ignore = F403
max-complexity = 10
max-line-length = 80
[wheel]
universal = 1
[coverage:run]
omit =
aiodisque/_version.py
[coverage:report]
exclude_lines =
pragma: no cover
def __repr__
if self.debug:
if settings.DEBUG
raise AssertionError
raise NotImplementedError
if 0:
if __name__ == .__main__.:
[pytest]
flake8-ignore =
tests/*.py ALL
#!/usr/bin/env python
from setuptools import setup
import versioneer
setup(
name='aiodisque',
version=versioneer.get_version(),
install_requires=[
'hiredis'
],
cmdclass=versioneer.get_cmdclass()
)
from pytest import fixture
from tempfile import TemporaryDirectory
from subprocess import Popen, PIPE, run
from time import sleep
class Configuration:
def __init__(self, **opts):
for k, v in opts.items():
setattr(self, k, v)
class DisqueNode:
def __init__(self, port, dir):
self.port = port
self.dir = dir
self.proc = None
def start(self):
if not self.proc:
cmd = ["disque-server",
"--port", str(self.port),
"--dir", self.dir]
self.proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
cmd = ['disque', '-p', str(self.port), 'info']
while True:
sleep(.01)
if self.proc.poll():
raise Exception('already stopped!', self.proc.stderr)
resp = run(cmd, stdout=PIPE, stderr=PIPE)
if not resp.returncode:
break
def stop(self):
self.proc.kill()
self.proc = None
@property
def configuration(self):
return Configuration(port=self.port, dir=self.dir)
@fixture(scope='function')
def node(request):
tmp_dir = TemporaryDirectory()
node = DisqueNode(port=7711, dir=tmp_dir.name)
node.start()
def teardown():
node.stop()
tmp_dir.cleanup()
request.addfinalizer(teardown)
return node.configuration
import pytest
from aiodisque import Disque, ConnectionError
@pytest.mark.asyncio
async def test_hello(node, event_loop):
client = Disque(node.port, loop=event_loop)
response = await client.hello()
assert isinstance(response, dict)
assert 'format' in response
assert 'nodes' in response
assert 'id' in response
@pytest.mark.asyncio
async def test_info(node, event_loop):
client = Disque(node.port, loop=event_loop)
response = await client.info()
assert isinstance(response, dict)
assert 'registered_jobs' in response
@pytest.mark.asyncio
async def test_qstat_empty(node, event_loop):
client = Disque(node.port, loop=event_loop)
response = await client.qstat('foo')
assert response is None
response = await client.qlen('foo')
assert response == 0
@pytest.mark.asyncio
async def test_qstat_notempty(node, event_loop):
client = Disque(node.port, loop=event_loop)
await client.addjob('foo', 'bar')
response = await client.qstat('foo')
assert isinstance(response, dict)
assert response['name'] == 'foo'
response = await client.qlen('foo')
assert response == 1
@pytest.mark.asyncio
async def test_job(node, event_loop):
client = Disque(node.port, loop=event_loop)
job_id = await client.addjob('foo', 'bar')
assert job_id.startswith('D-')
response = await client.getjob('foo')
assert isinstance(response, list)
job = response[0]
assert hasattr(job, 'id')
assert hasattr(job, 'body')
assert hasattr(job, 'body')
assert hasattr(job, 'queue')
assert not hasattr(job, 'nacks')
assert not hasattr(job, 'additional_deliveries')
assert job.id == job_id
result = await client.ackjob(job)
assert result == 1
result = await client.ackjob(job)
assert result == 0
result = await client.nack(job)
assert result == 0
@pytest.mark.asyncio
async def test_job_fastack(node, event_loop):
client = Disque(node.port, loop=event_loop)
job_id = await client.addjob('foo', 'bar')
assert job_id.startswith('D-')
response = await client.getjob('foo')
assert isinstance(response, list)
job = response[0]
assert hasattr(job, 'id')
assert hasattr(job, 'body')
assert hasattr(job, 'body')
assert hasattr(job, 'queue')
assert not hasattr(job, 'nacks')
assert not hasattr(job, 'additional_deliveries')
assert job.id == job_id
result = await client.fastack(job)
assert result == 1
result = await client.fastack(job)
assert result == 0
result = await client.nack(job)
assert result == 0
@pytest.mark.asyncio
async def test_job_working(node, event_loop):
client = Disque(node.port, loop=event_loop)
job_id = await client.addjob('foo', 'bar')
assert job_id.startswith('D-')
response = await client.getjob('foo')
assert isinstance(response, list)
job = response[0]
assert hasattr(job, 'id')
assert hasattr(job, 'body')
assert hasattr(job, 'body')
assert hasattr(job, 'queue')
assert not hasattr(job, 'nacks')
assert not hasattr(job, 'additional_deliveries')
assert job.id == job_id
result = await client.working(job)
assert result == 300
result = await client.ackjob(job)
assert result == 1
with pytest.raises(ConnectionError):
await client.working(job)
result = await client.nack(job)
assert result == 0
with pytest.raises(ConnectionError):
await client.working(job)
@pytest.mark.asyncio
async def test_job_withcounters(node, event_loop):
client = Disque(node.port, loop=event_loop)
job_id = await client.addjob('foo', 'bar')
assert job_id.startswith('D-')
response = await client.getjob('foo', withcounters=True)
assert isinstance(response, list)
job = response[0]
assert hasattr(job, 'id')
assert hasattr(job, 'body')
assert hasattr(job, 'queue')
assert hasattr(job, 'nacks')
assert hasattr(job, 'additional_deliveries')
assert job.id == job_id
result = await client.ackjob(job)
assert result == 1
result = await client.ackjob(job)
assert result == 0
@pytest.mark.asyncio
async def test_qpeek_positive(node, event_loop):
client = Disque(node.port, loop=event_loop)
await client.addjob('q', 'foo')
await client.addjob('q', 'bar')
await client.addjob('q', 'baz')
response = await client.qpeek('q', 2)
assert len(response) == 2
assert response[0].body == 'foo'
assert response[1].body == 'bar'
@pytest.mark.asyncio
async def test_qpeek_negative(node, event_loop):
client = Disque(node.port, loop=event_loop)
await client.addjob('q', 'foo')
await client.addjob('q', 'bar')
await client.addjob('q', 'baz')
response = await client.qpeek('q', -2)
assert len(response) == 2
assert response[0].body == 'baz'
assert response[1].body == 'bar'
from aiodisque.util import parse_address
def test_address():
assert parse_address('1237.0.0.1:') == ('1237.0.0.1', None)
assert parse_address(('1237.0.0.1', None)) == ('1237.0.0.1', None)
assert parse_address(['1237.0.0.1', None]) == ('1237.0.0.1', None)
assert parse_address(':') == ('localhost', None)
assert parse_address(':12') == ('localhost', 12)
assert parse_address('12') == ('localhost', 12)
assert parse_address('errorist.xyz') == ('errorist.xyz', None)
assert parse_address(12) == ('localhost', 12)
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment