Commit cc7c719a authored by xa's avatar xa

Merge branch 'py3.5' into 'master'

Implement async/await



See merge request !1
parents 0bd4448f 476ee659
Pipeline #479 passed with stages
aionsq/_version.py export-subst
---
before_script:
- apt install -y curl libsnappy-dev
- curl -O https://s3.amazonaws.com/bitly-downloads/nsq/nsq-0.3.5.linux-amd64.go1.4.2.tar.gz
- tar zxvf nsq-0.3.5.linux-amd64.go1.4.2.tar.gz
- python -m pip install -e .
- python -m pip install -r requirements-test.txt
- python -m pip install python-snappy
Prepare container:
stage: build
script:
- docker build -t errorist/aionsq:latest .
tags:
- docker-build
python3.4 tests:
Test:
stage: test
image: errorist/aionsq:latest
script:
- export PATH=$PATH:$PWD/nsq-0.3.5.linux-amd64.go1.4.2/bin
- py.test --cov aionsq --cov-report term-missing tests/
- python -m pip install -e .
- python -m pip install -r requirements-test.txt
- py.test tests/ --cov aionsq --cov-report term-missing --flake8
tags:
- python3.4
- python3.5
FROM errorist/py:3.5
MAINTAINER Xavier Barbosa <clint.northwood@gmail.com>
ENV NSQ_VERSION=0.3.6
ENV GOLANG_VERSION=1.5.1
ENV NSQ_PACKAGE=nsq-${NSQ_VERSION}.linux-amd64.go${GOLANG_VERSION}
RUN apk add --no-cache curl python3-dev snappy-dev tar
RUN mkdir -p /usr/local/bin
RUN curl -O https://s3.amazonaws.com/bitly-downloads/nsq/${NSQ_PACKAGE}.tar.gz
RUN tar zxvf ${NSQ_PACKAGE}.tar.gz -C /usr/local/bin --strip-components 2 --owner=0
RUN python -m pip install python-snappy
include versioneer.py
include aionsq/_version.py
......@@ -21,7 +21,7 @@ Publish message to a nsq:
from aionsq.http import NSQWriter
writer = NSQWriter('http://120.0.0.1:4567')
yield from writer.publish('test', 'hello world 2')
await writer.publish('test', 'hello world 2')
Connect directly to a nsq daemon:
......@@ -31,10 +31,10 @@ Connect directly to a nsq daemon:
reader = NSQReader('http://120.0.0.1:4567', 'topic1', 'chan1')
@reader.subscribe('test')
def consumer(msg):
async def consumer(msg):
msg.success()
yield from reader.start()
await reader.start()
Request nsq lookup:
......@@ -42,7 +42,7 @@ Request nsq lookup:
from aionsq import NSQLookup
lookup = NSQLookup('http://120.0.0.1:4567')
info = yield from lookup.info()
info = await lookup.info()
Start an authd::
......
......@@ -7,4 +7,7 @@ __all__ = (client.__all__
+ http.__all__
+ lookup.__all__
+ tcp.__all__)
__version__ = '0.1'
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions
This diff is collapsed.
import asyncio
import json
import logging
from aiohttp import web
......@@ -13,12 +12,10 @@ class Application(web.Application):
self.log = logging.getLogger(__name__)
self.db = db
@asyncio.coroutine
def ping_controller(self, request):
async def ping_controller(self, request):
return web.Response(body=b'OK')
@asyncio.coroutine
def auth_controller(self, request):
async def auth_controller(self, request):
self.log.info('incoming %s %s', request, request.GET)
try:
remote_ip = request.GET['remote_ip']
......@@ -28,7 +25,7 @@ class Application(web.Application):
raise web.HTTPBadRequest()
try:
response = yield from self.db.match(remote_ip, secret, tls)
response = await self.db.match(remote_ip, secret, tls)
except Exception as error:
raise web.HTTPForbidden() from error
body = json.dumps(response).encode()
......
......@@ -27,28 +27,26 @@ class NSQ:
else:
raise ValueError('addr or lookup required')
@asyncio.coroutine
def publish(self, topic, message):
async def publish(self, topic, message):
"""Publish a message.
Parameters:
topic (str): the topic to publish to
message (str): raw message bytes
"""
client = yield from self.adapter.writer()
response = yield from client.publish(topic, message)
client = await self.adapter.writer()
response = await client.publish(topic, message)
return response
@asyncio.coroutine
def multi_publish(self, topic, messages):
async def multi_publish(self, topic, messages):
"""Publish multiple messages in one roundtrip.
Parameters:
topic (str): the topic to publish to
messages (list): raw message bytes
"""
client = yield from self.adapter.writer()
response = yield from client.multi_publish(topic, messages)
client = await self.adapter.writer()
response = await client.multi_publish(topic, messages)
return response
def register(self, topic, channel, *, handler=None):
......@@ -60,32 +58,29 @@ class NSQ:
if not handler:
return lambda x: self.register(topic, channel, handler=x)
# TODO is-it really working as expected?
client = yield from self.adapter.reader(topic, channel)
return client.register(handler)
def close(self):
self.adapter.close()
@asyncio.coroutine
def wait_closed(self):
yield from self.adapter.wait_closed()
async def wait_closed(self):
await self.adapter.wait_closed()
class Adapter:
@asyncio.coroutine
def reader(self, topic, channel):
async def reader(self, topic, channel):
raise NotImplementedError()
@asyncio.coroutine
def writer(self):
async def writer(self):
raise NotImplementedError()
def close(self):
raise NotImplementedError()
@asyncio.coroutine
def wait_closed(self):
async def wait_closed(self):
raise NotImplementedError()
......@@ -96,12 +91,10 @@ class HTTPAdapter(Adapter):
self.loop = loop or asyncio.get_event_loop()
self._writer = None
@asyncio.coroutine
def reader(self, topic, channel):
async def reader(self, topic, channel):
raise AttributeError('does not implements this')
@asyncio.coroutine
def writer(self):
async def writer(self):
if not self._writer:
self._writer = NSQClient(self.addr, loop=self.loop)
return self._writer
......@@ -110,10 +103,9 @@ class HTTPAdapter(Adapter):
if self._writer:
self._writer.close()
@asyncio.coroutine
def wait_closed(self):
async def wait_closed(self):
if self._writer:
yield from self._writer.wait_closed()
await self._writer.wait_closed()
class LookupAdapter(Adapter):
......@@ -124,19 +116,17 @@ class LookupAdapter(Adapter):
self.client = NSQLookup(addr, loop=self.loop)
self._writer = None
@asyncio.coroutine
def reader(self, topic, channel):
async def reader(self, topic, channel):
raise Exception()
@asyncio.coroutine
def writer(self):
async def writer(self):
if not self._writer:
nodes = yield from self.client.nodes()
nodes = await self.client.nodes()
node = nodes['producers'][0]
addr = 'tcp://%s:%s' % (node['broadcast_address'], node['tcp_port'])
self.log.info('connect to %s', addr)
writer = NSQWriter(addr, loop=self.loop)
yield from writer.start()
await writer.start()
self._writer = writer
return self._writer
......@@ -144,10 +134,9 @@ class LookupAdapter(Adapter):
if self._writer:
self._writer.close()
@asyncio.coroutine
def wait_closed(self):
async def wait_closed(self):
if self._writer:
yield from self._writer.wait_closed()
await self._writer.wait_closed()
class TCPAdapter(Adapter):
......@@ -158,19 +147,17 @@ class TCPAdapter(Adapter):
self._readers = {}
self._writer = None
@asyncio.coroutine
def reader(self, topic, channel):
async def reader(self, topic, channel):
key = topic, channel
if key not in self._readers:
self._readers[key] = NSQReader(self.addr, loop=self.loop)
yield from self._readers[key].start()
await self._readers[key].start()
return self._readers[key]
@asyncio.coroutine
def writer(self):
async def writer(self):
if not self._writer:
self._writer = NSQWriter(self.addr, loop=self.loop)
yield from self._writer.start()
await self._writer.start()
return self._writer
def close(self):
......@@ -179,9 +166,8 @@ class TCPAdapter(Adapter):
for reader in self._readers.values():
self._writer.close()
@asyncio.coroutine
def wait_closed(self):
async def wait_closed(self):
if self._writer:
yield from self._writer.wait_closed()
await self._writer.wait_closed()
for reader in self._readers.values():
yield from self._writer.wait_closed()
await self._writer.wait_closed()
......@@ -15,9 +15,8 @@ class HTTPHandler:
self.req_handler.close()
self._closed.set()
@asyncio.coroutine
def wait_closed(self):
yield from self._closed.wait()
async def wait_closed(self):
await self._closed.wait()
return True
def __repr__(self):
......@@ -29,18 +28,16 @@ class HTTPHandler:
class NSQClient(HTTPHandler):
@asyncio.coroutine
def stats(self):
async def stats(self):
"""Return internal statistics.
"""
params = {'format': 'json'}
path = '/stats'
response = yield from self.req_handler('GET', path, params=params)
result = yield from response.json()
response = await self.req_handler('GET', path, params=params)
result = await response.json()
return result['data']
@asyncio.coroutine
def ping(self):
async def ping(self):
"""Monitoring endpoint.
.. note::
......@@ -49,21 +46,19 @@ class NSQClient(HTTPHandler):
to disk when overflow occurred.
"""
path = '/ping'
response = yield from self.req_handler('GET', path)
result = yield from ok(response)
response = await self.req_handler('GET', path)
result = await ok(response)
return result
@asyncio.coroutine
def info(self):
async def info(self):
"""Version information.
"""
path = '/info'
response = yield from self.req_handler('GET', path)
result = yield from unwrapped_json(response)
response = await self.req_handler('GET', path)
result = await unwrapped_json(response)
return result
@asyncio.coroutine
def publish(self, topic, message):
async def publish(self, topic, message):
"""Publish a message.
Parameters:
......@@ -73,13 +68,12 @@ class NSQClient(HTTPHandler):
params = {'topic': topic}
path = '/pub'
data = message.encode('utf-8')
response = yield from self.req_handler('POST', path,
params=params, data=data)
result = yield from ok(response)
response = await self.req_handler('POST', path,
params=params, data=data)
result = await ok(response)
return result
@asyncio.coroutine
def multi_publish(self, topic, messages):
async def multi_publish(self, topic, messages):
"""Publish multiple messages in one roundtrip.
Parameters:
......@@ -92,13 +86,12 @@ class NSQClient(HTTPHandler):
data = struct.pack('>l', len(messages))
for message in messages:
data += struct.pack('>l', len(message)) + message.encode('utf-8')
response = yield from self.req_handler('POST', path,
params=params, data=data)
result = yield from ok(response)
response = await self.req_handler('POST', path,
params=params, data=data)
result = await ok(response)
return result
@asyncio.coroutine
def create_topic(self, topic):
async def create_topic(self, topic):
"""Create a topic.
Parameters:
......@@ -106,14 +99,13 @@ class NSQClient(HTTPHandler):
"""
params = {'topic': topic}
path = '/topic/create'
response = yield from self.req_handler('POST', path, params=params)
response = await self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
await response.read()
return True
yield from exc(response)
await exc(response)
@asyncio.coroutine
def delete_topic(self, topic):
async def delete_topic(self, topic):
"""Delete an existing topic (and all channels).
Parameters:
......@@ -121,14 +113,13 @@ class NSQClient(HTTPHandler):
"""
params = {'topic': topic}
path = '/topic/delete'
response = yield from self.req_handler('POST', path, params=params)
response = await self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
await response.read()
return True
yield from exc(response)
await exc(response)
@asyncio.coroutine
def create_channel(self, topic, channel):
async def create_channel(self, topic, channel):
"""Create a topic.
Parameters:
......@@ -137,14 +128,13 @@ class NSQClient(HTTPHandler):
"""
params = {'topic': topic, 'channel': channel}
path = '/channel/create'
response = yield from self.req_handler('POST', path, params=params)
response = await self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
await response.read()
return True
yield from exc(response)
await exc(response)
@asyncio.coroutine
def delete_channel(self, topic, channel):
async def delete_channel(self, topic, channel):
"""Delete an existing channel on an existing topic.
Parameters:
......@@ -153,14 +143,13 @@ class NSQClient(HTTPHandler):
"""
params = {'topic': topic, 'channel': channel}
path = '/channel/delete'
response = yield from self.req_handler('POST', path, params=params)
response = await self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
await response.read()
return True
yield from exc(response)
await exc(response)
@asyncio.coroutine
def empty_topic(self, topic):
async def empty_topic(self, topic):
"""Empty all the queued messages (in-memory and disk)
for an existing topic.
......@@ -169,14 +158,13 @@ class NSQClient(HTTPHandler):
"""
params = {'topic': topic}
path = '/topic/empty'
response = yield from self.req_handler('POST', path, params=params)
response = await self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
await response.read()
return True
yield from exc(response)
await exc(response)
@asyncio.coroutine
def empty_channel(self, topic, channel):
async def empty_channel(self, topic, channel):
"""Empty all the queued messages (in-memory and disk)
for an existing topic.
......@@ -186,14 +174,13 @@ class NSQClient(HTTPHandler):
"""
params = {'topic': topic, 'channel': channel}
path = '/channel/empty'
response = yield from self.req_handler('POST', path, params=params)
response = await self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
await response.read()
return True
yield from exc(response)
await exc(response)
@asyncio.coroutine
def pause_topic(self, topic):
async def pause_topic(self, topic):
"""Pause message flow to all channels on an existing topic
(messages will queue at topic).
......@@ -202,14 +189,13 @@ class NSQClient(HTTPHandler):
"""
params = {'topic': topic}
path = '/topic/pause'
response = yield from self.req_handler('POST', path, params=params)
response = await self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
await response.read()
return True
yield from exc(response)
await exc(response)
@asyncio.coroutine
def unpause_topic(self, topic):
async def unpause_topic(self, topic):
"""Resume message flow to channels of an existing, paused, topic.
Parameters:
......@@ -217,14 +203,13 @@ class NSQClient(HTTPHandler):
"""
params = {'topic': topic}
path = '/topic/unpause'
response = yield from self.req_handler('POST', path, params=params)
response = await self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
await response.read()
return True
yield from exc(response)
await exc(response)
@asyncio.coroutine
def pause_channel(self, topic, channel):
async def pause_channel(self, topic, channel):
"""Pause message flow to consumers of an existing channel
(messages will queue at channel).
......@@ -234,14 +219,13 @@ class NSQClient(HTTPHandler):
"""
params = {'topic': topic, 'channel': channel}
path = '/channel/pause'
response = yield from self.req_handler('POST', path, params=params)
response = await self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
await response.read()
return True
yield from exc(response)
await exc(response)
@asyncio.coroutine
def unpause_channel(self, topic, channel):
async def unpause_channel(self, topic, channel):
"""Resume message flow to consumers of an existing, paused, channel.
Parameters:
......@@ -250,8 +234,8 @@ class NSQClient(HTTPHandler):
"""
params = {'topic': topic, 'channel': channel}
path = '/channel/unpause'
response = yield from self.req_handler('POST', path, params=params)
response = await self.req_handler('POST', path, params=params)
if response.status == 200:
yield from response.read()
await response.read()
return True
yield from exc(response)
await exc(response)
......@@ -16,10 +16,9 @@ class RequestHandler:
self.session = ClientSession(connector=connector, loop=self.loop)
self._closed = False
@asyncio.coroutine
def request(self, method, path, **kwargs):
async def request(self, method, path, **kwargs):
url = '%s%s' % (self.addr, path)
response = yield from self.session.request(method, url, **kwargs)
response = await self.session.request(method, url, **kwargs)
return response
def close(self):
......@@ -31,38 +30,35 @@ class RequestHandler:
__call__ = request
@asyncio.coroutine
def exc(response):
async def exc(response):
if response.status == 404:
result = yield from response.json()
result = await response.json()
if result == {'message': 'TOPIC_NOT_FOUND'}:
raise TopicNotFound()
if result == {'message': 'CHANNEL_NOT_FOUND'}:
raise ChannelNotFound()
raise HTTPNotFound(reason=result)
result = yield from response.text()
result = await response.text()
raise Exception(result)
@asyncio.coroutine
def ok(response):
async def ok(response):
"""Response must be 200 and 'OK'
"""
if response.status == 200:
result = yield from response.text()
result = await response.text()
return result in ('OK', '')
if response.status == 500:
result = yield from response.text()
result = await response.text()
raise Exception(result)
raise NotImplementedError()
@asyncio.coroutine
def unwrapped_json(response):
async def unwrapped_json(response):
if response.status == 200:
result = yield from response.json()
result = await response.json()
return result['data']
if response.status == 500:
result = yield from response.json()
result = await response.json()
raise HTTPInternalServerError(reason=result['status_txt'])
raise NotImplementedError()
import asyncio
from aionsq.http.client import HTTPHandler
from aionsq.http.requests import unwrapped_json, ok
......@@ -7,8 +6,7 @@ __all__ = ['NSQLookup']
class NSQLookup(HTTPHandler):
@asyncio.coroutine
def lookup(self, *, topic):
async def lookup(self, *, topic):
"""Returns a list of producers for a topic.
Parameters:
......@@ -17,21 +15,19 @@ class NSQLookup(HTTPHandler):
"""
params = {'topic': topic}
path = '/lookup'
response = yield from self.req_handler('GET', path, params=params)
result = yield from unwrapped_json(response)
response = await self.req_handler('GET', path, params=params)
result = await unwrapped_json(response)
return result
@asyncio.coroutine
def topics(self):
async def topics(self):
"""Returns a list of all known topics.
"""
path = '/topics'
response = yield from self.req_handler('GET', path)
result = yield from unwrapped_json(response)
response = await self.req_handler('GET', path)
result = await unwrapped_json(response)
return result
@asyncio.coroutine
def channels(self, *, topic):
async def channels(self, *, topic):
"""Returns a list of all known channels of a topic.
Parameters:
......@@ -39,21 +35,19 @@ class NSQLookup(HTTPHandler):
"""
params = {'topic': topic}
path = '/channels'
response = yield from self.req_handler('GET', path, params=params)
result = yield from unwrapped_json(response)
response = await self.req_handler('GET', path, params=params)
result = await unwrapped_json(response)
return result
@asyncio.coroutine
def nodes(self):
async def nodes(self):
"""Returns a list of all known nsqd.
"""
path = '/nodes'
response = yield from self.req_handler('GET', path)
result = yield from unwrapped_json(response)
response = await self.req_handler('GET', path)
result = await unwrapped_json(response)
return result
@asyncio.coroutine
def delete_topic(self, *, topic):
async def delete_topic(self, *, topic):
"""Deletes an existing topic.
Parameters:
......@@ -61,12 +55,11 @@ class NSQLookup(HTTPHandler):
"""
params = {'topic': topic}
path = '/delete_topic'
response = yield from self.req_handler('GET', path, params=params)
result = yield from response.json()
response = await self.req_handler('GET', path, params=params)
result = await response.json()
return result
@asyncio.coroutine
def delete_channel(self, *, topic, channel):
async def delete_channel(self, *, topic, channel):
"""Deletes an existing channel of an existing topic.
Parameters:
......@@ -76,12 +69,11 @@ class NSQLookup(HTTPHandler):
params = {'topic': topic,
'channel': channel}
path = '/delete_channel'
response = yield from self.req_handler('GET', path, params=params)
result = yield from response.json()
response = await self.req_handler('GET', path, params=params)
result = await response.json()
return result
@asyncio.coroutine
def tombstone_topic_producer(self, *, topic, node):
async def tombstone_topic_producer(self, *, topic, node):
"""Tombstones a specific producer of an existing topic.
Parameters:
......@@ -92,24 +84,22 @@ class NSQLookup(HTTPHandler):