Archive
Python 3 is slower than Python 2?
Recently I was playing with concurrent.futures. Following a comment on reddit, I got to the presentation of David Beazley entitled Understanding the Python GIL.
It’s a very interesting talk and from this I learned that Python 3.2 got a new GIL implementation! Out of curiosity I compared the performance of Python 2.7 and 3.3. The test machine had 4 cores. I made a CPU bound test script with three variations: (1) basic, single-threaded version, (2) using 4 threads, and (3) using 4 processes.
The results were surprising for me because Python 2.7 turned out to be faster!
(Legends: Py2 = Python 2.7.4, Py3 = Python 3.3.1)
basic.py:
Py2: 5.32 sec, Py3: 9.66 sec
with_threads:
Py2: 13.41 sec, Py3: 17.32 sec
with_processes:
Py2: 1.28 sec, Py3: 2.27 sec
You can also try the scripts, they are here.
bpython config file for light background
Problem
The default color scheme of bpython is designed for dark background. If you launch bpython with light background, it’s almost unreadable. Is there a color scheme for light background?
Solution
Create the dir ~/.bpython and add the following files:
~/.bpython/config:
# This is a standard python config file # Valid values can be True, False, integer numbers, strings # By default bpython will look for ~/.bpython/config or you can specify a file # with the -c option on the command line # General section tag [general] # Display the autocomplete list as you type (default: True). # When this is off, you can hit tab to see the suggestions. auto_display_list = True # Syntax highlighting as you type (default: True). syntax = True # Display the arg spec (list of arguments) for callables, # when possible (default: True). arg_spec = True # History file (default: ~/.pythonhist): hist_file = ~/.pythonhist # Number of lines to store in history (set to 0 to disable) (default: 100): hist_len = 100 # Soft tab size (default: 4, see pep-8): tab_length = 4 # Color schemes should be put in ~/.bpython/ # e.g. to use the theme ~/.bpython/foo.theme set color_scheme = foo # Leave blank or set to "default" to use the default theme #color_scheme = default color_scheme = jabba [keyboard] pastebin = F8 save = C-s
~/.bpython/jabba.theme (derived from light.theme):
# Each letter represents a colour marker: # k, r, g, y, b, m, c, w, d # which stands for: # blacK, Red, Green, Yellow, Blue, Magenta, Cyan, White, Default # Capital letters represent bold # Copy to ~/.bpython/foo.theme and set "color_scheme = foo" in # ~/.bpython/config [syntax] keyword = B name = r comment = b string = g error = r number = B operator = b paren = b punctuation = b token = g [interface] # XXX: gnome-terminal appears to be braindead. The cursor will disappear unless # you set the background colour to "d". background = d output = b main = b prompt = b prompt_more = g
Update (20160214)
As it was pointed out by Leonardo in a comment (thanks!), “you need to put the configuration files inside ~/.config/bpython instead of ~/.bpython.” Yeah, applications in newer Linux versions prefer the ~/.config folder.
Download files with threads easily
Problem
You have a file with a list of URLs that you want to download. You already know the wget trick:
wget -i down.txt
However, if you want to fetch a lot of files, it can be slow.
Solution
Well, let’s launch wget instances parallelly and fetch those files quickly. With concurrent.futures, it’s just a few lines:
#!/usr/bin/env python
import os
import concurrent.futures
from threading import Lock
lock = Lock()
INPUT = "down.txt"
THREADS = 10
def download(url):
cmd = "wget -q {url}".format(url=url)
with lock:
print cmd
os.system(cmd)
def main():
with concurrent.futures.ThreadPoolExecutor(max_workers=THREADS) as ex:
with open(INPUT) as f:
for line in f:
line = line.rstrip("\n")
ex.submit(download, line)
##########
if __name__ == "__main__":
main()
Thanks to defnull at reddit who directed me towards conncurrent.futures.
Download PyCon US 2012 videos in a multithreaded way
Note: I wrote this script some time ago when I didn’t know about concurrent.futures. This script does the job nicely but it’s a bit verbose. With concurrent.futures it would be just a few lines.
With the following script you can download all the videos of PyCon US 2012:
#!/usr/bin/env python
"""
Download PyCon US 2012 videos in a multithreaded way.
Requirement: youtube-dl script.
written by Jabba Laci, 2013 (jabba.laci@gmail.com)
https://pythonadventures.wordpress.com/
PyCon US 2012 videos: http://pyvideo.org/category/17
youtube-dl: http://rg3.github.io/youtube-dl/
Usage:
======
1) download youtube-dl and put it somewhere in the PATH
2) create a subdirectory called "download"
3) launch this script
Total size of the videos is about 27.5 GB.
The download process can take several hours.
You can interrupt the downloading with "killall python".
If you re-launch the script, the downloading will resume.
Tested under Linux with Python 2.7.
"""
import os
from Queue import Queue
from threading import Thread, Lock
TO_DIR = "download"
THREADS = 10
lock = Lock()
q = Queue()
threads = []
DATA = ["AeQxx4zXd5Q", "O8WXXtDUUOE", "ktLyuWoRHH8", "tKTW8Jd0BlQ", "A3Qe5wUbXzM",
"ZwBiQEHS4T8", "Rmg4-Ae1P1o", "9XlPKEessD8", "MIAKOMzRl1I", "q_i3CHNITQ4",
"3CSxYKbxfPU", "4bWC_VXffq4", "v7HH_CNIdXc", "ziz2lh-14i8", "dhUo_lpD7v0",
"WMUXMqYhQ-M", "qLXllxd4Z1c", "3FcAcE3Zq2Q", "U1Y5Uxn2Rcw", "x-JDra36m38",
"Me9SZohibPQ", "KUOoStyV7Zs", "Qh4Gkkgi1Mw", "Hx6VxszpvsY", "CFt6QrzavH0",
"AMMBYLB3qd0", "fVpvd7OX6PQ", "OceCWIqZt7I", "VuFW0PkNS74", "5jRLjGWWaHs",
"_CPNLY_Gf7s", "67l4czkKsz8", "FCiA6e44aOI", "uUEwEMMCZhE", "cY7pE7vX6MU",
"vP6j7VDpPrI", "QrITN6GZDu4", "euh9ZQi339o", "EBRMq2Ioxsc", "3BYN3ouwkRA",
"tCUdeLIj4hE", "Wk8zAr0R9zQ", "NUQMr5R3dlk", "twQKAoq2OPE", "dJJDndQrsSw",
"Q0Q9K93bK-4", "5YQrFiWa50M", "VMIj6eB9baY", "KOfB5WQb39g", "M5IPlMe83yI",
"2gha47uSk5c", "lJL2asANiyM", "YHXX3KuB23Q", "LddeJ06JoXE", "gpKMwPoldak",
"BoMQqW0lxVE", "NkUTLRZBWLM", "fekA2mRGTTE", "b7R3-_ViNxk", "nhr-YErfW8k",
"WZoeqnsY9AY", "Wh9a0obtQUQ", "ahM4GBZ-6qg", "399c-ycBvo4", "kdZuUIj4lMo",
"E09qigk_hnY", "nvkCqFLtcJI", "NIcijUt-HlE", "l_HBRhcgeuQ", "dX3DRdFKW_E",
"y_cXzaymXm0", "RBOScqRGHZA", "QPgqfnKG_T4", "fWONoZvTi80", "sgHbC6udIqc",
"1CjX385y3e4", "hnhN2_TpY8g", "GxyfYEe8MiQ", "wslWYg0CTkY", "54XwSUC8klI",
"6wZoBbE-rOo", "Zv26xHYlc8s", "N4zdWLuSbV0", "H841U6RhrDU", "bwwf_HbEJQM",
"qmgh14LUOjQ", "qTwvObrRGdY", "Ycvg0PCQ-sM", "ickNQcNXiS4", "C9K8DOe1zWw",
"47NSfuuuMfs", "3UHE-zD1r_M", "bTXert2uRco", "Bt2HStzaBzE", "z1RQMm37Xmw",
"LnVkLXRIbIg", "P5ad6NpjR3M", "hyzPYaAmVOc", "tYW52SLy_w0", "JOXwclgvXB0",
"188mXjwdkak", "9G6-GksU7Ko", "TmuEDxX1FDQ", "jXlR0Icvvh8", "vfYul2E56fo",
"cSbD5SKwak0", "bGWytn-Ff9E", "hvPYuqzTPIk", "RAxiiRPHS9k", "Mv3xgBQJPaE",
"jOu0D9ttCFI", "4-TwdBuTR1A", "yflKOoAohEk", "ANhTacigaf8", "vfPtGsSJldg",
"YdnBK5yO4zU", "26wgEsg9Mcc", "R9ITLdmfdLI", "KUpIFhNW89A", "OBbvj0WWT-g",
"9q8LTZSvpr8", "qbYYamU42Sw", "-Mx1JVTFOBY", "AZDWveIdqjY", "__s45TTXxps",
"QGfxLXoMpPk", "3dMq_3UUPxg", "9LVqBQcFmyw", "Adr_QuDZxuM", "YyEReiAYGlU",
"G-lGCC4KKok", "1VZfL9JVgFg", "n6145JSeqWc", "XGF3Qu4dUqk", "Xu5EhKVZdV8",
"o9pEzgHorH0", "miGolgp9xq8", "Xk6gQ6s2QjU", "tYk4_Nzl-Gg", "sdkAXM36C7M",
"L-fXOoxrt0M", "Iw9-GckD-gQ", "xHqlzuPq_qQ", "duc3jYgAaR0", "Zd5dfooZWG4",
"g0CankXpFZg", "ULdDuwf48kM", "P7SVi0YTIuE", "Pi9NpxAvYSs", "qgGqaBAEy3Q",
"bobeo5kFz1g", "w26x-z-BdWQ", "t_ziKY1ayCo", "Bs6-sai1fKE", "oZw8m_lyhvo",
"hp5ymCrD9yw", "2G5YTlheCbw", "SULKL7TMRsU", "Thd8yoBou7k", "52wxGESwQSA",
"NBSosX8xiRk"]
def read_urls():
global q
#
for yid in DATA:
q.put("https://www.youtube.com/watch?v={yid}".format(yid=yid))
class DownLoadThread(Thread):
def __init__(self, thread_id):
super(DownLoadThread, self).__init__()
self.thread_id = thread_id
def run(self):
global q
#
while not q.empty():
url = q.get()
cmd = "youtube-dl {url} -t -c 1>/dev/null".format(url=url)
with lock:
print "{tid}: START {cmd}".format(tid=self.thread_id, cmd=cmd)
print "# queue size:", q.qsize()
os.system(cmd)
with lock:
print "{tid}: STOP {cmd}".format(tid=self.thread_id, cmd=cmd)
def main():
global threads
#
read_urls()
#
os.chdir(TO_DIR)
#
for i in xrange(THREADS):
t = DownLoadThread(i)
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
print "# END"
##########
if __name__ == "__main__":
main()
Links
concurrent.futures
“The concurrent.futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor.” (source)
This feature appeared in Python 3.2 but the good news is that it was backported to Python 2 too. You’ll need to install the futures module:
sudo pip install futures
(0) Basic example without concurrency
Let’s take the following basic example:
#!/usr/bin/env python
from Queue import Queue
import random
import time
q = Queue()
fred = [1,2,3,4,5,6,7,8,9,10]
def f(x):
if random.randint(0,1):
time.sleep(0.1)
#
res = x * x
q.put(res)
def main():
for num in fred:
f(num)
#
while not q.empty():
print q.get()
if __name__ == "__main__":
main()
We have a list of numbers and we want to calculate their squares. The results are stored in a queue. In general, function “f()” performs a job that can take longer time too (that’s why I added some random waiting). These jobs are executed one after the other but the jobs are independent from each other: calculating the square of 5 doesn’t rely on the square of 4 for instance, i.e. these jobs could be processed parallely.
(1) Using ThreadPoolExecutor
Let’s execute the jobs mentioned above parallely with threads:
#!/usr/bin/env python
from Queue import Queue
import concurrent.futures
import random
import time
q = Queue()
fred = [1,2,3,4,5,6,7,8,9,10]
def f(x):
if random.randint(0,1):
time.sleep(0.1)
#
res = x * x
q.put(res)
def main():
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
for num in fred:
executor.submit(f, num)
#
while not q.empty():
print q.get()
####################
if __name__ == "__main__":
main()
Here we have a pool (a “list”) in which the jobs are added (see the for loop). We have 4 threads working on this pool: each thread takes a job out from the pool, executes it, and when the job is done, it takes another job-to-be-processed from the pool. When all the jobs are processed in the pool by the 4 workers, the execution goes on after the “with” block. The “with” statement guarantees that the execution is waiting until all worker threads finish. When we reach the “while” loop, all jobs are processed and all the worker threads finished.
The results are stored in a Queue because it is thread-safe. “The Queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implements all the required locking semantics.”
(2) Using ProcessPoolExecutor
“The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.”
So, instead of threads we will use processes here.
#!/usr/bin/env python
import sys
import redis
import concurrent.futures
r = redis.Redis()
fred = [1,2,3,4,5,6,7,8,9,10]
def check_server():
try:
r.info()
except redis.exceptions.ConnectionError:
print >>sys.stderr, "Error: cannot connect to redis server. Is the server running?"
sys.exit(1)
def f(x):
res = x * x
r.rpush("test", res)
def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
for num in fred:
executor.submit(f, num)
#
print r.lrange("test", 0, -1)
####################
if __name__ == "__main__":
check_server()
###
r.delete("test")
main()
It’s basically the same, simply ThreadPoolExecutor was replaced with ProcessPoolExecutor.
Again, we want to store the results in a “list”. However, Queue is not a good choice here because we are using processes here, and Queue is made for threads. I decided to store the results in a redis list. For more information about redis, read this post of mine: redis: getting started. In redis all operations are atomic, thus different processes can safely write the results in it.
When using processes, you might get this error:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
send(obj)
IOError: [Errno 32] Broken pipe
It seems to be a bug, I didn’t have this issue with Python3. A possible workaround is to submit elements slower, i.e. add “time.sleep(0.01)” after the line “executor.submit(…)”. More info here.
Which solution to use?
Once you have a basic solution without concurrency, it’s quite easy to parallelize the code with concurrent.futures, you just need to add some extra lines. Threads are good for I/O tasks, while processes are good for CPU-bound tasks.
However, you should make some tests because the results can be surprising. Out of curiosity, I tried the three methods above with a simple prime test. The source codes are available here. I go from 1 to 1000, and I test every number if it’s a prime or not. The prime test is very simple, and the whole exercise is CPU-bound.
Results:
$ time ./basic.py real 0m0.026s $ time ./with_threads.py real 0m0.138s $ time ./with_processes.py real 0m0.255s
That is, the naive approach was the fastest. Then threads, and finally processes.
I also tried to test numbers up to 100000. Basic: 0.3 sec, threads: 11 sec, processes: 17 sec (on a CPU with 4 cores).
I posed a question on reddit concerning multithreading and GIL. I got interesting answers, you can read them here.
pickle
“The pickle module implements a fundamental, but powerful algorithm for serializing and de-serializing a Python object structure. “Pickling” is the process whereby a Python object hierarchy is converted into a byte stream, and “unpickling” is the inverse operation, whereby a byte stream is converted back into an object hierarchy.” It’s a.k.a. serialization. (source)
For writing this entry, I also used this post on PyMOTW.
By default, the pickled byte stream contains ASCII characters only. But it’s fine, it makes debugging easier.
The cpickle module is a C implementation, which is a lot faster than the pure Python pickle module.
The pickle format is specific to Python, so you can use it only between two Python programs.
Warning! The pickle module is not intended to be secure against erroneous or maliciously constructed data. Never unpickle data received from an untrusted or unauthenticated source.
#!/usr/bin/env python
try:
import cPickle as pickle
except:
import pickle
def main():
data1 = [ { 'a':'one', 'b':2, 'c':3.0 } ]
print 'DATA: ',
print(data1)
data1_string = pickle.dumps(data1) # here: pickling
print 'PICKLE:', data1_string
data2 = pickle.loads(data1_string) # here: unpickling
print 'UNPICKLED:',
print(data2)
print 'SAME?:', (data1 is data2)
print 'EQUAL?:', (data1 == data2)
####################
if __name__ == "__main__":
main()
Output:
DATA: [{'a': 'one', 'c': 3.0, 'b': 2}]
PICKLE: (lp1
(dp2
S'a'
S'one'
p3
sS'c'
F3
sS'b'
I2
sa.
UNPICKLED: [{'a': 'one', 'c': 3.0, 'b': 2}]
SAME?: False
EQUAL?: True
“When working with your own classes, you must ensure that the class being pickled appears in the namespace of the process reading the pickle. Only the data for the instance is pickled, not the class definition. The class name is used to find the constructor to create the new object when unpickling.” (source)
That is, when you want to unpickle instances of a class, don’t forget to import the definition of this class!
Top 400 Python Projects on Github
[ found here ]
APScheduler examples
Update (20190626): this post is deprecated! I wrote a new post that describes the new version of APScheduler (version 3). See the updated post here.
“Advanced Python Scheduler (APScheduler) is a light but powerful in-process task scheduler that lets you schedule functions (or any other python callables) to be executed at times of your choosing.” (source)
The simplest way to schedule jobs using the built-in triggers is to use one of the shortcut methods provided by the scheduler:
Let’s see an example to each.
(1) simple date-based scheduling
The official doc. is here. “This is the simplest possible method of scheduling a job. It schedules a job to be executed once at the specified time. This is the in-process equivalent to the UNIX “at” command.”
#!/usr/bin/env python
import sys
from time import sleep
from apscheduler.scheduler import Scheduler
sched = Scheduler()
sched.start() # start the scheduler
# define the function that is to be executed
# it will be executed in a thread by the scheduler
def my_job(text):
print text
def main():
# job = sched.add_date_job(my_job, datetime(2013, 8, 5, 23, 47, 5), ['text'])
job = sched.add_date_job(my_job, '2013-08-05 23:47:05', ['text'])
while True:
sleep(1)
sys.stdout.write('.'); sys.stdout.flush()
##############################################################
if __name__ == "__main__":
main()
Meaning: at the specified date and time, call the function my_job with the parameter “text“. The line with “sched.add_date_job” registers the task and the execution of the script goes on with the next line! If it were the last line, the script would terminate. Thus we need an infinite loop too. At the specified time, the registered function will be triggered and executed in a thread, but the infinite loop goes on parallelly.
(2) interval-based scheduling
The official doc. is here. “This method schedules jobs to be run on selected intervals. The execution of the job starts after the given delay, or on start_date if specified. After that, the job will be executed again after the specified delay.”
The frame of the source code is the same as in the first example. Here I will only show the difference.
# from now on, execute my_job every minute job = sched.add_interval_job(my_job, minutes=1, args=['text']) # or: # start at start_date (my_job is called) and then execute my_job every minute job = sched.add_interval_job(my_job, minutes=1, start_date='2013-08-06 00:09:12', args=['text'])
In the first case: if you launch the script at 09:10:12 (hh:mm:ss), my_job will be called at 09:11:12 for the first time, then at 09:12:12, 09:13:12, etc.
In the second case: you specify when to call my_job for the first time (on August 6, 2013 at 00:09:12), then it will be executed again at 00:10:12, 00:11:12, etc.
(3) cron-style scheduling
The official doc. is here. “This is the most powerful scheduling method available in APScheduler. You can specify a variety of different expressions on each field, and when determining the next execution time, it finds the earliest possible time that satisfies the conditions in every field. This behavior resembles the “Cron” utility found in most UNIX-like operating systems.”
The frame of the source code is the same as in the first example. Here I will only show the difference.
job = sched.add_cron_job(my_job, minute="*/15", args=['text'])
The syntax is similar to cron’s syntax. Here is a visual crontab utility called corntab.
The example above means: execute my_job in each hour at every 15 minutes. So, if you launch the script at Xh8 (8 minutes after X hour), it will be executed for the first time at Xh15, then at Xh30, Xh45, (X+1)h0, (X+1)h15, etc.
Common
If you want to unregister a task, do this:
sched.unschedule_job(job)
This is why we stored the returned values in a variable called “job“.
You can also print the scheduled jobs in a human-readable format. It also prints when the job is executed next time, so it’s great for debugging:
job = sched.add_... sched.print_jobs()
Sample output:
Jobstore default:
my_job (trigger: date[2013-08-06 23:47:05], next run at: 2013-08-06 23:47:05)
Logging with Python (Part 2)
Logging with Python (Part 1)
Problem
You have used print statements in your programs to print debug information, but you would like to start using the logging module too. You want to log to the stdout, you want to log to a file, or you want to log to BOTH places (stdout and file).
Solution
The following entry is based on this post.
Our customized logging module (mylogging.py):
import logging
import sys
DEBUG_LOG_FILENAME = "jabba.log"
# set up formatting
formatter = logging.Formatter("%(levelname)-5s %(asctime)s %(module)s.%(funcName)s() [%(lineno)d]: %(message)s", "%Y-%m-%d %H:%M:%S")
# set up logging to STDOUT for all levels DEBUG and higher
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.DEBUG)
sh.setFormatter(formatter)
# set up logging to a file for all levels DEBUG and higher
fh = logging.FileHandler(DEBUG_LOG_FILENAME)
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
# create Logger object
mylogger = logging.getLogger('MyLogger')
mylogger.setLevel(logging.DEBUG)
mylogger.addHandler(sh) # enabled: stdout
mylogger.addHandler(fh) # enabled: file
# create shortcut functions
debug = mylogger.debug
info = mylogger.info
warning = mylogger.warning
error = mylogger.error
critical = mylogger.critical
To enable/disable logging to stdout / file, just comment/uncomment these two lines:
mylogger.addHandler(sh) # enabled: stdout mylogger.addHandler(fh) # enabled: file
And now a test file to demonstrate its usage:
#!/usr/bin/env python
from mylogging import debug, info, warning
def main():
for i in xrange(100):
if i % 5 == 0:
info("i is {i}".format(i=i))
if i % 50 == 0:
debug("i is {i}".format(i=i))
if i % 99 == 0:
warning("i is {i}".format(i=i))
#############################################################################
if __name__ == "__main__":
main()
Credits
Thanks to SaltyCrane for his excellent blog post on the topic.


You must be logged in to post a comment.