淘先锋技术网

首页 1 2 3 4 5 6 7

简单的示例

给出一个任务,然后交给线程池完成,线程池可以设置最大线程的数量,所以他会一次执行三个

 from concurrent.futures import ThreadPoolExecutor
    import time
    
    #简单的线程池使用
    def consume(num):
        time.sleep(2)
        print('consuming',num)
    
    pools = ThreadPoolExecutor(3)
    
    num = 1
    while True:
    
        time.sleep(0.5)
        pools.submit(consume,(num))
        num += 1

说明:

什么是线程池,顾名思义就是首先把多个线程放入一个池子中(内存),当有剩余的线程的时候,我们就把线程取出来使用,如果没有剩余的线程,程序就会等待线程使用线程池我们可以获取到任务的返回结果

基本使用方式:

  • 使用 concurrent.futures库来实现多线程库
  • max_workers表示线程池中最大有多少个线程
  • submit表示把任务提交给线程池
  • done方法可以查看任务是否完成(bool)
  • result 方式会阻塞程序,等待线程完成,并且获取函数的返回结果
  • cancel方式能够取消线程,但是只能取消没有提交上去的线程
from concurrent.futures import ThreadPoolExecutor
import time
#1.并发
#2.获取线程的返回值 当一个线程完成的时候,主线程能够知道
#3.让多线程和多进程编程接口一致
def get_html(sleep_time):
    time.sleep(sleep_time)
    print("get page {} success".format(sleep_time))
    return sleep_time
  
executor = ThreadPoolExecutor(max_workers=2)
#通过sumbit提交到线程池中
task1 = executor.submit(get_html,(3))
task2 = executor.submit(get_html,(2))
task3 = executor.submit(get_html,(2))

print(task3.cancel())
#done 用于判断是否完成
#print(task1.done())
#阻塞 等待任务完成获取结果
print(task1.result())
print(task2.result())
  • as_completed 方法
    使用 as_completed方法来获取所有完成的线程,并获取返回值
     from concurrent.futures import ThreadPoolExecutor, as_completed
        import time
        import random
        from functools import partial
        def get_html(sleep_time,num):
            time.sleep(sleep_time)
            # print("get page {} success".format(sleep_time))
            return num
    executor = ThreadPoolExecutor(max_workers=2)
    #通过sumbit提交到线程池中
    tasks = list()
    for i in range(10):
        sleep_time = random.randint(2, 5)
        #把右边函数看成一个整体
        tasks.append(executor.submit(partial(get_html,sleep_time), (i)))
    #阻塞 等待完成的函数
    for i in as_completed(tasks):
        data = i.result()
        print('num {} success'.format(data))

wait 阻塞主线程

wait可以等待tasks的某个任务或者所有任务完成之后再执行其他的(阻塞), 他有三种可选方式,默认是等待所有tasks完成

等待所有完成
等待第一个完成
等待第一个错误

FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
ALL_COMPLETED = 'ALL_COMPLETED'
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time
import random


def get_html(sleep_time):
    time.sleep(sleep_time)
    # print("get page {} success".format(sleep_time))
    return sleep_time


executor = ThreadPoolExecutor(max_workers=2)
#通过sumbit提交到线程池中
tasks = list()
for i in range(10):
    sleep_time = random.randint(2, 5)
    tasks.append(executor.submit(get_html, (sleep_time)))

#阻塞等待任务完成
wait(tasks, return_when='FIRST_COMPLETED')

for i in as_completed(tasks):
    data = i.result()

    print('num {} success'.format(data))

print('12312312')

示例:

通过上面的线程池,我们就可以把获取详细页面的任务交给线程池去获取

import requests
from bs4 import BeautifulSoup
from queue import Queue
from threading import Thread
from concurrent.futures import ThreadPoolExecutor


def get_html_doc(url):
    # 根据指定的url获取html文档
    res = requests.get(url)
    print('ex--->url',url)
    return res.content.decode("utf8")


def get_detail(detail_urls_queue,pools):
    while True:
        url = detail_urls_queue.get(1,timeout=2)
        # print('consumer--->',url)
        pools.submit(get_html_doc,(url))


def parse_index(detail_urls_queue,index_urls_queue):
    while True:
        url = index_urls_queue.get(1)
        # print('get_index_url--->',url)
        # 解析列表页面
        html_doc = get_html_doc(url)
        data = BeautifulSoup(html_doc)

        # 把index里面的url取出来再取下面的url
        # data.select调用css选择器 选择出来是dict
        detail_urls = data.select('[class=post-thumb] a')
        # 获取细节的url,把细节的url交给其他线程处理
        for i in detail_urls:
            url = i['href']
            # print('productor------>',url)
            detail_urls_queue.put(url)



        # 取出所有其他index页面的翻页url 去解析其他的url
        index_urls = data.select('a[class=page-numbers]')
        for i in index_urls:
            url = i['href']
            index_urls_queue.put(url)
            #去重 使用redis数据库
            # print('put_index_url--->', url)


if __name__ == "__main__":
    url = "http://blog.jobbole.com/category/it-tech/"
    # 详细页面的url
    detail_urls = Queue()
    index_urls_queue = Queue()
    index_urls_queue.put(url)
    # 列表url 防止重复
    # index_urls_list = []
    executor = ThreadPoolExecutor(max_workers=10)
    t1 = Thread(target=parse_index,args=(detail_urls,index_urls_queue))
    t2 = Thread(target=get_detail, args=(detail_urls,executor))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('down')