[Python-ideas] Allow creation of polymorph function (async function executable syncronously)
Wes Turner
wes.turner at gmail.com
Wed Mar 6 22:15:09 EST 2019
Here's syncer/syncer.py:
https://github.com/miyakogi/syncer/blob/master/syncer.py
I think the singledispatch is pretty cool.
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
from functools import singledispatch, wraps
import asyncio
import inspect
import types
from typing import Any, Callable, Generator
PY35 = sys.version_info >= (3, 5)
def _is_awaitable(co: Generator[Any, None, Any]) -> bool:
if PY35:
return inspect.isawaitable(co)
else:
return (isinstance(co, types.GeneratorType) or
isinstance(co, asyncio.Future))
@singledispatch
def sync(co: Any):
raise TypeError('Called with unsupported argument: {}'.format(co))
@sync.register(asyncio.Future)
@sync.register(types.GeneratorType)
def sync_co(co: Generator[Any, None, Any]) -> Any:
if not _is_awaitable(co):
raise TypeError('Called with unsupported argument: {}'.format(co))
return asyncio.get_event_loop().run_until_complete(co)
@sync.register(types.FunctionType)
@sync.register(types.MethodType)
def sync_fu(f: Callable[..., Any]) -> Callable[..., Any]:
if not asyncio.iscoroutinefunction(f):
raise TypeError('Called with unsupported argument: {}'.format(f))
@wraps(f)
def run(*args, **kwargs):
return asyncio.get_event_loop().run_until_complete(f(*args,
**kwargs))
return run
if PY35:
sync.register(types.CoroutineType)(sync_co)
```
On Wed, Mar 6, 2019 at 9:20 PM Nathaniel Smith <njs at pobox.com> wrote:
> On Wed, Mar 6, 2019 at 4:37 PM pylang <pylang3 at gmail.com> wrote:
> >> def maybe_async(fn):
> >> @functools.wraps(fn)
> >> def wrapper(*args, **kwargs):
> >> coro = fn(*args, **kwargs)
> >> if asyncio.get_running_loop() is not None:
> >> return coro
> >> else:
> >> return await coro
> >
> > I was unable to run his example as-is (in Python 3.6 at least) since the
> `await` keyword is only permitted inside an `async def` function.
>
> Oh yeah, that was a brain fart. I meant to write:
>
> def maybe_async(fn):
> @functools.wraps(fn)
> def wrapper(*args, **kwargs):
> coro = fn(*args, **kwargs)
> if asyncio.get_running_loop() is not None:
> return coro
> else:
> return asyncio.run(coro)
>
> -n
>
> --
> Nathaniel J. Smith -- https://vorpus.org
> _______________________________________________
> Python-ideas mailing list
> Python-ideas at python.org
> https://mail.python.org/mailman/listinfo/python-ideas
> Code of Conduct: http://python.org/psf/codeofconduct/
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-ideas/attachments/20190306/7fd71dfd/attachment.html>
More information about the Python-ideas
mailing list