0

I'm trying to execute the code on Python in parallel using asyncio. The idea is to run multiple jobs in parallel.

Here is my code:

import asyncio
import threading

async def print_thread():
    for n in range(5):
        print("Number: {}".format(threading.get_ident()))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(print_thread())
    finally:
        loop.close()

The output is:

Number: 4599266752
Number: 4599266752
Number: 4599266752
Number: 4599266752
Number: 4599266752

As far as I understand the code has been executed on a single thread. Is there a way to parallelize it?

PS

If I change the code to:

async def print_thread():
    print("Number: {}".format(threading.get_ident()))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        for n in range(5):
            loop.run_until_complete(print_thread())

I get the same result.

2
  • Any reasons for not using a thread pool (multiprocessing.dummy.pool.Pool)? Commented Mar 24, 2021 at 10:36
  • Is there a difference between asyncio and multiprocessing? I'm a newbie in Python. Commented Mar 24, 2021 at 12:23

2 Answers 2

1

Your for loop is inside your coroutine, so it cannot be in different threads. But even if you put the loop outside of your async function it will still run in the same thread:

import asyncio
import threading


async def print_thread():
    print("Thread: {}".format(threading.get_ident()))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    tasks = []
    for i in range(10):
        tasks.append(asyncio.ensure_future(print_thread()))
    loop.run_until_complete(asyncio.gather(*tasks))
    

Which will still output the same id:

Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864

The solution is to use a ThreadPoolExecutor, but it needs a function, not a coroutine, so you have to remove async from the definition:

import asyncio
import threading
import concurrent.futures


def print_thread():
    print("Thread: {}".format(threading.get_ident()))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        for i in range(10):
            loop.run_in_executor(pool, print_thread)

Which outputs:

Thread: 140446369556224
Thread: 140446361163520
Thread: 140446369556224
Thread: 140446361163520
Thread: 140446369556224
Thread: 140446352508672
Thread: 140446361163520
Thread: 140446344115968
Thread: 140446369556224
Thread: 140446335723264

As you can see, there are less threads than calls, it's normal. But if you have large batchs, you can change the thread count with max_workers parameter in ThreadPoolExecutor constructor.

If you still want to use a coroutine, there is a solution there: https://stackoverflow.com/a/46075571/7414475

Sign up to request clarification or add additional context in comments.

2 Comments

How can I wait to all futures in your example? Do I need to collect all futures and wait for them?
Yes, I'll add another answer about that!
0

Another answer with result collection, as requested in comments:

import asyncio
import threading
import concurrent.futures


def get_thread():
    return "Thread: {}".format(threading.get_ident())


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        tasks = []
        for i in range(10):
            tasks.append(loop.run_in_executor(pool, get_thread))
        print(loop.run_until_complete(asyncio.gather(*tasks)))

Output:

['Thread: 139740266125056', 'Thread: 139740266125056', 'Thread: 139740266125056', 'Thread: 139740183525120', 'Thread: 139740266125056', 'Thread: 139740175132416', 'Thread: 139740183525120', 'Thread: 139740166739712', 'Thread: 139740266125056', 'Thread: 139740158347008']

1 Comment

If I need to add a lock to get_thread, is it possible to do that in ThreadPoolExecutor?

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.