并发锁
01.凡是存在共享资源争抢的地方都可以使用锁(lock),从而保证只有一个使用者可以完全使用这个资源。
- 锁适用于访问和修改同一个资源的时候,即读写同一个资源。
- 如果仅仅是读取,则不需要加锁,因为对象是不可变的。
- 锁的注意事项包括:
- 少用锁,必要时用锁;使用了锁,多线程访问被锁的资源时就成了串行。
- 加锁时间越短越好,不需要就立即释放锁。
- 绝对避免死锁。
- threading.Lock()类可以创建一个锁对象,其方法包括:
- acquire(),获取一个锁,成功获取之后,返回True,否则返回False。
- acquire()的方法包括:
- blocking,获取锁之后是否阻塞,默认阻塞。
- timeout,阻塞的超时时间,默认永久阻塞;非阻塞时禁止设置。
- release(),释放锁,可以从任何线程中释放。
- 已上锁的锁,会被重置为unlocked;未上锁的调用,会抛出RuntimeError异常。
- acquire()的方法包括:
- acquire(),获取一个锁,成功获取之后,返回True,否则返回False。
- threading.Lock()类的示例:
import threading
event = threading.Event()
locked =threading.Lock()
class Counter(object):
def __init__(self):
self._value = 0
@property
def value(self):
return self._value
# 使用with上线文,实现加锁之后解锁的功能
def inc(self):
with locked:
self._value += 1
def dec(self):
with locked:
self._value -= 1
# 定义一个函数,当随机值小于0的时候将value减一,大于0的时候将value值加一。
# 如果不使用锁,多个线程读取的时,会造成并发争抢,导致最后的value值不为0。
def run(fun:Counter, count=100):
for _ in range(count):
for item in range(-50, 50):
if item < 0:
fun.dec()
else:
fun.inc()
fun = Counter()
c1 = 10
c2 = 10000
for i in range(c1):
t = threading.Thread(name='work_{}'.format(i), target=run, args=(fun, c2),)
t.start()
while True:
# 判断是否所有子线程都已执行完,如果是则输出value值。
if threading.active_count() == 1:
print(fun.value)
break
print("========= end ==========")
02.递归锁(rlock):
- threading.RLock()与threading.Lock()的使用方法基本相同,区别是:
- 在同一线程内,threading.RLock()可以进行多次acquire()操作,但不会阻塞线程。
- threading.RLock()有一个计数器,当acquire()时计数加一,release()时计数减一;只有当计数为0时才能被其他线程调用。
- threading.RLock()有一个owner属性,会标记获得该锁的线程id。
- threading.RLock()类的示例:
import threading
import logging
import time
FORMAT = "%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
locked =threading.RLock()
# 主线程加锁
locked.acquire()
def work(l):
for _ in range(10):
logging.info('Start Thread')
# 由于主线程的锁计数不为0,则子线程会一直等待获取锁。
l.acquire()
logging.info('End Thread')
l.release()
for i in range(5):
t = threading.Thread(name='work_{}'.format(i), target=work, args=(locked,),)
t.start()
# 10秒之后释放锁,子线程开始执行
time.sleep(10)
locked.release()
03.Condition类用于生产者和消费者模型中,为了解决生产者消费者速度匹配的问题,其构造方法为:
Condition(lock=None)
- Condition类可以传入一个Lock或者RLock对象,默认是RLock。
- Condition类的方法包括:
- acquire(*args),获取锁。
- wait(self, timeout=None),等待或超时。
- notify(n=1),唤醒至多指定数目个数的等待线程,没有等待的线程就没有任何操作。
- notify_all(),唤醒所有等待的线程。
- 使用Condition类,必须先acquire(),用完了必须release(),默认使用RLock锁,最好的方式是使用with上下文。
- Condition类的示例:
import threading
import logging
import time
import random
FORMAT = "%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class Dispatcher(object):
def __init__(self):
self.data = None
self.event = threading.Event()
self.condition = threading.Condition()
def produce(self, count=10):
for i in range(count):
# 使用通知机制
with self.condition:
self.data = random.randint(0, 100)
logging.info("The data is {}".format(self.data))
# 数据产生后,通知所有wait的线程
self.condition.notify_all()
self.event.wait(1)
def consume(self):
while True:
with self.condition:
# 线程阻塞,收到通知后即开始执行后续代码
self.condition.wait()
logging.info("Consumer data {}".format(self.data))
result = Dispatcher()
for i in range(3):
threading.Thread(name='consume_{}'.format(i), target=result.consume).start()
threading.Thread(name='produce', target=result.produce).start()
文档更新时间: 2021-10-04 00:19 作者:闻骏