并发编程之Concurrent
01.python3.2版本引入异步并行任务编程模块,提供一个高级的异步可执行的便利接口。
- concurrent.futures类提供了2个池执行器:
- ThreadPoolExecutor,异步调用的线程池的Executor。
- ProcessPoolExecutor,异步调用的进程池的Executor。
02.ThreadPoolExecutor对象:
- 使用ThreadPoolExecutor需要定义一个池的执行器对象,返回Executor实例,其格式为:
ThreadPoolExecutor(max_workers)
- max_worker,最大用于异步执行的工作线程数。
- ThreadPoolExecutor的实例方法包括:
- submit(fn, args, *kargs),提交执行的函数及其参数,返回Future类的实例。
- shutdown(wait=True),清理池。
- Future的实例方法包括:
- done(),如果调用被成功的取消或执行完成,返回True。
- cancelled(),如果调用被成功的取消,返回True。
- running(),如果正在运行切不能被取消,返回True。
- cancel(),尝试取消调用;如果已经执行且不能取消返回False,否则返回True。
- result(timeout=None),取返回的结果:
- timeout为None,一直等待返回。
- time设置到期,抛出concurrent.futures.TimeoutError异常。
- exception(timeout=None),取返回的异常:
- timeout为None,一直等待返回。
- time设置到期,抛出concurrent.futures.TimeoutError异常。
- ThreadPoolExecutor的简单示例:
from concurrent.futures import ThreadPoolExecutor
import logging
import datetime
import time
FORMAT = "%(process)d %(processName)s %(thread)s %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
start = datetime.datetime.now()
# 定义一个IO密集形型的函数,使用多线程处理
def worker(n):
logging.info('begin to work {}'.format(n))
time.sleep(5)
logging.info('finished ... {}'.format(n))
# 设置返回值,作为Future.result的值
return "线程:{} 已经完成".format(n)
# 设置一个列表,用于存放Executor实例
fs = []
executor = ThreadPoolExecutor(max_workers=4)
for i in range(3):
# 执行多线程,并返回Future实例
result = executor.submit(worker, i)
# 将返回的示例添加到列表中
fs.append(result)
# 设置一个循环,当所有线程完成后退出循环
while True:
time.sleep(1)
flag = True
for x in fs:
# 判断线程是否完成
done = x.done()
if not done:
print('worker {} status is: {}'.format(x, done))
else:
flag = False
if not flag:
# 如果线程已完成,则输出线程执行函数的返回值
for y in fs:
r = y.result()
logging.info(r)
break
03.ProcessPoolExecutor对象:
- 使用ProcessPoolExecutor需要定义一个池的执行器对象,返回Executor实例,其格式为:
ProcessPoolExecutor(max_workers)
- max_worker,最大用于异步执行的工作线程数。
- ProcessPoolExecutor的实例方法包括:
- submit(fn, args, *kargs),提交之执行的函数及其参数,返回Future类的实例。
- shutdown(wait=True),清理池。
- Future的实例方法包括:
- done(),如果调用被成功的取消或执行完成,返回True。
- cancelled(),如果调用被成功的取消,返回True。
- running(),如果正在运行切不能被取消,返回True。
- cancel(),尝试取消调用;如果已经执行且不能取消返回False,否则返回True。
- result(timeout=None),取返回的结果:
- timeout为None,一直等待返回。
- time设置到期,抛出concurrent.futures.TimeoutError异常。
- exception(timeout=None),取返回的异常:
- timeout为None,一直等待返回。
- time设置到期,抛出concurrent.futures.TimeoutError异常。
- ProcessPoolExecutor的简单示例:
from concurrent.futures import ProcessPoolExecutor
import logging
import datetime
import time
FORMAT = "%(process)d %(processName)s %(thread)s %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
start = datetime.datetime.now()
# 定义一个IO密集形型的函数,使用多线程处理
def worker(n):
logging.info('begin to work {}'.format(n))
time.sleep(5)
logging.info('finished ... {}'.format(n))
# 设置返回值,作为Future.result的值
return "线程:{} 已经完成".format(n)
# 设置一个列表,用于存放Executor实例
fs = []
if __name__ == '__main__':
executor = ProcessPoolExecutor(max_workers=4)
for i in range(3):
# 执行多线程,并返回Future实例
result = executor.submit(worker, i)
# 将返回的示例添加到列表中
fs.append(result)
# 设置一个循环,当所有线程完成后退出循环
while True:
flag = True
time.sleep(1)
for x in fs:
# 判断线程是否完成
done = x.done()
if not done:
print('worker {} status is: {}'.format(x, done))
else:
flag = False
if not flag:
# 如果线程已完成,则输出线程执行函数的返回值
print('------')
for y in fs:
r = y.result()
logging.info(r)
executor.shutdown()
break
文档更新时间: 2021-10-04 00:25 作者:闻骏