freeBuf
主站

分类

漏洞 工具 极客 Web安全 系统安全 网络安全 无线安全 设备/客户端安全 数据安全 安全管理 企业安全 工控安全

特色

头条 人物志 活动 视频 观点 招聘 报告 资讯 区块链安全 标准与合规 容器安全 公开课

官方公众号企业安全新浪微博

FreeBuf.COM网络安全行业门户,每日发布专业的安全资讯、技术剖析。

FreeBuf+小程序

FreeBuf+小程序

Python线程和队列使用的一点思考
2018-02-06 15:13:02

Python线程和队列使用的一点思考

1. 斗哥采访环节

  • 请问为什么要使用线程?

    答:为了提高程序速度,代码效率呀。

  • 请问为什么要使用队列?

    答:个人认为队列可以保证线程安全,实现线程间的同步,比较稳。

  • 线程为什么采用Threading模块?

    答:据我所知还有Thread模块,该模块级别较低不推荐用。更高级别的是threading模块,它有一个Thread类,而且提供了各种非常好用的同步机制。

  • 你所说的同步机制是指啥?

    答:就是希望线程能够同时开跑,想象一下“所有的马同时冲出栅栏”的场景,就是我们说的同步了,而Therad模块的同步机制不佳亦是其不推荐使用的原因之一。

2. 需要用到线程的场景?

2.1 举个简单的案例,假设这么一个需求如下

给定200个IP地址,可能开放端口有80,443,7001,7002,8000,8080,8081,8888,9000,9001等,现需以'[[http://ip:port](http://ip:port)]([http://ip:port](http://ip:port))'形式访问页面以判断是否正常。

2.2 为什么要用线程解决这个需求?

200个ip地址和10个端口,累计请求2000次,一个个请求过去太慢,设定线程可以提高效率。

2.3 如果不用线程怎么样实现?

(以下仅为演示代码,如有错误敬请指出)**注:**将200个ip地址放到ip.txt记事本中,读取ip拼接端口并请求。

#-*-coding:utf-8
import requests
portlist=[80,443,7001,7002,8000,8080,8081,8888,9000,9001]
ips=[t.replace("\n","") for t in open('ip.txt',"r").readlines()]
for ip in ips:
    for port in portlist:
        url="http://"+ip+':'+str(port)
        try:
            resp=requests.get(url=url,timeout=2)
            print url,"mabey normal..."
        except:
            print url,"unknown wrong..."

注:运行上述代码,请求2000条url,每条等待超时2秒,差不多要1个多小时才能跑完,漫长的等待过程中渐渐失去笑容和耐心……

3. threading如何运用以解决上述问题?

使用threading模块的Thread类来创建线程,先要创建一个Thread的实例,传给它一个函数去跑线程。比如专门定义一个函数req()来请求URL,然后把这个req函数传给Thread的实例,接着开启线程……可以先看下面这段代码。(以下代码修改自上文)

import requests
import threading

def req(url):    #请求的代码写成一个函数
    try:
        resp=requests.get(url=url,timeout=2)
        print url,"mabey normal..."
    except:
        print url,"unknown wrong..."

def main():
    portlist=[80,443,7001,7002,8000,8080,8081,8888,9000,9001]
    ips=[t.replace("\n","") for t in open('ip.txt',"r").readlines()]
    urllist=[]    
    threads=[]
    for ip in ips:    #将url写到列表中
        for port in portlist:
            urllist.append("http://"+ip+':'+str(port))

    for url in urllist:    #将线程存到threads列表中
        t=threading.Thread(target=req,args=(url,))
        threads.append(t)
    for t in threads:    #开始跑线程,用while来控制线程数
        t.start()
        while True:
            if(len(threading.enumerate())<100):
                break
if __name__ == '__main__':
    main()

其中, t=threading.Thread(target=req,args=(url,))t就是一个Thread的实例了,args是可以加入到函数传递的参数,而本代码的req()函数需要传递参数是url。

你可以看到的是,这个代码建立了2000个未开始跑的线程放到threads列表里,接着遍历threads来开启线程。为了防止线程数过多,用while循环判断如果当前线程数len(threading.enumerate()超过了100则不开启下一个线程,也就是100指的是线程数。

3.1 简单评价下这个脚本

(有其他建议请留言评论)

  • 代码效果:线程设置成100,不到1分钟时间就跑完了整个脚本。
  • 为了方便,将url写到了列表里,付出的代价是浪费了相应的内存空间。
  • 线程数的控制使用while循环和threading.enumerate()来判断,不够优雅。

3.2 更好一点的方式:使用for循环来控制线程数+while循环结合列表的pop方法

import requests
import threading
def req():
    while True:
        try:
            url=urllist.pop()
        except IndexError:
            break
        try:
            resp=requests.get(url=url,timeout=2)
            print url,"mabey normal..."
        except:
            print url,"unknown wrong..."
def main():        
    for i in range(10):
        t=threading.Thread(target=req)
        t.start()
    for i in range(10):
        t.join()

if __name__ == '__main__':
    portlist=[80,443,7001,7002,8000,8080,8081,8888,9000,9001]
    ips=[t.replace("\n","") for t in open('ip.txt',"r").readlines()]
    urllist=[]
    for ip in ips:
        for port in portlist:
            urllist.append("http://"+ip+':'+str(port))
    main()

你可以发现上述代码大概有2点变化。

  1. 线程的开启更加纯粹,不再有传递参数的功能。而多了个for循环来执行t.join(),这个是用来阻塞主线程,当开启的子线程未跑完时,主线程不往下继续执行。
  2. 参数url的获取,改成了url=urllist.pop()的方式,因为我们知道列表的pop方法会默认每次从列表移除最后一个元素并返回该元素的值,所以能够起到参数获取的作用。线程数的控制用for i in range(10)来开启,而不用while循环不停去检测线程数是不是超了。而参数获取完成了之后,列表也空了,似乎达到节省了空间,不过我们还是得事先准备一个列表,把url一个个预先填进去(如下图)。

image

如果不希望暂用那么大的空间,那么我们需要有一个缓存空间,并发的存入且能够并发读取而且不会发生阻塞,脑补一张图大概长下面这样:

image

上图描述就是人们常说的做生产者和消费者模式。在python中,Queue模块实现了多生产者多消费者队列, 尤其适合多线程编程.Queue类中实现了所有需要的锁原语,可以优雅的解决上述的问题,那么首先需要了解一下关于队列的一些细节……

4. 队列几点介绍

4.1 导入

  • import Queue
  • from Queue import [Queue Class]

4.2 通用方法

  • put(item(,block[,timeout]))从队列中放入item。
  • get()从队列移除并返回一个数据。(这个方法和列表的pop()方法是不是很像?)
  • empty()如果队列为空,返回True,反之返回False
  • task_done()task_done()告诉队列,get()方法的任务处理完毕。
  • join()阻塞调用线程,直到队列中的所有任务被处理掉。

4.3 队列模型(类)

  • FIFO队列(First in First Out,先进先出)
  • class Queue.Queue(maxsize=0)

Queue提供了一个基本的FIFO容器,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

import Queue
q=Queue.Queue 
for i in range(1,6):
    q.put(i)

while not q.empty():
    print q.get()

[console]
$ python queth.py
1
2
3
4
5

更多用法参考官方文档:Queue官方文档

4.4 多线程和Queue.Queue()

前面已经提到,参数的获取可以并发的实现,但是苦于一直没有找到合适的场景。我们在文章中提到的需求,你可以发现2000个url的获取通过个循环就可以轻易获取根本用不到生产者的模式,也就提现不出队列的强大,尽管如此我还是给出对应的脚本,你可以发现其实和用列表获取参数的差别并不大。(小伙伴有更好的场景欢迎提出来一起讨论呀)

import requests
import threading
from Queue import Queue
def req(queue):
    while True:
        url=queue.get()
        try:
            resp=requests.get(url=url,timeout=2)
            queue.task_done()
            print url,"mabey normal..."
        except:
            print url,"unknown wrong..."
            queue.task_done()
def get_url(queue):
    portlist=[80,443,7001,7002,8000,8080,8081,8888,9000,9001]
    ips=[t.replace("\n","") for t in open('ip.txt',"r").readlines()]
    for ip in ips:
        for port in portlist:
            url="http://"+ip+':'+str(port)
            queue.put(url,1)
def main():
    queue=Queue()
    get_url(queue)
    for i in range(10):
        t=threading.Thread(target=req,args=(queue,))
        t.setDaemon(True)
        t.start()
    queue.join()

if __name__ == '__main__':
    main()

你可以发现通过一个get_url()函数就轻易将url存储到队列中,我们在定义queue的时候是可以设定队列空间大小的,如queue=Queue(100),当存放了100个元素而未被取走时,队列会处于阻塞状态。不过设定队列大小上述代码就需要改写了,可以参考《Python核心编程》关于线程和队列的章节。

5. 小结

以上就是本次关于线程和队列思考的全部内容了,希望能够帮助到那些刚入门python的新手玩家们。本文也仅限斗哥的一点点小思考,也希望大家能够提出更好的见解和斗哥一起讨论。(The End)

640.webp.jpg

# python
本文为 独立观点,未经允许不得转载,授权请联系FreeBuf客服小蜜蜂,微信:freebee2022
被以下专辑收录,发现更多精彩内容
+ 收入我的专辑
+ 加入我的收藏
相关推荐
  • 0 文章数
  • 0 关注者