首页 技术 正文
技术 2022年11月15日
0 收藏 902 点赞 2,986 浏览 6100 个字

进程池和线程池


开进程开线程都需要消耗资源,只不过两者比较的情况线程消耗的资源比较少

在计算机能够承受范围之内最大限度的利用计算机

什么是池?

​在保证计算机硬件安全的情况下最大限度地利用计算机

​池其实是降低了程序的运行效率 但是保证了计算机硬件的安全

​(硬件的发展跟不上软件的速度)

线程池进程池

括号内可以传参数指定线程池内的线程个数

也可以不传 不传默认是当前cpu核数*5

提交任务的方式:

同步:提交任务之后,原地等待任务的结果,期间不做任何事

异步:提交任务后,不等待任务的返回结果(异步的结果怎么拿?),直接执行下一行代码

进程池/线程池的创建和提交回调

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
import os# pool = ThreadPoolExecutor(5) # 线程 # 括号内可以传参数指定线程池内的线程个数
# # 也可以不传 不传默认是当前所在计算机的cpu个数乘5
pool = ProcessPoolExecutor() # 进程 # 默认是当前计算机cpu的个数
"""
池子中创建的进程/线程创建一次就不会再创建了
至始至终用的都是最初的那几个
这样的话节省了反复开辟进程/线程的资源
"""def task(n):
print(n,os.getpid()) # 查看当前进程号
time.sleep(2)
return n**2def call_back(n):
print('拿到了异步提交任务的返回结果:',n.result())
"""
提交任务的方式
同步:提交任务之后 原地等待任务的返回结果 期间不做任何事
异步:提交任务之后 不等待任务的返回结果(异步的结果怎么拿???) 直接执行下一行代码
"""# pool.submit(task,1) # 朝线程池中提交任务 异步提交
# print('主')
"""
异步回调机制:当异步提交的任务有返回结果之后,会自动触发回调函数的执行"""
if __name__ == '__main__': t_list = []
for i in range(20):
res = pool.submit(task,i).add_done_callback(call_back) # 提交任务的时候 绑定一个回调函数 一旦该任务有结果 立刻执行对于的回调函数
# print(res.result()) # 原地等待任务的返回结果
t_list.append(res) # pool.shutdown() # 关闭池子 等待池子中所有的任务执行完毕之后 才会往下运行代码
# for p in t_list:
# print('>>>:',p.result())

验证复用池子里的线程或进程

池子中创建的进程或线程创建一次就不会再创建了,至始至终用的都是最初的那几个,这样的话就可以节省反复开辟进程或线程的资源了不是动态创建动态销毁的(如果是好几百个,可想而知)
import random
import time
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import current_threaddef task(i):
time.sleep(random.random())
# print(f"{os.getpid()} {i} is over...")
print(f"{os.getpid()} {current_thread().name} {i} is over...")
return f"{i}² = {i * i}"if __name__ == '__main__': # 进程池的时候一定要放在这里面
# pool = ProcessPoolExecutor(3)
pool = ThreadPoolExecutor(3, 'MyThreading') future_list = []
for i in range(5):
future = pool.submit(task, i)
future_list.append(future) pool.shutdown() # 关闭池子且等待池子中所有的任务运行完毕 for future in future_list:
print(f">>:{future.result()}") # 依次等每个 future的结果,所以是绝对有序的
print("主")
# 11000 0 is over... # 复用了进程号(即没有去开辟新的内存空间)
# 8024 2 is over...
# 10100 1 is over...
# 11000 3 is over...
# 8024 4 is over...
# >>:0² = 0
# >>:1² = 1
# >>:2² = 4
# >>:3² = 9
# >>:4² = 16
# 主# 使用线程池的打印结果
# 13024 MyThreading_1 1 is over... # 1.复用了线程
# 13024 MyThreading_1 3 is over... # 2.复用了线程
# 13024 MyThreading_2 2 is over...
# 13024 MyThreading_0 0 is over...
# 13024 MyThreading_1 4 is over...
# >>:0² = 0
# >>:1² = 1
# >>:2² = 4
# >>:3² = 9
# >>:4² = 16
# 主

异步回调机制

这(.add_done_callback())其实是 .submit() 返回结果对象的方法

异步回调机制:当异步提交的任务有返回结果之后,会自动触发回调函数的执行

import random
import time
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import current_threaddef callback(future):
print(f"我拿到了回调结果:{future.result()}")def task(i):
time.sleep(random.random())
# print(f"{os.getpid()} {i} is over...")
print(f"{os.getpid()} {current_thread().name} {i} is over...")
return f"{i}² = {i * i}"if __name__ == '__main__': # 进程池的时候一定要放在这里面
# pool = ProcessPoolExecutor(3)
pool = ThreadPoolExecutor(3, 'MyThreading') future_list = []
for i in range(5):
# -----------------------------------------------------
# .submit().add_done_callback() 自动调用回调函数
# 会自动将 .submit()的返回结果作为参数传给.add_done_callback() 中传入的函数去调用执行
# .add_done_callback() 其实是 .submit()返回对象自身的方法
# -----------------------------------------------------
future = pool.submit(task, i).add_done_callback(callback)
future_list.append(future) pool.shutdown() # 关闭池子且等待池子中所有的任务运行完毕 print("主")# 11348 MyThreading_0 0 is over...
# 我拿到了回调结果:0² = 0
# 11348 MyThreading_2 2 is over...
# 我拿到了回调结果:2² = 4
# 11348 MyThreading_0 3 is over...
# 我拿到了回调结果:3² = 9
# 11348 MyThreading_1 1 is over...
# 我拿到了回调结果:1² = 1
# 11348 MyThreading_2 4 is over...
# 我拿到了回调结果:4² = 16
# 主

通过闭包给回调函数添加额外参数

# 省略导模块等
# 线程池/进程池对象.submit() 会返回一个 future对象,该对象有.add_done_callback()方法(是一个对象绑定函数),参数是一个函数名(除了对象自身默认传入,无法为该函数传参)
# 这里利用闭包函数返回内部函数名的特点 直接调用这个闭包函数,达到传参的效果,可为回调函数添加更多的扩展性
def outter(*args, **kwargs):
def callback(res):
# 可以拿到 *args, **kwargs 参数做一些事情
print(res.result())
return callbackpool_list = []
for i in range(15):
pool_list.append(pool.submit(task, i).add_done_callback(outter(1, 2, 3, a=1, c=3))) # 朝线程池中提交任务(异步)

协程

进程:资源单位 线程:执行单位 协程:单线程下实现并发

并发 切换+保存状态 ps:看起来像同时执行的 就可以称之为并发

协程:完全是程序员自己意淫出来的名词 单线程下实现并发

并发的条件? 多道技术 空间上的复用 时间上的复用 切换+保存状态

程序员自己通过代码自己检测程序中的IO 一旦遇到IO自己通过代码切换 给操作系统的感觉是你这个线程没有任何的IO ps:欺骗操作系统 让它误认为你这个程序一直没有IO 从而保证程序在运行态和就绪态来回切换 提升代码的运行效率

切换+保存状态就一定能够提升效率吗??? 当你的任务是iO密集型的情况下 提升效率 如果你的任务是计算密集型的 降低效率

yield 保存上一次的结果

多进程下开多线程 多线程下再开协程

gevent模块实现

下载安装

gevent基本介绍

from gevent import spawn, monkey
monkey.patch_all() # 一般这个要写在很前面(例如导socket模块之前)
# 两行亦可写成一行 from gevent import monkey;monkey.patch_all()g1 = spawn(eat, 1, 2, 3, x=4, y=5)
# 创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面是该函数(eat)所需要的参数
g2 = spawn(func2)g1.join() # 等待协程g1结束
g2.join() # 等待协程g2结束
# 上述两步亦可合作一步:joinall([g1,g2])g1.value # 拿到func1的返回值

通过gevent实现遇到 IO自动切换状态(单线程下并发)

import timefrom gevent import spawn
# gevent 本身识别不了time.sleep() 等不属于该模块内的I/O操作
# 使用下面的操作来支持
from gevent import monkey
monkey.patch_all() # 监测代码中所有 I/O 行为def heng(name):
print(f"{name} 哼")
time.sleep(2)
print(f"{name} 哼 ...")def ha(name):
print(f"{name} 哈")
time.sleep(3)
print(f"{name} 哈 ...")# start_time = time.time()
# heng('egon')
# ha('jason')
# print(f"主 {time.time() - start_time}")
# # 主 5.005069732666016start_time = time.time()
s1 = spawn(heng, 'egon')
s2 = spawn(ha, 'jason')
s1.join()
s2.join()print(f"主 {time.time() - start_time}")
# 主 3.0046989917755127

在计算密集型任务中使用

from gevent import spawn, monkeymonkey.patch_all()import timedef func1():
for i in range(10000000):
i + 1def func2():
for i in range(10000000):
i + 1start = time.time()
g = spawn(func1)
g2 = spawn(func2)
g.join()
g2.join()
stop = time.time()
print(stop - start)
# 1.1324069499969482# 与前面普通的串行执行时间 1.2481744289398193 相近# spawn 可以让操作系统察觉不到这个程序有IO操作

利用gevent在单线程下实现并发

服务端

import socket
from gevent import spawn
from gevent import monkey # 让 gevent 能够识别python的 IO
monkey.patch_all()server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)def talk(conn):
while True:
try:
data = conn.recv(1024)
if len(data) == 0: break
print(data.decode('utf-8'))
conn.send(data.upper())
except ConnectionResetError as e:
print(e)
break
conn.close()def wait_client_connect():
while True:
conn, addr = server.accept()
spawn(talk, conn)if __name__ == '__main__':
g1 = spawn(wait_client_connect)
g1.join() # 别忘了加上

客户端

import socket
from threading import Thread, current_threaddef create_client():
client = socket.socket()
client.connect(('127.0.0.1', 8080))
n = 0
while True:
data = '%s %s' % (current_thread().name, n)
client.send(data.encode('utf-8'))
res = client.recv(1024)
print(res.decode('utf-8'))
n += 1for i in range(400): # 手动开400个线程连接客户端(测试的是服务端单线程实现并发)
t = Thread(target=create_client)
t.start()
最大程度下提高代码的执行效率(实现高并发)多进程下使用多线程
多线程下使用多协程
**大前提**IO密集型任务

IO模型

(了解)


阻塞IO模型

非阻塞IO模型

IO多路复用

让select帮你去要数据,你做自己的事,有数据了给你

异步IO模型

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:8,991
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,506
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,349
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,134
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,766
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,844