目录
Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池
1.昨日回顾
#生产者消费者模型.
# 生产者: 产生数据,
# 消费者: 接收数据并做下一步处理
# 容器: 队列.
#进程, 线程:
# 进程就是资源单位,线程就是执行单位.
#进程与线程的区别:
# 线程: 开销小,速度快,同一个进程下的线程资源内存级别共享.
# 进程: 开销巨大,速度慢, 不同进程的数据内存级别不共享.
#join: 阻塞,
# t1.join() 阻塞.
# print('主')
#getname, setname .name
#activeCount() 线程的数量
守护线程: 如果守护线程的生命周期小于其他线程,则他肯定先结束,否则等待其他非守护线程和主线程结束之后结束.
#互斥锁,锁
2.死锁现象与递归锁
2.1死锁现象
# from threading import Thread
# from threading import Lock
# import time
#
# lock_A = Lock()
# lock_B = Lock()
#
#
# class MyThread(Thread):
#
# def run(self):
# self.f1()
# self.f2()
#
#
# def f1(self):
#
# lock_A.acquire()
# print(f'{self.name}拿到了A锁')
#
# lock_B.acquire()
# print(f'{self.name}拿到了B锁')
#
# lock_B.release()
#
# lock_A.release()
#
# def f2(self):
#
# lock_B.acquire()
# print(f'{self.name}拿到了B锁')
#
# time.sleep(0.1)
# lock_A.acquire()
# print(f'{self.name}拿到了A锁')
#
# lock_A.release()
# lock_B.release()
#
# if __name__ == '__main__':
#
# for i in range(3):
# t = MyThread()
# t.start()
2.2递归锁
递归锁有一个计数的功能, 原数字为0,上一次锁,计数+1,释放一次锁,计数-1,
只要递归锁上面的数字不为零,其他线程就不能抢锁.
# from threading import Thread
# from threading import RLock
# import time
#
# lock_A = lock_B = RLock()
# 递归锁有一个计数的功能, 原数字为0,上一次锁,计数+1,释放一次锁,计数-1,
# 只要递归锁上面的数字不为零,其他线程就不能抢锁.
# class MyThread(Thread):
#
# def run(self):
# self.f1()
# self.f2()
#
#
# def f1(self):
#
# lock_A.acquire()
# print(f'{self.name}拿到了A锁')
#
# lock_B.acquire()
# print(f'{self.name}拿到了B锁')
#
# lock_B.release()
#
# lock_A.release()
#
# def f2(self):
#
# lock_B.acquire()
# print(f'{self.name}拿到了B锁')
#
# time.sleep(0.1)
# lock_A.acquire()
# print(f'{self.name}拿到了A锁')
#
# lock_A.release()
#
# lock_B.release()
#
# if __name__ == '__main__':
#
# for i in range(3):
# t = MyThread()
# t.start()
3.信号量
也是一种锁,控制并发数量
# from threading import Thread, Semaphore, current_thread
# import time
# import random
# sem = Semaphore(5)
#
# def task():
# sem.acquire()
#
# print(f'{current_thread().name} 厕所ing')
# time.sleep(random.randint(1,3))
#
# sem.release()
#
#
# if __name__ == '__main__':
# for i in range(20):
# t = Thread(target=task,)
# t.start()
4.GIL全局解释器锁
4.1背景
4.2为什么加锁
1. 当时都是单核时代,而且cpu价格非常贵.
2. 如果不加全局解释器锁, 开发Cpython解释器的程序员就会在源码内部各种主动加锁,解锁,非常麻烦,各种死锁现象等等.他为了省事儿,直接进入解释器时给线程加一个锁.
优点: 保证了Cpython解释器的数据资源的安全.
缺点: 单个进程的多线程不能利用多核.
#Jpython没有GIL锁.
#pypy也没有GIL锁.
#现在多核时代, 我将Cpython的GIL锁去掉行么?
#因为Cpython解释器所有的业务逻辑都是围绕着单个线程实现的,去掉这个GIL锁,几乎不可能.
单个进程的多线程可以并发,但是不能利用多核,不能并行.
多个进程可以并发,并行.
5.GIL与Lock锁的区别
相同点: 都是同种锁,互斥锁.
不同点:
GIL锁全局解释器锁,保护解释器内部的资源数据的安全.
GIL锁 上锁,释放无需手动操作.
自己代码中定义的互斥锁保护进程中的资源数据的安全.
自己定义的互斥锁必须自己手动上锁,释放锁.
6.验证计算密集型IO密集型的效率
6.1 IO密集型
# IO密集型: 单个进程的多线程并发 vs 多个进程的并发并行
# def task():
# count = 0
# time.sleep(random.randint(1,3))
# count += 1
# if __name__ == '__main__':
# 多进程的并发,并行
# start_time = time.time()
# l1 = []
# for i in range(50):
# p = Process(target=task,)
# l1.append(p)
# p.start()
#
# for p in l1:
# p.join()
#
# print(f'执行效率:{time.time()- start_time}') # 8.000000000
# 多线程的并发
# start_time = time.time()
# l1 = []
# for i in range(50):
# p = Thread(target=task,)
# l1.append(p)
# p.start()
#
# for p in l1:
# p.join()
#
# print(f'执行效率:{time.time()- start_time}') # 3.0294392108917236
对于IO密集型: 单个进程的多线程的并发效率高.
6.2 计算密集型
#from threading import Thread
#from multiprocessing import Process
#import time
#import random
# # 计算密集型: 单个进程的多线程并发 vs 多个进程的并发并行
#
# def task():
# count = 0
# for i in range(10000000):
# count += 1
#
#
# if __name__ == '__main__':
#
# # 多进程的并发,并行
# # start_time = time.time()
# # l1 = []
# # for i in range(4):
# # p = Process(target=task,)
# # l1.append(p)
# # p.start()
# #
# # for p in l1:
# # p.join()
# #
# # print(f'执行效率:{time.time()- start_time}') # 3.1402080059051514
#
# # 多线程的并发
# # start_time = time.time()
# # l1 = []
# # for i in range(4):
# # p = Thread(target=task,)
# # l1.append(p)
# # p.start()
# #
# # for p in l1:
# # p.join()
# #
# # print(f'执行效率:{time.time()- start_time}') # 4.5913777351379395
总结: 计算密集型: 多进程的并发并行效率高.
7.多线程实现socket通信
#无论是多线程还是多进程,如果按照上面的写法,来一个客户端请求,我就开一个线程,来一个请求开一个线程,
#应该是这样: 你的计算机允许范围内,开启的线程进程数量越多越好.
7.1服务端
# import socket
# from threading import Thread
#
# def communicate(conn,addr):
# while 1:
# try:
# from_client_data = conn.recv(1024)
# print(f'来自客户端{addr[1]}的消息: {from_client_data.decode("utf-8")}')
# to_client_data = input('>>>').strip()
# conn.send(to_client_data.encode('utf-8'))
# except Exception:
# break
# conn.close()
#
# def _accept():
# server = socket.socket()
# server.bind(('127.0.0.1', 8848))
# server.listen(5)
#
# while 1:
# conn, addr = server.accept()
# t = Thread(target=communicate,args=(conn,addr))
# t.start()
#
# if __name__ == '__main__':
# _accept()
7.2客户端
# import socket
# client = socket.socket()
# client.connect(('127.0.0.1',8848))
#
# while 1:
# try:
# to_server_data = input('>>>').strip()
# client.send(to_server_data.encode('utf-8'))
#
# from_server_data = client.recv(1024)
# print(f'来自服务端的消息: {from_server_data.decode("utf-8")}')
#
# except Exception:
# break
# client.close()
8.进程池,线程池
#线程池: 一个容器,这个容器限制住你开启线程的数量,比如4个,第一次肯定只能并发的处理4个任务,只要有任务完成,线程马上就会接下一个任务.
以时间换空间.
# from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
# import os
# import time
# import random
#
# # print(os.cpu_count())
# def task(n):
# print(f'{os.getpid()} 接客')
# time.sleep(random.randint(1,3))
#
#
# if __name__ == '__main__':
# 开启进程池 (并行(并行+并发))
# p = ProcessPoolExecutor() # 默认不写,进程池里面的进程数与cpu个数相等
#
# # p.submit(task,1)
# # p.submit(task,1)
# # p.submit(task,1)
# # p.submit(task,1)
# # p.submit(task,1)
# # p.submit(task,1)
# # p.submit(task,1)
# for i in range(20):
# p.submit(task,i)
#
# 开启线程池 (并发)
# t = ThreadPoolExecutor() # 默认不写, cpu个数*5 线程数
# t = ThreadPoolExecutor(100) # 100个线程
# for i in range(20):
# t.submit(task,i)