Queue里task_done方法使用注意

Queue是Python标准库的队列实现。在网上经常看到示例代码是这样实现的:

url = queue.get()
queue.task_done()
...

思路本身没什么问题——从队列中取一个数据,然后通知队列取成功了。但有些人代码是这样写的:

url = queue.get()
queue.task_done()


try:
    sock = urllib2.urlopen(url)
except:
    ...

如果是在多线程环境下,这会带来个问题:若是url = queue.get()刚好把队列取空了,而由于执行了urlopen,所以当前线程产生了阻塞,发生了线程切换,如果执行权限刚好落在了主线程,这时如果主线程的queue.join将导致整个程序结束掉。而sock = urllib2.urlopen(url)后面的代码都还没来得及执行。

为什么呢?

我们看下Queue.py里对join函数如何实现的:

79         self.all_tasks_done.acquire()
80         try:
81             while self.unfinished_tasks:    # unfinished_tasks保存了当前队列中的数目
82                 self.all_tasks_done.wait()
83         finally:
84             self.all_tasks_done.release()

上面代码里有个while循环,unfinished_tasks存储了当前队列里的数据数目,如果unfinished_tasks不为0,将会一直调用all_tasks_done.wait()阻塞住。只要unfinished_tasks不是0,join就可以一直工作下去。

而task_done()正是减小了unfinished_tasks。不妨再看看task_done()的实现代码:

59         self.all_tasks_done.acquire()
60         try:
61             unfinished = self.unfinished_tasks - 1
62             if unfinished <= 0:
63                 if unfinished < 0:
64                     raise ValueError('task_done() called too many times')
65                 self.all_tasks_done.notify_all()
66             self.unfinished_tasks = unfinished
67         finally:
68             self.all_tasks_done.release()

注意61行,表明每次执行了task_done(),unfinished_tasks就减1,直到unfinished_tasks为0。

所以,在多线程下,想让队列变得正常的话,注意task_done的位置:

url = queue.get()
...

try:
    sock = urllib2.urlopen(url)
except:
    ...

queue.task_done()   # 要在一切都搞定后,再告诉队列空了。