Head of Digital Marketing @Bronco. Lover of Python and Data.

The Best Python Queues And Multi-Threading Tutorial In The Whole World!

Over the last few weeks I have been working on a web app that need to ability to query hundreds, thousands or even millions of pages - Currently the biggest I've tested it on in one go was nearly 250,000 which took about an hour and a half to complete. This was before I researched Python Threading!

During the development of the script which powers this process, it soon became apparent that I would need to be running more threads (aka Python Multithreading) than just a normal single loop, in order for it to scale up it would need to run much quicker than it was.

Take for example, the following single threaded Python script...


# standard libraries
from datetime import datetime

# third party libraries 
from bs4 import BeautifulSoup
import requests


# capture current time
startTime = datetime.now()

# specify sitemap to get all site links
url = "http://www.craigaddyman.com/sitemap.xml"

# request sitemap and make 'soup'
r  = requests.get(url, timeout=5)
data = r.text
soup = BeautifulSoup(data.encode('utf-8'))

def sitemap_parser(soup):

    # parse sitemap for all links
    for url in soup.findAll("loc"):
        
        try:

            # request each link and print it along with it's response code
            r  = requests.get(url.text)
            print r.status_code, r.url
            
        except:
            print 'Error with...', url

sitemap_parser(soup)

# print current time minus the start time
print datetime.now()-startTime

The output for this is as follows...


200 http://www.craigaddyman.com/best-seo-blogs-the-ultimate-recommended-reading/
200 http://www.craigaddyman.com/gaming-social-signals-automated-twitter-network/
200 http://www.craigaddyman.com/building-an-automated-persona-on-twitter-for-quick-and-dirty-outreach/
200 http://www.craigaddyman.com/advanced-meta-descriptions/
200 http://www.craigaddyman.com/the-twitter-vanity-list/
200 http://www.craigaddyman.com/interview-with-james-agate-of-skyrocket-seo/
200 http://www.craigaddyman.com/an-interview-with-rand-fishkin-of-seomoz/
200 http://www.craigaddyman.com/an-interview-with-barry-adams/
200 http://www.craigaddyman.com/conversion-rate-optimisation-sorry-but-youre-doing-it-wrong/
200 http://www.craigaddyman.com/how-to-generate-content-ideas/
200 http://www.craigaddyman.com/interview-with-paul-may-of-buzzstream/
200 http://www.craigaddyman.com/how-to-find-an-xml-sitemap/
200 http://www.craigaddyman.com/how-to-increase-memory-allocation-for-screaming-frog/
200 http://www.craigaddyman.com/keeping-your-desktop-tidy-like-a-boss/
200 http://www.craigaddyman.com/image-manipulation-python/
200 http://www.craigaddyman.com/2013-recap-2014-goals/
200 http://www.craigaddyman.com/checking-http-response-codes-python/
200 http://www.craigaddyman.com/mass-link-duplication-checker-with-python/
200 http://www.craigaddyman.com/scraping-twitter-and-facebook-shares-with-python/
200 http://www.craigaddyman.com/if-machines-can-do-it-they-should/
200 http://www.craigaddyman.com/wordpress-titles-to-lowercase-with-mysql/
200 http://www.craigaddyman.com/working-with-csv-files/
200 http://www.craigaddyman.com/parse-an-xml-sitemap-with-python/
200 http://www.craigaddyman.com/mining-all-tweets-with-python/
200 http://www.craigaddyman.com/python-script-to-monitor-site-up-time/
200 http://www.craigaddyman.com/web-scraping-out-in-the-wild/
200 http://www.craigaddyman.com/python-course/
200 http://www.craigaddyman.com/list-comprehensions/
200 http://www.craigaddyman.com/yay-a-new-python-powered-blog/
200 http://www.craigaddyman.com/rename-an-image-with-python/
0:00:08.335000

You can see the time stamp at the bottom - 8 seconds it took to parse the sitemap and request each page.

Now to improve this we need to use Python multi-threading and a thread queue, the threading process is as it sounds, you have multiple threads all doing the same thing, imagine it as a team of people do these checks rather than just one person. The reason we need a thread queue is to stop duplicate value being checked, adding a python queue will handle this process to work safely with threads and multithreading.

Here is a basic example that I think works best and is the most 'pythonic'...


import Queue
from threading import Thread

# create the instance
q = Queue.LifoQueue()

# add items to the queue
for i in range(10):
    q.put(i)

def grab_data_from_queue():
    while not q.empty(): # check that the queue isn't empty
        print q.get() # print the item from the queue
        q.task_done() # specify that you are done with the item
        
for i in range(2): # aka number of threads
    t1 = Thread(target = grab_data_from_queue) # target is the above function
    t1.start() # start the thread
    
q.join() # this works in tandom with q.task_done
         # essentially q.join() keeps count of the queue size
         # and q.done() lowers the count one the item is used
         # this also stops from anything after q.join() from
         # being actioned.

The output for this is as follows...


98

76

54

32

10

It's actually just printing 0,1,2,3,4,5,6,7,8,9 but two at a time.

So now lets see how this might look in our original program.


# standard libraries
from datetime import datetime
import Queue
from threading import Thread

# third party libraries 
from bs4 import BeautifulSoup
import requests

# capture current time
startTime = datetime.now()

# create the instance
q = Queue.LifoQueue()

# specify sitemap to get all site links
url = "http://www.craigaddyman.com/sitemap.xml"

# request sitemap and make the  'soup'
r  = requests.get(url, timeout=5)
data = r.text
soup = BeautifulSoup(data.encode('utf-8'))


def sitemap_parser(soup):

    # parse sitemap for all links
    for url in soup.findAll("loc"):
        
        q.put(url.text) # add each url to the queue for processing

sitemap_parser(soup)

def grab_data_from_queue():
    
    while not q.empty(): # check that the queue isn't empty
        
        url = q.get() # get the item from the queue

        r  = requests.get(url.strip()) # request the url

        print r.status_code, r.url # print the response code and destination url
        
        q.task_done() # specify that you are done with the item

for i in range(10): # aka number of threadtex
    t1 = Thread(target = grab_data_from_queue) # target is the above function
    t1.start() # start the thread

q.join()

# print current time minus the start time
print datetime.now()-startTime

The output for this at 10 threads was 1.23 seconds!!

So there you have it a nice introduction to Python threading and queues, hopefully my code comments are clear enough but let me know if you have any questions! Don't forget to subscribe, share or call me out on any improvements. :)


Give Your Inbox Some Love


What You'll get?
  • Posts like this delivered straight to your inbox!
  • Nothing else.
Comment Policy

Any code snippets more than a line or 2, please include as a link to a gist

comments powered by Disqus