Commit 13204f45 authored by xa's avatar xa

implement deflate & snappy layers.

parent b5833bc5
......@@ -20,11 +20,11 @@ class TCPHandler:
return self._started
@asyncio.coroutine
def start(self):
def start(self, **opts):
if not self._started:
yield from self._openened.acquire()
yield from self._connection.start()
yield from self._connection.identify()
yield from self._connection.identify(**opts)
yield from self.start_warmup()
self._started = True
return True
......
......@@ -2,7 +2,7 @@ import asyncio
import json
import logging
import socket
from .protocol import NSQProtocol
from .protocol import blocking, NSQProtocol
from aionsq.util import parse_addr
......@@ -63,15 +63,22 @@ class NSQConnection:
opts.setdefault('client_id', socket.getfqdn())
opts.setdefault('hostname', socket.gethostname())
opts.setdefault('feature_negotiation', True)
if 'deflate' in opts:
opts.setdefault('deflate_level', 6)
data = json.dumps(opts)
self.protocol.send_cmd('IDENTIFY', body=data)
response = yield from self.next_response()
features = json.loads(response)
self.log.info('features %s' % features)
if features['deflate']:
raise NotImplementedError('Cannot use deflate')
if features['snappy']:
raise NotImplementedError('Cannot use snappy')
with blocking(self.protocol) as mini:
self.protocol.send_cmd('IDENTIFY', body=data)
response = mini.read()
features = json.loads(response)
self.log.info('features %s' % features)
if features['deflate']:
self.protocol.upgrade_deflate(opts['deflate_level'])
response = mini.read()
self.log.info('deflate status %s' % response)
if features['snappy']:
self.protocol.upgrade_snappy()
response = mini.read()
self.log.info('snappy status %s' % response)
return features
@asyncio.coroutine
......@@ -154,6 +161,12 @@ class NSQConnection:
"""
self.protocol.send_cmd('NOP')
@asyncio.coroutine
def cls(self):
"""Close...
"""
self.protocol.send_cmd('CLS')
@asyncio.coroutine
def next_response(self):
response = yield from self.pending_responses.get()
......
......@@ -18,21 +18,32 @@ class DeflateLayer:
def __init__(self, level):
self.level = level
wbits = -zlib.MAX_WBITS
self._compressor = zlib.compressobj(level, zlib.DEFLATED, wbits)
self._decompressor = zlib.decompressobj(wbits)
def write(self, data):
return zlib.compress(data, self.level)
data = self._compressor.compress(data)
return data + self._compressor.flush(zlib.Z_SYNC_FLUSH)
def read(self, data):
return zlib.decompress(data)
data = self._decompressor.decompress(data)
return data, self._decompressor.unconsumed_tail
class SnappyLayer:
def __init__(self):
self._compressor = snappy.StreamCompressor()
self._decompressor = snappy.StreamDecompressor()
def write(self, data):
return snappy.compress(data)
if isinstance(data, bytearray):
data = bytes(data)
return self._compressor.add_chunk(data, compress=True)
def read(self, data):
return snappy.decompress(data)
return self._decompressor.decompress(data), b''
class SpecLayer:
......
import asyncio
import logging
import struct
from .layers import SpecLayer
from .layers import SpecLayer, DeflateLayer, SnappyLayer
class NSQProtocol(asyncio.Protocol):
def __init__(self):
self.listeners = set()
self.buf1 = bytearray()
self.buf = bytearray()
self.log = logging.getLogger(__name__)
self.spec = SpecLayer()
self.z = None
def upgrade_deflate(self, level):
self.log.info('upgrade deflate')
self.z = DeflateLayer(level)
def upgrade_snappy(self):
self.log.info('upgrade snappy')
self.z = SnappyLayer()
def connection_made(self, transport):
self.transport = transport
......@@ -19,6 +29,10 @@ class NSQProtocol(asyncio.Protocol):
self.transport = None
def data_received(self, data):
if self.z:
self.buf1.extend(data)
data, self.buf1[:] = self.z.read(self.buf1)
self.log.info('read %r', data)
self.buf.extend(data)
self.process_buffer()
......@@ -41,9 +55,57 @@ class NSQProtocol(asyncio.Protocol):
listener(frame)
def send_cmd(self, cmd, *params, body=None):
msg = self.spec.prepare_cmd(cmd, *params, body=body)
msg = bytes(self.spec.prepare_cmd(cmd, *params, body=body))
self.log.info('send %s', msg)
if self.z:
msg = self.z.write(msg)
self.transport.write(msg)
def add_listener(self, callback):
self.listeners.add(callback)
def blocking(protocol):
return BlockingProtocol(protocol)
class BlockingProtocol:
def __init__(self, protocol):
self.protocol = protocol
self.transport = protocol.transport
self.buf1 = bytearray()
self.buf = bytearray()
self.log = logging.getLogger(__name__)
def __enter__(self):
self.transport.pause_reading()
self.transport._sock.setblocking(True)
return self
def __exit__(self, type, value, traceback):
self.transport._sock.setblocking(False)
self.transport.resume_reading()
def _read(self, size):
while len(self.buf) < size:
data = self.transport._sock.recv(512)
if self.protocol.z:
self.buf1.extend(data)
data, self.buf1[:] = self.protocol.z.read(self.buf1)
self.log.info('read %r', data)
self.buf.extend(data)
response, self.buf[:size] = self.buf[:size], []
return response
def read(self):
data = self._read(4)
size, *_ = struct.unpack('>l', data)
data = self._read(size)
frame = self.protocol.spec.parse_protocol(data)
self.log.info('got %s', frame)
if frame.type == 0:
return frame.body
raise Exception(frame)
import asyncio
import pytest
from aionsq.tcp.layers import DeflateLayer, SnappyLayer
from aionsq.tcp import NSQWriter, NSQReader
def test_deflate():
layer = DeflateLayer(6)
resp = layer.write(b'foobar')
assert resp == b'J\xcb\xcfOJ,\x02\x00\x00\x00\xff\xff'
resp = layer.write(bytearray('foobar', encoding='utf-8'))
assert resp == b'J\x03\x93\x00\x00\x00\x00\xff\xff'
resp = layer.read(b'J\xcb\xcfOJ,\x02\x00\x00\x00\xff\xff')
assert resp == (b'foobar', b'')
@pytest.mark.asyncio
def test_deflate_client(event_loop):
client = NSQWriter(':4150', loop=event_loop)
yield from client.start(deflate=True, deflate_level=6)
yield from client.publish('topic-deflate-1', 'msg1')
yield from client.multi_publish('topic-deflate-1', ['msg2', 'msg3'])
client.close()
yield from client.wait_closed()
@pytest.mark.asyncio
def test_deflate_reader(event_loop):
client = NSQReader(':4150', 'topic-deflate-1', 'chan1', loop=event_loop)
@client.register
def listener(msg):
yield from msg.success()
yield from client.start(deflate=True, deflate_level=6)
yield from asyncio.sleep(1)
client.close()
yield from client.wait_closed()
from aionsq.tcp.layers import DeflateLayer
def test_deflate():
layer = DeflateLayer(6)
resp = layer.write(b'foobar')
assert resp == b'x\x9cK\xcb\xcfOJ,\x02\x00\x08\xab\x02z'
resp = layer.write(bytearray('foobar', encoding='utf-8'))
assert resp == b'x\x9cK\xcb\xcfOJ,\x02\x00\x08\xab\x02z'
resp = layer.read(b'x\x9cK\xcb\xcfOJ,\x02\x00\x08\xab\x02z')
assert resp == b'foobar'
import asyncio
import pytest
from aionsq.tcp.layers import DeflateLayer, SnappyLayer
from aionsq.tcp import NSQWriter, NSQReader
def test_snappy():
layer = SnappyLayer()
resp = layer.write(b'foobar')
assert resp == b'\xff\x06\x00\x00sNaPpY\x00\x0c\x00\x00\x96\x05\x81[\x06\x14foobar'
resp = layer.write(bytearray('foobar', encoding='utf-8'))
assert resp == b'\x00\x0c\x00\x00\x96\x05\x81[\x06\x14foobar'
resp = layer.read(b'\xff\x06\x00\x00sNaPpY\x00\x0c\x00\x00\x96\x05\x81[\x06\x14foobar')
assert resp == (b'foobar', b'')
@pytest.mark.asyncio
def test_snappy_writer(event_loop):
client = NSQWriter(':4150', loop=event_loop)
yield from client.start(snappy=True)
yield from client.publish('topic-snappy-1', 'msg1')
yield from client.multi_publish('topic-snappy-1', ['msg2', 'msg3'])
client.close()
yield from client.wait_closed()
@pytest.mark.asyncio
def test_snappy_reader(event_loop):
client = NSQReader(':4150', 'topic-snappy-1', 'chan1', loop=event_loop)
@client.register
def listener(msg):
yield from msg.success()
yield from client.start(snappy=True)
yield from asyncio.sleep(1)
client.close()
yield from client.wait_closed()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment