Tutorial

In this tutorial, the features of fsc.async-tools are briefly described.

PeriodicTask

The PeriodicTask is an asynchronous contextmanager that can be used to regularly call a given function while the event loop is running.

As a simple example, consider the following function, which simply prints something to the terminal:

def snek():
    print('≻:======>···')

If we wish to run this function periodically, we can use the async with syntax:

#!/usr/bin/env python
"""
Example for using the PeriodicTask to regularly call a function.
"""

import asyncio
from fsc.async_tools import PeriodicTask


def snek():
    print('≻:======>···')


async def run():
    print('start')
    async with PeriodicTask(snek):
        await asyncio.sleep(3)
    print('stop')


if __name__ == '__main__':
    LOOP = asyncio.get_event_loop()
    LOOP.run_until_complete(run())

Additionally, you can control the delay between periodic tasks and whether the task is executed again when exiting the context manager, as described in the reference.

wrap_to_coroutine

The decorator wrap_to_coroutine() simplifies creating interfaces which can take either a regular function or a coroutine. It wraps the input into a coroutine, which is compatible with the await syntax.

#!/usr/bin/env python
"""
Example of wrapping a function and a coroutine into a coroutine using 'wrap_to_coroutine'.
"""

import asyncio

from fsc.async_tools import wrap_to_coroutine


@wrap_to_coroutine
def sync_snek():
    return '≻:======>···'


@wrap_to_coroutine
async def async_snek():
    await asyncio.sleep(0.)
    return '≻:======>···'


def main():
    """
    Run the 'wrap_to_coroutine' example.
    """
    loop = asyncio.get_event_loop()
    fut_sync = asyncio.ensure_future(sync_snek())
    fut_async = asyncio.ensure_future(async_snek())
    loop.run_until_complete(asyncio.gather(fut_sync, fut_async))

    print('sync snek ', fut_sync.result())
    print('async snek', fut_async.result())


if __name__ == '__main__':
    main()

BatchSubmitter

The BatchSubmitter is useful when a function can run many inputs in parallel, but has a significant startup overhead. It can be called as a coroutine and will collect inputs, and then submit them as a batch to the wrapped function.

In the following example, the BatchSubmitter wraps a function which multiplies numbers by three. Due to the sleep call, each function evaluation takes at least a second, but it can then multiply many numbers quickly.

#!/usr/bin/env python
"""
Example for using the BatchSubmitter to collect calls to a function.
"""

import time
import asyncio

import numpy as np
from fsc.async_tools import BatchSubmitter


def function(x):
    time.sleep(1.)
    return np.array(x) * 3


def main():
    """
    Run the 'BatchSubmitter' example.
    """
    loop = asyncio.get_event_loop()
    inputs = range(100)
    results_direct = np.array(inputs) * 3
    func = BatchSubmitter(function)
    start = time.time()
    results = loop.run_until_complete(
        asyncio.gather(*[func(i) for i in inputs])
    )
    results = loop.run_until_complete(
        asyncio.gather(*[func(i) for i in inputs])
    )
    end = time.time()
    assert np.all(results == results_direct)

    print('Batch submit ran in {:.2f} seconds.'.format(end - start))


if __name__ == '__main__':
    main()

limit_parallel

The decorator limit_parallel() can be used to limit the number of parallel calls to a given function. This can be useful in cases where a function uses a lot of resources, and so only a limited number can run concurrently.

#!/usr/bin/env python
"""
Example for using the limit_parallel decorator.
"""

import asyncio

import numpy as np
from fsc.async_tools import limit_parallel


async def use_some_memory():
    """
    Dummy function which initializes a large array.
    """
    x = np.ones(shape=(10000000, ))
    await asyncio.sleep(3.)
    del x


def launch_many(coro_func):
    """
    Launch the given coroutine function one hundred times.
    """
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(
        asyncio.
        gather(*[asyncio.ensure_future(coro_func()) for _ in range(100)])
    )


if __name__ == '__main__':
    print('Without limiter...')
    launch_many(use_some_memory)
    print('With limiter...')
    launch_many(limit_parallel(10)(use_some_memory))