博客
关于我
Python程序中的进程操作-进程池(multiprocess.Pool)
阅读量:440 次
发布时间:2019-03-06

本文共 3832 字,大约阅读时间需要 12 分钟。

Python从入门到放弃完整教程


一、进程池的概念

在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。直接创建和销毁大量进程会带来显著的性能问题。进程池的解决方案是定义一个固定数量的进程池,等到处理完毕后将进程放回池中继续等待任务。这既节省了操作系统的调度资源,也提高了程序的运行效率。


二、概念介绍——multiprocess.Pool

Pool的创建

Pool([numprocess], [initializer], [initargs])
  • numprocess:指定创建的进程数量,默认使用cpu_count()的值。
  • initializer:每个工作进程启动时要执行的可调用对象,默认为None
  • initargs:传给initializer的参数组。

三、参数用法

  • numprocess:默认使用所有CPU核数。
  • initializer:如需在进程启动前执行初始化操作,可传递lambda函数。
  • initargs:传递给initializer的参数。

四、主要方法

1. 同步执行

p.apply(func, args, kwargs)
  • 在池中的一个进程中执行func函数,返回结果。

2. 异步执行

p.apply_async(func, args, kwargs)
  • 在池中启动一个进程执行func,返回AsyncResult对象。
  • 异步执行的任务不会阻塞主进程。

3. 关闭进程池

p.close()
  • 防止进一步提交任务,待所有工作进程完成后关闭。

4. 等待进程完成

p.join()
  • 等待所有工作进程退出,适用于close()terminate()之后调用。

五、其他方法(了解)

1. 异步任务结果处理

  • AsyncResult对象提供以下方法:
    • get():等待并获取结果(可选超时)。
    • ready():判断任务是否完成。
    • successful():判断任务是否成功完成且无异常。
    • wait():等待任务结果(可选超时)。
    • terminate():立即终止所有工作进程。

六、代码实例——multiprocess.Pool

6.1 同步执行

import os
import time
from multiprocessing import Pool
def work(n):
print(f'进程{os.getpid()}执行任务{ n }')
time.sleep(3)
return n ** 2
if __name__ == '__main__':
p = Pool(3)
res_l = []
for i in range(10):
res = p.apply(work, (i,))
res_l.append(res)
print(res_l)

6.2 异步执行

import os
import time
import random
from multiprocessing import Pool
def work(n):
print(f'进程{os.getpid()}执行任务{ n }')
time.sleep(random.random())
return n ** 2
if __name__ == '__main__':
p = Pool(3)
res_l = []
for i in range(10):
res = p.apply_async(work, (i,))
res_l.append(res)
p.close()
p.join()
for res in res_l:
print(res.get())

七、进程池版socket并发聊天练习

7.1 服务器端

from socket import *
from multiprocessing import Pool
server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, REUSEADDR, 1)
server.bind(('127.0.0.1', 8080))
server.listen(5)
def talk(conn):
print(f'进程{os.getpid()}接收连接')
while True:
msg = conn.recv(1024)
if not msg:
break
conn.send(msg.upper())
if __name__ == '__main__':
p = Pool(4)
while True:
conn, addr = server.accept()
p.apply_async(talk, (conn,))

7.2 客户端

from socket import *
client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 8080))
while True:
msg = input('> ').strip()
if not msg:
continue
client.send(msg.encode('utf-8'))
msg = client.recv(1024)
print(msg.decode('utf-8'))

八、回调函数

回调函数用于在进程池任务完成后,立即通知主进程处理结果。这种方式特别适用于处理耗时较长的任务。

8.1 使用回调函数减少网络等待时间

import requests
import json
import os
from multiprocessing import Pool
def get_page(url):
print(f'进程{os.getpid()}正在获取{url}')
response = requests.get(url)
if response.status_code == 200:
return {'url': url, 'text': response.text}
def parse_page(res):
print(f'进程{os.getpid()}正在解析{res["url"]}')
parse_res = f'URL: {res["url"]}, 字数: {len(res["text"])}\n'
with open('db.txt', 'a') as f:
f.write(parse_res)
if __name__ == '__main__':
urls = [
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://help.github.com/',
'http://www.sina.com.cn/'
]
p = Pool(3)
res_l = []
for url in urls:
res = p.apply_async(get_page, (url,), callback=parse_page)
res_l.append(res)
p.close()
p.join()
print([res.get() for res in res_l])

九、无需回调函数

如果需要等待所有任务完成后再统一处理结果,可以直接在主进程中等待进程池完成任务。

from multiprocessing import Pool
import time
import random
def work(n):
time.sleep(1)
return n ** 2
if __name__ == '__main__':
p = Pool()
res_l = []
for i in range(10):
res = p.apply_async(work, (i,))
res_l.append(res)
p.close()
p.join()
nums = []
for res in res_l:
nums.append(res.get())
print(nums)

进程池的其他实现方法

  • apply()apply_async()方法的主要区别在于同步性。
  • join()方法只能在close()terminate()之后使用。

转载地址:http://odgyz.baihongyu.com/

你可能感兴趣的文章
Objective-C实现奇偶检验码(附完整源码)
查看>>
Objective-C实现奇偶转置排序算法(附完整源码)
查看>>
Objective-C实现奇异值分解SVD(附完整源码)
查看>>
Objective-C实现子集总和算法(附完整源码)
查看>>
Objective-C实现字符串autocomplete using trie(使用 trie 自动完成)算法(附完整源码)
查看>>
Objective-C实现字符串boyer moore search博耶摩尔搜索算法(附完整源码)
查看>>
Objective-C实现字符串IP地址转DWORD地址(附完整源码)
查看>>
Objective-C实现字符串jaro winkler算法(附完整源码)
查看>>
Objective-C实现字符串manacher马拉车算法(附完整源码)
查看>>
Objective-C实现字符串wildcard pattern matching通配符模式匹配算法(附完整源码)
查看>>
Objective-C实现字符串word patterns单词模式算法(附完整源码)
查看>>
Objective-C实现字符串Z 函数或 Z 算法(附完整源码)
查看>>
Objective-C实现字符串加解密(附完整源码)
查看>>
Objective-C实现字符串反转(附完整源码)
查看>>
Objective-C实现字符串复制功能(附完整源码)
查看>>
Objective-C实现字符串是否回文Palindrome算法 (附完整源码)
查看>>
Objective-C实现字符串查找子串(附完整源码)
查看>>
Objective-C实现完整的ComplexNumber复数类(附完整源码)
查看>>
Objective-C实现实现rabin karp算法(附完整源码)
查看>>
Objective-C实现对图像进行色调处理算法(附完整源码)
查看>>