...
 
Commits (4)
......@@ -79,7 +79,6 @@ When applied with some arguments, placeholders just fills the gaps::
'bar': 'I am bar'}
Factories also can be either sync or awaitable::
@services.factory('bar:sync')
......@@ -91,6 +90,45 @@ Factories also can be either sync or awaitable::
return 'I am bar'
Services are by default singleton, but they can also be instantiated at every call::
@services.factory('bar', singleton=True)
def bar_factory():
return time()
result1 = await services.get('bar')
sleep(.1)
result2 = await services.get('bar')
assert result1 == result2
# cache can be resetted
services.refresh("bar")
result3 = await services.get('bar')
assert result3 != result2
Singleton mode can be disabled per service::
@services.factory('baz', singleton=False)
def baz_factory():
return time()
result1 = await services.get('baz')
sleep(.1)
result2 = await services.get('baz')
assert result1 != result2
Current services are automatically exposed inside functions::
def func():
return current_injector()
assert func() is None
assert (await services.apply(func)) is services
Implementation
--------------
......
from .bases import Injector, annotate
from .bases import Injector, annotate, attr, current_injector
from ._version import get_versions
__all__ = ['Injector', 'annotate']
__version__ = get_versions()['version']
del get_versions
from ._version import get_versions
__version__ = get_versions()['version']
__version__ = get_versions()["version"]
del get_versions
__all__ = ["__version__", "Injector", "annotate", "attr", "current_injector"]
......@@ -4,51 +4,86 @@ import asyncio
import concurrent.futures
import logging
from abc import ABCMeta
from collections import OrderedDict, defaultdict
from functools import wraps
from inspect import signature
from collections import ChainMap
from contextlib import contextmanager
from contextvars import ContextVar
from inspect import signature, unwrap
from itertools import chain
from types import MappingProxyType
from typing import Callable, Optional, cast
from weakref import WeakKeyDictionary
from cached_property import cached_property
logger = logging.getLogger("knighted")
MaybeInjector = Optional["Injector"]
ANNOTATIONS: WeakKeyDictionary[Callable, "Annotation"] = WeakKeyDictionary()
class Factory:
current_injector_var: ContextVar[MaybeInjector] = ContextVar("current_injector")
def __init__(self, target):
self.target = target
def __call__(self, note, func=None):
def decorate(func):
self.target.factories[note] = func
return func
if func:
return decorate(func)
return decorate
def current_injector() -> MaybeInjector:
global current_injector_var
return current_injector_var.get(None)
class FactoryMethod:
"""Decorator for func
"""
def annotate(*pos_notes, **kw_notes):
def wrapper(func):
func = unwrap(func)
ANNOTATIONS[func] = Annotation(func, pos_notes, kw_notes)
return func
def __get__(self, obj, objtype):
target = obj or objtype
return Factory(target)
for arg in chain(pos_notes, kw_notes.values()):
if not isinstance(arg, str):
raise ValueError("Notes must be strings")
return wrapper
class Annotation:
def __init__(self, func, pos_notes, kw_notes):
self.bind_partial = signature(func).bind_partial
self.is_coro = asyncio.iscoroutinefunction(func)
self.markers = self.bind_partial(*pos_notes, **kw_notes).arguments
def given(self, *args, **kwargs):
return list(self.bind_partial(*args, **kwargs).arguments)
class DataProxy:
def __init__(self):
self.data = WeakKeyDictionary()
def __init__(self, name, type):
def __set_name__(self, owner, name):
self.name = name
self.type = type
def __get__(self, obj, objtype):
target = obj or objtype
if not hasattr(target, self.name):
setattr(target, self.name, self.type())
return getattr(target, self.name)
def __get__(self, instance, owner):
if instance is None:
response = self.data.setdefault(owner, {})
else:
response = self.data.setdefault(
instance, ChainMap({}, MappingProxyType(getattr(owner, self.name)))
)
return response
class FactoryAccessor:
def __get__(self, instance, owner):
def wrap_name(name, func=None, *, singleton=True):
def wrap_func(func):
(instance or owner).factories[name] = func, singleton
return func
if func:
return wrap_func(func)
return wrap_func
return wrap_name
def close_reaction(obj):
obj.close()
class CloseHandler:
......@@ -85,115 +120,109 @@ class CloseHandler:
class Injector(metaclass=ABCMeta):
"""Collects dependencies and reads annotations to inject them.
"""
factory = FactoryMethod()
services = DataProxy('_services', OrderedDict)
factories = DataProxy('_factories', OrderedDict)
factory = FactoryAccessor()
factories = DataProxy()
services = DataProxy()
def __init__(self):
self.services = self.__class__.services.copy()
self.factories = self.__class__.factories.copy()
self.reactions = defaultdict(WeakKeyDictionary)
self.close = CloseHandler(self)
def refresh(self, name: str):
service = self.services.pop(name, None)
if service:
logger.info("Refreshed service=%s", name)
return service
@cached_property
def executor(self):
return concurrent.futures.ThreadPoolExecutor(max_workers=10)
async def get(self, note):
if note in self.services:
return self.services[note]
for fact, args in note_loop(note):
if fact in self.factories:
func = self.factories[fact]
if asyncio.iscoroutinefunction(func):
instance = await func(*args)
else:
loop = asyncio.get_running_loop()
instance = await loop.run_in_executor(self.executor, func, *args)
logger.info('loaded service %s' % note)
self.services[note] = instance
return instance
raise ValueError('%r is not defined' % note)
async def apply(self, *args, **kwargs):
func, *args = args
response = await self.partial(func)(*args, **kwargs)
return response
def partial(self, func):
"""Resolves lately dependencies.
Returns:
callable: the service partially resolved
"""
@wraps(func)
async def wrapper(*args, **kwargs):
if func in ANNOTATIONS:
annotation = ANNOTATIONS[func]
given = annotation.given(*args, **kwargs)
to_load = {}
for key, note in annotation.marked.items():
if key not in given:
to_load[key] = asyncio.create_task(self.get(note))
for key, fut in to_load.items():
to_load[key] = await fut
kwargs.update(to_load)
result = func(*args, **kwargs)
if asyncio.iscoroutine(result):
result = await result
return result
logger.warning('%r is not annoted', func)
return func(*args, **kwargs)
return wrapper
class Annotation:
def __init__(self, pos_notes, kw_notes, func):
self.pos_notes = pos_notes
self.kw_notes = kw_notes
self.bind_partial = signature(func).bind_partial
@cached_property
def marked(self):
return self.bind_partial(*self.pos_notes, **self.kw_notes).arguments
def given(self, *args, **kwargs):
return list(self.bind_partial(*args, **kwargs).arguments)
def get(self, name: str) -> asyncio.Future:
future: asyncio.Future = asyncio.Future()
try:
result = self.services[name]
future.set_result(result)
except KeyError:
for fact, args in note_loop(name):
if fact in self.factories:
func, singleton = self.factories[fact]
if asyncio.iscoroutinefunction(func):
task = asyncio.create_task(func(*args))
else:
loop = asyncio.get_running_loop()
task = cast(
asyncio.Task,
loop.run_in_executor(self.executor, func, *args),
)
break
else:
raise ValueError("%r is not defined" % name)
logger.info("Loading service=%s", name)
if singleton:
task.add_done_callback(
lambda x: self.services.update({name: x.result()})
)
task.add_done_callback(lambda x: future.set_result(x.result()))
return future
def apply(self, *args, **kwargs) -> asyncio.Future:
with self.auto():
func, *args = args # type: ignore
func = unwrap(func)
anno = ANNOTATIONS.get(func)
if anno:
return self.do_apply(func, anno, args, kwargs)
fut: asyncio.Future = asyncio.Future()
fut.set_result(func(*args, **kwargs))
return fut
def do_apply(self, func, anno, args, kwargs):
given = anno.given(*args, **kwargs)
services = {
key: self.get(service)
for key, service in anno.markers.items()
if key not in given
}
logger.info("Apply services=%s to func=%r", ",".join(services.keys()), func)
async def run(args, kwargs):
kwargs = dict(kwargs)
for k, v in services.items():
kwargs[k] = await v
result = func(*args, **kwargs)
if anno.is_coro:
result = await result
return result
return asyncio.create_task(run(args, kwargs))
@contextmanager
def auto(self):
token = current_injector_var.set(self)
try:
yield self
finally:
current_injector_var.reset(token)
class attr:
def __init__(self, name):
self.service_name = name
ANNOTATIONS: WeakKeyDictionary[str, Annotation] = WeakKeyDictionary()
def close_reaction(obj):
obj.close()
def annotate(*args, **kwargs):
def decorate(func):
ANNOTATIONS[func] = Annotation(args, kwargs, func)
return func
for arg in chain(args, kwargs.values()):
if not isinstance(arg, str):
raise ValueError('Notes must be strings')
return decorate
def __get__(self, obj, objtype):
if obj is None:
raise AttributeError("attr applies to instances only")
return current_injector_var.get().get(self.service_name)
def note_loop(note):
args = note.split(':')
args = note.split(":")
results = []
fact, *args = args
results.append((fact, args))
while args:
suffix, *args = args
fact = '%s:%s' % (fact, suffix)
fact = "%s:%s" % (fact, suffix)
results.append((fact, args))
for fact, args in sorted(results, reverse=True):
yield fact, args
......@@ -15,3 +15,7 @@ max-complexity = 10
max-line-length=99
verbose=1
[pycodestyle]
max_line_length=99
This diff is collapsed.
import pytest
from knighted import Injector, annotate, current_injector, attr
@pytest.fixture
def services():
class MyInjector(Injector):
pass
return MyInjector()
@pytest.mark.asyncio
async def test_sync_async(services):
@services.factory("foo")
def foo_factory():
return "I am foo"
@annotate("foo")
async def fun(foo):
return {"foo": foo}
assert await services.apply(fun) == {"foo": "I am foo"}
@pytest.mark.asyncio
async def test_sync_sync(services):
@services.factory("foo")
def foo_factory():
return "I am foo"
@annotate("foo")
def fun(foo):
return {"foo": foo}
assert await services.apply(fun) == {"foo": "I am foo"}
@pytest.mark.asyncio
async def test_async_async(services):
@services.factory("foo")
async def foo_factory():
return "I am foo"
@annotate("foo")
async def fun(foo):
return {"foo": foo}
assert await services.apply(fun) == {"foo": "I am foo"}
@pytest.mark.asyncio
async def test_auto_sync_async(services):
@services.factory("foo")
def foo_factory():
return "I am foo"
@annotate("foo")
async def fun(foo):
return {"foo": foo}
result = await services.apply(fun)
assert result == {"foo": "I am foo"}
@pytest.mark.asyncio
async def test_auto_async_async(services):
@services.factory("foo")
async def foo_factory():
return "I am foo"
@services.factory("bar")
async def bar_factory():
return "I am bar"
@annotate("bar")
async def fun1(bar):
return bar
@annotate("foo")
async def fun(foo):
bar = await current_injector().apply(fun1)
return {"foo": foo, "bar": bar}
result = await services.apply(fun)
assert result == {"foo": "I am foo", "bar": "I am bar"}
@pytest.mark.asyncio
async def test_noauto_partial_async_async(services):
@services.factory("foo")
async def foo_factory():
return "I am foo"
@services.factory("bar")
async def bar_factory():
return "I am bar"
@annotate("bar")
async def fun1(bar):
return bar
@annotate("foo")
async def fun(foo):
bar = await current_injector().apply(fun1)
return {"foo": foo, "bar": bar}
with pytest.raises(TypeError):
await fun()
@pytest.mark.asyncio
async def test_descriptor_1_decorated(services):
@services.factory("foo")
async def foo_factory():
return "I am foo"
class Toto:
cache = attr("foo")
toto = Toto
with services.auto(), pytest.raises(Exception):
await toto.cache
@pytest.mark.asyncio
async def test_descriptor_2_decorated(services):
@services.factory("foo")
async def foo_factory():
return "I am foo"
class Toto:
cache = attr("foo")
toto = Toto()
with services.auto():
cache = await toto.cache
assert cache == "I am foo"
@pytest.mark.asyncio
async def test_descriptor_1_not_decorated(services):
@services.factory("foo")
async def foo_factory():
return "I am foo"
class Toto:
cache = attr("foo")
toto = Toto
with pytest.raises(Exception):
toto.cache # Must fails
@pytest.mark.asyncio
async def test_descriptor_2_not_decorated(services):
@services.factory("foo")
async def foo_factory():
return "I am foo"
class Toto:
cache = attr("foo")
toto = Toto()
with pytest.raises(LookupError):
toto.cache