QA

不同语言的并发编程

Posted by Sutdown on October 3, 2025

前言:基于各语言介绍一下对于并发的使用,也会穿插些语言的基础特性,适合编程萌新看,更侧重于广度,会结合代码展现。

并发编程基础介绍

古早的历史介绍,可以看看这篇万字详解并发编程!!!-阿里云开发者社区

并发编程三要素
  • 原子性:一个或者多个操作要么全部执行成功要么全部执行失败。

  • 有序性:程序执行顺序按照代码顺序先后执行,但是CPU可能会对指令进行重排序。

  • 可见性:当多个线程访问同一个变量时,如果一个线程修改了变量,其他线程立即获取最新的值。

并发编程的本质

并发编程的目标是充分地利用CPU,以达到最高的处理性能。

多任务的实现有以下3种方式:

  • 进程:是操作系统资源分配和独立运行的最小单位。
  • 线程:是进程内的一个任务执行独立单元,是任务调度和系统执行的最小单位。
  • 协程:是用户态的轻量级线程,协程的调度完全由用户控制,主要为了单线程下模拟多线程。

一个程序可以有一到多个多进程,一个进程下可以有一到多个线程或协程,一个线程下可以有一到多个协程。

单CPU和多CPU环境下的并发

对于单CPU:

  • 提升IO密集型人物的资源利用率
  • 无法加速CPU密集型任务
  • 切换任务时需要考虑CPU资源的切换开销

对于多CPU:

  • 可加速CPU密集型任务,不同任务被分配到不同的核心,实现真正的同时计算
  • IO密集型人物拥有更高的并发量
  • 多核心同时操作共享数据,资源竞争问题更加突出
并发编程的难点
  • 线程安全
  • 共享资源竞争
  • 死锁,活锁
常见编程语言的并发支持
  1. Java:线程(Thread)、线程池(ExecutorService)、同步锁(synchronizedLock)、并发容器(ConcurrentHashMap)。
  2. Python:threading(受 GIL 限制,CPU 密集型效率低)、multiprocessing(规避 GIL)、asyncio(协程)。
  3. C++:std::threadstd::mutex(C++11 及以上),依赖 OS 线程。
  4. Go:goroutine(轻量级线程)+ 通道(channel),简化并发编程。
维度 C++ Java Python
语言层支持 C++11 开始引入标准库并发;更底层,灵活但复杂 从 Java 1.0 起内置线程和同步机制 内置线程/进程库,但受 GIL 影响
主要模型 - 线程 (std::thread, std::jthread)
- 任务并行 (std::async, std::future)
- 原子操作 (std::atomic)
- (std::mutex, std::shared_mutex, std::lock_guard)
- 条件变量 (std::condition_variable)
- 线程局部存储 (thread_local)
- 线程 (Thread, Runnable, Callable)
- 线程池 (ExecutorService, ForkJoinPool)
- 同步 (synchronized, Lock, ReentrantLock, Condition)
- 原子类 (AtomicInteger, AtomicReference)
- 并发容器 (ConcurrentHashMap, BlockingQueue)
- 异步 (CompletableFuture)
- 线程 (threading.Thread)
- 线程池 (concurrent.futures.ThreadPoolExecutor)
- 进程池 (multiprocessing, ProcessPoolExecutor)
- 同步 (threading.Lock, RLock, Condition)
- 队列 (queue.Queue, multiprocessing.Queue)
- 异步/协程 (asyncio, await, Task)
内存模型 - 弱一致性,需要开发者理解 内存序 (memory_order)
- 支持 acquire/releaserelaxedseq_cst 等模式
- Java 内存模型 (JMM) 定义了 可见性、有序性、原子性
- volatile 保证可见性
- GIL 限制 Python 字节码同一时刻只有一个线程执行
- 内存一致性问题较少暴露
适用场景 高性能系统、嵌入式、游戏引擎、实时计算 企业应用、Web 服务器、高并发服务 爬虫、数据采集、I/O 密集型任务、分布式任务调度
I/O 并发 - std::async 或第三方库 (Boost.Asio, ASIO standalone)
- libuv
- NIO (非阻塞 I/O)
- Netty (事件驱动网络框架)
- CompletableFuture
- asyncio (事件循环)
- aiohttp (异步 HTTP 客户端/服务端)
- Twisted, Trio
CPU 并行 - 多线程并行
- OpenMP (编译指令并行)
- Intel TBB/oneTBB (任务调度)
- CUDA/OpenCL (GPU 加速)
- 多线程配合 ForkJoinPool(工作窃取算法)
- 并行流 (parallelStream())
- multiprocessing (独立进程绕过 GIL)
- joblib(科学计算)
- Ray, Dask(分布式并行计算)
高层抽象 - std::async + std::future/promise
- HPX(异步任务库)
- ExecutorService 提交任务
- ForkJoinPool 分治任务
- CompletableFuture 链式异步
- concurrent.futures (统一线程池/进程池 API)
- asyncio.gather (协程调度)
- Celery (分布式任务队列)
常用并发模式 - 生产者-消费者 (std::queue + mutex + condition_variable)
- 读写锁 (std::shared_mutex)
- 线程池(需要自行实现或用 Boost/TBB)
- 生产者-消费者 (BlockingQueue)
- 读写锁 (ReadWriteLock)
- 内置线程池 (Executors.newFixedThreadPool)
- 生产者-消费者 (queue.Queue)
- 协程调度 (asyncio.create_task)
- 线程池/进程池 (concurrent.futures)

C++并发编程

<thread>头文件

C++11时加入标准库,主要是对操作系统线程(比如linux的pthread,windows的win32Thread)的封装,提供面向对象的线程管理,无需操作底层API。

由于面向对象,因此存在RALL思想可以自动释放资源;任务驱动时支持函数lambda仿函数成员函数等;具备移植性,可屏蔽不同操作系统差异。

  • join():主线程等待子线程结束。
  • detach():分离线程,让它在后台运行,主线程不再管理。
<mutex>头文件 — 互斥

提供了用于互斥访问共享资源的类和函数,核心是确保同一时间只有一个线程能访问临界区(共享数据),从而避免数据竞争。

1 基础的互斥锁

lock():锁定互斥体,若已被其他线程锁定,则当前线程阻塞等待。
unlock():解锁互斥体,必须由持有锁的线程调用,否则行为未定义。
try_lock():尝试锁定互斥体,若成功返回 true,否则立即返回 false(不阻塞)。

2 lock_guard 基于RALL的互斥锁包装器

1
2
mutex mtx;
lock_guard<mutex> lock(mtx);

3 unique_lock

更灵活的锁包装器,支持手动锁定 / 解锁、延迟锁定、转移所有权等:

4 Other

  • std::recursive_mutex:允许同一线程多次锁定(需对应次数解锁),避免线程自身死锁。
  • std::timed_mutex/std::recursive_timed_mutex:支持带超时的 try_lock_for()/try_lock_until(),避免无限阻塞。
  • std::lock():同时锁定多个互斥体,避免因锁定顺序不同导致的死锁。
<condition_variable> 头文件 — 同步

1 std::condition_variable依赖互斥锁(通常是 std::unique_lock),实现线程的等待与唤醒:

  • wait(lock):释放锁并阻塞当前线程,等待被其他线程唤醒;唤醒后重新获取锁并返回。
  • wait(lock, predicate):带条件的等待,等价于 while (!predicate()) wait(lock),避免虚假唤醒(spurious wakeup)。
  • notify_one():唤醒一个等待该条件变量的线程。
  • notify_all():唤醒所有等待该条件变量的线程。

2 std::condition_variable_anystd::condition_variable 类似,但可配合任意满足可锁定(Lockable)概念的锁(如 std::lock_guardstd::shared_mutex 等),灵活性更高,但性能略低。

经典示例:生产者消费者模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
using namespace std;

mutex mtx; // 互斥锁
condition_variable cv; // 条件变量

std::queue<int> buffer; // 缓冲区
const int MAX_SIZE=5;

int global_data=0;
bool producer_done=false; // 给consumer增加一个退出条件

void producer(int id){
// 	int data=0; 当存在多个生产者时,data会被重复生产,因此改成全局变量
	while(global_data<7){
		unique_lock<mutex> lock(mtx);
		cv.wait(lock, []{return buffer.size()<MAX_SIZE;}); // 缓冲区有空位时可以生产
		buffer.push(global_data);
		std::cout << "Producer " << id << " produced: " << global_data << "\n";
		global_data++;
		
		lock.unlock();
		cv.notify_all();
	}
	
	{
		lock_guard<mutex> lock(mtx);
		producer_done=true;
	}
	cv.notify_all();
}

void consumer(int id){
	while(true){
		unique_lock<mutex> lock(mtx);
		cv.wait(lock, []{return !buffer.empty() || producer_done;}); 
		
		if(buffer.empty() && producer_done){
			cout << "Consumer " << id << " exits.\n";
			break;  // 改:消费者在生产结束、队列空时退出
		}
		
		int value=buffer.front();
		buffer.pop();
		std::cout << "Consumer " << id << " consumed: " << value << "\n";
		
		lock.unlock();
		cv.notify_all();
	}
}

int main() {
	std::thread p1(producer, 1); //	std::thread p2(producer, 2);
	std::thread c1(consumer, 1); //	std::thread c2(consumer, 2);
	
	p1.join(); //	p2.join();
	c1.join(); //	c2.join();
	
	return 0;
}

/*
运行结果:
Producer 1 produced: 0
Producer 1 produced: 1
Producer 1 produced: 2
Producer 1 produced: 3
Producer 1 produced: 4
Consumer 1 consumed: 0
Consumer 1 consumed: 1
Consumer 1 consumed: 2
Consumer 1 consumed: 3
Consumer 1 consumed: 4
Producer 1 produced: 5
Producer 1 produced: 6
Consumer 1 consumed: 5
Consumer 1 consumed: 6
Consumer 1 exits.
*/

Python并发编程

简单的小规模 I/O 任务用多线程;高并发、高性能要求的 I/O 任务用协程

模型 核心库 适用场景 优缺点
多线程 threading I/O 密集型任务 受 GIL 影响 CPU 密集型性能低,但线程切换开销小
多进程 multiprocessing CPU 密集型任务 可以充分利用多核 CPU,开销大,通信复杂
协程(异步) asyncio I/O 密集型任务(网络、文件) 高效、低内存开销,但编程模式需要 async/await

多线程碰到 I/O 可以切换去做其他线程的任务,但每个线程还是阻塞等待 I/O,由操作系统线程控制,这叫并发,不是异步;真正异步是任务主动挂起、由事件循环调度

多线程 – threading — 基于线程的并行性 — Python 3.13.7 文档

threading 是 Python 的标准线程库,用于多线程编程。

线程在同一进程内共享内存空间,创建开销小。

注意:Python 有 GIL(全局解释器锁),CPU 密集型任务多线程不会提升性能,但 I/O 密集型任务可显著提高吞吐量。

常用类和函数:

  • threading.Thread → 创建线程

    threading 模块是创建和管理线程的首选形式。每一个线程都通过一个继承 Thread 类,重写 run() 方法来实现逻辑,这个方法是线程的入口。

    调用 start() 之后线程变为活跃状态,并且持续直到 run() 结束,或者中间出现异常。所有的线程都执行完成之后,程序结束。join() 命令控制主线程的终止

    1
    2
    3
    
    t = threading.Thread(group=None,target=None, name=None, args=(), kwargs={}) # 线程创建
    t.start() # 启动线程
    t.join() # 阻塞主线程直到所有线程完成
    
  • threading.Lock → 线程锁,保证线程安全

    LOCK 互斥锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    shared_resource_lock = threading.Lock()
    shared_resource_lock.acquire()
    shared_resource_with_lock -= 1
    shared_resource_lock.release()
      
    # 如果状态是unlocked, 可以调用 acquire() 将状态改为locked
    # 如果状态是locked, acquire() 会被block直到另一线程调用 release() 释放锁
    # 如果状态是unlocked, 调用 release() 将导致 RuntimError 异常
    # 如果状态是locked, 可以调用 release() 将状态改为unlocked
    

    RLock 可重入锁

    在类外面保证线程安全,又要在类内使用同样方法的时候 RLock() 就很实用

    1. 谁拿到谁释放。如果线程A拿到锁,线程B无法释放这个锁,只有A可以释放;
    2. 同一线程可以多次拿到该锁,即可以acquire多次;
    3. acquire多少次就必须release多少次,只有最后一次release才能改变RLock的状态为unlocked
    1
    2
    3
    4
    5
    6
    
    class Box(object):
        lock = threading.RLock()
      
    Box.lock.acquire()
    # ...do something
    Box.lock.release()
    
  • threading.Event → 线程间通知

    事件是线程之间用于通讯的对象。有的线程等待信号,有的线程发出信号。基本上事件对象都会维护一个内部变量,可以通过 set() 方法设置为 true ,也可以通过 clear() 方法设置为 falsewait() 方法将会阻塞线程,直到内部变量为 true

    1
    2
    3
    4
    
    from threading import Thread, Event
    self.event = event
    self.event.set()
    self.event.clear()
    
  • threading.Condition → 条件变量,用于复杂线程协作

    1
    2
    3
    4
    5
    6
    7
    
    from threading import Thread, Condition
    condition.acquire()
    condition.wait()
    ...
    condition.notify()
    condition.release()
    # 有点类似cpp中的 lock 和 condition_variable
    
  • threading.Semaphore → 信号量控制并发数量

    可用于生产者消费者模型#

    1
    2
    3
    4
    
    semaphore = threading.Semaphore(0)
    semaphore.acquire() # 读取关联了信号量的共享资源,减少信号量的内部变量(信号量必须为非负)
    ...
    semaphore.release() # 线程不需要改信号量时,增加信号量的内部变量·   
    
  • With语句操作上下文管理器[理解Python的With语句 Linbo的博客](https://linbo.github.io/2013/01/08/python-with)

Python的threading模块提供了很多同步原语,包括信号量,条件变量,事件和锁,优先使用这些原语可以使得多线程编程更为安全。

GIL:GIL是CPython解释器引入的锁,解释器会强迫想要运行的线程必须拿到GIL才能访问解释器的任何资源,从而达到阻止不同的线程并发访问Python对象。

多进程

multiprocessing 提供了多进程支持,可充分利用多核 CPU。每个进程有独立内存空间,因此适合 CPU 密集型任务。

常用类和函数:

  • Process → 创建进程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    
    p = multiprocessing.Process(name='foo_process',target=foo, args=(i,))
    p.start()
    p.join()
      
    print('Process running:', p, p.is_alive()
    p.terminate() # 如果不 join(),子进程可能已经死了,但 Python 的进程对象还没清理,会留下 僵尸进程。
    p.join()
    #  is_alive() 方法监控它的声明周期。然后通过调用 terminate() 方法结束进程
      
    p.daemon = True # 后台运行进程,在主进程结束之后会自动结束。
    
  • QueuePipe → 进程间通信(IPC)

    Queue 返回一个进程共享的队列,是线程安全的,也是进程安全的。任何可序列化的对象(Python通过 pickable 模块序列化对象)都可以通过它进行交换。单向通信。

    一般来说单独的父子进程之间,pipe更轻量。多进程的话,queue更能保证线程安全。

    特性 Queue Pipe
    通信模式 多生产者-多消费者 双向点对点
    线程/进程安全 ✅ 内置锁 ❌ 需要手动保证
    接口风格 put/get send/recv
    性能 较高开销 更轻量
    适用场景 多进程并发通信 简单父子进程通信
  • Pool → 进程池,方便管理多个进程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    
    import multiprocessing
      
    def function_square(data):
        result = data*data*data
        return result
      
    if __name__ == '__main__':
        inputs = list(range(100))
        pool = multiprocessing.Pool(processes=4)
        pool_outputs = pool.map(function_square, inputs)
        pool.close()
        pool.join()
        print ('Pool    :', pool_outputs)
    

    多进程库提供了 Pool 类来实现简单的多进程任务。 Pool 类有以下方法:

    • apply(): 直到得到结果之前一直阻塞。
    • apply_async(): 这是 apply() 方法的一个变体,返回的是一个result对象。这是一个异步的操作,在所有的子类执行之前不会锁住主进程。
    • map(): 这是内置的 map() 函数的并行版本。在得到结果之前一直阻塞,此方法将可迭代的数据的每一个元素作为进程池的一个任务来执行。
    • map_async(): 这是 map() 方法的一个变体,返回一个result对象。如果指定了回调函数,回调函数应该是callable的,并且只接受一个参数。当result准备好时会自动调用回调函数(除非调用失败)。回调函数应该立即完成,否则,持有result的进程将被阻塞。
  • ValueArray → 共享内存

  • Python的多进程模块提供了在所有的用户间管理共享信息的管理者(Manager)。一个管理者对象控制着持有Python对象的服务进程,并允许其它进程操作共享对象。

  • 进程同步原语,和线程挺类似的

    • Lock: 这个对象可以有两种装填:锁住的(locked)和没锁住的(unlocked)。一个Lock对象有两个方法, acquire()release() ,来控制共享数据的读写权限。
    • Event: 实现了进程间的简单通讯,一个进程发事件的信号,另一个进程等待事件的信号。 Event 对象有两个方法, set()clear() ,来管理自己内部的变量。
    • Condition: 此对象用来同步部分工作流程,在并行的进程中,有两个基本的方法: wait() 用来等待进程, notify_all() 用来通知所有等待此条件的进程。
    • Semaphore: 用来共享资源,例如,支持固定数量的共享连接。
    • Rlock: 递归锁对象。其用途和方法同 Threading 模块一样。
    • Barrier: 将程序分成几个阶段,适用于有些进程必须在某些特定进程之后执行。处于障碍(Barrier)之后的代码不能同处于障碍之前的代码并行。

优点:可绕过 GIL,CPU 密集任务性能高。

缺点:进程创建和销毁开销大,通信复杂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import multiprocessing
import random
import time

class Producer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        for i in range(3):
            item = random.randint(0, 256)
            self.queue.put(item)
            print("Process Producer : item %d appended to queue %s" % (item, self.name))
            time.sleep(1)
            print("The size of queue is %s" % self.queue.qsize())

class Consumer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            if self.queue.empty():
                print("the queue is empty")
                break
            else:
                time.sleep(2)
                item = self.queue.get()
                print('Process Consumer : item %d popped from by %s \n' % (item, self.name))
                time.sleep(1)

if __name__ == '__main__':
    queue = multiprocessing.Queue() // 创建实例
    process_producer = Producer(queue)
    process_consumer = Consumer(queue)
    process_producer.start()
    process_consumer.start()
    process_producer.join()
    process_consumer.join()

"""
# 运行结果
Process Producer : item 132 appended to queue Producer-1
The size of queue is 1
Process Producer : item 94 appended to queue Producer-1
The size of queue is 2
Process Producer : item 63 appended to queue Producer-1
Process Consumer : item 132 popped from by Consumer-2 

The size of queue is 2
Process Consumer : item 94 popped from by Consumer-2 

Process Consumer : item 63 popped from by Consumer-2 

the queue is empty
"""

注:

concurrent.futures 是 Python 标准库(3.2+)里提供的 高级并发编程接口。 它屏蔽了底层线程、进程管理的复杂性,给用户提供了一个 统一的任务提交和结果获取的方式

核心概念:

  • Executor(执行器):管理线程或进程池。
    • ThreadPoolExecutor → 线程池(适合 I/O 密集型任务)
    • ProcessPoolExecutor → 进程池(适合 CPU 密集型任务)
  • Future:表示一个可能尚未完成的任务的结果,可以等待它完成,也可以检查是否完成。
异步协程

asyncio 是 Python 3 提供的 异步 I/O 框架,基于事件循环(Event Loop)。

核心思想:单线程下通过异步切换,非阻塞执行 I/O 操作

协程函数用 async def 定义,内部等待 I/O 用 await

管理时间循环

  • loop = get_event_loop(): 得到当前上下文的事件循环。
  • loop.call_later(time_delay, callback, argument): 延后 time_delay 秒再执行 callback 方法。
  • loop.call_soon(callback, argument): 尽可能快调用 callback, call_soon() 函数结束,主线程回到事件循环之后就会马上调用 callback
  • loop.time(): 以float类型返回当前时间循环的内部时间。
  • asyncio.set_event_loop(): 为当前上下文设置事件循环。
  • asyncio.new_event_loop(): 根据此策略创建一个新的时间循环并返回。
  • loop.run_forever(): 在调用 stop() 之前将一直运行。

协程的管理在于程序员可以主动预测任务的临时退出点,将执行权交给其它协程,之后再返回,这一过程仅涉及用户态,因此开销较小。常见的常见适配比如有限状态机。

常用函数:

  • asyncio.run() → 运行事件循环
  • asyncio.create_task() → 创建协程任务
  • asyncio.gather() → 并发运行多个协程
  • asyncio.sleep() → 非阻塞延时

特点

  • 适合高并发 I/O 操作
  • 内存开销低
  • 单线程,不会天然利用多核 CPU

Java并发编程

Java并发编程的核心在于:线程管理(Thread / Executor);同步与锁(synchronized / Lock / 原子类);线程间通信(wait/notify / Condition);高层并发工具(Concurrent包、线程池、Future、Fork/Join)。

  • JVM 负责从 Java 层抽象出线程对象,并协调其生命周期、同步和调度。
  • 实际执行的线程是操作系统的线程,JVM 只是提供接口和封装
1
2
3
4
5
6
7
Java Thread (java.lang.Thread)
          |
          v
JVM 内部封装线程对象
          |
          v
操作系统原生线程 (Native Thread)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.*;

public class ConcurrencyExample {

    // 共享队列:生产者-消费者模型的缓冲区
    private static final Queue<Integer> queue = new LinkedList<>();
    private static final int MAX_CAPACITY = 5;

    // 锁对象,用于 synchronized
    private static final Object lock = new Object();

    // 原子变量,用于线程安全计数
    private static final AtomicInteger totalProduced = new AtomicInteger(0);
    private static final AtomicInteger totalConsumed = new AtomicInteger(0);

    // 并发集合,用于统计消费数据,线程安全
    private static final ConcurrentHashMap<Integer, Integer> stats = new ConcurrentHashMap<>();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 线程池,管理固定数量线程,避免手动创建 Thread
        ExecutorService executor = Executors.newFixedThreadPool(4);

        // 生产者任务,使用 Runnable
        Runnable producer = () -> {
            Random rand = new Random();
            while (totalProduced.get() < 20) { // 线程安全读取原子变量
                int item = rand.nextInt(100); // 生成随机数字
                synchronized (lock) { // synchronized 同步块,保证队列操作原子性
                    while (queue.size() >= MAX_CAPACITY) {
                        try {
                            lock.wait(); // wait/notify机制,线程间通信
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt(); // 恢复中断状态
                        }
                    }
                    queue.offer(item); // 入队
                    totalProduced.incrementAndGet(); // 原子操作,自增
                    System.out.println(Thread.currentThread().getName() + " produced: " + item);
                    lock.notifyAll(); // 通知等待线程(消费者或其他生产者)
                }
            }
        };

        // 消费者任务,使用 Runnable
        Runnable consumer = () -> {
            while (totalConsumed.get() < 20) { // 原子变量控制消费次数
                int item = -1;
                synchronized (lock) { // synchronized 保证队列操作线程安全
                    while (queue.isEmpty()) {
                        try {
                            lock.wait(); // 等待生产者生产
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    item = queue.poll(); // 出队
                    totalConsumed.incrementAndGet(); // 原子自增
                    System.out.println(Thread.currentThread().getName() + " consumed: " + item);
                    // 并发集合更新统计,merge 是线程安全操作
                    stats.merge(item, 1, Integer::sum);
                    lock.notifyAll(); // 通知生产者可以继续生产
                }
            }
        };

        // Callable 示例:返回统计结果
        Callable<Map<Integer, Integer>> statsTask = () -> stats; // Callable 可以返回值

        // 提交生产者和消费者到线程池
        executor.submit(producer); // Runnable 提交线程池执行
        executor.submit(producer);
        executor.submit(consumer);
        executor.submit(consumer);

        // 等待线程结束
        executor.shutdown(); // 不再接受新任务
        executor.awaitTermination(1, TimeUnit.MINUTES); // 阻塞等待线程池关闭

        // 获取统计结果(Callable 调用)
        Map<Integer, Integer> finalStats = statsTask.call(); 
        System.out.println("Item consumption stats: " + finalStats); // 输出最终统计
    }
}

Other

python装饰器

  • 装饰器是 一个函数,它接收另一个函数(或类)作为输入,并返回一个新的函数(或类)。
  • 作用:在不修改原函数代码的前提下,给它添加额外功能
  • 常用于:日志、权限验证、性能计时、缓存、事务管理等。

参考链接

万字详解并发编程!!!-阿里云开发者社区

C++并发编程(C++11到C++17) - 知乎

python-parallel-programming-cookbook-cn 1.0 文档

C++并发编程实战

Java并发编程