博客
关于我
Python程序中的进程操作-进程池(multiprocess.Pool)
阅读量:440 次
发布时间:2019-03-06

本文共 3730 字,大约阅读时间需要 12 分钟。

Python从入门到放弃完整教程


一、进程池的概念

在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。直接创建和销毁大量进程会带来显著的性能问题。进程池的解决方案是定义一个固定数量的进程池,等到处理完毕后将进程放回池中继续等待任务。这既节省了操作系统的调度资源,也提高了程序的运行效率。


二、概念介绍——multiprocess.Pool

Pool的创建

Pool([numprocess], [initializer], [initargs])
  • numprocess:指定创建的进程数量,默认使用cpu_count()的值。
  • initializer:每个工作进程启动时要执行的可调用对象,默认为None
  • initargs:传给initializer的参数组。

三、参数用法

  • numprocess:默认使用所有CPU核数。
  • initializer:如需在进程启动前执行初始化操作,可传递lambda函数。
  • initargs:传递给initializer的参数。

四、主要方法

1. 同步执行

p.apply(func, args, kwargs)
  • 在池中的一个进程中执行func函数,返回结果。

2. 异步执行

p.apply_async(func, args, kwargs)
  • 在池中启动一个进程执行func,返回AsyncResult对象。
  • 异步执行的任务不会阻塞主进程。

3. 关闭进程池

p.close()
  • 防止进一步提交任务,待所有工作进程完成后关闭。

4. 等待进程完成

p.join()
  • 等待所有工作进程退出,适用于close()terminate()之后调用。

五、其他方法(了解)

1. 异步任务结果处理

  • AsyncResult对象提供以下方法:
    • get():等待并获取结果(可选超时)。
    • ready():判断任务是否完成。
    • successful():判断任务是否成功完成且无异常。
    • wait():等待任务结果(可选超时)。
    • terminate():立即终止所有工作进程。

六、代码实例——multiprocess.Pool

6.1 同步执行

import osimport timefrom multiprocessing import Pooldef work(n):    print(f'进程{os.getpid()}执行任务{ n }')    time.sleep(3)    return n ** 2if __name__ == '__main__':    p = Pool(3)    res_l = []    for i in range(10):        res = p.apply(work, (i,))        res_l.append(res)    print(res_l)

6.2 异步执行

import osimport timeimport randomfrom multiprocessing import Pooldef work(n):    print(f'进程{os.getpid()}执行任务{ n }')    time.sleep(random.random())    return n ** 2if __name__ == '__main__':    p = Pool(3)    res_l = []    for i in range(10):        res = p.apply_async(work, (i,))        res_l.append(res)    p.close()    p.join()    for res in res_l:        print(res.get())

七、进程池版socket并发聊天练习

7.1 服务器端

from socket import *from multiprocessing import Poolserver = socket(AF_INET, SOCK_STREAM)server.setsockopt(SOL_SOCKET, REUSEADDR, 1)server.bind(('127.0.0.1', 8080))server.listen(5)def talk(conn):    print(f'进程{os.getpid()}接收连接')    while True:        msg = conn.recv(1024)        if not msg:            break        conn.send(msg.upper())if __name__ == '__main__':    p = Pool(4)    while True:        conn, addr = server.accept()        p.apply_async(talk, (conn,))

7.2 客户端

from socket import *client = socket(AF_INET, SOCK_STREAM)client.connect(('127.0.0.1', 8080))while True:    msg = input('> ').strip()    if not msg:        continue    client.send(msg.encode('utf-8'))    msg = client.recv(1024)    print(msg.decode('utf-8'))

八、回调函数

回调函数用于在进程池任务完成后,立即通知主进程处理结果。这种方式特别适用于处理耗时较长的任务。

8.1 使用回调函数减少网络等待时间

import requestsimport jsonimport osfrom multiprocessing import Pooldef get_page(url):    print(f'进程{os.getpid()}正在获取{url}')    response = requests.get(url)    if response.status_code == 200:        return {'url': url, 'text': response.text}def parse_page(res):    print(f'进程{os.getpid()}正在解析{res["url"]}')    parse_res = f'URL: {res["url"]}, 字数: {len(res["text"])}\n'    with open('db.txt', 'a') as f:        f.write(parse_res)if __name__ == '__main__':    urls = [        'https://www.baidu.com',        'https://www.python.org',        'https://www.openstack.org',        'https://help.github.com/',        'http://www.sina.com.cn/'    ]    p = Pool(3)    res_l = []    for url in urls:        res = p.apply_async(get_page, (url,), callback=parse_page)        res_l.append(res)    p.close()    p.join()    print([res.get() for res in res_l])

九、无需回调函数

如果需要等待所有任务完成后再统一处理结果,可以直接在主进程中等待进程池完成任务。

from multiprocessing import Poolimport timeimport randomdef work(n):    time.sleep(1)    return n ** 2if __name__ == '__main__':    p = Pool()    res_l = []    for i in range(10):        res = p.apply_async(work, (i,))        res_l.append(res)    p.close()    p.join()    nums = []    for res in res_l:        nums.append(res.get())    print(nums)

进程池的其他实现方法

  • apply()apply_async()方法的主要区别在于同步性。
  • join()方法只能在close()terminate()之后使用。

转载地址:http://odgyz.baihongyu.com/

你可能感兴趣的文章
Objective-C实现binomial coefficient二项式系数算法(附完整源码)
查看>>
Objective-C实现bogo sort排序算法(附完整源码)
查看>>
Objective-C实现CaesarsCiphe凯撒密码算法(附完整源码)
查看>>
Objective-C实现cartesianProduct笛卡尔乘积算法(附完整源码)
查看>>
Objective-C实现check strong password检查密码强度算法(附完整源码)
查看>>
Objective-C实现circle sort圆形排序算法(附完整源码)
查看>>
Objective-C实现coulombs law库仑定律算法(附完整源码)
查看>>
Objective-C实现DBSCAN聚类算法(附完整源码)
查看>>
Objective-C实现dijkstra银行家算法(附完整源码)
查看>>
Objective-C实现Dinic算法(附完整源码)
查看>>
Objective-C实现disjoint set不相交集算法(附完整源码)
查看>>
Objective-C实现DisjointSet并查集的算法(附完整源码)
查看>>
Objective-C实现djb2哈希算法(附完整源码)
查看>>
Objective-C实现DNF排序算法(附完整源码)
查看>>
Objective-C实现double factorial iterative双阶乘迭代算法(附完整源码)
查看>>
Objective-C实现double factorial recursive双阶乘递归算法(附完整源码)
查看>>
Objective-C实现double hash双哈希算法(附完整源码)
查看>>
Objective-C实现double linear search recursion双线性搜索递归算法(附完整源码)
查看>>
Objective-C实现DoublyLinkedList双链表的算法(附完整源码)
查看>>
Objective-C实现DPLL(davisb putnamb logemannb loveland)算法(附完整源码)
查看>>