博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
进程(并发,并行) join start 进程池 (同步异步)
阅读量:6720 次
发布时间:2019-06-25

本文共 12473 字,大约阅读时间需要 41 分钟。

一、背景知识

  顾名思义,进程即正在执行的一个过程。进程是对正在运行程序的一个抽象。进程的概念起源于操作系统,是操作系统最核心的概念,也是操作系统提供的最古老也是最重要的抽象概念之一。操作系统的其他所有内容都是围绕进程的概念展开的。

  PS:即使可以利用的cpu只有一个(早期的计算机确实如此),也能保证支持(伪)并发的能力。将一个单独的cpu变成多个虚拟的cpu(多道技术:时间多路复用和空间多路复用+硬件上支持隔离),没有进程的抽象,现代计算机将不复存在。

必备理论

#一 操作系统的作用:    1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口    2:管理、调度进程,并且将多个进程对硬件的竞争变得有序#二 多道技术:    1.产生背景:针对单核,实现并发    ps:    现在的主机一般是多核,那么每个核都会利用多道技术    有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个    cpu中的任意一个,具体由操作系统调度算法决定。    核心    2.空间上的复用:如内存中同时有多道程序    3.时间上的复用:复用一个cpu的时间片       强调:遇到io切,占用cpu时间过长也切,核心在于切之前将进程的状态保存下来,这样            才能保证下次切换回来时,能基于上次切走的位置继续运行

二、多进程概念

1、进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu。进程有如下三个状态:

 

  其实在两种情况下会导致一个进程在逻辑上不能运行,

  1. 进程挂起是自身原因,遇到I/O阻塞,便要让出CPU让其他进程去执行,这样保证CPU一直在工作

  2. 与进程无关,是操作系统层面,可能会因为一个进程占用时间过多,或者优先级等原因,而调用其他的进程去使用CPU。

2、进程与程序的区别:程序仅仅只是一堆代码而已,而进程指的是程序的运行过程。

    强调:同一个程序执行两次,那也是两个进程,比如打开网易云音乐,虽然都是同一个软件,但是一个可以播放音乐,一个可以播放mv。

举例(单核+多道,实现多个进程的并发执行):    egon在一个时间段内有很多任务要做:python备课的任务,写书的任务,交女朋友的任务,王者荣耀上分的任务,      但egon同一时刻只能做一个任务(cpu同一时间只能干一个活),如何才能玩出多个任务并发执行的效果?    egon备一会课,再去跟李杰的女朋友聊聊天,再去打一会王者荣耀....这就保证了每个任务都在进行中

3、并发与并行:

无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务

  并发:针对只有一个cpu执行多个进程的情况。是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发。

  并行:针对多个cpu执行多个进程的情况,并行也属于并发。

  单核下,可以利用多道技术,多个核,每个核也都可以利用多道技术(多道技术是针对单核而言的

         有四个核,六个任务,这样同一时间有四个任务被执行,假设分别被分配给了cpu1,cpu2,cpu3,cpu4,

         一旦任务1遇到I/O就被迫中断执行,此时任务5就拿到cpu1的时间片去执行,这就是单核下的多道技术

         而一旦任务1的I/O结束了,操作系统会重新调用它(需知进程的调度、分配给哪个cpu运行,由操作系统说了算),可能被分配给四个cpu中的任意一个去执行

 

 三、同步\异步and阻塞\非阻塞

同步是指:发送方发出数据后,等接收方发回响应以后才发下一个数据包的通讯方式。

异步是指:发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。

同步:

#所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不会返回。   按照这个定义,其实绝大多数函数都是同步调用。但是一般而言,我们在说同步、异步的时候,特指那些需要其他部件协作或者需要一定时间完成的任务。#举例:#1. multiprocessing.Pool下的apply #发起同步调用后,就在原地等着任务结束,根本不考虑任务是在计算还是在io阻塞,总之就是一股脑地等任务结束#2. concurrent.futures.ProcessPoolExecutor().submit(func,).result()#3. concurrent.futures.ThreadPoolExecutor().submit(func,).result()

异步:

#异步的概念和同步相对。当一个异步功能调用发出后,调用者不能立刻得到结果。当该异步功能完成后,通过状态、通知或回调来通知调用者。     如果异步功能用状态来通知,那么调用者就需要每隔一定时间检查一次,效率就很低(有些初学多线程编程的人,总喜欢用一个循环去检查某个变量的值,这其实是一 种很严重的错误)。   如果是使用通知的方式,效率则很高,因为异步功能几乎不需要做额外的操作。至于回调函数,其实和通知没太多区别。#举例:#1. multiprocessing.Pool().apply_async() #发起异步调用后,并不会等待任务结束才返回,相反,会立即获取一个临时结果(并不是最终的结果,可能是封装好的一个对象)。#2. concurrent.futures.ProcessPoolExecutor(3).submit(func,)#3. concurrent.futures.ThreadPoolExecutor(3).submit(func,)

阻塞:

#阻塞调用是指调用结果返回之前,当前线程会被挂起(如遇到io操作)。函数只有在得到结果之后才会将阻塞的线程激活。有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。 对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。#举例:#1. 同步调用:apply一个累计1亿次的任务,该调用会一直等待,直到任务返回结果为止,但并未阻塞住(即便是被抢走cpu的执行权限,那也是处于就绪态);#2. 阻塞调用:当socket工作在阻塞模式的时候,如果没有数据的情况下调用recv函数,则当前线程就会被挂起,直到有数据为止。

非阻塞:

#非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前也会立刻返回,同时该函数不会阻塞当前线程。

小结:

#1. 同步与异步针对的是函数/任务的调用方式:同步就是当一个进程发起一个函数(任务)调用的时候,一直等到函数(任务)完成,而进程继续处于激活状态。而异步情况下是当一个进程发起一个函数(任务)调用的时候,不会等函数返回,而是继续往下执行当,函数返回的时候通过状态、通知、事件等方式通知进程任务完成。#2. 阻塞与非阻塞针对的是进程或线程:阻塞是当请求不能满足的时候就将进程挂起,而非阻塞则不会阻塞当前进程

 

 

 四、多进程实现

1、multiprocessing模块介绍

  Python提供了multiprocessing。multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。 multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。  需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

2、Process类介绍:为创建进程的类

主要方法:

#p为子进程p.start():启动进程,并调用该子进程中的p.run() p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁p.is_alive():如果p仍然运行,返回Truep.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

主要属性:

#p为子进程p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置p.name:进程的名称p.pid:进程的pidp.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

3、创建并开启子进程

  方式一:(常用)

from multiprocessing import Processimport timedef task(name):    print('%s is running' %name)    time.sleep(2)if __name__ == '__main__': #在windows系统下,开子进程的代码必须写到这一行下面    p=Process(target=task,args=('egon',))    p.start() #只是在给操作系统发了一个信号,让操作系统去开进程(申请内存+拷贝父进程的地址空间)    print('主')

 

from multiprocessing import Processimport timedef work(name):    print('%s is piaoing' %name)    time.sleep(5)    print('%s piao end' %name)if __name__=='__main__':                 #windows系统必须加    p=Process(target=work,args=('egon',))#args传参必须是个元组括号内要加逗号, 只是在给操作系统发一个信号,让操作系统去开进程 # #     (申请内存,拷贝父进程的地址空间)  p干的活是work 也可以以字典形式传:kwargs={'name':'egon'}    p.start()    print('主进程')

方式二:run()  名字是固定的

# multi 多 process进程from multiprocessing import Processimport timeclass Myprocess(Process):  # 自己的类    def __init__(self,name):        super().__init__()  # 保证有父类的功能        self.name=name    def run(self):        print('%s is running' % self.name)        time.sleep(2)if __name__ == '__main__':    p = Myprocess('egon')    p.start()  # 调的是p.run()    print('主')

 

from multiprocessing import Processimport timeclass work(Process):    def __init__(self,name):        super(work, self).__init__()  # super()括号内的参数可以不用传,,保证有父类的功能        self.name=name    def run(self):                            #方法名run()不可以更换        print('%s is piaoing' %self.name)        time.sleep(5)        print('%s piao end' %self.name)if __name__=='__main__':                      #windows系统必须加    p=work('egon')    p.start()   # 调的是p.run()    print('主进程')

 

 
为什么python运行的程序在系统是python.exe
 
python3 test.py ----> python.exe

 

当前进程ID : print(os.getpid())

 

解释器pycharm(运行python)的进程id : print(os.getppid())

 

| 为管道符,在cmd输tasklist  是全部运行的进程,tasklist |findstr python 过滤出python

 

注意:进程的直接内存空间是彼此隔离的,如下例:

 

from multiprocessing import Processn=100                 #在windows系统中应该把全局变量定义在if __name__ == '__main__'之上def work():    global n    n=0    print('子进程内: ',n)if __name__ == '__main__':    p=Process(target=work)    p.start()                  #结果总为:0    print('主进程内: ',n)       #结果总为:100

 

4、socket并发编程实例

  可以实现多个客户端与服务端进行交流。

服务端:

from socket import *from multiprocessing import Processserver=socket(AF_INET,SOCK_STREAM)server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)server.bind(('127.0.0.1',8080))server.listen(5)def talk(conn):    while True:        try:            msg=conn.recv(1024)            if not msg:break            conn.send(msg.upper())        except Exception:            breakif __name__ == '__main__':    while True:        conn,client_addr=server.accept()        p=Process(target=talk,args=(conn,))        p.start()

客户端:

#多个客户端from socket import *client=socket(AF_INET,SOCK_STREAM)client.connect(('127.0.0.1',8080))while True:    msg=input('>>: ').strip()    if not msg:continue    client.send(msg.encode('utf-8'))    msg=client.recv(1024)    print(msg.decode('utf-8'))

 

 

将进程干死:terminate

 

5、jion()方法详解

  join方法的主要作用是等待子进程结束后执行主进程。

基于谁开的子进程谁来关

标准版from multiprocessing import Processimport time,randomdef func(name):    print('%s is running'% name)    time.sleep(random.randint(1,3))    print('%s is runend'% name)if __name__ == '__main__':    p1 = Process(target=func ,args=('egon',))    p2 = Process(target=func ,args=('alex',))    p3 = Process(target=func ,args=('lishi',))    p4 = Process(target=func ,args=('jassin',))    p1.start()    p2.start()    p3.start()    p4.start()    print('zhu')# 精简版from multiprocessing import Processimport time,randomdef piao(name):    print('%s is piaoing' %name)    time.sleep(random.randint(1,3))    print('%s is done' %name)if __name__ == '__main__':    p1 = Process(target=piao, args=('alex',))    p2 = Process(target=piao, args=('wxx',))    p3 = Process(target=piao, args=('yxx',))    p_l = [p1, p2, p3]    for p in p_l:        p.start()    for p in p_l:        p.join()    print('主')

 

并行效果:

 

from multiprocessing import Processimport timedef piao(name):    print('%s is piaoing' %name)    time.sleep(3)    print('%s is piao end' %name)if __name__=='__main__':    p1=Process(target=piao,args=('egon',))    p2=Process(target=piao,args=('alex',))    p3=Process(target=piao,args=('yuanhao',))    p4=Process(target=piao,args=('wupeiqi',))    start_time=time.time()    p1.start()    p2.start()    p3.start()    p4.start()    p1.join()    p2.join()    p3.join()    p4.join()    end_time=time.time()    print(end_time-start_time)                   #结果为:3.多    print('主线程')                               #最后才被打印
egon is runninglishi is runningalex is runningjassin is runningegon is run endlishi is run endalex is run endjassin is run end3.2911880016326904zhu

  解释:p.join()是让主线程等待p的结束,卡住的是主线程而绝非进程p,进程只要start就会在开始运行了,所以p1-p4.start()时,系统中已经有四个并发的进程了而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其余p2,p3,p4仍然在运行,等p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接通过检测,无需等待所以4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间。

串行效果:

from multiprocessing import Processimport timedef run(name):    print('%s is running' %name)    time.sleep(3)    print('%s is run end'%name)if __name__ == '__main__':    p1 = Process(target=run,args=('egon',))    p2 = Process(target=run,args=('alex',))    p3 = Process(target=run,args=('lishi',))    p4 = Process(target=run,args=('jassin',))    start_time = time.time()    p1.start()    p1.join()    p2.start()    p2.join()    p3.start()    p3.join()    p4.start()    p4.join()    end_time = time.time()    print(end_time - start_time)    print('zhu')

 

结果:egon is runningegon is run endalex is runningalex is run endlishi is runninglishi is run endjassin is runningjassin is run end12.697726249694824zhu

 

 

解释:以上p1,p2,p3,p4进程是上一个子进程执行完才逐一被启动,形成串行效果。

 

六、进程池(Pool)

  多进程是实现并发的主要手段之一,但是通常会有如下问题:a.很明显需要并发执行的任务通常要远大于核数;b.一个操作系统不可能无限开启进程,通常有几个核就开几个进程;c.进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)。

  Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。执行任务的进程数始终位进程池中指定的那几个。

1、同步调用

  同步调用:提交完任务后,在原地等待任务结束,一旦结束可以立刻拿到结果。

from concurrent.futures import ProcessPoolExecutorimport time,random,osdef run(name,n):    print('%s is running %s' %(name,os.getpid()))    time.sleep(1)    return n**2if __name__ == '__main__':    p=ProcessPoolExecutor(4)    for i in range(10):        res=p.submit(run,'alex %s' %i,i).result() #同步调用#等进程执行完,并能得到结果,然后才开启下一个进程,相当于串行         print(res)    p.shutdown(wait=True)    print('主',os.getpid())

 

2、异步调用

  异步调用:提交完任务后,不会在原地等待任务结束,会继续提交下一次任务,等到所有任务都结束后,才能get结果。

from concurrent.futures import ProcessPoolExecutorimport time,osimport randomdef work(n):    print('%s is working' %(os.getpid()))    time.sleep(1)    return n**2if __name__=='__main__':    p=ProcessPoolExecutor(4)                              #从无到有开启4个进程,而且一直是这4个进程    objs=[]    for i in range(10):        objs=p.submin(work,'alex %s'%i,i)  #提交任务,不会在原地等结果        objs.append(obj)'''异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了'''    p.shutdown(wait = True)    print('主',os.geipid())    for obj in objs:        print(obj.result())''

 

'''提交/调用任务的方式有两种:    同步调用:提交/调用一个任务,然后就在原地等着,等到该任务执行完毕拿到结果,再执行下一行代码    异步调用: 提交/调用一个任务,不在原地等着,直接执行下一行代码,结果?'''# from multiprocessing import Process,Poolfrom concurrent.futures import ProcessPoolExecutorimport time,random,osdef piao(name,n):    print('%s is piaoing %s' %(name,os.getpid()))    time.sleep(1)    return n**2if __name__ == '__main__':    p=ProcessPoolExecutor(4)    objs=[]for i in range(10):        # res=p.submit(piao,'alex %s' %i,i).result() #同步调用        # print(res)        obj=p.submit(piao,'alex %s' %i,i) #异步调用        objs.append(obj)    for obj in objs:        print(obj.result())# 关门+等    # pool.close()    # pool.join()    p.shutdown(wait=True)    print('主',os.getpid())

******

from concurrent.futures import ProcessPoolExecutorimport time, random, osdef piao(name, n):    print('%s is piaoing %s' % (name, os.getpid()))    time.sleep(1)    return n ** 2if __name__ == '__main__':    p = ProcessPoolExecutor(4)    objs = []    start = time.time()    for i in range(10):        # res=p.submit(piao,'alex %s' %i,i).result() #同步调用        # print(res)        obj = p.submit(piao, 'alex %s' % i, i)  # 异步调用        objs.append(obj)    p.shutdown(wait=True)    print('主', os.getpid())    for obj in objs:        print(obj.result())    stop = time.time()    print(stop - start)

**

from concurrent.futures import ProcessPoolExecutorimport time,random,osdef piao(name):    print('%s is piaoing %s' %(name,os.getpid()))    time.sleep(random.randint(1,4))if __name__ == '__main__':    p=ProcessPoolExecutor(4)    for i in range(10):        p.submit(piao,'alex %s' %i)    p.shutdown(wait=True)  # shutdown 关门  wait=True 等    print('主',os.getpid())

 

转载于:https://www.cnblogs.com/jassin-du/p/7921296.html

你可能感兴趣的文章
commons-io-2.5-bin
查看>>
linux下安装一款笔记软件(为知笔记)
查看>>
HP Instant Information
查看>>
Maven 手动添加 JAR 包到本地仓库
查看>>
电子辅助的个体化严密控制策略比常规方法更有效地帮助早期RA实现全面控制病情[EULAR2015_THU0122]...
查看>>
【BZOJ3529】数表
查看>>
最坏的不是面试被拒,而是没面试机会,以面试官视角分析哪些简历至少能有面试机会...
查看>>
python的文件操作与目录操作os模块
查看>>
客户端存储cookie
查看>>
agal常用命令
查看>>
html转义
查看>>
自定义UITextView实现placeholder效果
查看>>
格式化
查看>>
如何让字体大小<12px
查看>>
【机器学习】--主成分分析PCA降维从初识到应用
查看>>
对于数组的操作:splice与slice
查看>>
第二次Scrum meeting
查看>>
云安全
查看>>
addEventListener调用带参数函数
查看>>
postgresql----INSERT
查看>>