并发编程之Concurrent


01.python3.2版本引入异步并行任务编程模块,提供一个高级的异步可执行的便利接口。

  • concurrent.futures类提供了2个池执行器:
    • ThreadPoolExecutor,异步调用的线程池的Executor。
    • ProcessPoolExecutor,异步调用的进程池的Executor。


02.ThreadPoolExecutor对象:

  • 使用ThreadPoolExecutor需要定义一个池的执行器对象,返回Executor实例,其格式为:
ThreadPoolExecutor(max_workers)
  • max_worker,最大用于异步执行的工作线程数。
  • ThreadPoolExecutor的实例方法包括:
    • submit(fn, args, *kargs),提交执行的函数及其参数,返回Future类的实例。
    • shutdown(wait=True),清理池。
  • Future的实例方法包括:
    • done(),如果调用被成功的取消或执行完成,返回True。
    • cancelled(),如果调用被成功的取消,返回True。
    • running(),如果正在运行切不能被取消,返回True。
    • cancel(),尝试取消调用;如果已经执行且不能取消返回False,否则返回True。
    • result(timeout=None),取返回的结果:
      • timeout为None,一直等待返回。
      • time设置到期,抛出concurrent.futures.TimeoutError异常。
    • exception(timeout=None),取返回的异常:
      • timeout为None,一直等待返回。
      • time设置到期,抛出concurrent.futures.TimeoutError异常。
  • ThreadPoolExecutor的简单示例:
from concurrent.futures import ThreadPoolExecutor
import logging
import datetime
import time


FORMAT = "%(process)d %(processName)s %(thread)s %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
start = datetime.datetime.now()


# 定义一个IO密集形型的函数,使用多线程处理
def worker(n):
    logging.info('begin to work {}'.format(n))
    time.sleep(5)
    logging.info('finished ... {}'.format(n))
    # 设置返回值,作为Future.result的值
    return "线程:{} 已经完成".format(n)


# 设置一个列表,用于存放Executor实例
fs = []

executor = ThreadPoolExecutor(max_workers=4)

for i in range(3):
    # 执行多线程,并返回Future实例
    result = executor.submit(worker, i)
    # 将返回的示例添加到列表中
    fs.append(result)


# 设置一个循环,当所有线程完成后退出循环
while True:
    time.sleep(1)
    flag = True
    for x in fs:
        # 判断线程是否完成
        done = x.done()
        if not done:
            print('worker {} status is: {}'.format(x, done))
        else:
            flag = False

    if not flag:
        # 如果线程已完成,则输出线程执行函数的返回值
        for y in fs:
            r = y.result()
            logging.info(r)
        break


03.ProcessPoolExecutor对象:

  • 使用ProcessPoolExecutor需要定义一个池的执行器对象,返回Executor实例,其格式为:
ProcessPoolExecutor(max_workers)
  • max_worker,最大用于异步执行的工作线程数。
  • ProcessPoolExecutor的实例方法包括:
    • submit(fn, args, *kargs),提交之执行的函数及其参数,返回Future类的实例。
    • shutdown(wait=True),清理池。
  • Future的实例方法包括:
    • done(),如果调用被成功的取消或执行完成,返回True。
    • cancelled(),如果调用被成功的取消,返回True。
    • running(),如果正在运行切不能被取消,返回True。
    • cancel(),尝试取消调用;如果已经执行且不能取消返回False,否则返回True。
    • result(timeout=None),取返回的结果:
      • timeout为None,一直等待返回。
      • time设置到期,抛出concurrent.futures.TimeoutError异常。
    • exception(timeout=None),取返回的异常:
      • timeout为None,一直等待返回。
      • time设置到期,抛出concurrent.futures.TimeoutError异常。
  • ProcessPoolExecutor的简单示例:
from concurrent.futures import ProcessPoolExecutor
import logging
import datetime
import time


FORMAT = "%(process)d %(processName)s %(thread)s %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
start = datetime.datetime.now()


# 定义一个IO密集形型的函数,使用多线程处理
def worker(n):
    logging.info('begin to work {}'.format(n))
    time.sleep(5)
    logging.info('finished ... {}'.format(n))
    # 设置返回值,作为Future.result的值
    return "线程:{} 已经完成".format(n)


# 设置一个列表,用于存放Executor实例
fs = []

if __name__ == '__main__':

    executor = ProcessPoolExecutor(max_workers=4)

    for i in range(3):
        # 执行多线程,并返回Future实例
        result = executor.submit(worker, i)
        # 将返回的示例添加到列表中
        fs.append(result)

    # 设置一个循环,当所有线程完成后退出循环
    while True:
        flag = True
        time.sleep(1)
        for x in fs:
            # 判断线程是否完成
            done = x.done()
            if not done:
                print('worker {} status is: {}'.format(x, done))
            else:
                flag = False

        if not flag:
            # 如果线程已完成,则输出线程执行函数的返回值
            print('------')
            for y in fs:
                r = y.result()
                logging.info(r)
            executor.shutdown()
            break
文档更新时间: 2021-10-04 00:25   作者:闻骏