Python线程和队列使用的一点思考
1. 斗哥采访环节
请问为什么要使用线程?
答:为了提高程序速度,代码效率呀。
请问为什么要使用队列?
答:个人认为队列可以保证线程安全,实现线程间的同步,比较稳。
线程为什么采用Threading模块?
答:据我所知还有Thread模块,该模块级别较低不推荐用。更高级别的是threading模块,它有一个Thread类,而且提供了各种非常好用的同步机制。
你所说的同步机制是指啥?
答:就是希望线程能够同时开跑,想象一下“所有的马同时冲出栅栏”的场景,就是我们说的同步了,而Therad模块的同步机制不佳亦是其不推荐使用的原因之一。
2. 需要用到线程的场景?
2.1 举个简单的案例,假设这么一个需求如下
给定200个IP地址,可能开放端口有80,443,7001,7002,8000,8080,8081,8888,9000,9001等,现需以'[[http://ip:port](http://ip:port)]([http://ip:port](http://ip:port))'
形式访问页面以判断是否正常。
2.2 为什么要用线程解决这个需求?
200个ip地址和10个端口,累计请求2000次,一个个请求过去太慢,设定线程可以提高效率。
2.3 如果不用线程怎么样实现?
(以下仅为演示代码,如有错误敬请指出)**注:**将200个ip地址放到ip.txt记事本中,读取ip拼接端口并请求。
#-*-coding:utf-8 import requests portlist=[80,443,7001,7002,8000,8080,8081,8888,9000,9001] ips=[t.replace("\n","") for t in open('ip.txt',"r").readlines()] for ip in ips: for port in portlist: url="http://"+ip+':'+str(port) try: resp=requests.get(url=url,timeout=2) print url,"mabey normal..." except: print url,"unknown wrong..."
注:运行上述代码,请求2000条url,每条等待超时2秒,差不多要1个多小时才能跑完,漫长的等待过程中渐渐失去笑容和耐心……
3. threading如何运用以解决上述问题?
使用threading模块的Thread类来创建线程,先要创建一个Thread的实例,传给它一个函数去跑线程。比如专门定义一个函数req()来请求URL,然后把这个req函数传给Thread的实例,接着开启线程……可以先看下面这段代码。(以下代码修改自上文)
import requests import threading def req(url): #请求的代码写成一个函数 try: resp=requests.get(url=url,timeout=2) print url,"mabey normal..." except: print url,"unknown wrong..." def main(): portlist=[80,443,7001,7002,8000,8080,8081,8888,9000,9001] ips=[t.replace("\n","") for t in open('ip.txt',"r").readlines()] urllist=[] threads=[] for ip in ips: #将url写到列表中 for port in portlist: urllist.append("http://"+ip+':'+str(port)) for url in urllist: #将线程存到threads列表中 t=threading.Thread(target=req,args=(url,)) threads.append(t) for t in threads: #开始跑线程,用while来控制线程数 t.start() while True: if(len(threading.enumerate())<100): break if __name__ == '__main__': main()
其中, t=threading.Thread(target=req,args=(url,))
的t
就是一个Thread的实例了,args
是可以加入到函数传递的参数,而本代码的req()
函数需要传递参数是url。
你可以看到的是,这个代码建立了2000个未开始跑的线程
放到threads列表里,接着遍历threads来开启线程。为了防止线程数过多,用while循环判断如果当前线程数len(threading.enumerate()
超过了100则不开启下一个线程,也就是100指的是线程数。
3.1 简单评价下这个脚本
(有其他建议请留言评论)
- 代码效果:线程设置成100,不到1分钟时间就跑完了整个脚本。
- 为了方便,将url写到了列表里,付出的代价是浪费了相应的内存空间。
- 线程数的控制使用while循环和threading.enumerate()来判断,不够优雅。
3.2 更好一点的方式:使用for循环来控制线程数+while循环结合列表的pop方法
import requests import threading def req(): while True: try: url=urllist.pop() except IndexError: break try: resp=requests.get(url=url,timeout=2) print url,"mabey normal..." except: print url,"unknown wrong..." def main(): for i in range(10): t=threading.Thread(target=req) t.start() for i in range(10): t.join() if __name__ == '__main__': portlist=[80,443,7001,7002,8000,8080,8081,8888,9000,9001] ips=[t.replace("\n","") for t in open('ip.txt',"r").readlines()] urllist=[] for ip in ips: for port in portlist: urllist.append("http://"+ip+':'+str(port)) main()
你可以发现上述代码大概有2点变化。
- 线程的开启更加纯粹,不再有传递参数的功能。而多了个for循环来执行
t.join()
,这个是用来阻塞主线程,当开启的子线程未跑完时,主线程不往下继续执行。 - 参数url的获取,改成了
url=urllist.pop()
的方式,因为我们知道列表的pop方法会默认每次从列表移除最后一个元素并返回该元素的值,所以能够起到参数获取的作用。线程数的控制用for i in range(10)
来开启,而不用while循环不停去检测线程数是不是超了。而参数获取完成了之后,列表也空了,似乎达到节省了空间,不过我们还是得事先准备一个列表,把url一个个预先填进去(如下图)。
如果不希望暂用那么大的空间,那么我们需要有一个缓存空间,并发的存入且能够并发读取而且不会发生阻塞,脑补一张图大概长下面这样:
上图描述就是人们常说的做生产者和消费者模式。在python中,Queue模块实现了多生产者多消费者队列, 尤其适合多线程编程.Queue类中实现了所有需要的锁原语,可以优雅的解决上述的问题,那么首先需要了解一下关于队列的一些细节……
4. 队列几点介绍
4.1 导入
- import Queue
- from Queue import [Queue Class]
4.2 通用方法
- put(item(,block[,timeout]))从队列中放入item。
- get()从队列移除并返回一个数据。(这个方法和列表的pop()方法是不是很像?)
- empty()如果队列为空,返回True,反之返回False
- task_done()task_done()告诉队列,get()方法的任务处理完毕。
- join()阻塞调用线程,直到队列中的所有任务被处理掉。
4.3 队列模型(类)
- FIFO队列(First in First Out,先进先出)
class Queue.Queue(maxsize=0)
Queue提供了一个基本的FIFO容器,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
import Queue q=Queue.Queue for i in range(1,6): q.put(i) while not q.empty(): print q.get() [console] $ python queth.py 1 2 3 4 5
更多用法参考官方文档:Queue官方文档
4.4 多线程和Queue.Queue()
前面已经提到,参数的获取可以并发的实现,但是苦于一直没有找到合适的场景。我们在文章中提到的需求,你可以发现2000个url的获取通过个循环就可以轻易获取根本用不到生产者的模式,也就提现不出队列的强大,尽管如此我还是给出对应的脚本,你可以发现其实和用列表获取参数的差别并不大。(小伙伴有更好的场景欢迎提出来一起讨论呀)
import requests import threading from Queue import Queue def req(queue): while True: url=queue.get() try: resp=requests.get(url=url,timeout=2) queue.task_done() print url,"mabey normal..." except: print url,"unknown wrong..." queue.task_done() def get_url(queue): portlist=[80,443,7001,7002,8000,8080,8081,8888,9000,9001] ips=[t.replace("\n","") for t in open('ip.txt',"r").readlines()] for ip in ips: for port in portlist: url="http://"+ip+':'+str(port) queue.put(url,1) def main(): queue=Queue() get_url(queue) for i in range(10): t=threading.Thread(target=req,args=(queue,)) t.setDaemon(True) t.start() queue.join() if __name__ == '__main__': main()
你可以发现通过一个get_url()
函数就轻易将url存储到队列中,我们在定义queue的时候是可以设定队列空间大小的,如queue=Queue(100)
,当存放了100个元素而未被取走时,队列会处于阻塞状态。不过设定队列大小上述代码就需要改写了,可以参考《Python核心编程》关于线程和队列的章节。
5. 小结
以上就是本次关于线程和队列思考的全部内容了,希望能够帮助到那些刚入门python的新手玩家们。本文也仅限斗哥的一点点小思考,也希望大家能够提出更好的见解和斗哥一起讨论。(The End)