Parallel Processing With Python 3

Parallel Processing With Python 3.pngOne of my favorite things about Python is its amazing sub process module. As great as it may be though, I still don't want to have to write the same code twice. So, when working with sub processes in any language, I generally prefer to create a class that can handle everything for me. Here I'll be showing you how to work with sub processes, wrap the logic in a class, and set it up so all you have to do is pass in a list of commands.

You can see my completed project here: https://github.com/Brainspire/ProcessManager

Set Up

Make sure that you have Python 3 installed. Create a directory to use as your project directory and name it whatever you'd like, I’ll be using “proc_manager”. Directly under your project directory create two files: “test_proc.py” and “test.py”. Then create another directory in your project folder called “classes”. Within classes create a new file called “__init__.py”, which we will leave blank, and create another file called “proc.py”, which will be our class file. The purpose behind creating “__init__.py” is to notify Python and we will be importing files from this directory.

Your project file structure should look like this:

        /proc_manager
            test_proc.py
            test.py
            /classes
                __init__.py
                proc.py
        

Getting Started

First, we will need a test script to mimic a sub process. Most importantly though, consider the fact that when you are parallel processing, very rarely will the processes return in the same order in which they were spawned. Therefore we will have the script accept an integer argument to define sleep time in order to have the processes complete at random times.

The Code

paste the following into “test.py” and save it.

        import argparse,time

        parser = argparse.ArgumentParser(description='test process')
        parser.add_argument('sleep', type=int, help='integer defining length of time to sleep', default=10)
        args = parser.parse_args()

        time.sleep(args.sleep)

        print("time slept:" + str(args.sleep))
        

First thing I am doing is importing two libraries, first is argparse for parsing arguments coming in from the command line, and the other is time so that we can call time.sleep while passing the sleep integer. Through the next few lines we are instantiating the argparse class while passing in a description, creating the argparse configuration with the argument "sleep" and calling parse_args which returns our argument dictionary. Finally we call time.sleep, pass it the argument "sleep" and output "time slept:" concatenated with the sleep argument. Another thing that you might want to do in the future if you intend to use this as a mock process for testing is add some random failure logic in order to test error handling. Once you have that saved you should be able to test it with the command "python3 test.py 5", which will cause the script to sleep for 5 seconds and output "time slept: 5".

Next we are going to create our class. Within the classes directory open the file "proc.py". The first thing that we will be doing is adding our imports at the top, so paste the following at the top of the file.

        from subprocess import Popen, PIPE
        from tempfile import TemporaryFile
        

The first line simply imports the subprocess module, and all we want from it are Popen for spawning processes and PIPE which is used to output to the buffer. The second line imports the tempfile module, which we will be using for standard out because the output buffer sometimes just isn't big enough and can cause deadlock. More on that when we get to it.

Next, paste the class definition and some initial needed properties.

        class Proc:
            dicProcessList = {}#initial process list
            dicActiveProcesses = {}#current active processes
            dicCompletedProcesses = {}#completed processes
            dicProcessOutPuts = {}#standard out from processes
            intLimit = 4#limit number of processes to run at any given time
        

Here we are defining 4 containers, three of which we will be using to juggle processes around to determine what is pending, active and completed; the forth will contain our output. We are also defining an integer limit which will be used to limit the number of concurrent processes running at any given time.

Paste in the constructor.

            def __init__(self, intLimit: int = 4):
                self.intLimit = intLimit
        

This defines the parameter intLimit, sets the accepted type hint as an integer and a default of 4 concurrent processes at any given time.

Past in a method for determing if we are at max concurrent processes.

            def limitMaxed(self):
                return len(self.dicActiveProcesses) >= self.intLimit
        

This does one simple thing, but the logic will be needed in a couple different places.

Next paste in a method to loop through our commands, determine if we are below max concurrent, and if so calls another method to spawn the process.

            def spawnProcesses(self):
                if(not self.limitMaxed()):
                    for strKey in list(self.dicProcessList):
                        lstCmd = self.dicProcessList[strKey]

                        self.dicActiveProcesses[strKey] = self.runProcess(lstCmd, strKey)
                        self.dicProcessList.pop(strKey)

                        if(self.limitMaxed()):
                            break
        

The first thing that we do here is check the max concurrent status to see if we are already at max, if not then continue. Here we are looping through the keys in the dicProcessList dictionary, which will be used to maintain association to running and completed processes. We then extract the command from the dicProcessesList into a local variable. In the following line we pass the command and its associative key to another method which returns the pointer object of the subprocess, which we will assign to dicActiveProcesses using the same associative key. We will then pop that process off of dicProcessList to remove it from the pending command list, and finally, we check the max concurrent status to determine if that process puts us at the limit. If we are at the limit then exit the loop, otherwise continue on to the next iteration.

Paste in a method to spawn our processes.

            def runProcess(self, lstCmd, strKey):
                self.dicProcessOutPuts[strKey] = TemporaryFile()
                return Popen(lstCmd, stdout=self.dicProcessOutPuts[strKey], stderr=PIPE)
        

Here we are defining a method called runProcess, which accepts the command to be run and an integer key to maintain the association throughout the process. The first line in the method defines a temporary file and assigns its pointer to our dicProcessOutPuts dictionary while using the key as the index. We then invoke Popen passing it our command, standard out to our temporary file, and standard error to PIPE for the output buffer. There are a couple things to talk about here. First, the command, you'll notice, is a list rather than a string. This is because we will be calling Popen with the default option of shell=False, which is the recommended way of interfacing with the shell through Python, and helps to prevent unintended data injection. I definitely recommend doing some research in this area and always use shell=False. My advise is, if you ever find yourself in a position where you want to use shell=True in order to make something work, just don’t. The way the command is structured, the first element of the array is the program to be invoked, and every element after 0 is a parameter to be passed into the program. The other thing that I need to mention here is the reasoning behind using a temporary file for standard out. Most everywhere on the net you find developers simply using PIPE for stdout without much explanation, and that's fine if you know that standard out will be small enough to be handled by the output buffer. The problem with that concept here is that we are developing a dynamic process that should be able to handle every scenario, including those with enough standard out to flood the output buffer and deadlock the process.

Paste in a method to check the status of our processes, migrate completed processes to the completed process dictionary, and assign standard out to the output dictionary.

            def pollProcesses(self):
                for strKey in list(self.dicActiveProcesses):
                    proc = self.dicActiveProcesses[strKey]

                    if proc.poll() is not None:
                        (strNone, strStdErr) = proc.communicate()

                        self.dicProcessOutPuts[strKey].seek(0)
                        strStdOut = self.dicProcessOutPuts[strKey].read()
                        self.dicProcessOutPuts[strKey].close()

                        self.dicCompletedProcesses[strKey] = {"stdout":strStdOut, "stderr":strStdErr, "retcode":proc.returncode}
                        self.dicActiveProcesses.pop(strKey)
        

Here we are looping through the keys of each process, pulling the subprocess object from the active processes dictionary, and then running proc.poll to determine if the process is complete or not. If the process returns anything but None we call proc.communicate to get standard error. I'm using strNone in place of where you’d normally see a variable for standard out to signify that we will be getting nothing from standard out since we are using a tempfile. We then call seek on our tempfile and pass it 0 to move the pointer back to the beginning of the file, extract the contents into the strStdOut variable, and close the temp file resource. We then assign standard out, standard error and the return code to the completed processes dictionary while popping the process off of the active processes dictionary in order to make room for the next process.

Paste in a method to retrieve information associated with the completed processes.

            def getProcessData(self, strKey: str = False):
                if(not strKey):
                    return self.dicCompletedProcesses
                elif strKey in self.dicCompletedProcesses:
                    return self.dicCompletedProcesses[strKey]
                else:
                    return False
        

This method takes one parameter, which is an integer index with a default value of False. We have maintained key associated to the original command list passed into the run method, which I will be covering after this segment. If strKey is False or nothing was passed into the method, we'll just return the entire list of data, otherwise we'll check the completed dictionary for the requested key, and if present, return that element, otherwise we’ll return False.

Finally, paste in the run method.

            def run(self, dicProcessList: dict):
                self.dicProcessList = dicProcessList

                while True:
                    self.spawnProcesses()

                    self.pollProcesses()

                    if(len(self.dicProcessList) == 0 and len(self.dicActiveProcesses) == 0):
                        break
        

In this method we accept the dictionary dicProcessList, which is the initial command list. I've set this up as a dictionary rather than just a list so that developers can use more meaningful string indexes rather than numeric if they prefer. At this point we just call spawnProcesses and pollProcesses in a loop until the active processes dictionary and the original commands dictionary are empty, and once that is the case we break.

Now open the file called "test.py" and copy and paste the following code into it.

        import os,random

        from classes.proc import Proc

        strCurrentDir = os.path.dirname(os.path.abspath(__file__))

        dicProcesses = {}
        for i in range(5):
            dicProcesses["process" + str(i)] = ["python3", os.path.join(strCurrentDir, "test.py"), str(random.randint(2, 10))]

        objPrcessManager = Proc()
        objPrcessManager.run(dicProcesses)
        print(objPrcessManager.getProcessData("process2"))
        print(objPrcessManager.getProcessData())
        

This is just a piece of code that will test our class. We will be importing os, random, and then our class from the classes directory on line 3. The variable strCurrentDirectory will contain the directory to this file, which can then be used in our subcommands to reference our test.py script created in the beginning. I define the dicProcesses dictionary, loop through a random number which I've set at a max of 5 loops, and then create the command dictionary that will be passed into the proc class. I then instantiate the Proc class, call run while passing our dictionary into it, and then call getProcessData on our object once the run method is completed.

 

Free Guide - Guide to custom software solutions

Stay Up-to-Date with the Latest in Custom Software With Brainspire's Monthly Newsletter