Commit 9486b10f authored by xa's avatar xa

Merge branch 'singleton' into 'master'

Singleton

See merge request !2
parents a5c66706 dd2a1b32
Pipeline #1043 failed with stages
in 115 minutes and 45 seconds
...@@ -79,7 +79,6 @@ When applied with some arguments, placeholders just fills the gaps:: ...@@ -79,7 +79,6 @@ When applied with some arguments, placeholders just fills the gaps::
'bar': 'I am bar'} 'bar': 'I am bar'}
Factories also can be either sync or awaitable:: Factories also can be either sync or awaitable::
@services.factory('bar:sync') @services.factory('bar:sync')
...@@ -91,6 +90,45 @@ Factories also can be either sync or awaitable:: ...@@ -91,6 +90,45 @@ Factories also can be either sync or awaitable::
return 'I am bar' return 'I am bar'
Services are by default singleton, but they can also be instantiated at every call::
@services.factory('bar', singleton=True)
def bar_factory():
return time()
result1 = await services.get('bar')
sleep(.1)
result2 = await services.get('bar')
assert result1 == result2
# cache can be resetted
services.refresh("bar")
result3 = await services.get('bar')
assert result3 != result2
Singleton mode can be disabled per service::
@services.factory('baz', singleton=False)
def baz_factory():
return time()
result1 = await services.get('baz')
sleep(.1)
result2 = await services.get('baz')
assert result1 != result2
Current services are automatically exposed inside functions::
def func():
return current_injector()
assert func() is None
assert (await services.apply(func)) is services
Implementation Implementation
-------------- --------------
......
from .bases import Injector, annotate from .bases import Injector, annotate, attr, current_injector
from ._version import get_versions from ._version import get_versions
__all__ = ['Injector', 'annotate']
__version__ = get_versions()['version']
del get_versions
from ._version import get_versions __version__ = get_versions()["version"]
__version__ = get_versions()['version']
del get_versions del get_versions
__all__ = ["__version__", "Injector", "annotate", "attr", "current_injector"]
...@@ -4,51 +4,86 @@ import asyncio ...@@ -4,51 +4,86 @@ import asyncio
import concurrent.futures import concurrent.futures
import logging import logging
from abc import ABCMeta from abc import ABCMeta
from collections import OrderedDict, defaultdict from collections import ChainMap
from functools import wraps from contextlib import contextmanager
from inspect import signature from contextvars import ContextVar
from inspect import signature, unwrap
from itertools import chain from itertools import chain
from types import MappingProxyType
from typing import Callable, Optional, cast
from weakref import WeakKeyDictionary from weakref import WeakKeyDictionary
from cached_property import cached_property from cached_property import cached_property
logger = logging.getLogger("knighted") logger = logging.getLogger("knighted")
MaybeInjector = Optional["Injector"]
ANNOTATIONS: WeakKeyDictionary[Callable, "Annotation"] = WeakKeyDictionary()
class Factory: current_injector_var: ContextVar[MaybeInjector] = ContextVar("current_injector")
def __init__(self, target):
self.target = target
def __call__(self, note, func=None): def current_injector() -> MaybeInjector:
def decorate(func): global current_injector_var
self.target.factories[note] = func return current_injector_var.get(None)
return func
if func:
return decorate(func)
return decorate
class FactoryMethod: def annotate(*pos_notes, **kw_notes):
"""Decorator for func def wrapper(func):
""" func = unwrap(func)
ANNOTATIONS[func] = Annotation(func, pos_notes, kw_notes)
return func
def __get__(self, obj, objtype): for arg in chain(pos_notes, kw_notes.values()):
target = obj or objtype if not isinstance(arg, str):
return Factory(target) raise ValueError("Notes must be strings")
return wrapper
class Annotation:
def __init__(self, func, pos_notes, kw_notes):
self.bind_partial = signature(func).bind_partial
self.is_coro = asyncio.iscoroutinefunction(func)
self.markers = self.bind_partial(*pos_notes, **kw_notes).arguments
def given(self, *args, **kwargs):
return list(self.bind_partial(*args, **kwargs).arguments)
class DataProxy: class DataProxy:
def __init__(self):
self.data = WeakKeyDictionary()
def __init__(self, name, type): def __set_name__(self, owner, name):
self.name = name self.name = name
self.type = type
def __get__(self, obj, objtype): def __get__(self, instance, owner):
target = obj or objtype if instance is None:
if not hasattr(target, self.name): response = self.data.setdefault(owner, {})
setattr(target, self.name, self.type()) else:
return getattr(target, self.name) response = self.data.setdefault(
instance, ChainMap({}, MappingProxyType(getattr(owner, self.name)))
)
return response
class FactoryAccessor:
def __get__(self, instance, owner):
def wrap_name(name, func=None, *, singleton=True):
def wrap_func(func):
(instance or owner).factories[name] = func, singleton
return func
if func:
return wrap_func(func)
return wrap_func
return wrap_name
def close_reaction(obj):
obj.close()
class CloseHandler: class CloseHandler:
...@@ -85,115 +120,109 @@ class CloseHandler: ...@@ -85,115 +120,109 @@ class CloseHandler:
class Injector(metaclass=ABCMeta): class Injector(metaclass=ABCMeta):
"""Collects dependencies and reads annotations to inject them. factory = FactoryAccessor()
""" factories = DataProxy()
services = DataProxy()
factory = FactoryMethod()
services = DataProxy('_services', OrderedDict)
factories = DataProxy('_factories', OrderedDict)
def __init__(self): def __init__(self):
self.services = self.__class__.services.copy()
self.factories = self.__class__.factories.copy()
self.reactions = defaultdict(WeakKeyDictionary)
self.close = CloseHandler(self) self.close = CloseHandler(self)
def refresh(self, name: str):
service = self.services.pop(name, None)
if service:
logger.info("Refreshed service=%s", name)
return service
@cached_property @cached_property
def executor(self): def executor(self):
return concurrent.futures.ThreadPoolExecutor(max_workers=10) return concurrent.futures.ThreadPoolExecutor(max_workers=10)
async def get(self, note): def get(self, name: str) -> asyncio.Future:
if note in self.services: future: asyncio.Future = asyncio.Future()
return self.services[note] try:
result = self.services[name]
for fact, args in note_loop(note): future.set_result(result)
if fact in self.factories: except KeyError:
func = self.factories[fact] for fact, args in note_loop(name):
if asyncio.iscoroutinefunction(func): if fact in self.factories:
instance = await func(*args) func, singleton = self.factories[fact]
else: if asyncio.iscoroutinefunction(func):
loop = asyncio.get_running_loop() task = asyncio.create_task(func(*args))
instance = await loop.run_in_executor(self.executor, func, *args) else:
logger.info('loaded service %s' % note) loop = asyncio.get_running_loop()
self.services[note] = instance task = cast(
return instance asyncio.Task,
raise ValueError('%r is not defined' % note) loop.run_in_executor(self.executor, func, *args),
)
async def apply(self, *args, **kwargs): break
func, *args = args else:
response = await self.partial(func)(*args, **kwargs) raise ValueError("%r is not defined" % name)
return response logger.info("Loading service=%s", name)
if singleton:
def partial(self, func): task.add_done_callback(
"""Resolves lately dependencies. lambda x: self.services.update({name: x.result()})
)
Returns: task.add_done_callback(lambda x: future.set_result(x.result()))
callable: the service partially resolved return future
"""
def apply(self, *args, **kwargs) -> asyncio.Future:
@wraps(func) with self.auto():
async def wrapper(*args, **kwargs): func, *args = args # type: ignore
if func in ANNOTATIONS: func = unwrap(func)
annotation = ANNOTATIONS[func] anno = ANNOTATIONS.get(func)
given = annotation.given(*args, **kwargs) if anno:
to_load = {} return self.do_apply(func, anno, args, kwargs)
for key, note in annotation.marked.items(): fut: asyncio.Future = asyncio.Future()
if key not in given: fut.set_result(func(*args, **kwargs))
to_load[key] = asyncio.create_task(self.get(note)) return fut
for key, fut in to_load.items():
to_load[key] = await fut def do_apply(self, func, anno, args, kwargs):
kwargs.update(to_load) given = anno.given(*args, **kwargs)
result = func(*args, **kwargs) services = {
if asyncio.iscoroutine(result): key: self.get(service)
result = await result for key, service in anno.markers.items()
return result if key not in given
logger.warning('%r is not annoted', func) }
return func(*args, **kwargs) logger.info("Apply services=%s to func=%r", ",".join(services.keys()), func)
return wrapper
async def run(args, kwargs):
kwargs = dict(kwargs)
class Annotation: for k, v in services.items():
def __init__(self, pos_notes, kw_notes, func): kwargs[k] = await v
self.pos_notes = pos_notes result = func(*args, **kwargs)
self.kw_notes = kw_notes if anno.is_coro:
self.bind_partial = signature(func).bind_partial result = await result
return result
@cached_property
def marked(self): return asyncio.create_task(run(args, kwargs))
return self.bind_partial(*self.pos_notes, **self.kw_notes).arguments
@contextmanager
def given(self, *args, **kwargs): def auto(self):
return list(self.bind_partial(*args, **kwargs).arguments) token = current_injector_var.set(self)
try:
yield self
finally:
current_injector_var.reset(token)
class attr:
def __init__(self, name):
self.service_name = name
def __get__(self, obj, objtype):
ANNOTATIONS: WeakKeyDictionary[str, Annotation] = WeakKeyDictionary() if obj is None:
raise AttributeError("attr applies to instances only")
return current_injector_var.get().get(self.service_name)
def close_reaction(obj):
obj.close()
def annotate(*args, **kwargs):
def decorate(func):
ANNOTATIONS[func] = Annotation(args, kwargs, func)
return func
for arg in chain(args, kwargs.values()):
if not isinstance(arg, str):
raise ValueError('Notes must be strings')
return decorate
def note_loop(note): def note_loop(note):
args = note.split(':') args = note.split(":")
results = [] results = []
fact, *args = args fact, *args = args
results.append((fact, args)) results.append((fact, args))
while args: while args:
suffix, *args = args suffix, *args = args
fact = '%s:%s' % (fact, suffix) fact = "%s:%s" % (fact, suffix)
results.append((fact, args)) results.append((fact, args))
for fact, args in sorted(results, reverse=True): for fact, args in sorted(results, reverse=True):
yield fact, args yield fact, args
...@@ -15,3 +15,7 @@ max-complexity = 10 ...@@ -15,3 +15,7 @@ max-complexity = 10
max-line-length=99 max-line-length=99
verbose=1 verbose=1
[pycodestyle]
max_line_length=99
This diff is collapsed.
import pytest
from knighted import Injector, annotate, current_injector, attr
@pytest.fixture
def services():
class MyInjector(Injector):
pass
return MyInjector()
@pytest.mark.asyncio
async def test_sync_async(services):
@services.factory("foo")
def foo_factory():
return "I am foo"
@annotate("foo")
async def fun(foo):
return {"foo": foo}
assert await services.apply(fun) == {"foo": "I am foo"}
@pytest.mark.asyncio
async def test_sync_sync(services):
@services.factory("foo")
def foo_factory():
return "I am foo"
@annotate("foo")
def fun(foo):
return {"foo": foo}
assert await services.apply(fun) == {"foo": "I am foo"}
@pytest.mark.asyncio
async def test_async_async(services):
@services.factory("foo")
async def foo_factory():
return "I am foo"
@annotate("foo")
async def fun(foo):
return {"foo": foo}
assert await services.apply(fun) == {"foo": "I am foo"}
@pytest.mark.asyncio
async def test_auto_sync_async(services):
@services.factory("foo")
def foo_factory():
return "I am foo"
@annotate("foo")
async def fun(foo):
return {"foo": foo}
result = await services.apply(fun)
assert result == {"foo": "I am foo"}
@pytest.mark.asyncio
async def test_auto_async_async(services):
@services.factory("foo")
async def foo_factory():
return "I am foo"
@services.factory("bar")
async def bar_factory():
return "I am bar"
@annotate("bar")
async def fun1(bar):
return bar
@annotate("foo")
async def fun(foo):
bar = await current_injector().apply(fun1)
return {"foo": foo, "bar": bar}
result = await services.apply(fun)
assert result == {"foo": "I am foo", "bar": "I am bar"}
@pytest.mark.asyncio
async def test_noauto_partial_async_async(services):
@services.factory("foo")
async def foo_factory():
return "I am foo"
@services.factory("bar")
async def bar_factory():
return "I am bar"
@annotate("bar")
async def fun1(bar):
return bar
@annotate("foo")
async def fun(foo):
bar = await current_injector().apply(fun1)
return {"foo": foo, "bar": bar}
with pytest.raises(TypeError):
await fun()
@pytest.mark.asyncio
async def test_descriptor_1_decorated(services):
@services.factory("foo")
async def foo_factory():
return "I am foo"
class Toto:
cache = attr("foo")
toto = Toto
with services.auto(), pytest.raises(Exception):
await toto.cache
@pytest.mark.asyncio
async def test_descriptor_2_decorated(services):
@services.factory("foo")
async def foo_factory():
return "I am foo"
class Toto:
cache = attr("foo")
toto = Toto()
with services.auto():
cache = await toto.cache
assert cache == "I am foo"
@pytest.mark.asyncio
async def test_descriptor_1_not_decorated(services):
@services.factory("foo")
async def foo_factory():
return "I am foo"
class Toto:
cache = attr("foo")
toto = Toto
with pytest.raises(Exception):
toto.cache # Must fails
@pytest.mark.asyncio
async def test_descriptor_2_not_decorated(services):
@services.factory("foo")
async def foo_factory():
return "I am foo"
class Toto:
cache = attr("foo")
toto = Toto()
with pytest.raises(LookupError):
toto.cache
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment