Commit 815556ec authored by xa's avatar xa

start everything for integration test

parent e5c83fa6
import asyncio
import logging
from aionsq.http import NSQClient
from aionsq.lookup import NSQLookup
from aionsq.tcp import NSQReader, NSQWriter
......@@ -118,6 +119,7 @@ class HTTPAdapter(Adapter):
class LookupAdapter(Adapter):
def __init__(self, addr, *, loop=None):
self.log = logging.getLogger(__name__)
self.loop = loop or asyncio.get_event_loop()
self.client = NSQLookup(addr, loop=self.loop)
self._writer = None
......@@ -132,6 +134,7 @@ class LookupAdapter(Adapter):
nodes = yield from self.client.nodes()
node = nodes['producers'][0]
addr = 'tcp://%s:%s' % (node['broadcast_address'], node['tcp_port'])
self.log.info('connect to %s', addr)
writer = NSQWriter(addr, loop=self.loop)
yield from writer.start()
self._writer = writer
......
......@@ -32,9 +32,13 @@ class Connection:
@asyncio.coroutine
def start(self):
if not self._started:
self.sock = socket.socket()
infos = yield from self.loop.getaddrinfo(*self.addr,
family=socket.AF_INET,
type=socket.SOCK_STREAM)
*opts, _, addr = infos[0]
self.sock = socket.socket(*opts)
self.sock.settimeout(0)
resp = yield from self.loop.sock_connect(self.sock, self.addr)
resp = yield from self.loop.sock_connect(self.sock, addr)
atexit.register(self.close)
self.log.error(resp)
self.sock.send(b' V2')
......@@ -179,8 +183,6 @@ class Connection:
data = yield from self.read()
return data == b'OK'
def _read_ready(self):
try:
mbytes = self.max_bytes
......
......@@ -62,7 +62,6 @@ class Service:
shell=False)
self.proc = proc
print('Starting %s [%s] %s' % (self.name, proc.pid, ' '.join(self.cmd)))
sleep(1)
def stop(self):
if not self.proc:
......@@ -74,49 +73,57 @@ class Service:
self.proc = None
return result
def service(*args, **kwargs):
serv = Service(*args, **kwargs)
serv.start()
return serv
services = {
'nsqlookupd': service('nsqlookupd'),
'nsqauthd': service(sys.executable or 'python', '-m', 'aionsq.authd', ':4181',
cwd=database_dir,
name='nsqauthd'),
'nsqd': service('nsqd',
'--lookupd-tcp-address=127.0.0.1:4160',
'--auth-http-address=127.0.0.1:4181',
# '--snappy',
# '--deflate',
# '--max-deflate-level=6',
'--tls-cert=' + filepath('certs', 'server.pem'),
'--tls-key=' + filepath('certs', 'server.key'),
'--tls-root-ca-file=' + filepath('certs', 'ca.pem'),
'--verbose',
cwd=database_dir),
'nsqadmin': service('nsqadmin',
'--lookupd-http-address=127.0.0.1:4161',
cwd=database_dir)
}
sleep(.5)
@fixture(scope="session", autouse=True)
def nsqlookupd(request):
service = Service('nsqlookupd')
service.start()
service = services['nsqlookupd']
request.addfinalizer(service.stop)
return service.config
@fixture(scope="session", autouse=True)
def nsqauthd(request):
service = Service(sys.executable or 'python', '-m', 'aionsq.authd', ':4181',
cwd=database_dir,
name='nsqauthd')
service.start()
service = services['nsqauthd']
request.addfinalizer(service.stop)
return service.config
@fixture(scope="session", autouse=True)
def nsqd(request):
service = Service('nsqd',
'--lookupd-tcp-address=127.0.0.1:4160',
'--auth-http-address=127.0.0.1:4181',
# '--snappy',
# '--deflate',
# '--max-deflate-level=6',
'--tls-cert=' + filepath('certs', 'server.pem'),
'--tls-key=' + filepath('certs', 'server.key'),
'--tls-root-ca-file=' + filepath('certs', 'ca.pem'),
'--verbose',
cwd=database_dir)
service.start()
service = services['nsqd']
request.addfinalizer(service.stop)
return service.config
@fixture(scope="session", autouse=True)
def nsqadmin(request):
service = Service('nsqadmin',
'--lookupd-http-address=127.0.0.1:4161',
cwd=database_dir)
service.start()
service = services['nsqadmin']
request.addfinalizer(service.stop)
return service.config
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment