Skip to content

Some strange phenomenom while using download_fileobj function with asyncio coroutine. #362

@rocknroll1397

Description

@rocknroll1397

I made an script with asyncio, aioboto3 that does 200 coroutine task at the same time.
The coroutine task is consist of (Download file from S3 - Send it to API - Save the result)
I defined a single task that processes one image and added these tasks to a Tasks list. Then, I executed them simultaneously using asyncio.gather().
Tasks = | Task | Task | Task | Task | Task | Task | Task |

However, I encountered a strange bug.
Each task should independently go through the process of downloading, calling an API, and saving the result without waiting for other tasks. But for some reason, all tasks finish the download step at the same time.

My expectation
----------------------Time--------------------->
Task1: |Download--------|API-----------|Save--|
Task2: |Download--|API-----------|Save-----|
Task3: |Download-----|API----------|Save-|
Task4: |Download-------------|API-------------|Save-|

How does it works
----------------------Time--------------------->
Task1: |Download-------------|API------------|Save--|
Task2: |Download-------------|API-----|Save-----|
Task3: |Download-------------|API---------|Save-|
Task4: |Download-------------|API---------------|Save-|

Code

async def download_file(file_name: str)
    session = aioboto3.Session(
        aws_access_key_id=self.aws_config['aws_access_key_id'],
        aws_secret_access_key=self.aws_config['aws_secret_access_key'],
        region_name=self.aws_config['region_name']
    )
    try:
        async with session.resource('s3') as s3:
            bucket = await s3.Bucket(self.source_bucket)
            buffer = io.BytesIO()
            await bucket.download_fileobj(file_name, buffer)
            buffer.seek(0)
            file_data = buffer.getvalue()
        return file_data

async def task(self, file_name: str, task_id: int, snapshot_id: int, task_amount: int, pbar):
    file_data = await self.download_file(file_name)
    ocr_result = await self.ocr_api.process_image(file_data)
    await self.process_result(ocr_result, task_id, snapshot_id, file_name, task_amount)
    pbar.update(1)

async def process():
    tasks = []
    with tqdm(total=len(file_rows), desc="Processing images") as pbar:
        for task_id, snapshot_id, file_name, task_amount in file_rows:
            tasks.append(self.task(file_name, task_id, snapshot_id, task_amount, pbar))
        
        await asyncio.gather(*tasks)

So with this code all of the tasks download should be done at the same time. Any of the task cannot go ahead to API step before all of the coroutine's download done. And it makes code slower.

But when I put a semaphore code just a one line for controlling network traffic. The program works as I expected. And faster.

async def download_file(file_name: str):
    global download_semaphore
    async with download_semaphore:
        async with session.resource('s3') as s3:
            bucket = await s3.Bucket(self.source_bucket)
            buffer = io.BytesIO()
            await bucket.download_fileobj(file_name, buffer)
            buffer.seek(0)
            file_data = buffer.getvalue()
        return file_data

Also if I use aiohttp instead of aioboto3 without semaphore. It works as I expected. So I think there is some issues in download_fileobj function. Is there anyone can solve this problem? I want to use this code without semaphore.

Thank you.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions