Skip to content Skip to sidebar Skip to footer

Python Multi-threaded Processing With Limited Cpu/ports

I have a dictionary of folder names that I would like to process in parallel. Under each folder, there is an array of file names that I would like to process in series: folder_file

Solution 1:

Using a thread pool:

#!/usr/bin/env python2from multiprocessing.dummy import Pool, Queue # thread pool

folder_file_dict = {
    folder_name: {
        file_names_key: file_names_array
    }
}

defprocess_files_in_series(file_names_array, udp_port):
    for file_name in file_names_array:
         time_consuming_method(file_name, udp_port)
         # create "file_name"
         ...

defmp_process(filenames):
    udp_port = free_udp_ports.get() # block until a free udp port is available
    args = filenames, udp_port
    try:
        return args, process_files_in_series(*args), Noneexcept Exception as e:
        return args, None, str(e)
    finally:
        free_udp_ports.put_nowait(udp_port)

free_udp_ports = Queue() # in general, use initializer to pass it to childrenfor port in udp_ports:
    free_udp_ports.put_nowait(port)
pool = Pool(number_of_concurrent_jobs) #for args, result, error in pool.imap_unordered(mp_process, get_files_arrays()):
    if error isnotNone:
       print args, error

I don't think you need to bind number of threads to number of udp ports if the processing time may differ for different filenames arrays.

If I understand the structure of folder_file_dict correctly then to generate the filenames arrays:

def get_files_arrays(folder_file_dict=folder_file_dict):
    for folder_name_dict in folder_file_dict.itervalues():
        for filenames_array in folder_name_dict.itervalues():
            yield filenames_array

Solution 2:

Use the multiprocessing.pool.ThreadPool. It handles queue / thread management for you and can be easily changed to do multiprocessing instead.

EDIT: Added example

Here's an example... multiple threads may end up using the same udp port. I'm not sure if that's a problem for you.

import multithreading
import multithreading.pool
import itertools

defprocess_files_in_series(file_names_array, udp_port):
    for file_name in file_names_array:
         time_consuming_method(file_name, udp_port)
         # create "file_name"

udp_ports = [123, 456, 789]

folder_file_dict = {
         folder_name : {
                         file_names_key : [file_names_array]
                       }
        }

defmain(folder_file_dict, udp_ports):
    # number of threads - here I'm limiting to the smaller of udp_ports,# file lists to process and a cap I arbitrarily set to 4
    num_threads = min(len(folder_file_dict), len(udp_ports), 4)
    # the pool
    pool = multithreading.pool.ThreadPool(num_threads)
    # build files to be processed into list. You may want to do other# Things like join folder_name...
    file_arrays = [value['file_names_key'] for value in folder_file_dict.values()]
    # do the work
    pool.map(process_files_in_series, zip(file_arrays, itertools.cycle(udp_ports))
    pool.close()
    pool.join()

Solution 3:

This is kind of a blue print to how you could use multiprocessing.Process with JoinableQueue 's to deliver Jobs to Workers. You will still be bound by I/O but with Process you do have true concurrency, which may prove to be useful, since threading may even be slower than a normal script processing the files.

(Be aware that this will also prevent you from doing anything else with your Laptop if you dare to start too many processes at once :P).

I tried to explain the code as much as possible with comments.

import traceback

from multiprocessing import Process, JoinableQueue, cpu_count

# Number if CPU's on your PC
cpus = cpu_count()

# The Worker Function. Could also be modelled as a classdefWorker(q_jobs):
    whileTrue:
        # Try / Catch / finally may be necessary for error-prone tasks since the processes # may hang forever if the task_done() method is not called.try:
            # Get an item from the Queue
            item = q_jobs.get()

            # At this point the data should somehow be processedexcept:
            traceback.print_exc()
        else:
            passfinally:
            # Inform the Queue that the Task has been done# Without this. The processes can not be killed# and will be left as Zombies afterwards
            q_jobs.task_done()


# A Joinable Queue to end the process
q_jobs = JoinableQueue()

# Create process depending on the number of CPU'sfor i inrange(cpus):

    # target function and arguments# a list of multiple arguments should not end with ',' e.g.# (q_jobs, 'bla')
    p = Process(target=Worker,
                args=(q_jobs,)
                )
    p.daemon = True
    p.start()

# fill Queue with Jobs
q_jobs.put(['Do'])
q_jobs.put(['Something'])

# End Process
q_jobs.join()

Cheers

EDIT

I wrote this with Python 3 in mind. Taking the parenthesis from the print function

print item

should make this work for 2.7.

Post a Comment for "Python Multi-threaded Processing With Limited Cpu/ports"