Some of you will remember that I have a few scripts that can be used for incident response in G Suite environments, available at:
For small environments they work great. Need to see if users received a specific malicious email? No problem. Need to move a message with a specific sender and subject from Inboxes to SPAM? You're covered. You can even delete messages that match specific criteria!
Therein is part of the problem. They're intended for fairly small environments, where you can search through mailboxes sequentially and view results on screen. I've recently been dropped into an environment where, instead of searching 100 - 1000 mailboxes, I need to search *100,000* mailboxes. That is a completely different scale than what I'd designed and implemented. How in the world do I search through 100k mailboxes when it takes at least a second to search *one*?
A Little Bit About Threads
My first thought was, "computers have several cores per CPU, why not use multiple threads?".
Those of you who know a little something about python and the Global Interpreter Lock are, right about now, probably saying, "oh you sweet summer child..."
When I started digging, I found out that threads in python don't actually take advantage of multiple cores and aren't what you might call "thread-safe" because of cpython implementations. I'm sure I screwed that up but the important thing here is: using python threads won't use some percentage of all of your cores but they CAN be used to get 100% out of one core.
That's when I learnt about the multiprocessing module. That *does* allow you to create multiple processes (child processes) from a program and use all of the cores in your system. And, as it turns out, it's not terribly difficult to use.
A Sample Program
My requirements, overall were pretty simple:
- get a list of users
- for each user, make a network API call to Google
- depending on the API results, write output to file
Since I can't afford to create a 100,000-person Google environment, I decided to cheat a little. Instead of making an API call, I would introduce a 0.25-second sleep. Then, I would have every iteration write to file. This wouldn't exactly account for waiting for the API call to finish but it would give me a starting point.
I also knew I didn't want to start with testing 100,000 iterations so I wanted to start with 100 iterations - at least until I had a grasp on how everything worked together!
The following was my starting point:
def make_input():
l = []
for i in range(65, 91):
l.append(chr(i))
for i in range(97, 123):
l.append(chr(i))
for i in range(65, 90):
l.append(chr(i) + chr(i+1))
for i in range(97, 122):
l.append(chr(i) + chr(i+1))
return list(l)
def do_stuff(a_list):
import time
for i in a_list:
time.sleep(.25)
print("in do_stuff: %s" % i)
# create a list of all upper and lowercase letters (for length)
letter_list = make_input()
# work_list is letter_list broken into a separate list per process
work_list = []
for i in range(len(letter_list)):
work_list.append(letter_list[i])
do_stuff(work_list)
print("done")
This creates a list of 102 items (all letters upper and lowercase a-z, then upper- and lowercase tuples "ab" - "yz"). This would approximate getting a list of users.
It then iterates through that list to create another list of work tasks. My intent was to approximate having one process with x number of items to process - basically one process with one thread processing a bunch of stuff. The utility of the work_list list will become more evident in just a moment.
Then it passes a reference to the work_list list to the do_stuff function; this function iterates through the list it receives, performing a 0.25-second sleep per iteration. Basic arithmetic tells us that it should take approximately (0.25 seconds * 102 items) 25 seconds to run. Using "time" to call it, I can verify that it finishes in approximately 28 seconds.
Add Multiple Processes
The next step was to pick an arbitrary number of processes to use to process the list of letters/tuples and see how that affects the runtime. Since I'm weird and have an obsession with multiples of the number four, I chose four processes. This is work work_list really comes into use: since I want to have four processes working on approximately the same number of items, work_list is a list of four lists and THOSE lists each contain approximately 1/4 of the letters/tuples to process. In this way I have divided the work into four buckets and can just assign one bucket per process!
The sample code to achieve this looks like:
import multiprocessing
from threading import Thread
def make_input():
l = []
for i in range(65, 91):
l.append(chr(i))
for i in range(97, 123):
l.append(chr(i))
for i in range(65, 90):
l.append(chr(i) + chr(i+1))
for i in range(97, 122):
l.append(chr(i) + chr(i+1))
return list(l)
def do_stuff(*a_list):
import time
for i in a_list:
time.sleep(.25)
print("in do_stuff: %s: " % i)
num_procs = 4
# create a list of all upper and lowercase letters (for length)
letter_list = make_input()
# work_list is letter_list broken into a separate list per process
work_list = [list() for i in range(num_procs)]
for i in range(len(letter_list)):
offset = i % num_procs
work_list[offset].append(letter_list[i])
# for each list of letters, make a new thread where each one is the distribute function
proc_list = []
for i in work_list:
process = multiprocessing.Process(target=do_stuff, args=(i))
proc_list.append(process)
for i in proc_list:
i.start()
for i in proc_list:
i.join()
print("done")
The real magic is done with the proc_list list. It creates a process for each list in work_list; on the first iteration through proc_list we call "start()" to start those processes and then the next iteration says "wait for each process to finish before continuing". Since the only operation being performed is a quarter-second sleep, each process should finish in about the same amount of time. This is the equivalent to having four processes and each process having one thread.
Since I'm breaking the work into four separate processes, I would expect it to finish in about 1/4th the time of the previous code. Sure enough, if I time it with "time" (time python3 my_script.py), I see it completes in just under 8seconds. That's a huge improvement over 28 seconds!!
Moar Parallelisation
Using multiple processes will take advantage of modern systems with lots of cores but now I want to go a step further and have each process create multiple threads - so not only can I have each core do a _little_ work, I can have each core do a *lot* of work!
With some modification, I have renamed do_stuff() to make_the_threads(). It then creates a Queue (first in, first out) of threads, and each thread calls a function called do_per_thread_work(). The alternative is to run threads as batches but then each batch would be slowed to the slowest worker in that batch. By using a queue, as soon as one thread finishes another thread starts and python makes sure there are never more than <x> number of threads running per process. That removes a LOT of overhead and keeps things moving smoothly.
The code to do this looks like:
import time
import threading
import multiprocessing
from queue import Queue
def make_input():
l = []
for j in range(1):
print(j)
for i in range(65, 91):
l.append(chr(i))
for i in range(97, 123):
l.append(chr(i))
for i in range(65, 90):
l.append(chr(i) + chr(i+1))
for i in range(97, 122):
l.append(chr(i) + chr(i+1))
return list(l)
def do_per_thread_work(q):
out_list = []
while True:
a_thing = q.get()
time.sleep(0.25)
q.task_done()
def make_the_threads(*a_list):
global thread_count
q = Queue(maxsize = thread_count )
q.qsize()
for i in range(thread_count):
t = threading.Thread(name="do_per_thread_work-"+str(i), target=do_per_thread_work, args=(q,))
t.start()
for i in a_list:
q.put(i)
q.join()
num_procs = 4
thread_count = 5
thread_list = [list() for i in range(num_procs)]
# create a list of all upper and lowercase letters (for length)
letter_list = make_input()
# thread_list is letter_list broken into a separate list per process
for i in range(len(letter_list)):
offset = i % num_procs
thread_list[offset].append(letter_list[i])
jobs = []
for i in range(num_procs):
process = multiprocessing.Process(target=make_the_threads, args=(thread_list[i]))
jobs.append(process)
for i in jobs:
i.start()
for i in jobs:
i.join()
print("done")
If I use some arbitrary numbers, such as four processes with five threads per process, I would expect the program to finish VERY quickly - instead of running for 28 seconds, I would expect it to finish in about 2 seconds -- 102 items, spread across 4 processes, 5 threads per process, then 0.25 seconds per thread. Using "time", I can verify that and the above code runs in just under 2 seconds.
Notice I've also added a loop in the input creation so I can begin to test arbitrarily long lists of letters/tuples. For example, if I wanted to test a list of 1000 (-ish) items, I can just increase the loop from 1 to 10. At 1000 (1020) items, I'd still expect the script to finish in about 20 seconds and I can verify that with time. Indeed, the above code runs on my system in about 15 seconds.
Some Closing Thoughts
This is just scratching the surface of parallelisation but it does a great job of showing how *some* workloads can benefit from multiple processes and/or threads. For my workload, massive parallelisation is a Good Thing because each thread is just making an HTTPS request and waiting for the result, then writing those results to file. My bottleneck is network IO and each thread will spend most of its life in a WAIT state, so creating hundreds of them may be beneficial.
This does NOT imply that ALL workloads will benefit from parallelisation! If each thread were performing heavy processing tasks, where the performance of the core itself were the bottleneck, then multiple processes but only one thread per process may be beneficial. If the bottleneck were, for example, disk IO, then the workload may not benefit from multiple threads OR multiple processes because a single process/thread may be enough to completely thrash the disk.
While it can have really cool results, it may be overkill to do a lot of multi-process/thread work unless you are sure your program will benefit from it. As a friend once told me, "don't optimise before the thing works"...but I can say that my search time for 100k mailboxes has dropped from about nine hours to about forty-five minutes and I'm pretty sure I can cut that in half again by adding two more cores to the system I'm using for search.
While it can have really cool results, it may be overkill to do a lot of multi-process/thread work unless you are sure your program will benefit from it. As a friend once told me, "don't optimise before the thing works"...but I can say that my search time for 100k mailboxes has dropped from about nine hours to about forty-five minutes and I'm pretty sure I can cut that in half again by adding two more cores to the system I'm using for search.