Source code for a_sync.asyncio.utils
import asyncio
[docs]
def get_event_loop() -> asyncio.AbstractEventLoop:
try:
loop = asyncio.get_event_loop()
except RuntimeError as e: # Necessary for use with multi-threaded applications.
if not str(e).startswith("There is no current event loop in thread"):
raise
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop