Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Current limitations of map and each #38

Open
cgarciae opened this issue Mar 23, 2018 · 2 comments
Open

Current limitations of map and each #38

cgarciae opened this issue Mar 23, 2018 · 2 comments

Comments

@cgarciae
Copy link

cgarciae commented Mar 23, 2018

Hi! First of I think paco is a very nice library and would like to help improve it. That said I have a particular problem: I need to download millions of images as fast as possible. I looked into these resources:

Using paco my initial code was:

import os
import aiohttp
import aiofiles
import paco

urls = [
    "https://static.pexels.com/photos/67843/splashing-splash-aqua-water-67843.jpeg",
    "https://cdn.pixabay.com/photo/2016/10/27/22/53/heart-1776746_960_720.jpg",
    "http://www.qygjxz.com/data/out/240/4321276-wallpaper-images-download.jpg"
] * 1000000

def download_file(path):
    async def do_download_file(url):
        
        filename = os.path.basename(url)
        filepath = os.path.join(path, filename)

        print(f"Downloading {url}")

        async with aiohttp.request("GET", url) as resp:
            context = await resp.read()

        print(f"Completed {filename}")

        async with aiofiles.open(filepath, "wb") as f:
            await f.write(context)
    
    return do_download_file

coro = paco.each(download_file(path))
loop = asyncio.get_event_loop()
loop.run_until_complete(coro)

I like the API of paco.each but when testing it my computer froze as its memory blew up while trying to create 1 million coroutines. The main problem is in these lines of code:

# paco/each.py

for index, value in enumerate(iterable):
    pool.add(collector(index, value))

I observe the following:

  1. It creates all the objects in memory before starting their tasks
  2. It also assumes that the collection fits in memory
  3. It also assumes that the collection is fast to iterate over
  4. Preserves order (nice to have)

Since my problem speed and memory then 1 to 3 are more relevant. I recreated the map and each using asyncio.Queue and limiting the amount of tasks to exist at the same time. This involved creating and structure I called Stream that just holds a coroutine and a Queue. My API enforces the limit on each to not surpass that amount of objects in memory.

urls = [
    "https://static.pexels.com/photos/67843/splashing-splash-aqua-water-67843.jpeg",
    "https://cdn.pixabay.com/photo/2016/10/27/22/53/heart-1776746_960_720.jpg",
    "http://www.qygjxz.com/data/out/240/4321276-wallpaper-images-download.jpg"
] * 1000000

path = "/data/tmp/images"

stream = from_iterable(urls)
coro = each(download_file(path), stream, limit = 10)

loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_async_exception)
loop.run_until_complete(coro)

Both the new from_iterable and map functions have queue_maxsize parameter that further limits how the data flows and enforces a back-pressure mechanism. The code is at the end. I wanted to share the experiment and also open the possibility of creating a paco.stream module to continue the life of this code.

from collections import namedtuple
import asyncio


DONE = object()

Stream = namedtuple("Stream", "coroutine queue")

def _active_tasks(tasks):
    return [ task for task in tasks if not task.done() ]


def _f_wrapper(f, queue = None):
    async def __f_wrapper(x):

        y = f(x)

        if hasattr(y, "__await__"):
            y = await y

        if queue is not None:
            await queue.put(y)
    
    return __f_wrapper
        
async def _task_limit(tasks, limit):
    
    tasks = _active_tasks(tasks)
    
    while len(tasks) >= limit:
        await asyncio.sleep(0)
        
        tasks = _active_tasks(tasks)

    return tasks
        
def map(f, stream, limit = 0, queue_maxsize = 0):
    coroin = stream.coroutine
    qin = stream.queue

    qout = asyncio.Queue(maxsize = queue_maxsize)
    
    async def _map(f):
        coroin_task = asyncio.ensure_future(coroin)
        
        
        tasks = []
        f = _f_wrapper(f, queue = qout)

        x = await qin.get()

        
        while x is not DONE:
            
            if limit:
                tasks = await _task_limit(tasks, limit)

            fcoro = f(x)
            ftask = asyncio.ensure_future(fcoro)
            tasks.append(ftask)

            x = await qin.get()
            
        
        # await tasks
        tasks = _active_tasks(tasks)
        while len(tasks) > 0:
            await asyncio.sleep(0)
            tasks = _active_tasks(tasks)

        await qout.put(DONE)

        await coroin_task

        
    return Stream(_map(f), qout)

def from_iterable(iterable, queue_maxsize = 0):
    qout = asyncio.Queue(maxsize=queue_maxsize)
    
    async def _from_iterable():
        
        for x in iterable:
            await qout.put(x)
            
        await qout.put(DONE)
        
    return Stream(_from_iterable(), qout)

def each(f, stream, limit = 0):
    coroin = stream.coroutine
    qin = stream.queue
    
    async def _each(f):
        coroin_task = asyncio.ensure_future(coroin)

        tasks = []
        f = _f_wrapper(f)

        x = await qin.get()

        while x is not DONE:

            if limit:
                tasks = await _task_limit(tasks, limit)

            fcoro = f(x)
            ftask = asyncio.ensure_future(fcoro)
            tasks.append(ftask)

            x = await qin.get()

         # await tasks
        tasks = _active_tasks(tasks)
        while len(tasks) > 0:
            await asyncio.sleep(0)
            tasks = _active_tasks(tasks)

        await coroin_task

    return _each(f)

def run(stream):
    return stream.coroutine
@aparamon
Copy link

@cgarciae Thanks for sharing your implementation!

What do you think of the alternative interface for the same task? It's basically a further generalization of paco.gather:

def igather(coros_or_futures, limit=0, loop=None, timeout=None,
        return_exceptions=False):
    """
    Arguments:
        coros_or_futures (iterable|asynchronousiterable): iterator yielding
            coroutines functions.
        limit (int): max concurrency limit. Use ``0`` for no limit.
        loop (asyncio.BaseEventLoop): optional event loop to use.
        timeout (int|float): timeout can be used to control the maximum number
            of seconds to wait before returning. timeout can be an int or
            float. If timeout is not specified or None, there is no limit to
            the wait time.
        return_exceptions (bool): returns exceptions as valid results.

    Returns:
        asynchronousiterable: sequence of values yielded by coroutines,
            as completed

Making the result ordered should be also possible, albeit a bit harder to implement and memory-hungry in the worst case.

@aparamon
Copy link

An implementation sketch inspired by https://bugs.python.org/issue30782#msg336237:

async def igather(tasks, limit=None):

    async def submit(tasks, buf):
        # TODO: additionally support async iterators
        for task in tasks:
            await buf.put(asyncio.create_task(task))
        await buf.put(None)

    async def consume(buf):
        while True:
            task = await buf.get()
            if task:
                yield await asyncio.wait_for(task, None)
            else:
                break

    buf = asyncio.Queue(limit or 0)
    asyncio.create_task(submit(tasks, buf))
    async for result in consume(buf):
        yield result

It preserves task submission order in efficient way,
but lacks proper exception handling.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants