简单的示例
给出一个任务,然后交给线程池完成,线程池可以设置最大线程的数量,所以他会一次执行三个
from concurrent.futures import ThreadPoolExecutor
import time
#简单的线程池使用
def consume(num):
time.sleep(2)
print('consuming',num)
pools = ThreadPoolExecutor(3)
num = 1
while True:
time.sleep(0.5)
pools.submit(consume,(num))
num += 1
说明:
什么是线程池,顾名思义就是首先把多个线程放入一个池子中(内存),当有剩余的线程的时候,我们就把线程取出来使用,如果没有剩余的线程,程序就会等待线程使用线程池我们可以获取到任务的返回结果
基本使用方式:
- 使用 concurrent.futures库来实现多线程库
- max_workers表示线程池中最大有多少个线程
- submit表示把任务提交给线程池
- done方法可以查看任务是否完成(bool)
- result 方式会阻塞程序,等待线程完成,并且获取函数的返回结果
- cancel方式能够取消线程,但是只能取消没有提交上去的线程
from concurrent.futures import ThreadPoolExecutor
import time
#1.并发
#2.获取线程的返回值 当一个线程完成的时候,主线程能够知道
#3.让多线程和多进程编程接口一致
def get_html(sleep_time):
time.sleep(sleep_time)
print("get page {} success".format(sleep_time))
return sleep_time
executor = ThreadPoolExecutor(max_workers=2)
#通过sumbit提交到线程池中
task1 = executor.submit(get_html,(3))
task2 = executor.submit(get_html,(2))
task3 = executor.submit(get_html,(2))
print(task3.cancel())
#done 用于判断是否完成
#print(task1.done())
#阻塞 等待任务完成获取结果
print(task1.result())
print(task2.result())
- as_completed 方法
使用 as_completed方法来获取所有完成的线程,并获取返回值
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
from functools import partial
def get_html(sleep_time,num):
time.sleep(sleep_time)
# print("get page {} success".format(sleep_time))
return num
executor = ThreadPoolExecutor(max_workers=2)
#通过sumbit提交到线程池中
tasks = list()
for i in range(10):
sleep_time = random.randint(2, 5)
#把右边函数看成一个整体
tasks.append(executor.submit(partial(get_html,sleep_time), (i)))
#阻塞 等待完成的函数
for i in as_completed(tasks):
data = i.result()
print('num {} success'.format(data))
wait 阻塞主线程
wait可以等待tasks的某个任务或者所有任务完成之后再执行其他的(阻塞), 他有三种可选方式,默认是等待所有tasks完成
等待所有完成
等待第一个完成
等待第一个错误
FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
ALL_COMPLETED = 'ALL_COMPLETED'
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time
import random
def get_html(sleep_time):
time.sleep(sleep_time)
# print("get page {} success".format(sleep_time))
return sleep_time
executor = ThreadPoolExecutor(max_workers=2)
#通过sumbit提交到线程池中
tasks = list()
for i in range(10):
sleep_time = random.randint(2, 5)
tasks.append(executor.submit(get_html, (sleep_time)))
#阻塞等待任务完成
wait(tasks, return_when='FIRST_COMPLETED')
for i in as_completed(tasks):
data = i.result()
print('num {} success'.format(data))
print('12312312')
示例:
通过上面的线程池,我们就可以把获取详细页面的任务交给线程池去获取
import requests
from bs4 import BeautifulSoup
from queue import Queue
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
def get_html_doc(url):
# 根据指定的url获取html文档
res = requests.get(url)
print('ex--->url',url)
return res.content.decode("utf8")
def get_detail(detail_urls_queue,pools):
while True:
url = detail_urls_queue.get(1,timeout=2)
# print('consumer--->',url)
pools.submit(get_html_doc,(url))
def parse_index(detail_urls_queue,index_urls_queue):
while True:
url = index_urls_queue.get(1)
# print('get_index_url--->',url)
# 解析列表页面
html_doc = get_html_doc(url)
data = BeautifulSoup(html_doc)
# 把index里面的url取出来再取下面的url
# data.select调用css选择器 选择出来是dict
detail_urls = data.select('[class=post-thumb] a')
# 获取细节的url,把细节的url交给其他线程处理
for i in detail_urls:
url = i['href']
# print('productor------>',url)
detail_urls_queue.put(url)
# 取出所有其他index页面的翻页url 去解析其他的url
index_urls = data.select('a[class=page-numbers]')
for i in index_urls:
url = i['href']
index_urls_queue.put(url)
#去重 使用redis数据库
# print('put_index_url--->', url)
if __name__ == "__main__":
url = "http://blog.jobbole.com/category/it-tech/"
# 详细页面的url
detail_urls = Queue()
index_urls_queue = Queue()
index_urls_queue.put(url)
# 列表url 防止重复
# index_urls_list = []
executor = ThreadPoolExecutor(max_workers=10)
t1 = Thread(target=parse_index,args=(detail_urls,index_urls_queue))
t2 = Thread(target=get_detail, args=(detail_urls,executor))
t1.start()
t2.start()
t1.join()
t2.join()
print('down')