本文共 3832 字,大约阅读时间需要 12 分钟。
在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。直接创建和销毁大量进程会带来显著的性能问题。进程池的解决方案是定义一个固定数量的进程池,等到处理完毕后将进程放回池中继续等待任务。这既节省了操作系统的调度资源,也提高了程序的运行效率。
Pool([numprocess], [initializer], [initargs])
numprocess:指定创建的进程数量,默认使用cpu_count()的值。initializer:每个工作进程启动时要执行的可调用对象,默认为None。initargs:传给initializer的参数组。numprocess:默认使用所有CPU核数。initializer:如需在进程启动前执行初始化操作,可传递lambda函数。initargs:传递给initializer的参数。p.apply(func, args, kwargs)
func函数,返回结果。p.apply_async(func, args, kwargs)
func,返回AsyncResult对象。p.close()
p.join()
close()或terminate()之后调用。AsyncResult对象提供以下方法: get():等待并获取结果(可选超时)。ready():判断任务是否完成。successful():判断任务是否成功完成且无异常。wait():等待任务结果(可选超时)。terminate():立即终止所有工作进程。import osimport timefrom multiprocessing import Pooldef work(n): print(f'进程{os.getpid()}执行任务{ n }') time.sleep(3) return n ** 2if __name__ == '__main__': p = Pool(3) res_l = [] for i in range(10): res = p.apply(work, (i,)) res_l.append(res) print(res_l) import osimport timeimport randomfrom multiprocessing import Pooldef work(n): print(f'进程{os.getpid()}执行任务{ n }') time.sleep(random.random()) return n ** 2if __name__ == '__main__': p = Pool(3) res_l = [] for i in range(10): res = p.apply_async(work, (i,)) res_l.append(res) p.close() p.join() for res in res_l: print(res.get()) from socket import *from multiprocessing import Poolserver = socket(AF_INET, SOCK_STREAM)server.setsockopt(SOL_SOCKET, REUSEADDR, 1)server.bind(('127.0.0.1', 8080))server.listen(5)def talk(conn): print(f'进程{os.getpid()}接收连接') while True: msg = conn.recv(1024) if not msg: break conn.send(msg.upper())if __name__ == '__main__': p = Pool(4) while True: conn, addr = server.accept() p.apply_async(talk, (conn,)) from socket import *client = socket(AF_INET, SOCK_STREAM)client.connect(('127.0.0.1', 8080))while True: msg = input('> ').strip() if not msg: continue client.send(msg.encode('utf-8')) msg = client.recv(1024) print(msg.decode('utf-8')) 回调函数用于在进程池任务完成后,立即通知主进程处理结果。这种方式特别适用于处理耗时较长的任务。
import requestsimport jsonimport osfrom multiprocessing import Pooldef get_page(url): print(f'进程{os.getpid()}正在获取{url}') response = requests.get(url) if response.status_code == 200: return {'url': url, 'text': response.text}def parse_page(res): print(f'进程{os.getpid()}正在解析{res["url"]}') parse_res = f'URL: {res["url"]}, 字数: {len(res["text"])}\n' with open('db.txt', 'a') as f: f.write(parse_res)if __name__ == '__main__': urls = [ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p = Pool(3) res_l = [] for url in urls: res = p.apply_async(get_page, (url,), callback=parse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) 如果需要等待所有任务完成后再统一处理结果,可以直接在主进程中等待进程池完成任务。
from multiprocessing import Poolimport timeimport randomdef work(n): time.sleep(1) return n ** 2if __name__ == '__main__': p = Pool() res_l = [] for i in range(10): res = p.apply_async(work, (i,)) res_l.append(res) p.close() p.join() nums = [] for res in res_l: nums.append(res.get()) print(nums)
apply()和apply_async()方法的主要区别在于同步性。join()方法只能在close()或terminate()之后使用。转载地址:http://odgyz.baihongyu.com/