• python多任务之总结——(一)进程学习

    本文主要从进程的基本概念、进程高级管理、进程创建、进程通信和守护进程等方面来详细讲解进程的相关知识点,示例代码主要基于python编程。

    进程基本概念

    进程:一个进程只有一个虚拟内存实例,一个虚拟处理器,当一个进程有多个线程时,所有线程会共享相同的内存地址空间。

    空闲进程–当没有其他进程运行时内核运行的进程,其pid值为0,启动后在内核运行的第一个进程为init进程,pid是1,内核将进程的最大值设置为32768(2的15次方),可以通过修改/proc/sys/kernel/pid_max进行修改。

    分配进程:内核进程是以严格的线性方式运行的,即当前进程的pid最大值是17,那分配给新进程的pid就是18,即使当新进程运行时,pid17的进程已经不在运行了,内核分配pid达到32768最大值后,才会重用以前已经分配过的pid。

    获取进程ID和父进程ID:

    os.getpid()

    os.getppid()

    运行新进程:

    在新进程执行一个新的程序分为两个步骤:

    1、系统调用fork()创建新的进程,

    2、通过exce系统调用把新的二进制程序加载到该进程中。

    写时复制:

    早期unix调用fork,内核会复制所有内部数据结构,复制进程的页表项,然后把父进程的地址空间按页复制到子进程的地址空间中,这样复制时十分耗时的,现代linux采用了写时复制,不对父进程进行整体复制。如果有多个进程要读取资源的副本,每个进程只要保存一个指向这个资源的指针就可以,如果某个进程想修改资源的副本,就要开始复制该资源,再进行反复修改,其他进程还是共享原来的那个资源副本,这就是写时复制,即在修改写入时才进行复制。写时复制是以页为基础的,只要进程没有修改其全部地址空间,就不需要全部复制。

    写时复制的实现非常简单,这些页被标记为只读,如果有进程试图修改某页时,就会产生缺页中断,内核处理缺页中断的方式就是对该页进行一次透明复制。

    终止进程:

    使用exit()后通常会执行一些基本的关闭步骤。

    exit_success

    exit_failure

    当进程退出时,内核会清理进程所创建的,不再使用的资源,包含但不局限于:分配内存,打开文件和System V的信号量。清理完成后,内核会摧毁进程,并告知父进程其子进程已经终止。

    应用直接调用_exit()并不是非常合适,绝大数应用在完全退出前需要做一些清理工作。

    等待子进程终止:

    为了防止子进程结束后父进程无法获取子进程的任何消息,unix设计成,如果子进程子在父进程之前结束,内核会把该子进程设置为特殊的进程状态,处于这种状态的进程称为僵尸进程。僵尸进程只保留最小的概要信息————一些基本的内核数据机构。僵尸进程会等待父进程来查询自己的状态,只有当父进程获取到已终止子进程的消息,这个子进程才会正式消失,不再处于僵尸状态。

    获取子进程消息的方式:

    wait()接口:调用wait成功时会返回已终止的子进程pid,如果没有子进程终止,调用会阻塞,直到有一个子进程终止。

    进程正常结束即调用了_exit(),WIFEXUTED返回真(非0值),WEXITSTATUS返回status的低八位,并传递给_eixt()函数;如果是信号导致进程结束,WIFSIGNALED返回真,此时WTERMSIG返回导致进程终止的信号编号。

    当子进程停止或继续执行时,WIFSTOPPED和WIFCONTINUED分别返回真,WIFSTOPPED返回真,此时WSTOPSIG返回时进程终止的信号编号。

    等待特定进程:

    waitpid(pid,status,options):

    pid<-1,等待一个指定的进程组的任何子进程退出,如pid=-500表示等待进程组是500的任何子进程,

    pid=-1,等待任何一个子进程退出,行为和wait一致,

    pid=0,等待同一个进程中的任何子进程,

    pid>0,等待进程pid等于pid的子进程,如参数是500,表示等待pid是500的子进程。

    僵尸进程:

    如果父进程在子进程之前结束,无论何时,只要进程结束就会遍历他的所有子进程,并且把他们的父进程设置为init进程,这保证了系统中不存在没有父进程的进程。

    守护进程:

    守护进程有两个基本要求:一是必须作为init进程的子进程运行(pid=1),二是不与任何控制终端进行交互

    进程组的主要特征就是信号可以发给进程组中的所有进程,单个进程操作可以使同一个进程组中的所有进程终止,停止或继续。进程组id就是首进程的pid,即使首进程终止了,该进程组依然存在。在进程组中,直接和用户打交道并控制终端的进程是前端进程组,其他的是后台进程组。

    进程高级管理

    1.1、进程调度

    进程调度器是内核子系统,功能是把有限的处理器资源分配给系统中的各个进程,换句话说调度器是个内核组件,决定选择哪个进程来运行。

    就绪进程就是非阻塞进程,阻塞进程就是正在睡眠的进程,等待I/O,需要内核唤醒。

    当就绪进程数大于处理器数时,有些进程会运行,而其他进程必须等待,调度器需要决定哪个进程运行何时运行运行多久等。

    同时运行多个进程就是多任务,多任务操作系统可以分为两大类,协同式和抢占式,linux实现了后一种形式的多任务,进程在被抢占前所能给运行的时间称为该进程的“时间片”(调度器给每个就绪进程分配一个处理器“时间片“)。

    协同式多任务系统中,进程会一直运行到自己结束为止,但一个拙劣或破坏性的程序可能会运行很长时间,破坏多任务机制,因此现代操作系统几乎都采用抢占式任务机制,linux也不例外。

    1.1.1 时间片

    进程调度器分配给进程的时间片对于系统的全局行为和性能时至关重要的。时间片太长,进程在执行前必须等待很长时间,降低并发运行,用户因感到明显延迟而失望,时间片太短,大量时间花费在进程调度上。linux的完全公平调度器不用时间片。

    1.1.2 I/O约束型进程和处理器约束型进程

    处理器约束型进程:一直消耗完所有可用时间片的进程,这类型进程会消耗大量的CPU资源,会消耗掉调度器分配的全部CPU,例子就是无限循环。

    I/O约束型进程:多数时间处于阻塞状态等待资源的进程。如键盘输入

    混合约束型:音频视频的编码解码。

    区别:处理器约束型进程期望获取尽可能长的时间片,尽快完成任务,相反I/O约束型进程不需要很长时间。

    平衡处理器约束型和I/O约束型应用之间的足球时很难的。

    1.1.3抢占式调度

    内核会给所有就绪进程分配一个时间片,当进程消耗完时间片内核就会挂起该进程开始运行另一个进程,如果系统中没有就绪进程,内核会给消耗完时间片的进程重新分配时间片并再次运行。通过这种方式保证所有进程都必须运行。

    1.2完全公平调度器

    传统的进程调度器会给每个就绪进程分配一个时间皮,此外还给每个进程分配优先级,先运行优先级高的进程。

    完全公平调度器引入了不同的算法,公平调度。算法过程:CFS最初给每个进程分别分配1/N处理器时间,然后CFS通过优先级权衡每个进程的比例,调整分配。默认优先级时0,权值时1,则比例不变,优先级设置值越小,优先级越高,权值越高。

    要确定每个进程真正的执行时间,完全公平调度器需要把比例划分出一个固定的周期即“目标延迟”。假设目标延迟时20ms,有两个优先级相同的进程,这样完全公平调度器会执行一个进程10ms,然后执行另一个,这样不断重复。但是当进程过多时,假设200个可运行进程,如果目标延迟时20ms,则每个时100微秒,由于一个进程到另一个进程的上下文切换会带来开销,以及无法更好利用

    时间局部性,基于以上问题,完全公平调度器引入另一个关键因素:最小粒度。

    最小粒度:任一个进程运行时间长的基准值,不管分配的处理器比例是多少,都至少会运行最小粒度的时间(除非被阻塞你)。这种机制保证了切换代价不会因为目标延迟值很小,二占用大比例的系统总时间,也就是最小粒度破坏了公平性。

    1.3、让出处理器

    调用sched_yield()函数,会挂起当前在运行的进程,然后调度器会选择一个新的进程进来,如果没有其他就绪进程,让出的进程会立即执行。但很少使用。

    1.4、进程优先级

    linux通过进程“优先级”(nice value)来调整分配给进程的处理器比例,nice value范围时(-20,19】默认值时0,nice value越低优先级越高,时间片越长。(因为1/N,N目标延迟时确定的)

    1.4.1 nice

    nice()调用成功,会在现有的value值上增加inc,只要进程的所有者时root才可以使用负值inc,减少value。提升进程的优先级,非root用户进程只能降低优先级。

    1.5、处理器亲和力

    在多处理器上,调度器必须决定在每个CPU上运行哪个进程,

    调度器要充分利用系统的处理器,但是如果一个进程曾在一个CPU上运行,之后再运行,调度器应尽可能把它放到同一个CPU上,因为处理器间的进程迁移会带来性能损失,最大的损失来自于“缓存效应”。

    每个处理器都是独立的,不能共享缓存中的数据,当进程迁移到新的处理器时,要写入数据到内存中,原来处理器的缓存会被标记为无效,因此在任意时刻,任意数据只在一个处理器的缓存有效。

    当然如果一个处理器比另一个的负载大得多。这样把某些进程迁移到不忙碌的CPU就很有意义了,决定何时移动进程来避免负载不平衡就是负载均衡。

    处理器亲和力表示一个进程会一直被调度到同一个处理器的可能性。术语“软亲和力”表明调度器调度进程到同一个处理器上的自然倾向,只有当负载极端不平衡的时候才会考虑迁移进程。有些时候用户或进程需要保证进程和处理器间的绑定,即进程非常依赖缓存,把进程绑定到某个处理器并强制内核保持这种绑定关系,称为“硬亲和力“

    1.6、linux调度策略和优先级

    每个进程都有一个静态优先级,该优先级和nice value无关。‘先进先出’(FIFO)策略是没有时间片,非常简单铯实时策略,只要没有更高级优先级进程就绪,FIFO类型进程就会持续运行。

    1、如果一个就绪的FIFO类型进程是系统中的最高优先级进程,它就会一直保持运行,当FIFO进程就绪会抢占普通进程。

    2、FIFO类型进程会持续运行,直到阻塞或调用sched_yield(),或者有更高优先级的进程

    3、当FIFO阻塞,调度器会将其移出就绪队列,当该进程重新就绪时会被查到相同优先级进程队列末尾,它必须等待更高或相同优先级进程停止运行后才会运行。

    4、当FIFO类型进程调用sched_yield(),调度器 它必须等待更高或相同优先级进程停止运行后才会运行会把它放到相同优先级进程队列末尾,它必须等待更高或相同优先级进程停止运行后才会运行,如果不存在更高或相同优先级进程,则调用sched_yield()没用。

    5、当FIFO类型进程被更高级进程或新来的同等优先级FIFO进程抢占,他在优先级队列中的位置不变,因此一旦高优先级进程停止运行,被抢占的FIFO进程会继续执行。

    6、当一个进程称为FIFO类型进程或者该进程的静态优先级发生改变,它就会被放到相同优先级进程队列的队首,因此新来的FIFO类型进程会抢占同等优先级进程。

    轮训策略(round-robin,RR):调度器会给每个轮训类型进程分配一个时间片,当耗光时间片时,调度器会把该进程放在同等优先级进程队列的末尾,通过这种方式,轮询类型进程间就能轮询调度。如果给定优先级队列里只有一个进程,轮询类型就等同于FIFO类型。

    普通调度策略:所有普通类型的进程静态优先级为0,任何一个就绪的FIFO或轮询类型进程都会抢占它们。调度器利用nice value来划分普通进程的优先级,静态优先级不受nice value影响,普通应用的静态优先级时0,实时应用时1到99。

    批调度策略:只有在没有其他就绪进程时才会运行。

    进程创建

    一、利用os.fork()创建

    1、创建简单的子进程

    示例:

    pid = os.fork()if pid == 0:    print('子进程{}'.format(os.getpid()))    time.sleep(2)    print('子进程over')elif pid>0:    print('主进程{}'.format(os.getpid()))    time.sleep(3)print ("After fork process pid=%s,ppid=%s" % (os.getpid(), os.getppid()))结果:主进程1396子进程1397子进程overAfter fork process pid=1397,ppid=1396After fork process pid=1396,ppid=787

    子进程和父进程分别会打印最后一句,运行的结果顺序可能会不一样

    因为父子进程是独立运行的,只不过是打印在同一个终端上

    这个函数很特殊,调用一次,返回两次,因为操作系统是将当前的进程(父进程)复制了一份(子进程),然后分别在父进程和子进程内返回。子进程永远返回0,而父进程返回子进程的 PID。我们可以通过判断返回值是不是 0 来判断当前是在父进程还是子进程中执行。

    2、变量在父子进程中的关系:

    假设在父进程中存在变量a,而在子进程中将变量a改变时,其在父进程中的值并没有变化,这主要是因父子进程中位于不同的虚拟空间所致。

    3、父子进程PID:

    pid = os.fork()if pid < 0:    print("create process failed")elif pid == 0:    print("my PID is",os.getpid()) #子进程PID    print("my parent process-1 PID is",os.getppid()) #父进程PID    sleep(5)    print("my parent process-2 PID is",os.getppid()) #父进程PID    print("this is child process")else:    sleep(1)    print("===========================")    print("the PID is",pid) #父进程中fork()返回值 = 子进程PID    print("the parent PID is",os.getpid()) #父进程PID    print("this is parent process") 结果:my PID is 4338my parent process-1 PID is 4337===========================the PID is 4338the parent PID is 4337this is parent processmy parent process-2 PID is 2236this is child process

    注:在子进程中,sleep(5)前后的子进程对应的父进程是不一样的(PID不一致),这是由于当父进程结束后,子进程就成了孤儿进程。孤儿进程将被init进程(该进程号即为子进程新的父进程PID)所收养,并由init进程对它们完成状态收集工作。

    二、利用subprocess创建

    1、创建

    child =subprocess.Popen('cmd',shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)child.wait()child.communicate()print ("Hello")

    2、管道阻塞问题

    import subprocessdef test(size):    print ('start')     cmd = 'ddif=/dev/urandom bs=1 count=%d 2>/dev/null' % size    p =subprocess.Popen(args=cmd, shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT, close_fds=True)    # p.communicate()    p.wait()  # 这里超出管道限制,将会卡住子进程    print ('end') # 64KBtest(64 * 1024) # 64KB + 1Btest(64 * 1024 + 1) 使用p.wait时的结果:startendstart 使用p.communicate时的结果:startendstartend

    首先测试输出为64KB 大小的情况。使用 wait() 等待 调用结束,可以看到正确的 start 和 end 输出;然后测试比 64KB 多的情况,wait()这种情况下只输出了 start,也就是说程序执行卡在了 p.wait() 上,程序死锁。而使用p.communicate()这个方法不会死锁,会把输出放在内存,而不是管道里,所以这时候上限就和内存大小有关了,一般不会有问题。而且如果要获得程序返回值,可以在调用 Popen.communicate() 之后取 Popen.returncode 的值。

    3、常用方法

    child= subprocess.Popen("ls-l",shell=True, stdout=subprocess.PIPE)  child.poll()  返回子进程运行状态,主要是两种结果,None代表尚未运行完,而一个返回码则代表已经运行完成并且是成功或失败了child.kill()  强行终止子进程child.send_signal(...)  向子进程发送一个信号(具体信号是以什么方式表示不清楚,还待研究)child.terminate()  终止子进程child.pid  子进程的pidchild.returncode  子进程的返回码child.stdin/stdout/stderr  子进程的标准输入流,标准输出和标准错误输出,都是类文件对象

    三、利用multiprocessing.Process创建

    importmultiprocessingdef task():    print('%s 子进程 is run....'%os.getpid())    time.sleep(3)    print('%s 子进程 is done...'%os.getpid()) if __name__ == '__main__':   p=multiprocessing.Process(target=task)    p.daemon = True  #守护进程    p.start()    p.join()    print('主进程:',os.getpid())结果:1522 子进程 is run....1522 子进程 is done...主进程: 1521注:对于是守护进程的子进程,主进程加join才会等待守护进程执行再结束,主进程不加join时,不会等待守护进程执行,会直接结束。主进程不管加或不加join都会等待非守护子进程结束后再结束,加join后会等待非守护子进程结束后再执行join后面主进程代码。将进程定义为类:import multiprocessingimport timeclassClockProcess(multiprocessing.Process):    def __init__(self,interval):       multiprocessing.Process.__init__(self)       self.interval = interval     def run(self):        n = 5        while n> 0:           print("the time is {0}".format(time.ctime()))           time.sleep(self.interval)           n -= 1 if __name__ == '__main__':    p = ClockProcess(3)    p.start()     注:进程p调用start()时,自动调用run()

    四、进程池

    当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。

    1、进程池创建

    from multiprocessing import Poolimport os, time, randomdef worker(msg):    t_start = time.time()    print("%s开始执行,进程号为%d" % (msg,os.getpid()))    random.random()    time.sleep(random.random()*2)    t_stop = time.time()    print(msg,"执行完毕,耗时%0.2f"% (t_stop-t_start))po = Pool(3)  # 定义一个进程池,最大进程数3for i in range(0,10):    # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))    # 每次循环将会用空闲出来的子进程去调用目标    po.apply_async(worker,(i,))  #异步进程池,同时执行3个进程    # po.apply(worker,(i,))  #同步进程池,必须等待上一个进程执行完毕才会执行下一个进程。print("----start----")po.close()  # 关闭进程池,关闭后po不再接收新的请求po.join()  # 等待po中所有子进程执行完成,必须放在close语句之后print("-----end-----")

    2、map和imap方法

    在pool中,有两个方法,一个是map一个是imap,其实这两方法超级方便,在执行结束之后,可以得到每个进程的返回结果,但是缺点就是每次的时候,只能有一个参数,也就是在执行的函数中,最多是只有一个参数的,否则,需要使用组合参数的方法。

    在使用map的时候,直接返回的一个是一个list,从而这个list也就是函数执行的结果,而在imap中,返回的是一个由结果组成的迭代器,如果需要使用多个参数的话,那么需要*args,从而使用参数args

    在使用pool的时候,可以直接得到结果,map和imap都是直接得到一个list和可迭代对象,而apply_async得到的结果是一个对象需要用一个list装起来,然后得到每个结果。

    def func(name):    #print(name.upper())    returnname.upper() def main():    p =multiprocessing.Pool(3)   print(p.map(func, ['kel','simle']))   print(p.imap(func, ['kel','simle']))    for iin p.imap(func, ['kel','simle']):       print(i)main()结果:['KEL', 'SIMLE']<multiprocessing.pool.IMapIteratorobject at 0x106faaa20>KELSIMLE

    进程通信

    方式:进程间通讯有多种方式,包括信号,管道,消息队列,信号量,共享内存,socket等

    一、通信Pipe

    Pipe()函数返回表示管道末端的一对Connection(conn1,conn2)对象。,默认情况下是双工(双向)。返回的两个连接对象代表管道的两端。每个连接对象都有send()和recv()方法(等等)。请注意,如果两个进程(或线程)尝试同时读取或写入管道的同一端,管道中的数据可能会损坏。当然,同时使用管道不同端的过程也不会有风险。如果duplex为True(默认),则管道是双向的。如果duplex是False,那么管道是单向的:conn1只能用于接收消息,conn2发送消息。

    示例:def f(conn):   conn.send([42, None, 'hello']) if __name__ =='__main__':parent_conn,child_conn = Pipe()   #parent_conn, child_conn = Pipe(duplex=False)当duplex=False,只能用child_conn发送消息,parent_conn接收消息    p =Process(target = f, args = (parent_conn, ))   p.start()   print(child_conn.recv())   p.terminate()结果:[42, None, 'hello']返回的两个连接对象必须一个发,一个收,不能用同一个对象收发

    二、信号量Semaphore

    Semaphore用来控制对共享资源的访问数量,例如池的最大连接数。

    示例:def worker(s, i):   s.acquire()   print(multiprocessing.current_process().name + " acquire")   time.sleep(1)   print(multiprocessing.current_process().name + " release")   s.release()if __name__ =="__main__":    s =multiprocessing.Semaphore(2)  #最大子进程数两个    for iin range(5):       p = multiprocessing.Process(target=worker, args=(s, i * 2))       p.start()       #p.join()结果:Process-1 acquireProcess-2 acquire Process-1 releaseProcess-3 acquire Process-2 releaseProcess-4 acquire Process-3 releaseProcess-5 acquire Process-4 releaseProcess-5 release注:每次执行的最大子进程数为2.

    三、Event实现进程间同步通信

    Event内部包含了一个标志位,初始的时候为false。可以使用使用set()来将其设置为true;或者使用clear()将其从新设置为false;可以使用is_set()来检查标志位的状态;另一个最重要的函数就是wait(timeout=None),用来阻塞当前线程,直到event的内部标志位被设置为true或者timeout超时。如果内部标志位为true则wait()函数理解返回。e= threading.Event()线程和进程间的相互通信类似。

    示例:defwait_for_event(e):   """Wait for the event to be set before doinganything"""   print('wait_for_event: starting')   e.wait()   print('wait_for_event: e.is_set()->' + str(e.is_set()))  defwait_for_event_timeout(e, t):   """Wait t seconds and then timeout"""   print('wait_for_event_timeout: starting')   e.wait(t)   print('wait_for_event_timeout: e.is_set()->' + str(e.is_set()))  if __name__ =='__main__':    e =multiprocessing.Event()    w1 =multiprocessing.Process(name='block', target=wait_for_event, args=(e,))   w1.start()     w2 =multiprocessing.Process(name='non-block',target=wait_for_event_timeout,args=(e,2))   w2.start()    time.sleep(3)    #e.set()print('main: event isset')结果;e.set未执行时: wait_for_event:startingwait_for_event_timeout:startingwait_for_event_timeout:e.is_set()->Falsemain: event is sete.set执行时wait_for_event:startingwait_for_event_timeout:startingwait_for_event_timeout:e.is_set()->Falsemain: event is setwait_for_event: e.is_set()->True

    四、共享内存通信Array、Value、Manager

    Value:将一个值存放在内存中,

    Array:将多个数据存放在内存中,

    但要求数据类型一致,后面加数字时是个对象,表示开辟n个空间

    def func(n, a):    n.value= 3.14    a[1] =9999num =multiprocessing.Value('d', 0.0)arr =multiprocessing.Array('i', range(10))#第二个参数如果传入一个数字,此处表示开辟5个空间,且均为整型i,其实就是一个列表[0,0,0,0,0],第二个参数如果传的是列表等,表示就是存入列表中的元素。则表示在共享内存中开辟多大的空间#m = Array('i',5)print(arr[:])p =multiprocessing.Process(target=func, args=(num, arr))p.start()p.join()print(num.value)print(arr[:])结果:[0, 1, 2, 3, 4, 5, 6,7, 8, 9]3.140000104904175[0, 9999, 2, 3, 4, 5,6, 7, 8, 9]在Process进程中,我们修改了Value和Array对象。回到主程序,打印出结果,主程序也看到了两个对象的改变,说明资源确实在两个进程之间共享。with Manager() asmanager:  #做一个别名,此时manager就相当于Manager()       d = manager.dict()  #生成一个可在多个进程之间传递和共享的字典        l = manager.list(range(5))  #生成一个可在多个进程之间传递和共享的列表;通过range(5)给列表中生成5个数据

    五、消息队列

    1、Queue.Queue是进程内(线程)非阻塞队列。multiprocess.Queue是跨进程通信队列。前者是各自私有,后者是各子进程共有。需要注意的是队列中Queue.Queue是线程安全的,但并不是进程安全,所以多进程一般使用线程、进程安全的multiprocessing.Queue().初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头)。

    2、常用方法

    q.qsize():返回当前队列包含的消息数量;q.empty():如果队列为空,返回True,反之Falseq.full():如果队列满了,返回True,反之Falseq.put_nowait(item) #  等效于 put(item,block=False)q.get_nowait()#    等效于 get(item,block=False)q.put(obj [,block=True[, timeout]])

    第二个block为可选参数,默认为True.

    当block为True,timeout为空时,q.put([1,2,3])、q.put([1,2,3],True) 表示将序列插入到队尾,阻塞调用,如果q队列满时,一直等待(无超时限制的阻塞调用)。

    当block为True,timeout为正整数时,q.put([1,],True,2)表示阻塞调用进程最多timeout秒,如果超过该时间仍无空间可用,则抛出Queue.Full异常(带超时的阻塞调用)。

    当block为False,q.put([1,], False) 表示调用进程时如果有空闲空间则将数据放入队列中,否则立即抛出Queue.Full异常。

    from multiprocessingimport JoinableQueue,是 Queue的子类,增加了task_done()和join()方法。

    如果每从队列里取一次,但没有执行task_done(),则join无法判断队列到底有没有结束,在最后执行个join()是等不到结果的,会一直挂起阻塞。可以理解为,每task_done一次(无论是否执行了get方法,每调用一次就会从队列里删除一个元素)就从队列里删掉一个元素,这样在最后join的时候根据队列长度是否为零来判断队列是否结束,从而执行解除等待阻塞。

    3、进程池中的消队列

    如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue()创建Queue:queue = multiprocessing.Manager().Queue()

    守护进程

    使用fork创建进程:由于创建守护进程的第一步调用了fork函数来创建子进程,再将父进程退出。由于在调用了fork函数时,子进程全盘拷贝了父进程的会话期、进程组、控制终端等,虽然父进程退出了,但会话期、进程组、控制终端等并没有改变,因此,这还不是真正意义上的独立开来,需要再次调用os.fork创建子进程形成真正意义上的守护进程。

    守护进程与后台进程区别:

    1、守护进程与终端无关,是被init进程收养的孤儿进程,而后台进程的父进程仍然时终端的,

    2、守护进程在终端关闭时仍然坚挺,而后台进程随用户退出而停止,除非加上nohup

    3、守护进程改变了会话、进程组、工作目录和文件描述符,后台进程则直接继承父进程。

    defdaemonize(pid_file=None):   """    创建守护进程    :parampid_file: 保存进程id的文件   :return:   """    # 从父进程fork一个子进程出来    pid =os.fork()    # 子进程的pid一定为0,父进程大于0    if pid:       sys.exit()      # 退出父进程,sys.exit()方法比os._exit()方法会多执行一些刷新缓冲工作     os.chdir('/')   # 子进程默认继承父进程的工作目录,最好是变更到根目录,否则回影响文件系统的卸载   os.umask(0)     # 子进程默认继承父进程的umask(文件权限掩码),重设为0(完全控制),以免影响程序读写文件   os.setsid()     # 让子进程成为新的会话组长和进程组长,由于在调用了fork函数时,子进程全盘拷贝了父进程的会话期、进程组、控制终端等,虽然父进程退出了,但会话期、进程组、控制终端等并没有改变,因此,这还不是真正意义上的独立开来。    _pid =os.fork()     # 注意了,这里是第2次fork,也就是子进程的子进程,我们把它叫为孙子进程    if_pid:       sys.exit()          # 退出子进程     # 此时,孙子进程已经是守护进程了,接下来重定向标准输入、输出、错误的描述符(是重定向而不是关闭, 这样可以避免程序在 print 的时候出错)    #0,1,2 文件描述符代表标准输入设备(比如键盘),标准输出设备(显示器)和标准错误,因为进程1比较特殊,    # 是所有进程的父进程,所以不做输入输出,/dev/null就是空设备,指向它就没有输入输出了。    # 刷新缓冲区先   sys.stdout.flush()   sys.stdin.flush()    # dup2复制文件描述符,重定向到/dev/nul,即丢弃所有输入输出    withopen('/dev/null') as read_null, open('/dev/null', 'w') as write_null:       os.dup2(read_null.fileno(), sys.stdin.fileno()) # 把程序的输入流重定向到上面定义的read_null 文件       os.dup2(write_null.fileno(), sys.stdout.fileno())       os.dup2(write_null.fileno(), sys.stderr.fileno())    ifpid_file:       with open(pid_file, 'w+') as f:           f.write(str(os.getpid()))        atexit.register(os.remove, pid_file)

    守护进程与守护线程:

    1.线程与进程运行完毕的区别:

    • 主进程运行完毕指的是主进程代码运行完毕(注意是主进程代码执行完毕而不是主进程终止结束,有可能此时主进程在等待非守护子进程)

    • 主线程运行完毕指的是所在的进程内的所有非守护线程运行完毕后,主线程才算运行完毕

    • 强调:运行完毕,并非是终止

    2.守护进程:主进程代码运行完毕,守护进程也就结束 (守护的是主进程)

    主进程要等非守护进程都运行完毕后再回收子进程的资源(否则会产生僵尸进程)才结束

    主进程等子进程是因为主进程要给子进程收尸(代用wait方法向操作系统发起回收资源信号(pid号,状态信息))

    守护线程:非守护线程代码运行完毕,守护线程也就结束 (守护的是非守护线程)

    主线程在其他非守护线程运行完毕后才算结束(守护线程在此时就会被回收)

    强调:主线程也是非守护线程(进程包含了线程)

    总结:主线程的结束意味着进程结束,进程整体的资源都会被回收,而进程必须保证非守护线程都运行完毕后才能结束

    守护进程:主进程代码运行完毕,守护进程也就结束

    守护线程:非守护线程运行完毕,守护线程结束

    守护进程的创建与使用

    daemon模块

    • import osimport sysimport timeimport atexitimport signalclass Daemon: def__init__(self, pidfile='/*/daemon.pid', stdin='/dev/null', stdout='/dev/null',stderr='/dev/null'): self.stdin = stdin self.stdout = stdout self.stderr = stderr self.pidfile = pidfile defdaemonize(self): if os.path.exists(self.pidfile): raise RuntimeError('Already running.') # First fork (detaches from parent) try: if os.fork() > 0: raise SystemExit(0) except OSError as e: raise RuntimeError('fork #1 faild: {0}({1})n'.format(e.errno, e.strerror)) os.chdir('/') os.setsid() os.umask(0o22) # Second fork (relinquish session leadership) try: if os.fork() > 0: raise SystemExit(0) except OSError as e: raise RuntimeError('fork #2 faild: {0}({1})n'.format(e.errno, e.strerror)) # Flush I/O buffers sys.stdout.flush() sys.stderr.flush() # Replace file descriptors for stdin, stdout, and stderr with open(self.stdin, 'rb', 0) as f: os.dup2(f.fileno(), sys.stdin.fileno()) with open(self.stdout, 'ab', 0) as f: os.dup2(f.fileno(), sys.stdout.fileno()) with open(self.stderr, 'ab', 0) as f: os.dup2(f.fileno(), sys.stderr.fileno()) # Write the PID file with open(self.pidfile, 'w') as f: c = os.getpid() f.write(str(c)) # Arrange to have the PID file removed on exit/signal atexit.register(lambda: os.remove(self.pidfile)) signal.signal(signal.SIGTERM, self.__sigterm_handler) # Ctrl-c #Signal handler for termination (required) @staticmethod def__sigterm_handler(signo, frame): raise SystemExit(1) #Signal handler for termination (required) defstart(self): try: self.daemonize() except RuntimeError as e: print(e) raise SystemExit(1) self.run() defstop(self): try: if os.path.exists(self.pidfile): with open(self.pidfile) as f: os.kill(int(f.read()),signal.SIGTERM) else: print('Not running.') raise SystemExit(1) except OSError as e: if 'No such process' in str(e) andos.path.exists(self.pidfile): os.remove(self.pidfile) defrestart(self): self.stop() self.start() defrun(self):pass

    test_daemon模块

    import osimport sysimport timefrom daemon importDaemon classMyTestDaemon(Daemon):    defrun(self):       sys.stdout.write('Daemon started with pid {}n'.format(os.getpid()))       while True:           sys.stdout.write('Daemon Alive!{}n'.format(time.ctime()))           sys.stdout.flush()            time.sleep(5) if __name__ =='__main__':    PIDFILE= '/*/daemon-example.pid'    LOG ='/*/daemon-example.log'    daemon= MyTestDaemon(pidfile=PIDFILE, stdout=LOG, stderr=LOG)    iflen(sys.argv) != 2:       print('Usage: {} [start|stop]'.format(sys.argv[0]))       raise SystemExit(1)     if'start' == sys.argv[1]:       daemon.start()    elif'stop' == sys.argv[1]:       daemon.stop()    elif'restart' == sys.argv[1]:       daemon.restart()    else:       print('Unknown command {!r}'.format(sys.argv[1]))raiseSystemExit(1)

    注:本文部分示例代码源自于网上分享。

    «
    »
以专业成就每一位客户,让企业IT只为效果和安全买单

以专业成就每一位客户,让企业IT只为效果和安全买单