跳转至

多线程

📌 线程间通信

Python中多线程通信最常用的是使用threading模块中的Queue类。
Queue是一个线程安全的队列,可以用于线程间的数据传递和通信,避免了直接共享内存可能引发的竞态条件。

Queue方法 说明
put() 向队列添加元素
get() 从队列获取元素
task_done() 通知队列任务已完成
import threading
from queue import Queue


# 生产者线程
def producer(q):
    for i in range(5):
        q.put(i)
        print(f"Produced: {i}")
    q.put("生产者线程结束")


# 消费者线程
def consumer(q):
    while True:
        item = q.get()
        if item == "生产者线程结束":
            q.task_done()
            break
        print(f"Consumed: {item}")
        q.task_done()


# 创建队列
queue = Queue()

# 创建生产者和消费者线程
producer_thread = threading.Thread(target=producer, args=(queue,))
consumer_thread = threading.Thread(target=consumer, args=(queue,))

# 启动线程
producer_thread.start()
consumer_thread.start()

# 等待生产者线程结束
producer_thread.join()

# 等待消费者线程处理完所有任务
queue.join()
consumer_thread.join()

print("All threads finished.")

📌 锁

悲观锁认为在任务时候都可能存在并发冲突,因此在访问共享资源时,会将其锁定,直至完成操作后才释放锁。
实现方式如数据库的行级锁和JAVA的synchronized关键字,先取锁再访问。
优点是可以保证数据的一致性,但在高并发时频繁加锁和释放锁,会导致性能下降。

乐观锁认为并发冲突的概率较小,因此在访问共享资源时不会立即加锁,先进行操作,然后在更新时检查资源是否被修改;若更新失败则重试。
实现方式如版本号机制、数据库中的CAS(Compare And Swap)操作和JAVA的Atomic类,先访问再取锁。
优点是可以提升高并发性能,但在并发冲突较多时需要多次重试,可能导致性能下降。

import threading

"""
Lock: 互斥锁,可以由任意线程解锁,不能重复上锁;重复上锁可能会导致死锁
RLock: 重入锁,只能由锁定线程解锁,可以重复上锁,但也要重复解锁;适合递归调用的场景
其他的还有Condition-条件锁,Event-事件锁,Semaphore-信号量锁等
"""
# 创建一个锁对象
lock = threading.RLock()

# 全局变量
counter = 0

# def increment_counter():
#     global counter
#     # 获取锁
#     lock.acquire()
#     try:
#         counter += 1
#         print(f"Incremented counter to {counter}")
#     finally:
#         # 释放锁
#         lock.release()

# 使用with语句自动获取和释放锁,更加安全,也更易于阅读和维护
def increment_counter():
    global counter
    with lock:  # 缺少锁会遇到竞态条件,导致最终计算结果不正确
        counter += 1
        print(f"Incremented counter to {counter}")

# 创建线程列表
threads = []

# 创建并启动10个线程
for _ in range(10):
    thread = threading.Thread(target=increment_counter)
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()  # 阻塞主进程,直到线程结束

# 输出最终的计数器值
print(f"Final counter value: {counter}")

🚁 死锁

造成死锁的原因: 系统资源不足、进程推进顺序不当、资源分配不当。形成条件有:

  • 互斥,一个资源每次只能被一个进程使用
  • 请求和保持: 一个进程因请求资源而阻塞时,对已有资源保持不放
  • 不可剥夺: 进程已获得的资源,使用完前不能强行剥夺
  • 循环等待: 若干进程形成头尾相连的循环等待资源

避免死锁: 银行家算法,通过统计各进程对资源的最大需求,满足时进行分发(破坏条件-循环等待)。

造成死锁时会导致资源被锁定、系统性能下降、进程停止运行等情况。

📌 线程池

Python中的线程池通常使用concurrent.futures模块中的ThreadPoolExecutor类来实现。
线程池可以有效地管理一定数量的线程,避免频繁创建和销毁线程的开销,同时限制并发线程的数量,防止资源耗尽。

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

# 目标函数
def worker(x):
    time.sleep(0.1)
    return x * x

# 创建一个线程池,指定最大线程数量
with ThreadPoolExecutor(max_workers=5) as executor:
    # 使用map方法提交任务
    results = executor.map(worker, [1, 2, 3, 4, 5])
    for result in results:
        print(result)

# 或者使用submit方法提交任务,并获取Future对象
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(worker, x) for x in [1, 2, 3, 4, 5]]
    # 使用as_completed遍历已完成的Future对象并获取结果,类似于Thread.join()
    for future in as_completed(futures):
        print(future.result())

print("Done!")