多线程笔记

Table of Contents

1 守护线程

在C/C++中,子线程随着主线程退出而退出,但是Python默会在主线程退出时前等待所有子线程退出。如果一个线程的daemon为True,主线程退出后,这个子线程也要跟着退出。

#!/usr/bin/env python
#coding=utf-8
import threading,time,logging
logging.basicConfig(level=logging.DEBUG,format='(%(threadName)-10s) %(message)s')

def daemon():
    logging.debug('Starting')
    time.sleep(2)
    logging.debug('Exiting')

d = threading.Thread(name='daemon', target=daemon)
#d.setDaemon(True)

def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')

t = threading.Thread(name='non-daemon',target=non_daemon)
d.start()
t.start()

执行后输出如下:

$ python test.py
(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting
(daemon    ) Exiting

如果把d.setDaemon(True)注释去掉,输出就会变成以下:

$ python test.py
(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting

因为daemon进程sleep还没完,主线程已经结束了,所以看不到(daemon ) Exiting。

2 Queue+Threading

这里,工作线程的代码大致如下:

while True:
   data = queue.get()
   ....
   queue.task_done()

先看看Queue里get()的实现:

150     def get(self, block=True, timeout=None):
......
161         self.not_empty.acquire()
162         try:
163             if not block:
164                 if not self._qsize():
165                     raise Empty
166             elif timeout is None:
167                 while not self._qsize():
168                     self.not_empty.wait()

get的block参数默认是True,所以queue.get()将会执行到167行那句代码上,这时,如果队列空了,就会调用self.not_empty.wait(),当前线程将被阻塞掉,接着就是发生了线程切换。

我的主线程最后句代码是:

queue.join()

再看看join()的实现:

79         self.all_tasks_done.acquire()
80         try:
81             while self.unfinished_tasks:    #unfinished_tasks保存了当前队列中的任务数
82                 self.all_tasks_done.wait()
83         finally:
84             self.all_tasks_done.release()

self.unfinished_tasks保存了当前队列的任务数量,如果数量不为空,将执行到self.all_tasks_done.wait(),这个时候主线程陷入阻塞中;如果为空的话,就结束join。

但,因为threading创建的线程默认不是守护线程,所以就产生一个矛盾:主线程执行完了join,但其他线程还在get那里阻塞着的,而主线程又在等它们结束,所以程序自然无法自然结束了。

“守护线程”这个东西,就是说主线程必须等所有子线程结束了,才会退出;线程如果是守护线程的话,主线程不用等它们结束的。把Thread对象的daemon属性设置成True,线程就成了守护线程了。

所以代码可以这么写(伪代码):

def work():
"""这个是多线程调用的函数"""
    data = queue.get()

    ....这里是处理data的代码....

    queue.task_done()

def main():
    t = Threading.thread()
    t.daemon = True
    t.start()
    .....
    queue.join()

这个时候,虽然其他线程还在阻塞着的,但主线程会因为队列为空了,执行完join,所以整个程序就结束了。

另外一个问题——queue.task_done()的位置,之前我以为这样就可以了:

data = queue.get()
queue.task_done()
...

再看看task_done的实现代码:

59         self.all_tasks_done.acquire()
60         try:
61             unfinished = self.unfinished_tasks - 1
62             if unfinished <= 0:
63                 if unfinished < 0:
64                     raise ValueError('task_done() called too many times')
65                 self.all_tasks_done.notify_all()
66             self.unfinished_tasks = unfinished
67         finally:
68             self.all_tasks_done.release()

self.unfinished_tasks保存着当前队列中任务数量,注意第61行,说明每次执行task_done,self.unfinished_tasks就减1。join结束的条件是self.unfinished_tasks为0,具体代码看上面贴的。

所以,如果太着急调用task_done会产生一些问题,比如:

url = queue.get()
queue.task_done()

sock = urllib2.urlopen(url)
...

注意上面的代码,如果url = queue.get()刚好导致队列为空了,而urlopen产生了阻塞,所以发生了线程接换,如果不幸运恰恰切换到主线程,这时queue.join将导致整个程序结束掉!sock = urllib2.urlopen(url)后面的代码还没来得及执行!

所以这样写才是正确的:

url = queue.get()

sock = urllib2.urlopen(url)
...
queue.task_done() #这里保证了该执行的代码都执行完了,才通知队列删除一个元素

另:如果阅读代码水平不太差,遇到问题,就看看实现代码,在不懂的地方可以加点print之类的信息(不要认为标准模块就不能让咱们修改,照改不误)。看网上的文字描述性文章,理解概念性的东西可能很长时间都解决不了问题啊。