并发编程之 多进程
一. multiprocessing模块介绍
python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。
multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。
multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
主要功能分四部分: 创建进程部分,进程同步部分,进程池部分,进程之间数据共享
需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。
二. Process类的介绍
Process模块是一个创建进程的一个模块,借助这个模块,就可以完成进程的创建
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)强调:1. 需要使用关键字的方式来指定参数2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号参数介绍:1 group参数未使用,值始终为None2 target表示调用对象,即子进程要执行的任务3 args表示调用对象的位置参数元组,args=(1,2,'egon',)4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}5 name为子进程的名称
方法的介绍:
1 p.start():启动进程,并调用该子进程中的p.run()2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法3 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁4 p.is_alive():如果p仍然运行,返回True5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
属性的介绍:
1 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置2 p.name:进程的名称3 p.pid:进程的pid4 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)5 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
在Windows中使用process模块的注意事项:
在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if __name__ ==‘__main__’ 判断保护起来,import 的时候 ,就不会递归运行了。
三. 使用process模块创建进程
1. 进程创建的两种方式
第一种:
import timefrom multiprocessing import Processdef f(name): print("Hello ",name) print("我是子进程")if __name__ == '__main__': # 在windows环境下, 开启进程必须在 __name__ == '__main__' 下面 p = Process(target=f,args=("Bob", )) # 创建一个进程对象 p.start() # 只是向操作系统发出一个开辟子进程的信号,然后就执行下一行了. # 这个信号操作系统接收到之后,会从内存中开辟一个子进程空间,然后在将主进程所有数据copy加载到子进程,然后在调用cpu去执行. # 开辟子进程开销是很大的. time.sleep(1) print("执行主进程的内容了")
永远会先执行主进程的代码
注意事项:
第一: 创建子进程必须使用**if _ _ name _ _ == "_ _ main _ _"**
第二: Process的args这个参数必须是元组(一个元素的时候时要加逗号)
第二种:
from multiprocessing import Processimport timeclass MyProcess(Process): def __init__(self,name): super(MyProcess, self).__init__() self.name = name def run(self): print(f"{self.name} is running!") time.sleep(2) print(f"{self.name} is gone!")if __name__ == '__main__': p = MyProcess("zcy") p.start() print("**主**")
简单应用
from multiprocessing import Processimport timedef task(name): print(f"{name} is running!") time.sleep(1) print(f"{name} is gone!")def task1(name): print(f"{name} is running!") time.sleep(2) print(f"{name} is gone!")def task2(name): print(f"{name} is running!") time.sleep(3) print(f"{name} is gone!")if __name__ == '__main__': # # 一个进程串行的执行三个任务 结果: 结束时间6.001725912094116 # start = time.time() # task("zcy") # task1("zdr") # task2("lfz") # print(f'结束时间{time.time() - start}') # 三个进程并发或并行的执行三个任务 结果: 结束时间3.0388405323028564 start= time.time() p1 = Process(target=task, args=('zcy',)) # 创建一个进程对象 p2 = Process(target=task1, args=('zdr',)) # 创建一个进程对象 p3 = Process(target=task2, args=('lfz',)) # 创建一个进程对象 p1.start() p2.start() task2("lfz") print(f'结束时间{time.time() - start}')
从上面可以看出串行和并行或并发的区别
2. 进程pid
-
如何区分内存中的这些进程?
- 命令行获取所有的进程的pid 命令为: tasklist 如果要看某几个进程: tasklist|findstr 进程名
-
代码级别如果获取一个进程pid?
import osprint(os.getpid())
-
如何获取父进程(主进程)的pid?
import timeimport osprint(f"子进程:{os.getpid()}")print(f"父进程:{os.getppid()}")time.sleep(100)
一个py文件内的主进程和子进程
from multiprocessing import Processimport timeimport osdef task(name): print(f"子进程:{os.getpid()}") print(f"主进程:{os.getppid()}")if __name__ == '__main__': p = Process(target=task,args=("zdr",)) p.start() print(f"======main:{os.getpid()}")结果:======main:10720子进程:12388主进程:10720
3. 验证进程之间的空间隔离
验证不可变数据类型的空间隔离
from multiprocessing import Processimport timeimport osname = "zcy"def task(): global name name = "Jacky" print(f"子进程:{name}")if __name__ == '__main__': p = Process(target=task) p.start() print(f"======the main is start") time.sleep(3) print(f"main:{name}")结果:======the main is start子进程:Jackymain:zcy
验证可变数据类型的空间隔离
from multiprocessing import Processimport timeimport osname = ["zcy"]def task(): name.append("Jacky") print(f"子进程:{name}")if __name__ == '__main__': p = Process(target=task) p.start() print(f"======the main is start") time.sleep(3) print(f"main:{name}")# ======the main is start# 子进程:['zcy', 'Jacky']# main:['zcy']
4. 进程对象join方法
from multiprocessing import Processimport timedef task(name,sec): print(f'{name}is running') time.sleep(sec) print(f'{name} is gone')if __name__ == '__main__': p1 = Process(target=task,args=('zcy',3)) p2 = Process(target=task,args=('zdr',4)) p3 = Process(target=task,args=('lfz',1)) p1.start() p2.start() p3.start() # join就是阻塞 start_time = time.time() # p1.join() p2.join() p3.join() 几乎同时执行,开始计时 p1.join() print(f'==主1:{time.time()-start_time}') # 第三秒执行到这行,花费3秒多一点 p2.join() print(f'===主2:{time.time()-start_time}') # 第四秒执行到这一行,花费4秒多一点 p3.join() print(f'==主3:{time.time()-start_time}') # 原本是第一秒执行,但还要等上一行主进程代码结束,因为代码是从上往下执行,所有需要花费4秒多一点 # 所以整个程序花费4秒多一点执行完,而不是8秒
如何优化代码
from multiprocessing import Processimport timedef task(sec): print("running") time.sleep(sec) print("gone")if __name__ == '__main__': p1 = Process(target=task, args=(1,)) p2 = Process(target=task,args=(2,)) p3 = Process(target=task,args=(3,)) p1.start() p2.start() p3.start() # join就是阻塞 start_time = time.time() # p1.join() p2.join() p3.join() 几乎同时执行,开始计时 p1.join() print(f'==主1:{time.time()-start_time}') # 第一秒执行到这行,花费1秒多一点 p2.join() print(f'===主2:{time.time()-start_time}') # 第二秒执行到这一行,花费2秒多一点 p3.join() print(f'==主3:{time.time()-start_time}') # 第三秒执行到这一行,花费3秒多一点 # 错误示范 # start = time.time() # for i in range(1,4): # p = Process(target=task,args=(i,)) # p.start() # p.join() # print(f"time:{time.time() - start}") # 正确示范 # lst = [] # start = time.time() # for i in range(1,4): # p = Process(target=task,args=(i,)) # p.start() # lst.append(p) # for i in lst: # i.join() # print(f"time:{time.time() - start}")
5. 进程对象其他属性
p.terminate() "杀死"进程p
p.name 给p其别名
p.is_alive() 判断p是否存活
from multiprocessing import Processimport timedef task(name): print(f'{name} is running') time.sleep(2) print(f'{name} is gone')if __name__ == '__main__': # 在windows环境下, 开启进程必须在 __name__ == '__main__' 下面 # p = Process(target=task,args=('zdr',)) # 创建一个进程对象 p = Process(target=task,args=('zdr',),name='alex') # 创建一个进程对象 p.start() # time.sleep(1) # p.terminate() # 杀死子进程 *** # p.join() # *** # time.sleep(0.5) # print(p.is_alive()) # *** # print(p.name) p.name = 'sb' print(p.name) print('主开始')
6. 僵尸进程与孤儿进程
参考博客:http://www.cnblogs.com/Anker/p/3271773.html一:僵尸进程(有害) 僵尸进程:一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵死进程。详解如下我们知道在unix/linux中,正常情况下子进程是通过父进程创建的,子进程在创建新的进程。子进程的结束和父进程的运行是一个异步过程,即父进程永远无法预测子进程到底什么时候结束,如果子进程一结束就立刻回收其全部资源,那么在父进程内将无法获取子进程的状态信息。因此,UNⅨ提供了一种机制可以保证父进程可以在任意时刻获取子进程结束时的状态信息:1、在每个进程退出的时候,内核释放该进程所有的资源,包括打开的文件,占用的内存等。但是仍然为其保留一定的信息(包括进程号the process ID,退出状态the termination status of the process,运行时间the amount of CPU time taken by the process等)2、直到父进程通过wait / waitpid来取时才释放. 但这样就导致了问题,如果进程不调用wait / waitpid的话,那么保留的那段信息就不会释放,其进程号就会一直被占用,但是系统所能使用的进程号是有限的,如果大量的产生僵死进程,将因为没有可用的进程号而导致系统不能产生新的进程. 此即为僵尸进程的危害,应当避免。 任何一个子进程(init除外)在exit()之后,并非马上就消失掉,而是留下一个称为僵尸进程(Zombie)的数据结构,等待父进程处理。这是每个子进程在结束时都要经过的阶段。如果子进程在exit()之后,父进程没有来得及处理,这时用ps命令就能看到子进程的状态是“Z”。如果父进程能及时 处理,可能用ps命令就来不及看到子进程的僵尸状态,但这并不等于子进程不经过僵尸状态。 如果父进程在子进程结束之前退出,则子进程将由init接管。init将会以父进程的身份对僵尸状态的子进程进行处理。二:孤儿进程(无害) 孤儿进程:一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。 孤儿进程是没有父进程的进程,孤儿进程这个重任就落到了init进程身上,init进程就好像是一个民政局,专门负责处理孤儿进程的善后工作。每当出现一个孤儿进程的时候,内核就把孤 儿进程的父进程设置为init,而init进程会循环地wait()它的已经退出的子进程。这样,当一个孤儿进程凄凉地结束了其生命周期的时候,init进程就会代表党和政府出面处理它的一切善后工作。因此孤儿进程并不会有什么危害。我们来测试一下(创建完子进程后,主进程所在的这个脚本就退出了,当父进程先于子进程结束时,子进程会被init收养,成为孤儿进程,而非僵尸进程),文件内容import osimport sysimport timepid = os.getpid()ppid = os.getppid()print 'im father', 'pid', pid, 'ppid', ppidpid = os.fork()#执行pid=os.fork()则会生成一个子进程#返回值pid有两种值:# 如果返回的pid值为0,表示在子进程当中# 如果返回的pid值>0,表示在父进程当中if pid > 0: print 'father died..' sys.exit(0)# 保证主线程退出完毕time.sleep(1)print 'im child', os.getpid(), os.getppid()执行文件,输出结果:im father pid 32515 ppid 32015father died..im child 32516 1看,子进程已经被pid为1的init进程接收了,所以僵尸进程在这种情况下是不存在的,存在只有孤儿进程而已,孤儿进程声明周期结束自然会被init来销毁。三:僵尸进程危害场景: 例如有个进程,它定期的产 生一个子进程,这个子进程需要做的事情很少,做完它该做的事情之后就退出了,因此这个子进程的生命周期很短,但是,父进程只管生成新的子进程,至于子进程 退出之后的事情,则一概不闻不问,这样,系统运行上一段时间之后,系统中就会存在很多的僵死进程,倘若用ps命令查看的话,就会看到很多状态为Z的进程。 严格地来说,僵死进程并不是问题的根源,罪魁祸首是产生出大量僵死进程的那个父进程。因此,当我们寻求如何消灭系统中大量的僵死进程时,答案就是把产生大 量僵死进程的那个元凶枪毙掉(也就是通过kill发送SIGTERM或者SIGKILL信号啦)。枪毙了元凶进程之后,它产生的僵死进程就变成了孤儿进 程,这些孤儿进程会被init进程接管,init进程会wait()这些孤儿进程,释放它们占用的系统进程表中的资源,这样,这些已经僵死的孤儿进程 就能瞑目而去了。四:测试#1、产生僵尸进程的程序test.py内容如下#coding:utf-8from multiprocessing import Processimport time,osdef run(): print('子',os.getpid())if __name__ == '__main__': p=Process(target=run) p.start() print('主',os.getpid()) time.sleep(1000)#2、在unix或linux系统上执行[root@vm172-31-0-19 ~]# python3 test.py &[1] 18652[root@vm172-31-0-19 ~]# 主 18652子 18653[root@vm172-31-0-19 ~]# ps aux |grep ZUSER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMANDroot 18653 0.0 0.0 0 0 pts/0 Z 20:02 0:00 [python3] <defunct> #出现僵尸进程root 18656 0.0 0.0 112648 952 pts/0 S+ 20:02 0:00 grep --color=auto Z[root@vm172-31-0-19 ~]# top #执行top命令发现1zombietop - 20:03:42 up 31 min, 3 users, load average: 0.01, 0.06, 0.12Tasks: 93 total, 2 running, 90 sleeping, 0 stopped, 1 zombie%Cpu(s): 0.0 us, 0.3 sy, 0.0 ni, 99.7 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 stKiB Mem : 1016884 total, 97184 free, 70848 used, 848852 buff/cacheKiB Swap: 0 total, 0 free, 0 used. 782540 avail Mem PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMANDroot 20 0 29788 1256 988 S 0.3 0.1 0:01.50 elfin #3、等待父进程正常结束后会调用wait/waitpid去回收僵尸进程但如果父进程是一个死循环,永远不会结束,那么该僵尸进程就会一直存在,僵尸进程过多,就是有害的解决方法一:杀死父进程解决方法二:对开启的子进程应该记得使用join,join会回收僵尸进程参考python2源码注释class Process(object): def join(self, timeout=None): ''' Wait until child process terminates ''' assert self._parent_pid == os.getpid(), 'can only join a child process' assert self._popen is not None, 'can only join a started process' res = self._popen.wait(timeout) if res is not None: _current_process._children.discard(self)join方法中调用了wait,告诉系统释放僵尸进程。discard为从自己的children中剔除解决方法三:http://blog.csdn.net/u010571844/article/details/50419798
7. 守护进程
一个例子: 古时候 太监守护这个皇帝,如果皇帝驾崩了,太监直接也就死了.
子进程守护着主进程,只要主进程结束,子进程跟着就结束
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
from multiprocessing import Processimport timedef task(name): print(f"{name} is running!") time.sleep(2) print(f"{name} is gone!")if __name__ == '__main__': p =Process(target=task,args=("zdr",)) p.daemon = True p.start() # p.daemon = True # 会报错,process has already started(因为进程已经开始) time.sleep(1) print("main function")