简介
进程是运行的程序,每个进程有自己的系统状态,包含了内存、打开文件列表、程序计数器(跟踪执行的指令)、存储函数本地调用变量的堆栈。
使用os或subprocess可以创建新进程,比如:os.fork(), subprocess.Popen()。子进程和父进程是相互独立执行的。
interprocess communication (IPC)进程间的通信: 最常见的形式是基于消息传递(message passing)。message是原始字节的缓存,通过I/O channel比如网络socket和管道,使用原语比如send() and recv()来发送接收消息。次常用的有内存映射区:memory-mapped regions,见mmap模块,实际上是共享内存。
线程有自己的控制流和执行堆栈,但是共享系统资源和数据。
并发的难点:同步和数据共享。解决的方法一般是使用互斥锁。
write_lock = Lock()...# Critical section where writing occurswrite_lock.acquire()f.write("Here's some data.\n")f.write("Here's more data.\n")...write_lock.release()
python的并发程序设计
多数系统上,Python支持消息传递和基于线程的并发程序设计。global interpreter lock (the GIL)机制实际每个时间单元只允许单个线程执行,哪怕有多个CPU。如果瓶颈在I/O,使用多线程效果不错;如果在cpu,效果则会更差。还不如使用子进程和消息传递。线程数一多经常出现以下怪异的问题,比如100个线程工作良好,1000个线程就可能出问题了,这种情况一般需要使用异步事件处理系统,比如中央事件循环可能使用select模块监控I/O资源和分发异步到大量的I/O 处理器。asyncore和流行的第三方的Twisted (http://twistedmatrix/com)可以实现这点。
消息传递在python使用很广,甚至在线程中。它难于出错,减少了锁和同步原语的使用。可以扩展至网络和分布式系统。Python的高级特性比如协程序(coroutines)也使用消息传递抽象。
multiprocessing支持子进程、通信和共享数据、执行不同形式的同步。
multiprocessing
Process类
这个类表示子进程中运行的任务:Process([group [, target [, name [, args [, kwargs]]]]]),构造函数中必须使用关键字参数,target表示可调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。Name为别名。Group实质上不使用。
方法有:is_alive()、.join([timeout])、run()、start()、terminate()。
属性有:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。
Process类中,注意daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。
创建函数并将其作为单个进程。
import multiprocessingimport timedef clock(interval): for i in range(3): print("The time is {0}".format(time.ctime())) time.sleep(interval)if __name__ == '__main__': p = multiprocessing.Process(target=clock, args=(2,)) p.start()
将进程定义为类:
import multiprocessingimport timeclass ClockProcess(multiprocessing.Process): def __init__(self, interval): multiprocessing.Process.__init__(self) self.interval = interval def run(self): for i in range(3): print("The time is {0}".format(time.ctime())) time.sleep(self.interval)if __name__ == '__main__': p = ClockProcess(2) p.start()
注意,要在命令行才能执行,用IDE是不行的。
进程通信
multiprocessing支持管道和队列,都是用消息传递来实现的,队列接口和线程中的队列类似。
Queue([maxsize]):默认不限制大小,队列实质是用管道和锁来实现的。支持线程会给底层管道传送数据。
方法有:cancel_join_thread()、close()、empty()、full()、get([block [, timeout]])、get_nowait()(等同于get(False))、join_thread()、put(item [, block [, timeout]])、put_nowait(item)(等同于put(item, False))、qsize()、JoinableQueue([maxsize])、task_done()、join()
下例使用队列进行通信:
JoinableQueue创建连接的进程队列。队列和普通队列基本一样,不过消费者在处理完毕之后可以通知生产者(q.task_done())。使用共享信号和条件变量实现。join()由生产者使用,等待所有成员都收到task_done。
import multiprocessingdef consumer(input_q): while True: item = input_q.get() print(item) input_q.task_done()def producer(sequence, output_q): for item in sequence: output_q.put(item)if __name__ == '__main__': q = multiprocessing.JoinableQueue() cons_p = multiprocessing.Process(target=consumer, args=(q,)) cons_p.daemon = True cons_p.start() sequence = [1, 2, 3, 4] producer(sequence, q) q.join()
这里控制多进程的关键在于队列get()之后,使用task_done()指示该元素处理完毕;进程启动之前设置了daemon为True;对队列使用join()。
这种方法可以启动多个进程,如下:
process = [] key_list = multiprocessing.JoinableQueue() # Launch the consumer process for i in range(10): t = multiprocessing.Process(target=consumer,args=(key_list,lock)) t.daemon=True process.append(t) for i in range(10): process[i].start() producer( key_list ) key_list.join()
下面有个应用实例:
在某些程序中,生产者需要告知消费者没有更多项目了,消费者可以关闭了。这时需要使用哨兵(sentinel)。
#!/usr/bin/env python# -*- coding: utf-8 -*-# multiprocessing_sentinel.py# Author Rongzhong Xu 2016-08-11 wechat: pythontesting"""multiprocessing sentinel demo,Tesed in python2.7/3.5/2.6"""import multiprocessingdef consumer(input_q): while True: item = input_q.get() if item is None: break # Process item print(item) # Replace with useful work # Shutdown print("Consumer done")def producer(sequence, output_q): for item in sequence: # Put the item on the queue output_q.put(item)if __name__ == '__main__': q = multiprocessing.Queue() # Launch the consumer process cons_p = multiprocessing.Process(target=consumer, args=(q,)) cons_p.start() # Produce items sequence = [1, 2, 3, 4] producer(sequence, q) # Signal completion by putting the sentinel on the queue q.put(None) # Wait for the consumer process to shutdown cons_p.join()
注意:每个消费者都需要一个:sentinel,可以使用for语句来实现
for i in range(10): q.put(None)
实际使用中不局限于使用None,使用其他特殊符号等也是可以的。上面程序从表面看比使用JoinableQueue要复杂,实现的效果又是一样的。实际上这种场景应用更广泛,在consumer比较耗时的情况下,JoinableQueue如果锁住整个函数则互相等待的时间太长,如果不锁,后面几次执行可能丢失数据。
管道
使用管道:Pipe([duplex]),返回值:元组(conn1, conn2)。conn1和conn2为Connection对象,代表管道的末端。管道默认是双向的,如果设置duplex为False,conn1只能接收,conn2只能发送。
Connection对象的方法和属性如下:
close()、fileno()、poll([timeout])、recv()、recv_bytes([maxlength])、recv_bytes_into(buffer [, offset])、send(obj)、send_bytes(buffer [, offset [, size]])
下面例子实现和之前类似的功能:
def consumer(pipe): output_p, input_p = pipe input_p.close() # Close the input end of the pipe while True: try: item = output_p.recv() except EOFError: break # Process item print(item) # Replace with useful work # Shutdown print("Consumer done")# Produce items and put on a queue. sequence is an# iterable representing items to be processed.def producer(sequence, input_p): for item in sequence: # Put the item on the queue input_p.send(item)if __name__ == '__main__': (output_p, input_p) = multiprocessing.Pipe() # Launch the consumer process cons_p = multiprocessing.Process( target=consumer, args=((output_p, input_p),)) cons_p.start() # Close the output pipe in the producer output_p.close() # Produce items sequence = [1, 2, 3, 4] producer(sequence, input_p) # Signal completion by closing the input pipe input_p.close() # Wait for the consumer process to shutdown cons_p.join()
管道还可以用于双向通信,比如下例的C/S模式:
import multiprocessing# A server processdef adder(pipe): server_p, client_p = pipe client_p.close() while True: try: x, y = server_p.recv() except EOFError: break result = x + y server_p.send(result) # Shutdown print("Server done")if __name__ == '__main__': (server_p, client_p) = multiprocessing.Pipe() # Launch the server process adder_p = multiprocessing.Process( target=adder, args=((server_p, client_p),)) adder_p.start() # Close the server pipe in the client server_p.close() # Make some requests on the server client_p.send((3, 4)) print(client_p.recv()) client_p.send(('Hello', 'World')) print(client_p.recv()) # Done. Close the pipe client_p.close() # Wait for the consumer process to shutdown adder_p.join()
send()和recv()使用pickle序列化对象。更高级的程序需要使用远程过程调用,需要使用到进程池。
进程池
Pool类在简单的情况下可用于管理固定数量的消费者。进程池的功能和列表解析及函数式编程中的map-reduce类似。
import multiprocessingimport timedef do_calculation(data): return data * 2def start_process(): print('Starting {0}'.format(multiprocessing.current_process().name))if __name__ == '__main__': # convert range to list for python3 inputs = list(range(100)) time1 = time.time() builtin_outputs = map(do_calculation, inputs) # convert to list for python3 print('Built-in: {0}'.format(list(builtin_outputs))) time2 = time.time() print(time2 - time1) pool_size = multiprocessing.cpu_count() * 2 pool = multiprocessing.Pool(processes=pool_size, initializer=start_process, ) pool_outputs = pool.map(do_calculation, inputs) pool.close() # no more tasks pool.join() # wrap up current tasks time3 = time.time() print('Pool : {0}'.format(pool_outputs)) print(time3 - time2)
执行结果:
$ python3 multiprocessing_pool.py Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198]3.790855407714844e-05Starting ForkPoolWorker-1Starting ForkPoolWorker-2Starting ForkPoolWorker-3Starting ForkPoolWorker-4Starting ForkPoolWorker-5Starting ForkPoolWorker-6Starting ForkPoolWorker-7Starting ForkPoolWorker-8Starting ForkPoolWorker-9Starting ForkPoolWorker-10Starting ForkPoolWorker-11Starting ForkPoolWorker-12Starting ForkPoolWorker-13Starting ForkPoolWorker-14Starting ForkPoolWorker-15Starting ForkPoolWorker-16Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198]0.2203056812286377
上面例子先计算map的时间,然后用进程池的map,计算出时间。在列表数比较少的情况下,多进程的执行时间更短。列表数比较多的情况下,多进程的执行时间更长,可见python内置的map是效率比较高的。
如果消费者函数有内存泄露,可以在执行任务之后重启,设定maxtasksperchild参数即可。
import timedef do_calculation(data): return data * 2def start_process(): print('Starting {0}'.format(multiprocessing.current_process().name))if __name__ == '__main__': # convert range to list for python3 inputs = list(range(100)) time1 = time.time() builtin_outputs = map(do_calculation, inputs) # convert to list for python3 print('Built-in: {0}'.format(list(builtin_outputs))) time2 = time.time() print(time2 - time1) pool_size = multiprocessing.cpu_count() * 2 pool = multiprocessing.Pool(processes=pool_size, initializer=start_process, maxtasksperchild=3, ) pool_outputs = pool.map(do_calculation, inputs) pool.close() # no more tasks pool.join() # wrap up current tasks time3 = time.time() print('Pool : {0}'.format(pool_outputs)) print(time3 - time2)
执行结果:
$ python3 multiprocessing_pool2.py Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198]3.600120544433594e-05Starting ForkPoolWorker-1Starting ForkPoolWorker-3Starting ForkPoolWorker-2Starting ForkPoolWorker-4Starting ForkPoolWorker-5Starting ForkPoolWorker-6Starting ForkPoolWorker-7Starting ForkPoolWorker-8Starting ForkPoolWorker-9Starting ForkPoolWorker-10Starting ForkPoolWorker-11Starting ForkPoolWorker-12Starting ForkPoolWorker-13Starting ForkPoolWorker-14Starting ForkPoolWorker-15Starting ForkPoolWorker-16Starting ForkPoolWorker-17Starting ForkPoolWorker-18Starting ForkPoolWorker-19Starting ForkPoolWorker-20Starting ForkPoolWorker-21Starting ForkPoolWorker-22Starting ForkPoolWorker-23Starting ForkPoolWorker-24Starting ForkPoolWorker-25Starting ForkPoolWorker-26Starting ForkPoolWorker-27Starting ForkPoolWorker-28Starting ForkPoolWorker-29Starting ForkPoolWorker-30Starting ForkPoolWorker-31Starting ForkPoolWorker-32Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198]0.23842501640319824
从结果看,进程数有所增加。(注意,进程数似乎比预期的要少)
Pool([numprocess [,initializer [, initargs]]])
numprocess的默认值是cpu_count()。方法有:apply(func [, args [, kwargs]]),apply_async(func [, args [, kwargs [, callback]]]),close(),join(),imap(func, iterable [, chunksize]),imap_unordered(func, iterable [, chunksize]]),map(func, iterable [, chunksize]),map_async(func, iterable [, chunksize [, callback]]),terminate().
返回结果AsyncResult的方法:get([timeout])、ready()、sucessful()、wait([timeout])、wait([timeout])
以下代码生成指定目录的文件名和SHA512对应表的字典。
import multiprocessingimport hashlibimport binascii# Some parameters you can tweakBUFSIZE = 8192 # Read buffer sizePOOLSIZE = 2 # Number of workersdef compute_digest(filename): try: f = open(filename, "rb") except IOError: return None digest = hashlib.sha512() while True: chunk = f.read(BUFSIZE) if not chunk: break digest.update(chunk) f.close() return filename, digest.digest()def build_digest_map(topdir): digest_pool = multiprocessing.Pool(POOLSIZE) allfiles = (os.path.join(path, name) for path, dirs, files in os.walk(topdir) for name in files) digest_map = dict(digest_pool.imap_unordered(compute_digest, allfiles, 20)) digest_pool.close() return digest_map# Try it out. Change the directory name as desired.if __name__ == '__main__': digest_map = build_digest_map("/home/andrew/data/code/python/\python-chinese-library/libraries/multiprocessing") print(len(digest_map)) for key in digest_map.keys(): print("{0}: {1}".format(key, binascii.hexlify(digest_map[key])))
共享数据和同步
共享内存通过mmap实现。共享内存中创建的是ctypes对象,不需要管道中的序列化。
Value(typecode, arg1, … argN, lock),RawValue(typecode, arg1, …, argN),Array(typecode, initializer, lock),RawArray(typecode, initializer)
原语有: Lock,Rlock,Semaphore,BoundedSemaphore,Event,Condition.
import multiprocessingclass FloatChannel(object): def __init__(self, maxsize): self.buffer = multiprocessing.RawArray('d', maxsize) self.buffer_len = multiprocessing.Value('i') self.empty = multiprocessing.Semaphore(1) self.full = multiprocessing.Semaphore(0) def send(self, values): self.empty.acquire() # Only proceed if buffer empty nitems = len(values) self.buffer_len = nitems # Set the buffer size self.buffer[:nitems] = values # Copy values into the buffer self.full.release() # Signal that buffer is full def recv(self): self.full.acquire() # Only proceed if buffer full values = self.buffer[:self.buffer_len.value] # Copy values self.empty.release() # Signal that buffer is empty return values# Performance test. Receive a bunch of messagesdef consume_test(count, ch): for i in range(count): values = ch.recv()# Performance test. Send a bunch of messagesdef produce_test(count, values, ch): for i in range(count): ch.send(values)if __name__ == '__main__': ch = FloatChannel(100000) p = multiprocessing.Process(target=consume_test, args=(1000, ch)) p.start() values = [float(x) for x in range(100000)] produce_test(1000, values, ch) print("Done") p.join()
参考资料
- 讨论qq群144081101 591302926 567351477 钉钉免费群21745728
- 本文相关书籍下载
- 本文最新版本地址
- 本文涉及的python测试开发库 谢谢点赞!
- pymotw multiprocessing参考