Commit e31909e2 authored by xa's avatar xa Committed by Xavier Barbosa

modern async/await tests

parent 8f5d4bc8
......@@ -6,7 +6,7 @@ import sys
import logging
import warnings
from pytest import fixture
from subprocess import Popen, PIPE
from subprocess import Popen
from time import sleep
os.environ['PYTHONASYNCIODEBUG'] = '1'
......@@ -30,7 +30,7 @@ filepath = lambda *x: os.path.join(here, *x)
database_dir = os.path.join(here, '.database')
data = os.makedirs(database_dir, exist_ok=True)
filelist = [ f for f in os.listdir(database_dir) if f.endswith('.dat') ]
filelist = [f for f in os.listdir(database_dir) if f.endswith('.dat')]
for f in filelist:
os.remove(os.path.join(database_dir, f))
......@@ -55,7 +55,7 @@ class Service:
env = os.environ.copy()
env.setdefault('GOMAXPROCS', '2')
self.log = open(filepath('_.log'), 'a')
subl = SubLog()
# subl = SubLog()
proc = Popen(self.cmd,
env=env,
cwd=self.cwd,
......@@ -73,6 +73,7 @@ class Service:
self.proc = None
return result
def service(*args, **kwargs):
serv = Service(*args, **kwargs)
serv.start()
......@@ -100,6 +101,7 @@ services = {
}
sleep(.5)
@fixture(scope="session", autouse=True)
def nsqlookupd(request):
service = services['nsqlookupd']
......
import asyncio
import pytest
from aionsq.tcp import TCPHandler, AuthFailedError
@pytest.mark.asyncio
def test_auth_allow(event_loop):
async def test_auth_allow(event_loop):
client = TCPHandler(':4150', auth_secret='allow', loop=event_loop)
yield from client.start()
await client.start()
client.close()
yield from client.wait_closed()
await client.wait_closed()
@pytest.mark.asyncio
def test_auth_deny(event_loop):
async def test_auth_deny(event_loop):
client = TCPHandler(':4150', auth_secret='deny', loop=event_loop)
with pytest.raises(AuthFailedError):
yield from client.start()
await client.start()
client.close()
yield from client.wait_closed()
await client.wait_closed()
import aiohttp
import asyncio
import pytest
import socket
from aionsq.authd import Application, TestDB
......@@ -14,71 +13,79 @@ def find_unused_port():
@pytest.mark.asyncio
def create_server(event_loop):
async def create_server(event_loop):
db = TestDB()
app = Application(db=db)
handler = app.make_handler()
port = find_unused_port()
srv = yield from event_loop.create_server(handler, '127.0.0.1', port)
srv = await event_loop.create_server(handler, '127.0.0.1', port)
url = "http://127.0.0.1:{}".format(port)
return app, srv, url, handler
@pytest.mark.asyncio
def test_ping(event_loop):
app, srv, url, handler = yield from create_server(event_loop)
response = yield from aiohttp.get(url + '/ping')
result = yield from response.text()
async def test_ping(event_loop):
app, srv, base, handler = await create_server(event_loop)
response = await aiohttp.get(base + '/ping')
result = await response.text()
assert response.status == 200
assert result == 'OK'
yield from handler.finish_connections()
await handler.finish_connections()
@pytest.mark.asyncio
def test_auth_bad(event_loop):
app, srv, url, handler = yield from create_server(event_loop)
response = yield from aiohttp.get(url + '/auth?secret=allowed')
async def test_auth_bad(event_loop):
app, srv, base, handler = await create_server(event_loop)
response = await aiohttp.get(base + '/auth?secret=allowed')
assert response.status == 400
result = yield from response.read()
yield from handler.finish_connections()
await response.read()
await handler.finish_connections()
@pytest.mark.asyncio
def test_auth_ok(event_loop):
app, srv, url, handler = yield from create_server(event_loop)
response = yield from aiohttp.get(url + '/auth?secret=allow&remote_ip=.&tls=false')
yield from handler.finish_connections()
async def test_auth_ok(event_loop):
app, srv, base, handler = await create_server(event_loop)
url = base + '/auth?secret=allow&remote_ip=.&tls=false'
response = await aiohttp.get(url)
await handler.finish_connections()
assert response.status == 200
result = yield from response.json()
result = await response.json()
assert result == {
'authorizations': [{'channels': ['.*'],
'permissions': ['subscribe', 'publish'],
'topic': '.*'}],
'permissions': ['subscribe', 'publish'],
'topic': '.*'}],
'identity': 'username',
'ttl': 3600
}
@pytest.mark.asyncio
def test_auth_allow(event_loop):
app, srv, url, handler = yield from create_server(event_loop)
response = yield from aiohttp.get(url + '/auth?secret=allow&remote_ip=.&tls=false')
yield from handler.finish_connections()
async def test_auth_allow(event_loop):
app, srv, base, handler = await create_server(event_loop)
url = base + '/auth?secret=allow&remote_ip=.&tls=false'
response = await aiohttp.get(url)
await handler.finish_connections()
assert response.status == 200
result = yield from response.json()
result = await response.json()
assert result == {
'authorizations': [{'channels': ['.*'],
'permissions': ['subscribe', 'publish'],
'topic': '.*'}],
'permissions': ['subscribe', 'publish'],
'topic': '.*'}],
'identity': 'username',
'ttl': 3600
}
@pytest.mark.asyncio
def test_auth_forbidden(event_loop):
app, srv, url, handler = yield from create_server(event_loop)
response = yield from aiohttp.get(url + '/auth?secret=deny&remote_ip=.&tls=false')
yield from handler.finish_connections()
async def test_auth_forbidden(event_loop):
app, srv, base, handler = await create_server(event_loop)
url = base + '/auth?secret=deny&remote_ip=.&tls=false'
response = await aiohttp.get(url)
await handler.finish_connections()
assert response.status == 403
result = yield from response.text()
await response.text()
......@@ -4,25 +4,25 @@ from aionsq.tcp import NSQReader, NSQWriter
@pytest.mark.asyncio
def test_client(event_loop):
async def test_client(event_loop):
client = NSQWriter(':4150', loop=event_loop)
yield from client.start(deflate=True, deflate_level=6)
yield from client.publish('topic-deflate-1', 'msg1')
yield from client.multi_publish('topic-deflate-1', ['msg2', 'msg3'])
await client.start(deflate=True, deflate_level=6)
await client.publish('topic-deflate-1', 'msg1')
await client.multi_publish('topic-deflate-1', ['msg2', 'msg3'])
client.close()
yield from client.wait_closed()
await client.wait_closed()
@pytest.mark.asyncio
def test_reader(event_loop):
async def test_reader(event_loop):
client = NSQReader(':4150', 'topic-deflate-1', 'chan1', loop=event_loop)
@client.register
def listener(msg):
yield from msg.success()
async def listener(msg):
await msg.success()
yield from client.start(deflate=True, deflate_level=6)
yield from asyncio.sleep(1)
await client.start(deflate=True, deflate_level=6)
await asyncio.sleep(1)
client.close()
yield from client.wait_closed()
await client.wait_closed()
......@@ -4,18 +4,18 @@ from aionsq.http.exceptions import ChannelNotFound, TopicNotFound
@pytest.mark.asyncio
def test_ping():
async def test_ping():
client = NSQClient(':4151')
data = yield from client.ping()
data = await client.ping()
assert data is True
client.close()
yield from client.wait_closed()
await client.wait_closed()
@pytest.mark.asyncio
def test_info():
async def test_info():
client = NSQClient(':4151')
data = yield from client.info()
data = await client.info()
assert isinstance(data, dict)
assert 'broadcast_address' in data
assert 'hostname' in data
......@@ -25,86 +25,86 @@ def test_info():
assert data['http_port'] == 4151
assert data['tcp_port'] == 4150
client.close()
yield from client.wait_closed()
await client.wait_closed()
@pytest.mark.asyncio
def test_topic():
async def test_topic():
client = NSQClient(':4151')
yield from client.create_topic('foobar')
yield from client.empty_topic('foobar')
yield from client.pause_topic('foobar')
yield from client.unpause_topic('foobar')
yield from client.delete_topic('foobar')
await client.create_topic('foobar')
await client.empty_topic('foobar')
await client.pause_topic('foobar')
await client.unpause_topic('foobar')
await client.delete_topic('foobar')
with pytest.raises(TopicNotFound):
yield from client.empty_topic('xxxx')
await client.empty_topic('xxxx')
with pytest.raises(TopicNotFound):
yield from client.pause_topic('xxx')
await client.pause_topic('xxx')
with pytest.raises(TopicNotFound):
yield from client.unpause_topic('xxx')
await client.unpause_topic('xxx')
client.close()
yield from client.wait_closed()
await client.wait_closed()
@pytest.mark.asyncio
def test_channel():
async def test_channel():
client = NSQClient('http://127.0.0.1:4151')
yield from client.create_topic('foobar')
yield from client.create_channel('foobar', 'baz')
yield from client.empty_channel('foobar', 'baz')
yield from client.pause_channel('foobar', 'baz')
yield from client.unpause_channel('foobar', 'baz')
yield from client.delete_channel('foobar', 'baz')
await client.create_topic('foobar')
await client.create_channel('foobar', 'baz')
await client.empty_channel('foobar', 'baz')
await client.pause_channel('foobar', 'baz')
await client.unpause_channel('foobar', 'baz')
await client.delete_channel('foobar', 'baz')
with pytest.raises(ChannelNotFound):
yield from client.empty_channel('foobar', 'qux')
await client.empty_channel('foobar', 'qux')
with pytest.raises(ChannelNotFound):
yield from client.pause_channel('foobar', 'baz')
await client.pause_channel('foobar', 'baz')
with pytest.raises(ChannelNotFound):
yield from client.unpause_channel('foobar', 'baz')
await client.unpause_channel('foobar', 'baz')
with pytest.raises(ChannelNotFound):
yield from client.delete_channel('foobar', 'qux')
await client.delete_channel('foobar', 'qux')
yield from client.delete_topic('foobar')
await client.delete_topic('foobar')
with pytest.raises(TopicNotFound):
yield from client.create_channel('foobar', 'qux')
await client.create_channel('foobar', 'qux')
with pytest.raises(TopicNotFound):
yield from client.empty_channel('foobar', 'qux')
await client.empty_channel('foobar', 'qux')
with pytest.raises(TopicNotFound):
yield from client.pause_channel('foobar', 'qux')
await client.pause_channel('foobar', 'qux')
with pytest.raises(TopicNotFound):
yield from client.unpause_channel('foobar', 'qux')
await client.unpause_channel('foobar', 'qux')
client.close()
yield from client.wait_closed()
await client.wait_closed()
@pytest.mark.asyncio
def test_stats():
async def test_stats():
client = NSQClient('http://127.0.0.1:4151')
data = yield from client.stats()
data = await client.stats()
stats_validator.validate(data)
client.close()
yield from client.wait_closed()
await client.wait_closed()
@pytest.mark.asyncio
def test_publish():
async def test_publish():
client = NSQClient('http://127.0.0.1:4151')
data = yield from client.publish('topic1', 'hello world 1')
data = await client.publish('topic1', 'hello world 1')
assert data is True
data = yield from client.multi_publish('topic1', ['hello world 2',
'hello world 3'])
data = await client.multi_publish('topic1', ['hello world 2',
'hello world 3'])
assert data is True
client.close()
yield from client.wait_closed()
await client.wait_closed()
from jsonspec.validators import load
......
......@@ -4,75 +4,75 @@ from aionsq.http.exceptions import HTTPInternalServerError
@pytest.mark.asyncio
def test_ping():
async def test_ping():
lookup = NSQLookup('http://127.0.0.1:4161')
data = yield from lookup.ping()
data = await lookup.ping()
assert data is True
lookup.close()
yield from lookup.wait_closed()
await lookup.wait_closed()
@pytest.mark.asyncio
def test_info():
async def test_info():
lookup = NSQLookup('http://127.0.0.1:4161')
data = yield from lookup.info()
data = await lookup.info()
assert data == {'version': '0.3.6'}
lookup.close()
yield from lookup.wait_closed()
await lookup.wait_closed()
@pytest.mark.asyncio
def test_lookup_ok():
async def test_lookup_ok():
lookup = NSQLookup('http://127.0.0.1:4161')
data = yield from lookup.lookup(topic='topic1')
data = await lookup.lookup(topic='topic1')
lookup_validator.validate(data)
lookup.close()
yield from lookup.wait_closed()
await lookup.wait_closed()
@pytest.mark.asyncio
def test_lookup_ko():
async def test_lookup_ko():
lookup = NSQLookup('http://127.0.0.1:4161')
with pytest.raises(HTTPInternalServerError):
yield from lookup.lookup(topic='foo')
await lookup.lookup(topic='foo')
lookup.close()
yield from lookup.wait_closed()
await lookup.wait_closed()
@pytest.mark.asyncio
def test_topics():
async def test_topics():
lookup = NSQLookup('http://127.0.0.1:4161')
data = yield from lookup.topics()
data = await lookup.topics()
topics_validator.validate(data)
lookup.close()
yield from lookup.wait_closed()
await lookup.wait_closed()
@pytest.mark.asyncio
def test_channels_ok():
async def test_channels_ok():
lookup = NSQLookup('http://127.0.0.1:4161')
data = yield from lookup.channels(topic='test')
data = await lookup.channels(topic='test')
assert data == {'channels': []}
lookup.close()
yield from lookup.wait_closed()
await lookup.wait_closed()
@pytest.mark.asyncio
def test_channels_ko():
async def test_channels_ko():
lookup = NSQLookup('http://127.0.0.1:4161')
data = yield from lookup.channels(topic='foo')
data = await lookup.channels(topic='foo')
assert data == {'channels': []}
lookup.close()
yield from lookup.wait_closed()
await lookup.wait_closed()
@pytest.mark.asyncio
def test_nodes():
async def test_nodes():
lookup = NSQLookup('http://127.0.0.1:4161')
data = yield from lookup.nodes()
data = await lookup.nodes()
nodes_validator.validate(data)
lookup.close()
yield from lookup.wait_closed()
await lookup.wait_closed()
from jsonspec.validators import load
......
import asyncio
import pytest
from aionsq import NSQ
@pytest.mark.asyncio
def test_tcp(event_loop):
async def test_tcp(event_loop):
nsq = NSQ('tcp://127.0.0.1:4150', loop=event_loop)
yield from nsq.publish('topic1', 'msg1')
yield from nsq.multi_publish('topic1', ['msg2', 'msg3'])
await nsq.publish('topic1', 'msg1')
await nsq.multi_publish('topic1', ['msg2', 'msg3'])
nsq.close()
yield from nsq.wait_closed()
await nsq.wait_closed()
@pytest.mark.asyncio
def test_http(event_loop):
async def test_http(event_loop):
nsq = NSQ('http://127.0.0.1:4151', loop=event_loop)
yield from nsq.publish('topic1', 'msg1')
yield from nsq.multi_publish('topic1', ['msg2', 'msg3'])
await nsq.publish('topic1', 'msg1')
await nsq.multi_publish('topic1', ['msg2', 'msg3'])
nsq.close()
yield from nsq.wait_closed()
await nsq.wait_closed()
@pytest.mark.asyncio
def test_lookup(event_loop):
async def test_lookup(event_loop):
nsq = NSQ(lookup='http://127.0.0.1:4161', loop=event_loop)
yield from nsq.publish('topic1', 'msg1')
yield from nsq.multi_publish('topic1', ['msg2', 'msg3'])
await nsq.publish('topic1', 'msg1')
await nsq.multi_publish('topic1', ['msg2', 'msg3'])
nsq.close()
yield from nsq.wait_closed()
await nsq.wait_closed()
......@@ -4,25 +4,25 @@ from aionsq.tcp import NSQReader, NSQWriter
@pytest.mark.asyncio
def test_writer(event_loop):
async def test_writer(event_loop):
client = NSQWriter(':4150', loop=event_loop)
yield from client.start(snappy=True)
yield from client.publish('topic-snappy-1', 'msg1')
yield from client.multi_publish('topic-snappy-1', ['msg2', 'msg3'])
await client.start(snappy=True)
await client.publish('topic-snappy-1', 'msg1')
await client.multi_publish('topic-snappy-1', ['msg2', 'msg3'])
client.close()
yield from client.wait_closed()
await client.wait_closed()
@pytest.mark.asyncio
def test_reader(event_loop):
async def test_reader(event_loop):
client = NSQReader(':4150', 'topic-snappy-1', 'chan1', loop=event_loop)
@client.register
def listener(msg):
yield from msg.success()
async def listener(msg):
await msg.success()
yield from client.start(snappy=True)
yield from asyncio.sleep(1)
await client.start(snappy=True)
await asyncio.sleep(1)
client.close()
yield from client.wait_closed()
await client.wait_closed()
......@@ -2,66 +2,62 @@ import asyncio
import pytest
from aionsq.tcp import TCPHandler, NSQReader, NSQWriter
from aionsq.tcp import exceptions
from itertools import product
import logging
logging.basicConfig(level=logging.DEBUG)
@pytest.mark.asyncio
def test_client(event_loop):
async def test_client(event_loop):
client = TCPHandler('tcp://127.0.0.1:4150', loop=event_loop)
assert not client.started
yield from client.start()
await client.start()
assert client.started
client.close()
yield from client.wait_closed()
await client.wait_closed()
assert not client.started
@pytest.mark.asyncio
def test_publish(event_loop):
async def test_publish(event_loop):
client = NSQWriter('tcp://127.0.0.1:4150', loop=event_loop)
with pytest.raises(exceptions.NotStartedError):
yield from client.publish('topic1', 'msg1')
await client.publish('topic1', 'msg1')
yield from client.start()
result = yield from client.publish('topic1', 'msg1')
await client.start()
result = await client.publish('topic1', 'msg1')
assert result is True
client.close()
yield from client.wait_closed()
await client.wait_closed()
@pytest.mark.asyncio
def test_multi_publish(event_loop):
async def test_multi_publish(event_loop):
client = NSQWriter(':4150', loop=event_loop)
with pytest.raises(exceptions.NotStartedError):
yield from client.multi_publish('topic1', ['msg1'])
await client.multi_publish('topic1', ['msg1'])
yield from client.start()
result = yield from client.multi_publish('topic1', ['msg1'])
await client.start()
result = await client.multi_publish('topic1', ['msg1'])
assert result is True
client.close()
yield from client.wait_closed()
await client.wait_closed()
@pytest.mark.asyncio
def test_subscribe(event_loop):
async def test_subscribe(event_loop):
client = NSQReader(':4150', 'topic1', 'chan1', loop=event_loop)
@client.register
def listener(msg):
yield from msg.success()
async def listener(msg):
await msg.success()
yield from client.start()
yield from asyncio.sleep(1)
await client.start()
await asyncio.sleep(1)
client.close()
yield from client.wait_closed()
await client.wait_closed()
def test_consumers_1():
......@@ -70,7 +66,7 @@ def test_consumers_1():
assert not client.consumers
@client.register
def consumer1(msg):
async def consumer1(msg):
return
assert consumer1 in client.consumers
......@@ -86,7 +82,7 @@ def test_consumers_1():
def test_consumers_2():
client = NSQReader(':4150', 'topic1', 'chan1')
def consumer1(msg):
async def consumer1(msg):
return
assert not client.consumers
......
......@@ -7,7 +7,7 @@ from itertools import product
@pytest.mark.asyncio
@pytest.mark.parametrize('tls,deflate,snappy', product((True, False), repeat=3))
def test_client(event_loop, cert, tls, deflate, snappy):
async def test_client(event_loop, cert, tls, deflate, snappy):
client = NSQWriter(':4150', cert=cert, loop=event_loop)
opts = {
'tls_v1': tls,
......@@ -18,23 +18,22 @@ def test_client(event_loop, cert, tls, deflate, snappy):
assert not client.started
if deflate and snappy:
with pytest.raises(exceptions.IdentifyFailedError):
yield from client.start(**opts)
await client.start(**opts)
assert client.started
else:
yield from client.start(**opts)
await client.start(**opts)
assert client.started
client.close()
yield from client.wait_closed()
await client.wait_closed()
assert not client.started
@pytest.mark.asyncio
@pytest.mark.parametrize('tls,deflate,snappy', product((True, False), repeat=3))
def test_writer(event_loop, cert, tls, deflate, snappy):
async def test_writer(event_loop, cert, tls, deflate, snappy):
client = NSQWriter(':4150', cert=cert, loop=event_loop)
topic = 'topic-%s-%s-%s' % (tls, deflate, snappy)
chan = 'chan-%s-%s-%s' % (tls, deflate, snappy)
opts = {
'tls_v1': tls,
'deflate': deflate,
......@@ -44,23 +43,23 @@ def test_writer(event_loop, cert, tls, deflate, snappy):
assert not client.started
if deflate and snappy:
with pytest.raises(exceptions.IdentifyFailedError):
yield from client.start(**opts)
await client.start(**opts)
assert client.started
else:
yield from client.start(**opts)
await client.start(**opts)
assert client.started
yield from client.publish(topic, 'msg1')
yield from client.multi_publish(topic, ['msg2', 'msg3'])
await client.publish(topic, 'msg1')
await client.multi_publish(topic, ['msg2', 'msg3'])
client.close()
yield from client.wait_closed()
await client.wait_closed()
assert not client.started
@pytest.mark.asyncio
@pytest.mark.parametrize('tls,deflate,snappy', product((True, False), repeat=3))
def test_reader(event_loop, cert, tls, deflate, snappy):
async def test_reader(event_loop, cert, tls, deflate, snappy):
topic = 'topic-%s-%s-%s' % (tls, deflate, snappy)
client = NSQReader(':4150', topic, 'chan1',