-
Notifications
You must be signed in to change notification settings - Fork 88
Description
I made an script with asyncio, aioboto3 that does 200 coroutine task at the same time.
The coroutine task is consist of (Download file from S3 - Send it to API - Save the result)
I defined a single task that processes one image and added these tasks to a Tasks list. Then, I executed them simultaneously using asyncio.gather().
Tasks = | Task | Task | Task | Task | Task | Task | Task |
However, I encountered a strange bug.
Each task should independently go through the process of downloading, calling an API, and saving the result without waiting for other tasks. But for some reason, all tasks finish the download step at the same time.
My expectation
----------------------Time--------------------->
Task1: |Download--------|API-----------|Save--|
Task2: |Download--|API-----------|Save-----|
Task3: |Download-----|API----------|Save-|
Task4: |Download-------------|API-------------|Save-|
How does it works
----------------------Time--------------------->
Task1: |Download-------------|API------------|Save--|
Task2: |Download-------------|API-----|Save-----|
Task3: |Download-------------|API---------|Save-|
Task4: |Download-------------|API---------------|Save-|
Code
async def download_file(file_name: str)
session = aioboto3.Session(
aws_access_key_id=self.aws_config['aws_access_key_id'],
aws_secret_access_key=self.aws_config['aws_secret_access_key'],
region_name=self.aws_config['region_name']
)
try:
async with session.resource('s3') as s3:
bucket = await s3.Bucket(self.source_bucket)
buffer = io.BytesIO()
await bucket.download_fileobj(file_name, buffer)
buffer.seek(0)
file_data = buffer.getvalue()
return file_data
async def task(self, file_name: str, task_id: int, snapshot_id: int, task_amount: int, pbar):
file_data = await self.download_file(file_name)
ocr_result = await self.ocr_api.process_image(file_data)
await self.process_result(ocr_result, task_id, snapshot_id, file_name, task_amount)
pbar.update(1)
async def process():
tasks = []
with tqdm(total=len(file_rows), desc="Processing images") as pbar:
for task_id, snapshot_id, file_name, task_amount in file_rows:
tasks.append(self.task(file_name, task_id, snapshot_id, task_amount, pbar))
await asyncio.gather(*tasks)
So with this code all of the tasks download should be done at the same time. Any of the task cannot go ahead to API step before all of the coroutine's download done. And it makes code slower.
But when I put a semaphore code just a one line for controlling network traffic. The program works as I expected. And faster.
async def download_file(file_name: str):
global download_semaphore
async with download_semaphore:
async with session.resource('s3') as s3:
bucket = await s3.Bucket(self.source_bucket)
buffer = io.BytesIO()
await bucket.download_fileobj(file_name, buffer)
buffer.seek(0)
file_data = buffer.getvalue()
return file_data
Also if I use aiohttp instead of aioboto3 without semaphore. It works as I expected. So I think there is some issues in download_fileobj function. Is there anyone can solve this problem? I want to use this code without semaphore.
Thank you.