前言:基于各语言介绍一下对于并发的使用,也会穿插些语言的基础特性,适合编程萌新看,更侧重于广度,会结合代码展现。
并发编程基础介绍
古早的历史介绍,可以看看这篇万字详解并发编程!!!-阿里云开发者社区。
并发编程三要素
-
原子性:一个或者多个操作要么全部执行成功要么全部执行失败。
-
有序性:程序执行顺序按照代码顺序先后执行,但是CPU可能会对指令进行重排序。
-
可见性:当多个线程访问同一个变量时,如果一个线程修改了变量,其他线程立即获取最新的值。
并发编程的本质
并发编程的目标是充分地利用CPU,以达到最高的处理性能。
多任务的实现有以下3种方式:
- 进程:是操作系统资源分配和独立运行的最小单位。
- 线程:是进程内的一个任务执行独立单元,是任务调度和系统执行的最小单位。
- 协程:是用户态的轻量级线程,协程的调度完全由用户控制,主要为了单线程下模拟多线程。
一个程序可以有一到多个多进程,一个进程下可以有一到多个线程或协程,一个线程下可以有一到多个协程。
单CPU和多CPU环境下的并发
对于单CPU:
- 提升IO密集型人物的资源利用率
- 无法加速CPU密集型任务
- 切换任务时需要考虑CPU资源的切换开销
对于多CPU:
- 可加速CPU密集型任务,不同任务被分配到不同的核心,实现真正的同时计算
- IO密集型人物拥有更高的并发量
- 多核心同时操作共享数据,资源竞争问题更加突出
并发编程的难点
- 线程安全
- 共享资源竞争
- 死锁,活锁
常见编程语言的并发支持
- Java:线程(
Thread
)、线程池(ExecutorService
)、同步锁(synchronized
、Lock
)、并发容器(ConcurrentHashMap
)。 - Python:
threading
(受 GIL 限制,CPU 密集型效率低)、multiprocessing
(规避 GIL)、asyncio
(协程)。 - C++:
std::thread
、std::mutex
(C++11 及以上),依赖 OS 线程。 - Go:
goroutine
(轻量级线程)+ 通道(channel
),简化并发编程。
维度 | C++ | Java | Python |
---|---|---|---|
语言层支持 | C++11 开始引入标准库并发;更底层,灵活但复杂 | 从 Java 1.0 起内置线程和同步机制 | 内置线程/进程库,但受 GIL 影响 |
主要模型 | - 线程 (std::thread , std::jthread )- 任务并行 ( std::async , std::future )- 原子操作 ( std::atomic )- 锁 ( std::mutex , std::shared_mutex , std::lock_guard )- 条件变量 ( std::condition_variable )- 线程局部存储 ( thread_local ) |
- 线程 (Thread , Runnable , Callable )- 线程池 ( ExecutorService , ForkJoinPool )- 同步 ( synchronized , Lock , ReentrantLock , Condition )- 原子类 ( AtomicInteger , AtomicReference ) - 并发容器 ( ConcurrentHashMap , BlockingQueue ) - 异步 ( CompletableFuture ) |
- 线程 (threading.Thread )- 线程池 ( concurrent.futures.ThreadPoolExecutor )- 进程池 ( multiprocessing , ProcessPoolExecutor )- 同步 ( threading.Lock , RLock , Condition )- 队列 ( queue.Queue , multiprocessing.Queue )- 异步/协程 ( asyncio , await , Task ) |
内存模型 | - 弱一致性,需要开发者理解 内存序 (memory_order) - 支持 acquire/release 、relaxed 、seq_cst 等模式 |
- Java 内存模型 (JMM) 定义了 可见性、有序性、原子性 - volatile 保证可见性 |
- GIL 限制 Python 字节码同一时刻只有一个线程执行 - 内存一致性问题较少暴露 |
适用场景 | 高性能系统、嵌入式、游戏引擎、实时计算 | 企业应用、Web 服务器、高并发服务 | 爬虫、数据采集、I/O 密集型任务、分布式任务调度 |
I/O 并发 | - std::async 或第三方库 (Boost.Asio, ASIO standalone)- libuv 等 |
- NIO (非阻塞 I/O)- Netty (事件驱动网络框架)- CompletableFuture |
- asyncio (事件循环)- aiohttp (异步 HTTP 客户端/服务端)- Twisted , Trio |
CPU 并行 | - 多线程并行 - OpenMP (编译指令并行) - Intel TBB/oneTBB (任务调度) - CUDA/OpenCL (GPU 加速) |
- 多线程配合 ForkJoinPool(工作窃取算法) - 并行流 ( parallelStream() ) |
- multiprocessing (独立进程绕过 GIL)- joblib (科学计算)- Ray , Dask (分布式并行计算) |
高层抽象 | - std::async + std::future/promise - HPX(异步任务库) |
- ExecutorService 提交任务- ForkJoinPool 分治任务- CompletableFuture 链式异步 |
- concurrent.futures (统一线程池/进程池 API)- asyncio.gather (协程调度)- Celery (分布式任务队列) |
常用并发模式 | - 生产者-消费者 (std::queue + mutex + condition_variable )- 读写锁 ( std::shared_mutex )- 线程池(需要自行实现或用 Boost/TBB) |
- 生产者-消费者 (BlockingQueue )- 读写锁 ( ReadWriteLock )- 内置线程池 ( Executors.newFixedThreadPool ) |
- 生产者-消费者 (queue.Queue )- 协程调度 ( asyncio.create_task )- 线程池/进程池 ( concurrent.futures ) |
C++并发编程
<thread>
头文件
C++11时加入标准库,主要是对操作系统线程(比如linux的pthread,windows的win32Thread)的封装,提供面向对象的线程管理,无需操作底层API。
由于面向对象,因此存在RALL思想可以自动释放资源;任务驱动时支持函数lambda仿函数成员函数等;具备移植性,可屏蔽不同操作系统差异。
join()
:主线程等待子线程结束。detach()
:分离线程,让它在后台运行,主线程不再管理。
<mutex>
头文件 — 互斥
提供了用于互斥访问共享资源的类和函数,核心是确保同一时间只有一个线程能访问临界区(共享数据),从而避免数据竞争。
1 基础的互斥锁
lock():锁定互斥体,若已被其他线程锁定,则当前线程阻塞等待。
unlock():解锁互斥体,必须由持有锁的线程调用,否则行为未定义。
try_lock():尝试锁定互斥体,若成功返回 true,否则立即返回 false(不阻塞)。
2 lock_guard 基于RALL的互斥锁包装器
1
2
mutex mtx;
lock_guard<mutex> lock(mtx);
3 unique_lock
更灵活的锁包装器,支持手动锁定 / 解锁、延迟锁定、转移所有权等:
4 Other
std::recursive_mutex
:允许同一线程多次锁定(需对应次数解锁),避免线程自身死锁。std::timed_mutex
/std::recursive_timed_mutex
:支持带超时的try_lock_for()
/try_lock_until()
,避免无限阻塞。std::lock()
:同时锁定多个互斥体,避免因锁定顺序不同导致的死锁。
<condition_variable>
头文件 — 同步
1 std::condition_variable
依赖互斥锁(通常是 std::unique_lock
),实现线程的等待与唤醒:
wait(lock)
:释放锁并阻塞当前线程,等待被其他线程唤醒;唤醒后重新获取锁并返回。wait(lock, predicate)
:带条件的等待,等价于while (!predicate()) wait(lock)
,避免虚假唤醒(spurious wakeup)。notify_one()
:唤醒一个等待该条件变量的线程。notify_all()
:唤醒所有等待该条件变量的线程。
2 std::condition_variable_any
与 std::condition_variable
类似,但可配合任意满足可锁定(Lockable)概念的锁(如 std::lock_guard
、std::shared_mutex
等),灵活性更高,但性能略低。
经典示例:生产者消费者模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
using namespace std;
mutex mtx; // 互斥锁
condition_variable cv; // 条件变量
std::queue<int> buffer; // 缓冲区
const int MAX_SIZE=5;
int global_data=0;
bool producer_done=false; // 给consumer增加一个退出条件
void producer(int id){
// int data=0; 当存在多个生产者时,data会被重复生产,因此改成全局变量
while(global_data<7){
unique_lock<mutex> lock(mtx);
cv.wait(lock, []{return buffer.size()<MAX_SIZE;}); // 缓冲区有空位时可以生产
buffer.push(global_data);
std::cout << "Producer " << id << " produced: " << global_data << "\n";
global_data++;
lock.unlock();
cv.notify_all();
}
{
lock_guard<mutex> lock(mtx);
producer_done=true;
}
cv.notify_all();
}
void consumer(int id){
while(true){
unique_lock<mutex> lock(mtx);
cv.wait(lock, []{return !buffer.empty() || producer_done;});
if(buffer.empty() && producer_done){
cout << "Consumer " << id << " exits.\n";
break; // 改:消费者在生产结束、队列空时退出
}
int value=buffer.front();
buffer.pop();
std::cout << "Consumer " << id << " consumed: " << value << "\n";
lock.unlock();
cv.notify_all();
}
}
int main() {
std::thread p1(producer, 1); // std::thread p2(producer, 2);
std::thread c1(consumer, 1); // std::thread c2(consumer, 2);
p1.join(); // p2.join();
c1.join(); // c2.join();
return 0;
}
/*
运行结果:
Producer 1 produced: 0
Producer 1 produced: 1
Producer 1 produced: 2
Producer 1 produced: 3
Producer 1 produced: 4
Consumer 1 consumed: 0
Consumer 1 consumed: 1
Consumer 1 consumed: 2
Consumer 1 consumed: 3
Consumer 1 consumed: 4
Producer 1 produced: 5
Producer 1 produced: 6
Consumer 1 consumed: 5
Consumer 1 consumed: 6
Consumer 1 exits.
*/
Python并发编程
简单的小规模 I/O 任务用多线程;高并发、高性能要求的 I/O 任务用协程
模型 | 核心库 | 适用场景 | 优缺点 |
---|---|---|---|
多线程 | threading |
I/O 密集型任务 | 受 GIL 影响 CPU 密集型性能低,但线程切换开销小 |
多进程 | multiprocessing |
CPU 密集型任务 | 可以充分利用多核 CPU,开销大,通信复杂 |
协程(异步) | asyncio |
I/O 密集型任务(网络、文件) | 高效、低内存开销,但编程模式需要 async/await |
多线程碰到 I/O 可以切换去做其他线程的任务,但每个线程还是阻塞等待 I/O,由操作系统线程控制,这叫并发,不是异步;真正异步是任务主动挂起、由事件循环调度。
多线程 – threading — 基于线程的并行性 — Python 3.13.7 文档
threading
是 Python 的标准线程库,用于多线程编程。
线程在同一进程内共享内存空间,创建开销小。
注意:Python 有 GIL(全局解释器锁),CPU 密集型任务多线程不会提升性能,但 I/O 密集型任务可显著提高吞吐量。
常用类和函数:
-
threading.Thread
→ 创建线程threading
模块是创建和管理线程的首选形式。每一个线程都通过一个继承Thread
类,重写run()
方法来实现逻辑,这个方法是线程的入口。调用
start()
之后线程变为活跃状态,并且持续直到run()
结束,或者中间出现异常。所有的线程都执行完成之后,程序结束。join()
命令控制主线程的终止1 2 3
t = threading.Thread(group=None,target=None, name=None, args=(), kwargs={}) # 线程创建 t.start() # 启动线程 t.join() # 阻塞主线程直到所有线程完成
-
threading.Lock
→ 线程锁,保证线程安全LOCK 互斥锁
1 2 3 4 5 6 7 8 9
shared_resource_lock = threading.Lock() shared_resource_lock.acquire() shared_resource_with_lock -= 1 shared_resource_lock.release() # 如果状态是unlocked, 可以调用 acquire() 将状态改为locked # 如果状态是locked, acquire() 会被block直到另一线程调用 release() 释放锁 # 如果状态是unlocked, 调用 release() 将导致 RuntimError 异常 # 如果状态是locked, 可以调用 release() 将状态改为unlocked
RLock 可重入锁
在类外面保证线程安全,又要在类内使用同样方法的时候
RLock()
就很实用- 谁拿到谁释放。如果线程A拿到锁,线程B无法释放这个锁,只有A可以释放;
- 同一线程可以多次拿到该锁,即可以acquire多次;
- acquire多少次就必须release多少次,只有最后一次release才能改变RLock的状态为unlocked
1 2 3 4 5 6
class Box(object): lock = threading.RLock() Box.lock.acquire() # ...do something Box.lock.release()
-
threading.Event
→ 线程间通知事件是线程之间用于通讯的对象。有的线程等待信号,有的线程发出信号。基本上事件对象都会维护一个内部变量,可以通过
set()
方法设置为true
,也可以通过clear()
方法设置为false
。wait()
方法将会阻塞线程,直到内部变量为true
。1 2 3 4
from threading import Thread, Event self.event = event self.event.set() self.event.clear()
-
threading.Condition
→ 条件变量,用于复杂线程协作1 2 3 4 5 6 7
from threading import Thread, Condition condition.acquire() condition.wait() ... condition.notify() condition.release() # 有点类似cpp中的 lock 和 condition_variable
-
threading.Semaphore
→ 信号量控制并发数量可用于生产者消费者模型#
1 2 3 4
semaphore = threading.Semaphore(0) semaphore.acquire() # 读取关联了信号量的共享资源,减少信号量的内部变量(信号量必须为非负) ... semaphore.release() # 线程不需要改信号量时,增加信号量的内部变量·
-
With语句操作上下文管理器[理解Python的With语句 Linbo的博客](https://linbo.github.io/2013/01/08/python-with)
Python的threading模块提供了很多同步原语,包括信号量,条件变量,事件和锁,优先使用这些原语可以使得多线程编程更为安全。
GIL:GIL是CPython解释器引入的锁,解释器会强迫想要运行的线程必须拿到GIL才能访问解释器的任何资源,从而达到阻止不同的线程并发访问Python对象。
多进程
multiprocessing
提供了多进程支持,可充分利用多核 CPU。每个进程有独立内存空间,因此适合 CPU 密集型任务。
常用类和函数:
-
Process
→ 创建进程1 2 3 4 5 6 7 8 9 10
p = multiprocessing.Process(name='foo_process',target=foo, args=(i,)) p.start() p.join() print('Process running:', p, p.is_alive() p.terminate() # 如果不 join(),子进程可能已经死了,但 Python 的进程对象还没清理,会留下 僵尸进程。 p.join() # is_alive() 方法监控它的声明周期。然后通过调用 terminate() 方法结束进程 p.daemon = True # 后台运行进程,在主进程结束之后会自动结束。
-
Queue
、Pipe
→ 进程间通信(IPC)Queue
返回一个进程共享的队列,是线程安全的,也是进程安全的。任何可序列化的对象(Python通过pickable
模块序列化对象)都可以通过它进行交换。单向通信。一般来说单独的父子进程之间,pipe更轻量。多进程的话,queue更能保证线程安全。
特性 Queue Pipe 通信模式 多生产者-多消费者 双向点对点 线程/进程安全 ✅ 内置锁 ❌ 需要手动保证 接口风格 put/get
send/recv
性能 较高开销 更轻量 适用场景 多进程并发通信 简单父子进程通信 -
Pool
→ 进程池,方便管理多个进程1 2 3 4 5 6 7 8 9 10 11 12 13
import multiprocessing def function_square(data): result = data*data*data return result if __name__ == '__main__': inputs = list(range(100)) pool = multiprocessing.Pool(processes=4) pool_outputs = pool.map(function_square, inputs) pool.close() pool.join() print ('Pool :', pool_outputs)
多进程库提供了
Pool
类来实现简单的多进程任务。Pool
类有以下方法:apply()
: 直到得到结果之前一直阻塞。apply_async()
: 这是apply()
方法的一个变体,返回的是一个result对象。这是一个异步的操作,在所有的子类执行之前不会锁住主进程。map()
: 这是内置的map()
函数的并行版本。在得到结果之前一直阻塞,此方法将可迭代的数据的每一个元素作为进程池的一个任务来执行。map_async()
: 这是map()
方法的一个变体,返回一个result对象。如果指定了回调函数,回调函数应该是callable的,并且只接受一个参数。当result准备好时会自动调用回调函数(除非调用失败)。回调函数应该立即完成,否则,持有result的进程将被阻塞。
-
Value
、Array
→ 共享内存 -
Python的多进程模块提供了在所有的用户间管理共享信息的管理者(Manager)。一个管理者对象控制着持有Python对象的服务进程,并允许其它进程操作共享对象。
-
进程同步原语,和线程挺类似的
- Lock: 这个对象可以有两种装填:锁住的(locked)和没锁住的(unlocked)。一个Lock对象有两个方法,
acquire()
和release()
,来控制共享数据的读写权限。 - Event: 实现了进程间的简单通讯,一个进程发事件的信号,另一个进程等待事件的信号。
Event
对象有两个方法,set()
和clear()
,来管理自己内部的变量。 - Condition: 此对象用来同步部分工作流程,在并行的进程中,有两个基本的方法:
wait()
用来等待进程,notify_all()
用来通知所有等待此条件的进程。 - Semaphore: 用来共享资源,例如,支持固定数量的共享连接。
- Rlock: 递归锁对象。其用途和方法同
Threading
模块一样。 - Barrier: 将程序分成几个阶段,适用于有些进程必须在某些特定进程之后执行。处于障碍(Barrier)之后的代码不能同处于障碍之前的代码并行。
- Lock: 这个对象可以有两种装填:锁住的(locked)和没锁住的(unlocked)。一个Lock对象有两个方法,
优点:可绕过 GIL,CPU 密集任务性能高。
缺点:进程创建和销毁开销大,通信复杂
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import multiprocessing
import random
import time
class Producer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
for i in range(3):
item = random.randint(0, 256)
self.queue.put(item)
print("Process Producer : item %d appended to queue %s" % (item, self.name))
time.sleep(1)
print("The size of queue is %s" % self.queue.qsize())
class Consumer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
while True:
if self.queue.empty():
print("the queue is empty")
break
else:
time.sleep(2)
item = self.queue.get()
print('Process Consumer : item %d popped from by %s \n' % (item, self.name))
time.sleep(1)
if __name__ == '__main__':
queue = multiprocessing.Queue() // 创建实例
process_producer = Producer(queue)
process_consumer = Consumer(queue)
process_producer.start()
process_consumer.start()
process_producer.join()
process_consumer.join()
"""
# 运行结果
Process Producer : item 132 appended to queue Producer-1
The size of queue is 1
Process Producer : item 94 appended to queue Producer-1
The size of queue is 2
Process Producer : item 63 appended to queue Producer-1
Process Consumer : item 132 popped from by Consumer-2
The size of queue is 2
Process Consumer : item 94 popped from by Consumer-2
Process Consumer : item 63 popped from by Consumer-2
the queue is empty
"""
注:
concurrent.futures
是 Python 标准库(3.2+)里提供的 高级并发编程接口。
它屏蔽了底层线程、进程管理的复杂性,给用户提供了一个 统一的任务提交和结果获取的方式。
核心概念:
- Executor(执行器):管理线程或进程池。
ThreadPoolExecutor
→ 线程池(适合 I/O 密集型任务)ProcessPoolExecutor
→ 进程池(适合 CPU 密集型任务)
- Future:表示一个可能尚未完成的任务的结果,可以等待它完成,也可以检查是否完成。
异步协程
asyncio
是 Python 3 提供的 异步 I/O 框架,基于事件循环(Event Loop)。
核心思想:单线程下通过异步切换,非阻塞执行 I/O 操作。
协程函数用 async def
定义,内部等待 I/O 用 await
。
管理时间循环
loop = get_event_loop()
: 得到当前上下文的事件循环。loop.call_later(time_delay, callback, argument)
: 延后time_delay
秒再执行callback
方法。loop.call_soon(callback, argument)
: 尽可能快调用callback
,call_soon()
函数结束,主线程回到事件循环之后就会马上调用callback
。loop.time()
: 以float类型返回当前时间循环的内部时间。asyncio.set_event_loop()
: 为当前上下文设置事件循环。asyncio.new_event_loop()
: 根据此策略创建一个新的时间循环并返回。loop.run_forever()
: 在调用stop()
之前将一直运行。
协程的管理在于程序员可以主动预测任务的临时退出点,将执行权交给其它协程,之后再返回,这一过程仅涉及用户态,因此开销较小。常见的常见适配比如有限状态机。
常用函数:
asyncio.run()
→ 运行事件循环asyncio.create_task()
→ 创建协程任务asyncio.gather()
→ 并发运行多个协程asyncio.sleep()
→ 非阻塞延时
特点:
- 适合高并发 I/O 操作
- 内存开销低
- 单线程,不会天然利用多核 CPU
Java并发编程
Java并发编程的核心在于:线程管理(Thread / Executor);同步与锁(synchronized / Lock / 原子类);线程间通信(wait/notify / Condition);高层并发工具(Concurrent包、线程池、Future、Fork/Join)。
- JVM 负责从 Java 层抽象出线程对象,并协调其生命周期、同步和调度。
- 实际执行的线程是操作系统的线程,JVM 只是提供接口和封装
1
2
3
4
5
6
7
Java Thread (java.lang.Thread)
|
v
JVM 内部封装线程对象
|
v
操作系统原生线程 (Native Thread)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.*;
public class ConcurrencyExample {
// 共享队列:生产者-消费者模型的缓冲区
private static final Queue<Integer> queue = new LinkedList<>();
private static final int MAX_CAPACITY = 5;
// 锁对象,用于 synchronized
private static final Object lock = new Object();
// 原子变量,用于线程安全计数
private static final AtomicInteger totalProduced = new AtomicInteger(0);
private static final AtomicInteger totalConsumed = new AtomicInteger(0);
// 并发集合,用于统计消费数据,线程安全
private static final ConcurrentHashMap<Integer, Integer> stats = new ConcurrentHashMap<>();
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 线程池,管理固定数量线程,避免手动创建 Thread
ExecutorService executor = Executors.newFixedThreadPool(4);
// 生产者任务,使用 Runnable
Runnable producer = () -> {
Random rand = new Random();
while (totalProduced.get() < 20) { // 线程安全读取原子变量
int item = rand.nextInt(100); // 生成随机数字
synchronized (lock) { // synchronized 同步块,保证队列操作原子性
while (queue.size() >= MAX_CAPACITY) {
try {
lock.wait(); // wait/notify机制,线程间通信
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
}
}
queue.offer(item); // 入队
totalProduced.incrementAndGet(); // 原子操作,自增
System.out.println(Thread.currentThread().getName() + " produced: " + item);
lock.notifyAll(); // 通知等待线程(消费者或其他生产者)
}
}
};
// 消费者任务,使用 Runnable
Runnable consumer = () -> {
while (totalConsumed.get() < 20) { // 原子变量控制消费次数
int item = -1;
synchronized (lock) { // synchronized 保证队列操作线程安全
while (queue.isEmpty()) {
try {
lock.wait(); // 等待生产者生产
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
item = queue.poll(); // 出队
totalConsumed.incrementAndGet(); // 原子自增
System.out.println(Thread.currentThread().getName() + " consumed: " + item);
// 并发集合更新统计,merge 是线程安全操作
stats.merge(item, 1, Integer::sum);
lock.notifyAll(); // 通知生产者可以继续生产
}
}
};
// Callable 示例:返回统计结果
Callable<Map<Integer, Integer>> statsTask = () -> stats; // Callable 可以返回值
// 提交生产者和消费者到线程池
executor.submit(producer); // Runnable 提交线程池执行
executor.submit(producer);
executor.submit(consumer);
executor.submit(consumer);
// 等待线程结束
executor.shutdown(); // 不再接受新任务
executor.awaitTermination(1, TimeUnit.MINUTES); // 阻塞等待线程池关闭
// 获取统计结果(Callable 调用)
Map<Integer, Integer> finalStats = statsTask.call();
System.out.println("Item consumption stats: " + finalStats); // 输出最终统计
}
}
Other
python装饰器
- 装饰器是 一个函数,它接收另一个函数(或类)作为输入,并返回一个新的函数(或类)。
- 作用:在不修改原函数代码的前提下,给它添加额外功能。
- 常用于:日志、权限验证、性能计时、缓存、事务管理等。