并发锁


01.凡是存在共享资源争抢的地方都可以使用锁(lock),从而保证只有一个使用者可以完全使用这个资源。

  • 锁适用于访问和修改同一个资源的时候,即读写同一个资源。
    • 如果仅仅是读取,则不需要加锁,因为对象是不可变的。
    • 锁的注意事项包括:
      • 少用锁,必要时用锁;使用了锁,多线程访问被锁的资源时就成了串行。
      • 加锁时间越短越好,不需要就立即释放锁。
      • 绝对避免死锁。
  • threading.Lock()类可以创建一个锁对象,其方法包括:
    • acquire(),获取一个锁,成功获取之后,返回True,否则返回False。
      • acquire()的方法包括:
        • blocking,获取锁之后是否阻塞,默认阻塞。
        • timeout,阻塞的超时时间,默认永久阻塞;非阻塞时禁止设置。
      • release(),释放锁,可以从任何线程中释放。
        • 已上锁的锁,会被重置为unlocked;未上锁的调用,会抛出RuntimeError异常。
  • threading.Lock()类的示例:
import threading


event = threading.Event()
locked =threading.Lock()


class Counter(object):

    def __init__(self):
        self._value = 0

    @property
    def value(self):
        return self._value

    # 使用with上线文,实现加锁之后解锁的功能
    def inc(self):
        with locked:
            self._value += 1

    def dec(self):
        with locked:
            self._value -= 1

# 定义一个函数,当随机值小于0的时候将value减一,大于0的时候将value值加一。
# 如果不使用锁,多个线程读取的时,会造成并发争抢,导致最后的value值不为0。
def run(fun:Counter, count=100):
    for _ in range(count):
        for item in range(-50, 50):
            if item < 0:
                fun.dec()
            else:
                fun.inc()


fun = Counter()
c1 = 10
c2 = 10000
for i in range(c1):
    t = threading.Thread(name='work_{}'.format(i), target=run, args=(fun, c2),)
    t.start()

while True:
    # 判断是否所有子线程都已执行完,如果是则输出value值。
    if threading.active_count() == 1:
        print(fun.value)
        break

print("========= end ==========")


02.递归锁(rlock):

  • threading.RLock()与threading.Lock()的使用方法基本相同,区别是:
    • 在同一线程内,threading.RLock()可以进行多次acquire()操作,但不会阻塞线程。
    • threading.RLock()有一个计数器,当acquire()时计数加一,release()时计数减一;只有当计数为0时才能被其他线程调用。
    • threading.RLock()有一个owner属性,会标记获得该锁的线程id。
  • threading.RLock()类的示例:
import threading
import logging
import time


FORMAT = "%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
locked =threading.RLock()
# 主线程加锁
locked.acquire()


def work(l):
    for _ in range(10):
        logging.info('Start Thread')
        # 由于主线程的锁计数不为0,则子线程会一直等待获取锁。
        l.acquire()
        logging.info('End Thread')
        l.release()


for i in range(5):
    t = threading.Thread(name='work_{}'.format(i), target=work, args=(locked,),)
    t.start()

# 10秒之后释放锁,子线程开始执行
time.sleep(10)
locked.release()


03.Condition类用于生产者和消费者模型中,为了解决生产者消费者速度匹配的问题,其构造方法为:

Condition(lock=None)
  • Condition类可以传入一个Lock或者RLock对象,默认是RLock。
  • Condition类的方法包括:
    • acquire(*args),获取锁。
    • wait(self, timeout=None),等待或超时。
    • notify(n=1),唤醒至多指定数目个数的等待线程,没有等待的线程就没有任何操作。
    • notify_all(),唤醒所有等待的线程。
  • 使用Condition类,必须先acquire(),用完了必须release(),默认使用RLock锁,最好的方式是使用with上下文。
  • Condition类的示例:
import threading
import logging
import time
import random


FORMAT = "%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)


class Dispatcher(object):

    def __init__(self):
        self.data = None
        self.event = threading.Event()
        self.condition = threading.Condition()

    def produce(self, count=10):

        for i in range(count):
            # 使用通知机制
            with self.condition:
                self.data = random.randint(0, 100)
                logging.info("The data is {}".format(self.data))
                # 数据产生后,通知所有wait的线程
                self.condition.notify_all()
            self.event.wait(1)

    def consume(self):

        while True:
            with self.condition:
                # 线程阻塞,收到通知后即开始执行后续代码
                self.condition.wait()
                logging.info("Consumer data {}".format(self.data))


result = Dispatcher()
for i in range(3):
    threading.Thread(name='consume_{}'.format(i), target=result.consume).start()
threading.Thread(name='produce', target=result.produce).start()
文档更新时间: 2021-10-04 00:19   作者:闻骏