Commit 6b631ce3 authored by Xavier Barbosa's avatar Xavier Barbosa

implements http api

parents
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
AIO NSQ
=======
Implements nsq_ client for asyncio.
Installation
------------
::
python -m pip install aionsq
Usage
-----
Connect directly to a nsq daemon:
.. code-block:: python
from aionsq import NSQConsumer
nsq = NSQConsumer('tcp://120.0.0.1:4567')
@nsq.subscribe('test')
def consumer(msg):
pass
yield from nsq.start()
Or thru a nsq lookup:
.. code-block:: python
from aionsq import NSQConsumer
nsq = NSQConsumer(lookup='http://120.0.0.1:4567')
@nsq.subscribe('test')
def consumer(msg):
pass
yield from nsq.start()
Request nsq lookup:
.. code-block:: python
from aionsq import NSQLookup
lookup = NSQLookup('http://120.0.0.1:4567')
info = yield from lookup.info()
Publish message to a nsq:
.. code-block:: python
from aionsq import NSQClient
client = NSQClient('http://120.0.0.1:4567')
yield from client.publish('test', 'hello world 2')
.. _nsq: http://nsq.io
from .client import *
from .connection import *
from .lookup import *
from .protocols import *
__all__ = (client.__all__
+ connection.__all__
+ lookup.__all__
+ protocols.__all__)
import asyncio
import struct
from .requests import RequestHandler, ok, exc, unwrapped_json
from collections import defaultdict
__all__ = ['NSQConsumer', 'NSQClient']
class NSQConsumer:
def __init__(self, addr=None, lookup=None, *, loop=None):
if addr and lookup:
raise ValueError('addr and lookup are mutually exclusive')
self.req_handler = RequestHandler(addr)
self.lookup = lookup
self.loop = loop or asyncio.get_event_loop()
self.consumers = defaultdict(set)
def subscribe(self, topic, *, func=None):
"""Subcribe to a topic. can be used a decorator.
"""
if func is None:
return lambda x: self.subscribe(topic, func=x)
self.consumers[topic].add(func)
return func
def unsubscribe(self, topic, *, func):
"""Subcribe to a topic. can be used a decorator.
"""
self.consumers[topic].remove(func)
return func
@asyncio.coroutine
def start(self):
raise NotImplementedError()
@asyncio.coroutine
def stop(self):
raise NotImplementedError()
class NSQClient:
def __init__(self, addr, *, loop=None):
self.req_handler = RequestHandler(addr)
self.loop = loop or asyncio.get_event_loop()
@asyncio.coroutine
def stats(self):
"""Return internal statistics.
"""
params = {'format': 'json'}
path = '/stats'
response = yield from self.req_handler('GET', path, params=params)
result = yield from response.json()
return result['data']
@asyncio.coroutine
def ping(self):
"""Monitoring endpoint.
.. note::
The only “unhealthy” state is if nsqd failed to write messages
to disk when overflow occurred.
"""
path = '/ping'
response = yield from self.req_handler('GET', path)
result = yield from ok(response)
return result
@asyncio.coroutine
def info(self):
"""Version information.
"""
path = '/info'
response = yield from self.req_handler('GET', path)
result = yield from unwrapped_json(response)
return result
@asyncio.coroutine
def pub(self, topic, message):
"""Publish a message.
Parameters:
topic (str): the topic to publish to
message (str): raw message bytes
"""
params = {'topic': topic}
path = '/pub'
data = message.encode('utf-8')
response = yield from self.req_handler('POST', path,
params=params, data=data)
result = yield from ok(response)
return result
@asyncio.coroutine
def mpub(self, topic, messages):
"""Publish multiple messages in one roundtrip.
Parameters:
topic (str): the topic to publish to
messages (list): raw message bytes
"""
assert messages, 'messages are required'
params = {'topic': topic, 'binary': True}
path = '/mpub'
data = struct.pack('>l', len(messages))
for message in messages:
data += struct.pack('>l', len(message)) + message.encode('utf-8')
response = yield from self.req_handler('POST', path,
params=params, data=data)
result = yield from ok(response)
return result
@asyncio.coroutine
def create_topic(self, topic):
"""Create a topic.
Parameters:
topic (str): the topic to create
"""
params = {'topic': topic}
path = '/topic/create'
response = yield from self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
return True
yield from exc(response)
@asyncio.coroutine
def delete_topic(self, topic):
"""Delete an existing topic (and all channels).
Parameters:
topic (str): the existing topic to delete
"""
params = {'topic': topic}
path = '/topic/delete'
response = yield from self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
return True
yield from exc(response)
@asyncio.coroutine
def create_channel(self, topic, channel):
"""Create a topic.
Parameters:
topic (str): the existing topic
channel (str): the channel to create
"""
params = {'topic': topic, 'channel': channel}
path = '/channel/create'
response = yield from self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
return True
yield from exc(response)
@asyncio.coroutine
def delete_channel(self, topic, channel):
"""Delete an existing channel on an existing topic.
Parameters:
topic (str): the existing topic to delete
channel (str): the existing channel to delete
"""
params = {'topic': topic, 'channel': channel}
path = '/channel/delete'
response = yield from self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
return True
yield from exc(response)
@asyncio.coroutine
def empty_topic(self, topic):
"""Empty all the queued messages (in-memory and disk)
for an existing topic.
Parameters:
topic (str): the existing topic to empty
"""
params = {'topic': topic}
path = '/topic/empty'
response = yield from self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
return True
yield from exc(response)
@asyncio.coroutine
def empty_channel(self, topic, channel):
"""Empty all the queued messages (in-memory and disk)
for an existing topic.
Parameters:
topic (str): the existing topic
channel (str): the existing channel to empty
"""
params = {'topic': topic, 'channel': channel}
path = '/channel/empty'
response = yield from self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
return True
yield from exc(response)
@asyncio.coroutine
def pause_topic(self, topic):
"""Pause message flow to all channels on an existing topic
(messages will queue at topic).
Parameters:
topic (str): the existing topic
"""
params = {'topic': topic}
path = '/topic/pause'
response = yield from self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
return True
yield from exc(response)
@asyncio.coroutine
def unpause_topic(self, topic):
"""Resume message flow to channels of an existing, paused, topic.
Parameters:
topic (str): the existing topic
"""
params = {'topic': topic}
path = '/topic/unpause'
response = yield from self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
return True
yield from exc(response)
@asyncio.coroutine
def pause_channel(self, topic, channel):
"""Pause message flow to consumers of an existing channel
(messages will queue at channel).
Parameters:
topic (str): the existing topic
channel (str): the existing channel to pause
"""
params = {'topic': topic, 'channel': channel}
path = '/channel/pause'
response = yield from self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
return True
yield from exc(response)
@asyncio.coroutine
def unpause_channel(self, topic, channel):
"""Resume message flow to consumers of an existing, paused, channel.
Parameters:
topic (str): the existing topic
channel (str): the existing channel to pause
"""
params = {'topic': topic, 'channel': channel}
path = '/channel/unpause'
response = yield from self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
return True
yield from exc(response)
import asyncio
__all__ = ['connect']
@asyncio.coroutine
def connect(addr):
pass
from aiohttp.web import HTTPNotFound, HTTPInternalServerError
class TopicNotFound(RuntimeError):
pass
class ChannelNotFound(RuntimeError):
pass
(HTTPNotFound, HTTPInternalServerError)
import asyncio
from .requests import RequestHandler, unwrapped_json, ok
__all__ = ['NSQLookup']
class NSQLookup:
def __init__(self, addr):
self.req_handler = RequestHandler(addr)
@asyncio.coroutine
def lookup(self, *, topic):
"""Returns a list of producers for a topic.
Parameters:
topic (str): the topic to list producers for
"""
params = {'topic': topic}
path = '/lookup'
response = yield from self.req_handler('GET', path, params=params)
result = yield from unwrapped_json(response)
return result
@asyncio.coroutine
def topics(self):
"""Returns a list of all known topics.
"""
path = '/topics'
response = yield from self.req_handler('GET', path)
result = yield from unwrapped_json(response)
return result
@asyncio.coroutine
def channels(self, *, topic):
"""Returns a list of all known channels of a topic.
Parameters:
topic (str): the topic to list channels for
"""
params = {'topic': topic}
path = '/channels'
response = yield from self.req_handler('GET', path, params=params)
result = yield from unwrapped_json(response)
return result
@asyncio.coroutine
def nodes(self):
"""Returns a list of all known nsqd.
"""
path = '/nodes'
response = yield from self.req_handler('GET', path)
result = yield from unwrapped_json(response)
return result
@asyncio.coroutine
def delete_topic(self, *, topic):
"""Deletes an existing topic.
Parameters:
topic (str): the existing topic to delete
"""
params = {'topic': topic}
path = '/delete_topic'
response = yield from self.req_handler('GET', path, params=params)
result = yield from response.json()
return result
@asyncio.coroutine
def delete_channel(self, *, topic, channel):
"""Deletes an existing channel of an existing topic.
Parameters:
topic (str): the existing topic
channel (str): the existing channel to delete
"""
params = {'topic': topic,
'channel': channel}
path = '/delete_channel'
response = yield from self.req_handler('GET', path, params=params)
result = yield from response.json()
return result
@asyncio.coroutine
def tombstone_topic_producer(self, *, topic, node):
"""Tombstones a specific producer of an existing topic.
Parameters:
topic (str): the existing topic
node (str): the producer (nsqd) to tombstone
(identified by <broadcast_address>:<http_port>)
"""
params = {'topic': topic,
'node': node}
path = '/tombstone_topic_producer'
response = yield from self.req_handler('GET', path, params=params)
result = yield from response.json()
return result
@asyncio.coroutine
def ping(self):
"""Monitoring endpoint, should return OK.
"""
path = '/ping'
response = yield from self.req_handler('GET', path)
result = yield from ok(response)
return result
@asyncio.coroutine
def info(self):
"""Returns version information.
"""
path = '/info'
response = yield from self.req_handler('GET', path)
result = yield from unwrapped_json(response)
return result
def __repr__(self):
return '<%s(addr=%r)>' % (
self.__class__.__name__,
self.req_handler.addr
)
import asyncio
__all__ = ['NSQProtocol']
class NSQProtocol(asyncio.Protocol):
def __init__(self, message, loop):
self.message = message
self.loop = loop
def connection_made(self, transport):
transport.write(self.message.encode())
print('Data sent: {!r}'.format(self.message))
def data_received(self, data):
print('Data received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
print('Stop the event lop')
self.loop.stop()
import asyncio
import atexit
from aiohttp import ClientSession, TCPConnector
from aionsq.exceptions import TopicNotFound, ChannelNotFound
from aionsq.exceptions import HTTPInternalServerError, HTTPNotFound
__all__ = ['RequestHandler']
class RequestHandler:
def __init__(self, addr):
self.addr = addr
connector = TCPConnector(verify_ssl=False)
self.session = session = ClientSession(connector=connector)
atexit.register(session.close)
@asyncio.coroutine
def request(self, method, path, **kwargs):
url = '%s%s' % (self.addr, path)
response = yield from self.session.request(method, url, **kwargs)
return response
__call__ = request
@asyncio.coroutine
def exc(response):
if response.status == 404:
result = yield from response.json()
if result == {'message': 'TOPIC_NOT_FOUND'}:
raise TopicNotFound()
if result == {'message': 'CHANNEL_NOT_FOUND'}:
raise ChannelNotFound()
raise HTTPNotFound(reason=result)
result = yield from response.text()
raise Exception(result)
@asyncio.coroutine
def ok(response):
"""Response must be 200 and 'OK'
"""
if response.status == 200:
result = yield from response.text()
return result in ('OK', '')
if response.status == 500:
result = yield from response.text()
raise Exception(result)
raise NotImplementedError()
@asyncio.coroutine
def unwrapped_json(response):
print(response.headers)
if response.status == 200:
result = yield from response.json()
return result['data']
if response.status == 500:
result = yield from response.json()
raise HTTPInternalServerError(reason=result['status_txt'])
raise NotImplementedError()
pytest
pytest-asynci
pytest-cov
pytest-flake8
json-spec
This source diff could not be displayed because it is too large. You can view the blob instead.
[flake8]
ignore = F403
max-complexity = 10
max-line-length = 80
#!/usr/bin/env python
from setuptools import setup, find_packages, Command
class PyTest(Command):
user_options = []
def initialize_options(self):
pass
def finalize_options(self):
pass
def run(self):
import subprocess
import sys
errno = subprocess.call([sys.executable,
'runtests.py',
'tests/',
'--cov', 'aionsq',
'--cov-report', 'html'])
raise SystemExit(errno)
setup(
name='aionsq',
packages=find_packages(),
install_requires=[
'aiohttp'
],
cmdclass = {'test': PyTest}
)
import pytest
from aionsq.client import NSQClient
from aionsq.exceptions import ChannelNotFound, TopicNotFound
@pytest.mark.asyncio
def test_ping():
client = NSQClient('http://127.0.0.1:4151')
data = yield from client.ping()
assert data is True
@pytest.mark.asyncio
def test_info():
client = NSQClient('http://127.0.0.1:4151')
data = yield from client.info()
assert data == {'version': '0.3.5'}
@pytest.mark.asyncio
def test_topic():
client = NSQClient('http://127.0.0.1:4151')
yield from client.create_topic('foobar')
yield from client.empty_topic('foobar')
yield from client.pause_topic('foobar')
yield from client.unpause_topic('foobar')
yield from client.delete_topic('foobar')
with pytest.raises(TopicNotFound):
yield from client.empty_topic('xxxx')
with pytest.raises(TopicNotFound):
yield from client.pause_topic('xxx')
with pytest.raises(TopicNotFound):
yield from client.unpause_topic('xxx')
@pytest.mark.asyncio
def test_channel():
client = NSQClient('http://127.0.0.1:4151')
yield from client.create_topic('foobar')
yield from client.create_channel('foobar', 'baz')
yield from client.empty_channel('foobar', 'baz')
yield from client.pause_channel('foobar', 'baz')
yield from client.unpause_channel('foobar', 'baz')
yield from client.delete_channel('foobar', 'baz')
with pytest.raises(ChannelNotFound):
yield from client.empty_channel('foobar', 'qux')
with pytest.raises(ChannelNotFound):
yield from client.pause_channel('foobar', 'baz')
with pytest.raises(ChannelNotFound):
yield from client.unpause_channel('foobar', 'baz')
with pytest.raises(ChannelNotFound):
yield from client.delete_channel('foobar', 'qux')
yield from client.delete_topic('foobar')
with pytest.raises(TopicNotFound):
yield from client.create_channel('foobar', 'qux')
with pytest.raises(TopicNotFound):
yield from client.empty_channel('foobar', 'qux')
with pytest.raises(TopicNotFound):
yield from client.pause_channel('foobar', 'qux')