![](https://secure.gravatar.com/avatar/3606ab2afec09ce1726d9684ec002dd9.jpg?s=120&d=mm&r=g)
Hi, I have been trying to make unhandled exceptions reliably crash the event loop (eg replicated behaviour of boost::asio, for those familiar with that C++ library). I'm aiming to have any exception bubble up from run_forever or run_until_complete style functions. I had thought I had a perfectly acceptable solution and then hit a strange case that threw my understanding of the way the loop worked. In 'test_b' below, the only difference is we keep a reference to the created task. This test hangs - the exception is raised, but the custom exception handler is never called. I'd be very interested to understand exactly why this happens. I'd also appreciate any feedback on the best way to reliably crash the event loop on unhandled exceptions (my next attempt will be to replace AbstractEventLoop.call_exception and see what happens). # example.py import asyncio class CustomException(RuntimeError): pass class UnhandledExceptionError(RuntimeError): pass def run_until_unhandled_exception(*, loop=None): """Run the event until there is an unhandled error in a callback This function sets the exception handler on the loop """ loop = loop if loop is not None else asyncio.get_event_loop() ex = [] def handler(loop, context): print('handler') loop.default_exception_handler(context) loop.stop() ex.append(context.get('exception')) loop.set_exception_handler(handler) loop.run_forever() if len(ex) > 0: raise UnhandledExceptionError('Unhandled exception in loop') from ex[0] async def fail_after(delay): await asyncio.sleep(delay) print('raise CustomException(...)') raise CustomException(f'fail_after(delay={delay})') async def finish_after(delay): await asyncio.sleep(delay) return delay def test_a(event_loop): event_loop.create_task(fail_after(0.01)) run_until_unhandled_exception(loop=event_loop) def test_b(event_loop): task = event_loop.create_task(fail_after(0.01)) run_until_unhandled_exception(loop=event_loop) def run_test(test): try: test(asyncio.get_event_loop()) except Exception as ex: print(ex) if __name__ == '__main__': run_test(test_a) run_test(test_b) # This hangs