0

I am new to Python and I am working on an existing python project that does execution of some custom scripts that are inputted by the user. The scripts are run as python subprocess

 await asyncio.create_subprocess_exec(...)

Majority of the methods in my project are defined with async keyword with await on the caller. The problem is that at any point in time, there is only one thread running so essentially I am only able to run single shell script and only when that is complete a new execution starts. I am planning to move the execution of the logic (sequence of such async methods) into a ThreadPoolExecutor or ProcessPoolExecutor

I tried converting all the async def methods to simple methods. But inside a few of these methods, there are some calls to other async methods from third party modules. And it fails with SyntaxError: 'await' outside async function . Looks like any async calls should be made from within async def methods. So I cannot really convert all my methods to def

Any suggestions on what to do to make this project multi-threaded ?

3
  • If the shell scripts run in their own processes they run concurrently even if started by a single thread. Commented Aug 1, 2022 at 11:23
  • But before the control actually reaches that point, there is some pre-processing done in the other async def methods. So, in a way, the module runs one user submission at a time. I want to make this pre-processing logic run in different threads and not on MainThread. This will unblock other user submissions as well. Commented Aug 1, 2022 at 11:30
  • 1
    Then @Ksshish gave the right hint: Create a "ThreadPoolExecutor" for the whole program and execute "asyncio.run" in it with an appropriate async function as parameter. Commented Aug 1, 2022 at 11:45

1 Answer 1

1

Since I am a new contributor I can not add comment yet so let me reply in answer. @Mohan I think your code is already having parallel processing.

coroutine asyncio.create_subprocess_exec(program, *args, stdin=None, stdout=None, stderr=None, limit=None, **kwds)¶ Create a subprocess.

The limit argument sets the buffer limit for StreamReader wrappers for Process.stdout and Process.stderr (if subprocess.PIPE is passed to stdout and stderr arguments).

Return a Process instance.

As you can see it returns a Process object - meaning OS process has been created. I believe that means OS gonna run the subprocess not python and since OS has access to all cores it will be using all available cores.

edited

https://docs.python.org/3/library/asyncio-subprocess.html

Directly from the documentation:

import asyncio

async def run(cmd):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

    stdout, stderr = await proc.communicate()

    print(f'[{cmd!r} exited with {proc.returncode}]')
    if stdout:
        print(f'[stdout]\n{stdout.decode()}')
    if stderr:
        print(f'[stderr]\n{stderr.decode()}')


async def main():
    await asyncio.gather(
        run('ls /zzz'),
        run('sleep 1; echo "hello"'))

asyncio.run(main())

From Documentation:

Because all asyncio subprocess functions are asynchronous and asyncio provides many tools to work with such functions, it is easy to execute and monitor multiple subprocesses in parallel.

For running tasks concurrently all you have to do is use asyncio.gather() function.

# Without loop
a = asyncio.create_subprocess_exec(...) 
b = asyncio.create_subprocess_exec(...)
await asyncio.gather(a,b)

# Using loop
tasks = []
for a,b,c in some_func():
    tasks.append(asyncio.create_subprocess_exec(a,b,c,...))

await asyncio.gather(*tasks)

Unless you want to leverage multiple cores this should just work without even converting async def to def. If you wish to use all cores then check out the below links and play with ProcessPoolExecutor.

edited

I am not sure if it is even relevant but I found this method in documentation:

https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

https://docs.python.org/3/library/asyncio-task.html#running-in-threads

Somewhat related question: How to properly use asyncio run_coroutine_threadsafe function?

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for the info. I have a use case for executing multiple tasks concurrently leveraging the CPU cores. I am actually taking your suggestion with the pool executors.
Thanks for your detailed comment @Kashish. While I haven't used the gather(...) method as my use case wouldn't require it, I was able to make the entire flow sequential and submit to a pool. This is working and gave me a good starting point to build on top of it.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.