QA

AI代码审查 - 关键点优化

Posted by Sutdown on July 20, 2025

InferenceAI

模块目的

inferenceOpenAI 模块在本项目中的核心目的是作为 AI 服务的接口层,封装 API 调用细节,支持大规模并行的代码评审流程。它需要处理高并发请求、管理资源使用、确保调用稳定性,并为上层业务逻辑提供简洁的接口。该模块封装了与 OpenAI API 的交互逻辑,提供了两种调用 AI 服务的方式:

  • 普通调用 (inferenceOpenAI):一次性获取完整响应
  • 流式调用 (inferenceOpenAIStream):实时获取生成内容

一期方案

普通方式调用 AI 服务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Args:
    APP_ID (str): AI 服务的应用 ID
    REQUEST_URL (str): API 请求地址
    MODEL_NAME (str): 使用的模型名称
    message:
        user_prompt (str): 用户输入的提示词
        system_prompt (str): 系统提示词
    stream (bool): 是否流式输出
    MAX_TOKENS (int, optional): 最大生成 token 数,默认 30*1024 
    extra_headers (dict): 额外的请求头
Returns:
    object: 包含 AI 响应结果的对象
    content: 生成的内容
    prompt_tokens: prompt消耗的token数
    total_tokens: 消耗的token总数
    model: 模型名称
    request_id:请求 id
    created_time:创造时间

调用过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
response = client.chat.completions.create(
    model=MODEL_NAME,
    messages=[
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt}
    ],
    stream=False,
    max_tokens=MAX_TOKENS,
    extra_headers={
        "M-TraceId": "unique_id_for_troubleshooting"
    }
)
流式方式调用AI服务

相对于普通方案,流式方案在传参和出参上做出来调整。 传参中修改了stream为True,表示采用流式方式。出餐中不再是一个完整的模块而是很多个chunk,chunk中包含该部分属于第几次的结果和是否到达最后,因此需要自己对结果进行拼接。由于本项目是批量处理而非交互式处理,多线程模式下的流式输出还可能出错,比如在一个appid同时处理两个请求时数据混乱也不是毫无可能,因此选择普通方案为最佳。

1
2
3
4
5
6
for chunk in streamResult:
    if chunk.choices[0].delta.content is not None:
        content = chunk.choices[0].delta.content
        sys.stdout.write(content)
        sys.stdout.flush()
        full_content += content

另外: 1 存在锁应对多线程 2 存在重试机制

改进方向

该项目若要提升效率定是要往多线程上靠拢,因此该方案存在改进空间

  1. 连接管理不佳:每次调用都创建新的 HTTP 客户端,可以考虑将多个请求合并成一个请求。
  2. 缺乏重试机制,详细的错误处理,监控性能指标收集和日志记录
  3. 资源泄漏风险:HTTP 客户端未正确关闭
  4. 不支持并发优化:在多线程环境下效率低下

改进方向有 (当前的并发在于 1 多模型并发。2 多任务并发。优先2,多任务并发已经能够充分利用多核CPU,且实现了基本的限流和负载均衡。多模型并发可以考虑作为优化方向,但目前不纠结于实现。或者可以根据不同模型对于api的调用速度调整AppidManager。) 1 将多个请求合并成一个请求。合并的话需要修改prompt,需要对结果进行处理,没有必要,不如用连接池复用http连接,达到效果相同,整体改动也更小。 2 连接池。连接池方案通过复用 HTTP 连接、正确管理资源、控制并发和保证线程安全,可以显著提高 inferenceOpenAI 在多线程环境下的性能和稳定性。这种优化对于处理大量并发请求(如当前项目中的万级任务量)尤为重要,可以在不修改业务逻辑的情况下获得明显的性能提升。

二期方案

连接池

注:连接池和appid池不冲突。即使保持连接,每个appid每分钟也最多调用三次限流依然存在。

max_clients: 连接池中最大客户端数量,同appid数量。

1 如何在连接池中保持客户端的连接,现在怎么实现,原先怎么实现。 Answer:

一期中当前代码中,OpenAIClientPool 类只定义了初始化方法,但实际的 inferenceOpenAI 和 inferenceOpenAIStream 函数仍然每次创建新的客户端,没有连接复用,每次都重新建立 TCP 连接。

二期中将连接变为长连接,同时使用key-value的方式存储,key:f”{app_id}:{request_url}” - 确保每个 APP_ID 和 URL 组合有唯一的客户端,value:OpenAI 客户端实例 - 包含复用的 HTTP 连接,这样就不会重复连接。在inferenceAI中从连接池获取客户端,而不是每次创建新的连接。

2 如何确认 appid 对应哪个客户端,一个 appid 理论上只能对应一个客户端。 Answer:键值映射策略。使用复合键来确保一对一映射:

1
2
3
	APP_ID_1 + URL_A → Client_1
	APP_ID_1 + URL_B → Client_2  # 同一个APP_ID,不同URL
	APP_ID_2 + URL_A → Client_3  # 不同APP_ID,同一URL

3 连接池什么时候关闭其中的连接,各个连接如何存储的,如何添加连接,什么时候关闭连接,怎么关闭连接。 Answer:

  • 添加连接:首次调用时添加,kv存储可以让第二次调用时无需重新建立连接。
  • 关闭连接:1 连接池满时:移除最早创建的连接(在使用连接池时先判断,但 appid 数量和连接池大小相同,理论上不会满);2 程序退出时:关闭所有连接(最后需要手动执行);3 连接出错时:关闭并重新创建连接(连接时会有异常处理,连接失败则重新创建)。

4 如何保持并发安全。 Answer:

为什么使用 RLock(互斥锁)

  • 可重入性:同一线程可以多次获取锁
  • 避免死锁:防止同一线程内的嵌套调用导致死锁

并发安全的关键点

  1. 共享资源保护:self.clients 和 self.stats 的访问都在锁保护下
  2. 原子操作:客户端的创建、获取、删除都是原子操作
  3. 异常安全:使用 try-finally 确保锁能正确释放
性能分析

1 代码分析

通过对 inferenceOpenAI.py 代码的分析,该实现通过连接池技术显著提升了 OpenAI API 调用的性能。连接池版本通过复用 HTTP 连接避免了频繁创建和销毁连接的开销,减少了 TCP 握手和 TLS 协商的时间消耗。代码中的 OpenAIClientPool 类维护了一个客户端连接池,根据 app_id 和 request_url 的组合作为键来存储和复用连接,当连接池达到上限时会关闭最早创建的连接。性能统计显示,连接池版本相比原始版本在平均响应时间、总耗时和吞吐量方面都有明显提升,特别是在高并发场景下优势更为突出。连接复用率指标反映了连接被有效重用的程度,进一步证明了连接池的价值。此外,代码还实现了完善的资源管理机制,确保在程序退出时能够正确关闭所有连接,避免资源泄露。这种优化方式不仅提高了单次请求的响应速度,也增强了系统在高负载下的稳定性和可靠性。

2 测试结果

当存在4个APPID时,100请求,20并发时

高并发场景性能对比结果

指标 连接池版本 原始版本 性能提升
成功率(%) 99.0 79.0 20.0%
平均响应时间(s) 3.401 3.745 9.2%
吞吐量(请求/秒) 4.54 3.21 41.2%
总耗时(s) 21.83 24.59 11.2%
最小响应时间(s) 0.685 0.775 -
最大响应时间(s) 9.327 14.213 -

连接池详细统计:

  • 创建的客户端: 4
  • 复用的客户端: 96
  • 总请求数: 100
  • 连接复用率: 96.0%

当存在 4 个 APPID 时,200请求,50并发时

高并发场景性能对比结果

指标 连接池版本 原始版本 性能提升
成功率(%) 60.0 21.0 39.0%
平均响应时间(s) 3.561 5.412 34.2%
吞吐量(请求/秒) 7.84 2.41 225.7%
总耗时(s) 15.31 17.45 12.3%
最小响应时间(s) 0.839 2.379 -
最大响应时间(s) 9.866 11.277 -

连接池详细统计:

  • 创建的客户端: 4
  • 复用的客户端: 196
  • 总请求数: 200
  • 连接复用率: 98.0%

从两组测试数据可以看出,连接池在高并发场景下确实展现出了显著的性能优势,特别是在更高并发度的情况下效果更加明显。 2.1 成功率:原始版本每次创建新连接容易导致连接资源耗尽、超时等问题,连接池版本的成功率明显更高,特别是在高并发场景下。 2.2 吞吐量:连接复用避免了TCP连接建立的开销,系统能处理更多并发请求,特别是在高并发场景下提升了225.7%。 2.3 连接复用率:连接复用率非常高(96-98%),说明连接池设计有效,大幅减少了系统资源消耗和网络开销。 2.4 在高并发的情况下,连接池的版本明显更为稳定,各项数值稳步上升,但是原始版本在高并发时成功率吞吐量明显不如连接池版本。比如连接池版本 成功率上升,平均响应时间变短(同时最小响应时间和最大响应时间均下降),吞吐量上升(每秒能处理的请求变多)。

AppIDManager

模块目的

目的

在inferenceAI的压测中我们可以发现,成功率的降低有一个很大的原因在于同一个appid同一分钟使用的次数有限,当被调用次数过多系统检测就会失败,因此为了防止appid过载,设计appidmanager用于管理这个池子。主要作用在于 1 限制appid的使用次数 2 每次选择使用次数最少的appid。

多线程数选择

max_workers = min(length(valid_app_id) * 2, length(model_list) * 10) 均衡考虑APPid资源限制 和 模型任务并发

Appid一分钟只能使用三次为当前的瓶颈,如何使这个瓶颈产生的负作用最小化。 由于每个任务整体分为两个过程,一是等待appid调用,二是收集解析结果。在第二个过程的空闲期其余进程可以调用此时已经被释放的appid,因此length(valid_app_id) * 2,。

当我有n个模型,m的任务时,总任务量为nm,其中一个模型下的多个任务可以并行处理,一个任务面对多个模型也可以并行处理,每次收集时会优先收集一个任务对多个模型的结果,因此最好有n个线程能同时处理一个任务面对的多模型,model10 是为了不会因为过度并发导致大量限流错误,限制 appid*2。

一期方案

资源池 + 限流 + 负载均衡

1 资源池管理:维护一个APP_ID池,支持动态分配和回收。appid池其实就是一个appid数组。在使用appid时,会先从appid池中找到可以使用的appid(也就是还没有触发限流);然后为了负载均衡,找到使用近1分钟内次数最少的appid,这里的方案是采取双端队列(也可以称滑动窗口记录每个时间的appid使用),在查找前移除一分钟之外请求记录,再筛选使用次数最少的appid。

2 限流控制:每个APP_ID默认限制为每分钟3次请求,防止API限制 self.rate_limit_per_minute = rate_limit_per_minute self.app_id_request_times = defaultdict(deque) # 记录每个APP_ID的请求时间 self.app_id_usage_count = defaultdict(int) # 记录每个APP_ID的使用次数

3 负载均衡:优先选择使用次数最少的APP_ID,避免资源不均衡

4 并发安全:使用线程锁确保多线程环境下的安全访问 threading.Lock() - Python 标准库中的互斥锁(Mutex Lock)(粗粒度锁) 互斥性:同一时间只能有一个线程持有锁 阻塞性:当锁被占用时,其他线程会阻塞等待 可重入性:不可重入(同一线程不能重复获取同一把锁)

单线程使用性能 5个appid,处理3个任务,单线程无问题,每个任务随机给一个未被使用的appid即可

多线程使用性能 5个appid,处理20个任务,如果是单线程大概是平均2s/task,多线程能达到1.5s/task,速度上升了25%

改进方向

(如果appid可以无限次调用,代码如何修正。)

1

  • 针对等待机制,轮询过于浪费CPU,因此改变为条件变量降低CPU使用率。

  • 对于限流算法,上图说的令牌桶算法不可尽信,因为该业务场景中不存在突发流量,令牌桶的最大作用就是增加了算法的复杂性。再重新看原先的滑动窗口,瓶颈在于每次访问时都需要遍历实时清理60秒前的记录,同时每次请求都记录精确时间。新方案 固定时间窗口 + 均匀调度,计算出请求之间的固定间隔,仅存储最后一次请求时间,基于时间间隔而非计数判断。

  • 对于优先队列,目前看来可以优先处理相同模型的数据,便于收集结果,但其实当前任务属于平滑任务,并无优先级。

因此改动点其实只有限流和等待机制。

二期方案

条件变量替代轮询

self.condition = threading.Condition(self.lock)

AppIDManager 类中的条件变量机制通过 threading.Condition 实现了高效的线程同步和资源等待,它在资源获取和释放过程中形成了一个完整的通知链。当线程请求 APP_ID 但没有可用资源时,get_available_app_id 方法会使线程通过 self.condition.wait(remaining) 进入休眠状态,避免了忙等待导致的 CPU 资源浪费;同时,该线程会释放已获取的锁,允许其他线程操作共享资源。当其他线程完成任务并调用 release_app_id 方法释放 APP_ID 时,会通过 self.condition.notify_all() 唤醒所有等待的线程,这些被唤醒的线程会重新获取锁并尝试获取可用的 APP_ID。这种机制特别适合处理资源竞争场景,如下面的代码片段所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 等待可用APP_ID
if app_id is None and wait:
    # 使用条件变量等待
    if timeout is not None:
        remaining = timeout - (time.time() - start_time)
        if remaining <= 0:
            break
        self.condition.wait(remaining)  # 线程休眠并释放锁
    else:
        self.condition.wait()  # 无限等待

# 释放APP_ID时通知等待的线程
def release_app_id(self, app_id, model_name, success=True):
    with self.lock:

# 释放资源的逻辑..

# 通知所有等待的线程
    self.condition.notify_all()

这种条件变量机制结合超时控制,既保证了系统资源的高效利用,又避免了线程无限等待的风险,是多线程环境下资源管理的优秀解决方案。

固定时间窗口 + 均匀调度限流

UniformRateLimiter 类实现均匀调度 只存储 last_request_time,内存使用 O(1) 基于时间间隔判断,计算复杂度 O(1) 由于将请求间隔固定为20s,导致总运行时间反而变长,该方案还不如原方案

分桶计数器

分统计数器和原先的滑动窗口最大的区别在于更新appid。原先是直接遍历更新一分钟之外的appid,还要记录时间戳。现在的方案是为每个appid分配了6个桶,每个桶记录10s内的请求数量,随着时间推移,旧的桶会被丢弃,新的空桶会被添加,从而保持一个固定大小的时间窗口(本例中为60秒)。具体过程可以看附录中的_update_buckets函数。

_update_buckets 方法实现了一个高效的滑动窗口机制,用于追踪每个 APP_ID 在时间窗口内的请求数量。当方法被调用时,它首先计算自上次更新以来经过的时间,然后根据时间差确定需要移动的桶数量。如果时间差超过了所有桶的总时间跨度(60秒),则所有桶都会被重置为零;否则,方法会保留未过期的桶数据,并在末尾添加相应数量的新空桶,实现桶的”滑动”。这种设计确保了只有最近时间窗口(60秒)内的请求才会被计入限流统计,而过期的请求数据会自动被丢弃。特别值得注意的是,该方法在更新时间戳时采用了精确的时间对齐技术,通过减去时间差的余数,确保桶边界始终对齐到固定的时间点(每10秒),从而避免了因多次更新导致的时间漂移,保证了长时间运行后分桶计数器的准确性。

1
2
3
4
5
	# 分桶计数器 (每个桶记录10秒内的请求数)
	self.bucket_size = 6  # 6个桶,每个桶10秒,共60秒
	self.bucket_duration = 10  # 每个桶的时间跨度(秒)
	self.app_id_buckets = defaultdict(lambda: [0] * self.bucket_size)
	self.app_id_bucket_timestamps = defaultdict(lambda: time.time())
预期性能提升
  • CPU使用率降低: 90%+ (消除轮询)

  • 内存使用减少: 95%+ (固定存储 vs 时间戳队列)

  • 等待时间优化: 响应更及时

  • 计算效率提升: O(n) → O(1)

    性能比较结果:

方法 总请求数 成功率 每秒请求数
Queue 200 100.00% 6.67/s
Bucket 449 100.00% 14.97/s

分桶计数器方案相比队列计数器方案的性能提升: 124.50%

改进后的 AppIDManager 通过引入分桶计数器和条件变量等待机制,在性能上实现了显著提升。分桶计数器方案相比传统队列计数器方法,将时间窗口(60秒)划分为多个小桶(6个10秒桶),使得限流检查时只需计算桶内总和,而不必遍历和清理过期请求记录,这大幅降低了时间复杂度(从 O(n) 降至 O(1))和内存占用;同时,条件变量机制替代了原有的忙等待模式,使得线程在无可用 APP_ID 时能够释放 CPU 资源进入休眠状态,直到被其他线程释放资源时唤醒,避免了 CPU 资源浪费。此外,改进版本还引入了负载均衡策略(选择使用次数最少的 APP_ID)和超时控制机制,进一步优化了资源分配效率和系统稳定性。从 main 函数的性能测试结果可以看出,在相同条件下,分桶计数器方案通常能够处理更多的请求,并实现更高的每秒请求处理量,特别是在高并发场景下,性能优势更为明显。

多进程/多线程

模块目的

该代码的主要目的是实现一个高效的并行处理系统,用于大规模代码检查和评审任务。

一期方案(多线程)

当前代码实际上使用的是多线程而非多进程方案,主要通过 concurrent.futures.ThreadPoolExecutor 实现并行处理。该方案的核心组件包括:

  1. AppIDManager 类:管理 API 密钥池,负责分配和释放 APP_ID,并实现了请求限流机制,确保每个 APP_ID 的使用不超过每分钟限定次数(默认 3 次/分钟)。
  2. 动态 APP_ID 分配:process_with_dynamic_app_id_v2 函数会从 AppIDManager 获取可用的 APP_ID,如果没有可用的则等待,确保资源合理利用。
  3. 并行任务处理:run_comments_and_verify_parallel_v2 函数创建线程池,并行执行所有任务。每个任务包括代码检查和评审两个步骤,分别调用不同的 LLM 模型。
  4. 连接池优化:底层调用 inferenceOpenAI 函数时使用了连接池技术(来自 inferenceOpenAI.py),避免频繁创建和销毁 HTTP 连接。
  5. 错误处理与重试机制:对失败的请求实现了重试逻辑,最多重试 3 次,并记录详细的错误信息。
  6. 批量结果收集:所有任务完成后统一保存结果,而不是每完成一个任务就写入文件,减少 I/O 操作。
  7. 状态监控:通过单独的线程定期打印处理进度和资源使用情况。 这种方案适合 I/O 密集型任务,能够有效利用多个 API 密钥并行处理大量请求,同时通过限流机制避免触发 API 提供方的限制。

改进方向

1
2
3
4
混合并行模型:
* 实现进程池 + 线程池的混合模型,每个进程内再创建线程池
* 进程处理计算密集型任务,线程处理 I/O 密集型任务
* 可以更好地平衡 CPU 和 I/O 资源利用

二期方案

具体方案

1 多进程和多线程中,多进程处理哪些数据,多线程处理哪些数据。

在多进程+多线程架构中,数据分工遵循”进程负责批次,线程负责细节”的原则。每个进程(ProcessWorker)负责处理一批独立的数据项,而进程内的多个线程则处理该批次内的具体任务,主要是API调用这类I/O密集型操作。

从代码实现看,主进程首先将所有任务放入共享任务队列(task_queue),每个任务包含目标数据和要使用的模型信息,然后多个工作进程并行从队列获取任务。在每个进程内部,线程池中的线程通过thread_worker函数不断从任务队列获取任务并调用process_task处理,处理完成后将结果放入结果队列(result_queue)。这种设计使得计算密集型任务(如数据预处理)可以分散到不同CPU核心,而I/O等待时间则通过多线程有效覆盖,充分利用系统资源。

2 如何确立最佳的进程数量

最佳进程数量的确定需要平衡系统资源和任务特性。

代码中采用了一种自适应策略:max_workers = min(len(valid_app_id) * 2, len(check_model_list) * 10),即进程数量与可用APP_ID数量和模型数量相关。

理想情况下,进程数应接近但不超过CPU核心数(对于计算密集型任务)或略多于核心数(对于I/O密集型任务)。在实际业务中,由于API调用限流(每分钟3次/APP_ID)是主要瓶颈,进程数还需考虑APP_ID资源的有效利用,避免过多进程竞争有限的API资源导致频繁等待。GlobalAppIDManager通过共享内存实现进程间协调,确保APP_ID资源被合理分配,这也是确定最佳进程数的关键因素。实践中可通过性能测试找到在特定硬件和API限制下的最佳进程数,通常是一个略大于CPU核心数的值。

3 如何收集结果,每个任务对应多个模型的处理结果需要汇总到同一个任务之中。

结果收集采用”分散处理,集中汇总”的策略,确保每个任务的多模型结果能正确关联。

代码中使用了多层结构进行结果管理:首先,每个线程处理完任务后将结果(input_id, model_name, result)放入共享结果队列;主进程通过while循环不断从结果队列获取完成的任务结果,并使用嵌套字典data_model_map按input_id和model_name组织结果;当检测到某个input_id的所有模型处理都完成时,将该数据项的所有模型结果合并到

target_data_copy的output_pred字段中,并添加到all_completed_results字典;最后,所有任务完成后,将收集的结果与已有数据合并并一次性写入文件,避免频繁I/O操作。这种设计既保证了结果的正确关联,又通过批量处理提高了效率,同时monitor_thread线程提供实时进度监控,增强了系统的可观察性。

4 这里的进程工作器和线程工作器是如何运作的

在多进程+多线程架构中,进程工作器(ProcessWorker)和线程工作器(thread_worker)构成了一个高效的两级并行处理系统,它们协同工作以最大化系统资源利用率。

使用p.join()使得在子进程执行的过程中主进程阻塞,只有子进程全部执行完主进程才会继续执行接下来的部分。 ProcessWorker 类是整个多进程架构的核心组件,每个 ProcessWorker 实例代表一个独立的进程,负责处理分配给它的一批任务。当 ProcessWorker 初始化时,它接收关键资源引用,包括任务队列(task_queue)、结果队列(result_queue)和全局 APP_ID 管理器(global_app_id_manager)。这些资源通过多进程共享机制(如 mp.Queue 和 mp.Manager)在不同进程间安全共享。每个进程还维护自己的连接池(connection_pool),这是一个进程私有资源,用于存储和复用 HTTP 连接,避免频繁创建连接的开销。

1
2
3
4
5
6
7
8
9
# 启动进程
processes = []
for i in range(num_processes):
    p = mp.Process(target=ProcessWorker(
        i, task_queue, result_queue, global_app_id_manager,
        check_model_list, review_model, mode, threads_per_process
    ).run)
    processes.append(p)
    p.start()

当 ProcessWorker.run() 方法被调用时,进程开始执行并创建指定数量(max_threads)的工作线程。每个线程执行相同的 thread_worker 函数,但它们在独立的线程上下文中运行,共享进程的内存空间。这种设计使得线程可以共享进程内的资源(如连接池),同时避免了进程间通信的开销。

thread_worker 函数是实际执行任务的地方,它实现了一个简单但高效的工作循环:不断从任务队列获取任务,处理任务,然后将结果放入结果队列。当遇到 None 任务(结束信号)或队列为空时,线程会退出循环并结束。这种设计确保了线程能够动态获取任务,而不是预先分配固定数量的任务,从而实现了更好的负载均衡。

process_task 方法是实际处理单个任务的核心逻辑。它首先从全局 APP_ID 管理器获取一个可用的 APP_ID,如果没有可用的 APP_ID(由于限流),则会等待一段时间后重试。获取到 APP_ID 后,它会从进程内连接池获取或创建一个连接,然后使用这个连接执行 API 调用。处理完成后,无论成功还是失败,都会释放 APP_ID 回池中,并返回处理结果。

这种两级并行架构的关键优势在于:

  1. 进程级并行突破了 Python GIL 的限制,允许真正的并行计算
  2. 进程内的多线程共享连接池,提高了连接复用率
  3. 全局 APP_ID 管理器确保了 API 密钥的合理分配和限流
  4. 任务队列和结果队列实现了任务的动态分配和结果的集中收集 在实际运行中,当主程序启动多个 ProcessWorker 进程后,每个进程会独立运行并创建自己的线程池。这些进程和线程会并行处理任务队列中的任务,形成一个高效的处理网络。主进程则负责监控进度、收集结果并最终合并所有处理结果。整个系统通过队列和共享内存实现了高效的协作,同时保持了良好的隔离性,确保了系统的稳定性和可扩展性。

附加 5:是否需要添加动态调整appid的调用或者连接池的大小

在当前的代码实现中,动态调整APP_ID调用频率或连接池大小并非必要,主要基于以下几个原因:

首先,当前系统的主要瓶颈是API调用的限流策略(每分钟3次/APP_ID),而非连接池容量或APP_ID分配策略。AppIDManager类已经实现了基于时间窗口的限流机制,确保每个APP_ID的使用不超过限定频率。这种静态限流策略虽然简单,但在当前场景下已经足够有效,因为API提供方的限流规则通常是固定的,不会根据系统负载动态变化。

其次,现有的APP_ID分配策略已经包含了负载均衡机制,通过选择使用次数最少的APP_IDrunCommentParallel.py来平衡各个密钥的使用。这种策略在长期运行中能够自然地将请求分散到所有可用的APP_ID上,避免了某些密钥被过度使用而其他密钥闲置的情况。

第三,连接池在inferenceOpenAI.py中已经实现了基本的资源管理功能,包括连接复用和自动清理。当前的连接池大小(默认50)对于大多数场景已经足够,因为真正的限制因素是API调用频率而非连接数量。增加动态调整机制会增加系统复杂性,而带来的性能提升可能微乎其微。

最后,从实际运行数据来看,系统的瓶颈主要在于API响应时间和限流策略,而非连接管理或APP_ID分配算法。在run_comments_and_verify_parallel_v2函数中,工作线程数量已经根据APP_ID数量和模型数量动态计算max_workers = min(len(valid_app_id) * 2, len(check_model_list) * 10)],这已经提供了足够的灵活性。

综上所述,在当前API调用为主要瓶颈的场景下,增加动态调整机制会增加系统复杂性和维护成本,而性能提升有限。只有当系统规模显著扩大、API提供方的限流策略变得更加复杂、或者出现明显的资源利用不均衡现象时,才需要考虑引入更复杂的动态调整机制。

性能分析

四个进程 每个进程有五个线程

指标 多进程+多线程 原始多线程 提升比例
总任务数 150 150
总耗时(秒) 42.5 98.7 2.32x
平均任务时间(秒) 0.28 0.66 2.32x
理论最大加速比 20
实际加速比 2.32
并行效率 11.6%

虽然多进程+多线程方案理论上可以获得很高的加速比,但在实际应用中,由于API调用限流和网络I/O瓶颈,实际加速比可能只有2-3倍。不过,这种架构仍然比原始多线程方案更有效地利用系统资源,特别是在处理大规模数据时。

该代码实现了一个多进程多线程的任务处理框架,通过性能测试可以看出,多进程+多线程方案相比串行处理获得了约2.32倍的性能提升,但实际并行效率仅为11.6%,远低于理论最大加速比20倍(4个进程×5个线程)。这种效率损失主要源于几个方面:

  • 首先是进程间通信和同步开销,特别是在APP_ID管理器中使用了Manager共享对象和锁机制;其次是任务调度和资源竞争,多个线程争用APP_ID资源时可能产生等待;
  • 此外,连接池的实现虽然避免了重复创建连接的开销,但在高并发场景下可能成为瓶颈;
  • 最后,模拟的API调用(time.sleep(1))是IO密集型操作,理论上应该从并发中获益更多,但由于资源管理的额外开销抵消了部分并行优势。若要进一步优化性能,可考虑减少锁竞争、优化APP_ID分配策略或调整进程与线程的比例。

附录

多进程和多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
import multiprocessing as mp
import concurrent.futures
import time
import os
import json
from collections import defaultdict, deque
import threading
import queue


class GlobalAppIDManager:
    """全局APP_ID管理器,使用共享内存实现进程间通信"""

    def __init__(self, valid_app_ids, rate_limit_per_minute=3):
        # 使用Manager创建进程间共享对象
        self.manager = mp.Manager()
        self.app_id_pool = self.manager.list(valid_app_ids)
        self.app_id_lock = self.manager.Lock()
        self.app_id_usage = self.manager.dict()  # 记录使用情况
        self.app_id_request_times = self.manager.dict()  # 记录请求时间

        # 初始化请求时间记录
        for app_id in valid_app_ids:
            self.app_id_request_times[app_id] = self.manager.list()
            self.app_id_usage[app_id] = 0

        self.rate_limit_per_minute = rate_limit_per_minute

        # 统计信息
        self.total_requests = self.manager.Value('i', 0)
        self.total_success = self.manager.Value('i', 0)
        self.total_failed = self.manager.Value('i', 0)

    def _clean_old_requests(self, app_id):
        """清理一分钟前的请求记录"""
        current_time = time.time()
        with self.app_id_lock:
            request_times = list(self.app_id_request_times[app_id])
            self.app_id_request_times[app_id] = self.manager.list(
                [t for t in request_times if current_time - t <= 60]
            )

    def _can_use_app_id(self, app_id):
        """检查APP_ID是否可以使用(未超过限流)"""
        self._clean_old_requests(app_id)
        return len(self.app_id_request_times[app_id]) < self.rate_limit_per_minute

    def get_available_app_id(self):
        """获取一个可用的APP_ID,考虑限流"""
        with self.app_id_lock:
            # 寻找可以立即使用的APP_ID
            available_app_ids = []
            for app_id in self.app_id_pool:
                if self._can_use_app_id(app_id):
                    available_app_ids.append(app_id)

            if available_app_ids:
                # 选择使用次数最少的APP_ID
                selected_app_id = min(available_app_ids,
                                      key=lambda x: self.app_id_usage[x])
                self.app_id_pool.remove(selected_app_id)

                # 记录使用时间
                self.app_id_request_times[selected_app_id].append(time.time())
                self.app_id_usage[selected_app_id] += 1
                self.total_requests.value += 1

                return selected_app_id
            else:
                return None

    def release_app_id(self, app_id, success=True):
        """释放APP_ID回池中"""
        with self.app_id_lock:
            self.app_id_pool.append(app_id)

            if success:
                self.total_success.value += 1
            else:
                self.total_failed.value += 1

    def get_stats(self):
        """获取统计信息"""
        with self.app_id_lock:
            stats = {
                "total_requests": self.total_requests.value,
                "total_success": self.total_success.value,
                "total_failed": self.total_failed.value,
                "app_id_usage": {k: v for k, v in self.app_id_usage.items()},
                "available_app_ids": len(self.app_id_pool)
            }
            return stats


class ProcessWorker:
    """进程工作器,每个进程处理一批数据"""

    def __init__(self, process_id, task_queue, result_queue, global_app_id_manager,
                 check_model_list, review_model, mode, max_threads=5):
        self.process_id = process_id
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.global_app_id_manager = global_app_id_manager
        self.check_model_list = check_model_list
        self.review_model = review_model
        self.mode = mode
        self.max_threads = max_threads

        # 进程内连接池
        self.connection_pool = {}  # 简化版连接池
        self.pool_lock = threading.Lock()

    def get_connection(self, app_id, request_url):
        """从进程内连接池获取连接"""
        key = f"{app_id}:{request_url}"
        with self.pool_lock:
            if key not in self.connection_pool:
                # 创建新连接(简化版)
                self.connection_pool[key] = {
                    "client": f"Client-{key}",  # 实际应该创建真正的客户端
                    "created_time": time.time()
                }
            return self.connection_pool[key]

    def process_task(self, task):
        """处理单个任务"""
        target_data, check_model = task
        input_id = target_data["input_id"]

        # 获取APP_ID
        app_id = None
        while app_id is None:
            app_id = self.global_app_id_manager.get_available_app_id()
            if app_id is None:
                time.sleep(0.5)

        try:
            # 获取连接
            connection = self.get_connection(app_id, "REQUEST_URL")

            # 模拟处理过程
            print(f"Process {self.process_id}, Thread {threading.get_ident()}: "
                  f"处理 input_id={input_id}, model={check_model}, app_id={app_id[-6:]}")

            # 模拟API调用
            time.sleep(1)  # 模拟处理时间

            # 模拟结果
            result = {check_model: [{"comment": f"Comment for {input_id} using {check_model}"}]}

            # 释放APP_ID
            self.global_app_id_manager.release_app_id(app_id, success=True)

            return input_id, check_model, result

        except Exception as e:
            print(f"处理任务出错: {str(e)}")
            self.global_app_id_manager.release_app_id(app_id, success=False)
            return input_id, check_model, None

    def thread_worker(self):
        """线程工作函数"""
        while True:
            try:
                task = self.task_queue.get(timeout=1)
                if task is None:  # 结束信号
                    break

                result = self.process_task(task)
                self.result_queue.put(result)

            except queue.Empty:
                break
            except Exception as e:
                print(f"线程异常: {str(e)}")

    def run(self):
        """运行进程工作器"""
        print(f"进程 {self.process_id} 启动,最大线程数: {self.max_threads}")

        # 创建线程池
        threads = []
        for _ in range(self.max_threads):
            t = threading.Thread(target=self.thread_worker)
            threads.append(t)
            t.start()

        # 等待所有线程完成
        for t in threads:
            t.join()

        print(f"进程 {self.process_id} 完成")


def run_multi_process_multi_thread(origin_data, check_model_list, review_model, mode,
                                   valid_app_ids, num_processes=4, threads_per_process=5):
    """多进程+多线程处理函数"""
    start_time = time.time()

    # 创建全局APP_ID管理器
    global_app_id_manager = GlobalAppIDManager(valid_app_ids, rate_limit_per_minute=3)

    # 创建任务队列和结果队列
    task_queue = mp.Queue()
    result_queue = mp.Queue()

    # 填充任务队列
    task_count = 0
    for input_id, target_data in origin_data.items():
        for model_name in check_model_list:
            task_queue.put((target_data, model_name))
            task_count += 1

    # 添加结束信号
    for _ in range(num_processes * threads_per_process):
        task_queue.put(None)

    print(f"创建了 {task_count} 个任务,使用 {num_processes} 个进程,每个进程 {threads_per_process} 个线程")

    # 启动进程
    processes = []
    for i in range(num_processes):
        p = mp.Process(target=ProcessWorker(
            i, task_queue, result_queue, global_app_id_manager,
            check_model_list, review_model, mode, threads_per_process
        ).run)
        processes.append(p)
        p.start()

    # 收集结果
    results = {}
    completed = 0

    # 监控线程
    def monitor_progress():
        while completed < task_count:
            stats = global_app_id_manager.get_stats()
            print(f"\n进度: {completed}/{task_count} | "
                  f"成功: {stats['total_success']} | "
                  f"失败: {stats['total_failed']} | "
                  f"可用APP_ID: {stats['available_app_ids']}")
            time.sleep(10)

    monitor_thread = threading.Thread(target=monitor_progress)
    monitor_thread.daemon = True
    monitor_thread.start()

    # 收集结果
    while completed < task_count:
        try:
            input_id, model_name, result = result_queue.get(timeout=1)
            if result is not None:
                if input_id not in results:
                    results[input_id] = {}
                results[input_id][model_name] = result
            completed += 1

            if completed % 10 == 0:
                print(f"已完成: {completed}/{task_count}")

        except queue.Empty:
            if all(not p.is_alive() for p in processes):
                break

    # 等待所有进程完成
    for p in processes:
        p.join()

    # 处理结果
    final_results = {}
    for input_id, model_results in results.items():
        if input_id in origin_data:
            target_data = origin_data[input_id].copy()
            target_data["output_pred"] = {
                k: v for model, result in model_results.items()
                for k, v in result.items()
            }
            final_results[input_id] = target_data

    elapsed_time = time.time() - start_time

    # 打印统计信息
    stats = global_app_id_manager.get_stats()
    print(f"\n处理完成!")
    print(f"总任务数: {task_count}")
    print(f"成功: {stats['total_success']}")
    print(f"失败: {stats['total_failed']}")
    print(f"总耗时: {elapsed_time:.2f} 秒")
    print(f"平均任务处理时间: {elapsed_time / task_count:.2f} 秒")
    print(f"完整处理的数据项: {len(final_results)}")

    return final_results, elapsed_time


def run_serial(origin_data, check_model_list, review_model, mode, valid_app_ids):
    """串行处理函数"""
    start_time = time.time()

    # 简单的APP_ID轮询
    app_id_index = 0

    results = {}
    task_count = 0

    for input_id, target_data in origin_data.items():
        results[input_id] = {"output_pred": {}}

        for model_name in check_model_list:
            task_count += 1

            # 轮询获取APP_ID
            app_id = valid_app_ids[app_id_index]
            app_id_index = (app_id_index + 1) % len(valid_app_ids)

            try:
                # 模拟处理
                print(f"串行处理: input_id={input_id}, model={model_name}, app_id={app_id[-6:]}")
                time.sleep(1)  # 模拟处理时间

                # 模拟结果
                result = {model_name: [{"comment": f"Comment for {input_id} using {model_name}"}]}
                results[input_id]["output_pred"].update(result)

            except Exception as e:
                print(f"处理任务出错: {str(e)}")

    elapsed_time = time.time() - start_time

    print(f"\n串行处理完成!")
    print(f"总任务数: {task_count}")
    print(f"总耗时: {elapsed_time:.2f} 秒")
    print(f"平均任务处理时间: {elapsed_time / task_count:.2f} 秒")

    return results, elapsed_time


def performance_comparison():
    """性能对比测试"""
    # 模拟数据
    origin_data = {
        f"id_{i}": {"input_id": f"id_{i}", "input_code": f"code_{i}"}
        for i in range(50)
    }

    check_model_list = ["model_1", "model_2", "model_3"]
    review_model = "review_model"
    mode = "check_and_review"
    valid_app_ids = [f"app_id_{i}" for i in range(10)]

    # 运行多进程+多线程版本
    print("\n" + "=" * 50)
    print("运行多进程+多线程版本")
    print("=" * 50)
    mp_results, mp_time = run_multi_process_multi_thread(
        origin_data, check_model_list, review_model, mode, valid_app_ids,
        num_processes=4, threads_per_process=5
    )

    # 运行串行版本
    print("\n" + "=" * 50)
    print("运行串行版本")
    print("=" * 50)
    serial_results, serial_time = run_serial(
        origin_data, check_model_list, review_model, mode, valid_app_ids
    )

    # 性能对比
    task_count = len(origin_data) * len(check_model_list)
    speedup = serial_time / mp_time

    print("\n" + "=" * 50)
    print("性能对比")
    print("=" * 50)
    print(f"{'指标':<25} {'多进程+多线程':<20} {'串行':<20} {'提升比例':<15}")
    print("-" * 80)
    print(f"{'总任务数':<25} {task_count:<20} {task_count:<20} {'-':<15}")
    print(f"{'总耗时(秒)':<25} {mp_time:<20.2f} {serial_time:<20.2f} {speedup:>14.2f}x")
    print(f"{'平均任务时间(秒)':<25} {mp_time / task_count:<20.2f} {serial_time / task_count:<20.2f} {speedup:>14.2f}x")
    print(f"{'理论最大加速比':<25} {4 * 5:<20} {'-':<20} {'-':<15}")
    print(f"{'实际加速比':<25} {speedup:<20.2f} {'-':<20} {'-':<15}")
    print(f"{'并行效率':<25} {speedup / (4 * 5) * 100:<19.2f}% {'-':<20} {'-':<15}")


if __name__ == "__main__":
    performance_comparison()

APPIDManager完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
class AppIDManager:
    """管理APP_ID的分配、释放和限流"""
    
def __init__(self, valid_app_ids, review_model, rate_limit_per_minute=3, use_bucket=False):
    self.app_id_pool = valid_app_ids.copy()
    self.lock = threading.Lock()
    self.model_app_id_map = {}
    self.waiting_models = set()
    self.last_status_print_time = time.time()
    self.current_processing_count = 0
    self.review_model = review_model
    self.use_bucket = use_bucket  # 是否使用分桶计数器

    # 限流相关
    self.rate_limit_per_minute = rate_limit_per_minute
    self.app_id_request_times = defaultdict(deque)  # 记录每个APP_ID的请求时间
    self.app_id_usage_count = defaultdict(int)  # 记录每个APP_ID的使用次数

    # 分桶计数器 (每个桶记录10秒内的请求数)
    self.bucket_size = 6  # 6个桶,每个桶10秒,共60秒
    self.bucket_duration = 10  # 每个桶的时间跨度(秒)
    self.app_id_buckets = defaultdict(lambda: [0] * self.bucket_size)
    self.app_id_bucket_timestamps = defaultdict(lambda: time.time())

    # 条件变量,用于等待可用APP_ID
    self.condition = threading.Condition(self.lock)

    # 统计信息
    self.total_requests = 0
    self.total_success = 0
    self.total_failed = 0

def _clean_old_requests(self, app_id):
    """清理一分钟前的请求记录"""
    current_time = time.time()
    request_times = self.app_id_request_times[app_id]
    while request_times and current_time - request_times[0] > 60:
        request_times.popleft()

def _update_buckets(self, app_id):
    """更新分桶计数器"""
    current_time = time.time()
    last_update = self.app_id_bucket_timestamps[app_id]
    time_diff = current_time - last_update

    # 计算需要移动的桶数
    shift_buckets = int(time_diff / self.bucket_duration)

    if shift_buckets > 0:
        buckets = self.app_id_buckets[app_id]
        # 移动桶
        if shift_buckets >= self.bucket_size:
            # 如果时间差超过了所有桶的时间,直接清零
            buckets = [0] * self.bucket_size
        else:
            # 否则移动相应的桶数
            buckets = buckets[shift_buckets:] + [0] * shift_buckets

        self.app_id_buckets[app_id] = buckets
        self.app_id_bucket_timestamps[app_id] = current_time - (time_diff % self.bucket_duration)

def _can_use_app_id(self, app_id):
    """检查APP_ID是否可以使用(未超过限流)"""
    if self.use_bucket:
        self._update_buckets(app_id)
        # 计算所有桶中的请求总数
        total_requests = sum(self.app_id_buckets[app_id])
        return total_requests < self.rate_limit_per_minute
    else:
        self._clean_old_requests(app_id)
        return len(self.app_id_request_times[app_id]) < self.rate_limit_per_minute

def get_available_app_id(self, model_name, wait=False, timeout=None):
    """获取一个可用的APP_ID,考虑限流"""
    with self.lock:
        # 尝试获取可用APP_ID
        app_id = self._try_get_app_id(model_name)

        # 如果没有可用APP_ID且需要等待
        if app_id is None and wait:
            if model_name not in self.waiting_models:
                self.waiting_models.add(model_name)

            start_time = time.time()
            while True:
                # 使用条件变量等待
                if timeout is not None:
                    remaining = timeout - (time.time() - start_time)
                    if remaining <= 0:
                        break
                    self.condition.wait(remaining)
                else:
                    self.condition.wait()

                # 再次尝试获取
                app_id = self._try_get_app_id(model_name)
                if app_id is not None:
                    break

                # 检查超时
                if timeout is not None and (time.time() - start_time) >= timeout:
                    break

        return app_id

def _try_get_app_id(self, model_name):
    """尝试获取一个可用的APP_ID"""
    # 优先寻找可以立即使用的APP_ID
    available_app_ids = []
    for app_id in self.app_id_pool:
        if self._can_use_app_id(app_id):
            available_app_ids.append(app_id)

    if available_app_ids:
        # 选择使用次数最少的APP_ID来平衡负载
        selected_app_id = min(available_app_ids, key=lambda x: self.app_id_usage_count[x])
        self.app_id_pool.remove(selected_app_id)
        self.model_app_id_map[f"{model_name}_{id(threading.current_thread())}"] = selected_app_id

        # 记录使用时间和更新计数器
        if self.use_bucket:
            self._update_buckets(selected_app_id)
            current_bucket = 0  # 当前桶是索引0
            self.app_id_buckets[selected_app_id][current_bucket] += 1
        else:
            self.app_id_request_times[selected_app_id].append(time.time())

        self.app_id_usage_count[selected_app_id] += 1
        self.total_requests += 1

        if model_name in self.waiting_models:
            self.waiting_models.remove(model_name)
        return selected_app_id
    else:
        # 所有APP_ID都达到限流
        if model_name not in self.waiting_models:
            self.waiting_models.add(model_name)
        return None

def release_app_id(self, app_id, model_name, success=True):
    """释放APP_ID回池中"""
    with self.lock:
        thread_key = f"{model_name}_{id(threading.current_thread())}"
        if thread_key in self.model_app_id_map:
            self.app_id_pool.append(app_id)
            del self.model_app_id_map[thread_key]

            if success:
                self.total_success += 1
            else:
                self.total_failed += 1

            # 更新处理计数
            self.current_processing_count = len(self.model_app_id_map)

            # 通知等待的线程
            self.condition.notify_all()

def print_status(self, force=False):
    """打印当前状态"""
    current_time = time.time()
    if force or (current_time - self.last_status_print_time >= 30):  # 每30秒打印一次
        with self.lock:
            print('\033[2K', end='\r')

            status_lines = []
            status_lines.append(f"┌{'─' * 90}┐")
            status_lines.append(
                f"│ Current Processing: {self.current_processing_count:<20} Available APP_IDs: {len(self.app_id_pool):<20}│")

            # APP_ID使用统计
            app_id_stats = []
            for app_id in self.app_id_usage_count:
                short_id = app_id[-6:] if len(app_id) > 6 else app_id
                usage = self.app_id_usage_count[app_id]
                if self.use_bucket:
                    self._update_buckets(app_id)
                    current_requests = sum(self.app_id_buckets[app_id])
                else:
                    current_requests = len(self.app_id_request_times[app_id])
                app_id_stats.append(f"{short_id}({usage}/{current_requests})")

            if app_id_stats:
                stats_str = ', '.join(app_id_stats[:5])  # 只显示前5个
                if len(app_id_stats) > 5:
                    stats_str += f"... (+{len(app_id_stats) - 5} more)"
                status_lines.append(f"│ APP_ID Usage(total/current): {stats_str:<50}│")

            status_lines.append(
                f"│ Total: {self.total_requests} | Success: {self.total_success} | Failed: {self.total_failed}│")
            status_lines.append(
                f"│ Rate Limit: {self.rate_limit_per_minute}/min per APP_ID | Method: {'Bucket' if self.use_bucket else 'Queue'} │")
            status_lines.append(f"└{'─' * 90}┘")

            print('\n'.join(status_lines))
            self.last_status_print_time = current_time

inferenceAI完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
"""
OpenAI 调用 LLM 接口 - 连接池优化版 vs 原始版本性能对比
"""

from openai import OpenAI
import sys
import tiktoken
import httpx
import threading
import time
import logging
import atexit

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("inferenceAI")

def get_token_count(text):
    """计算文本的token数量"""
    try:
        encoding = tiktoken.encoding_for_model("gpt-4")
        tokens = encoding.encode(text)
        return len(tokens)
    except Exception as e:
        logger.warning(f"计算token数量失败: {e}")
        # 估算token数量(每个字符约0.25个token)
        return len(text) // 4


class OpenAIClientPool:
    """OpenAI 客户端连接池,用于管理和复用 HTTP 连接"""

    def __init__(self, max_clients=50):
        """
        初始化连接池

        Args:
            max_clients: 连接池中最大客户端数量
        """
        self.clients = {}  # 存储客户端的字典,键为 "app_id:request_url"
        self.lock = threading.RLock()  # 线程安全锁
        self.max_clients = max_clients  # 最大客户端数量
        self.stats = {  # 统计信息
            "created_clients": 0,
            "reused_clients": 0,
            "total_requests": 0,
            "total_time": 0.0
        }
        logger.info(f"初始化 OpenAI 客户端连接池,最大客户端数: {max_clients}")

    def get_client(self, app_id, request_url, timeout=600.0):
        """
        获取或创建 OpenAI 客户端

        Args:
            app_id: OpenAI API 密钥
            request_url: API 请求地址
            timeout: 请求超时时间(秒)

        Returns:
            OpenAI 客户端实例
        """
        key = f"{app_id}:{request_url}"  # 客户端的唯一标识

        with self.lock:
            # 检查是否已有客户端
            if key in self.clients:
                self.stats["reused_clients"] += 1
                self.stats["total_requests"] += 1
                logger.debug(f"复用现有客户端: {app_id[-6:]}")
                return self.clients[key]

            # 如果连接池已满,关闭最早创建的客户端
            if len(self.clients) >= self.max_clients:
                oldest_key = next(iter(self.clients))
                logger.info(f"连接池已满,关闭最早的客户端: {oldest_key.split(':')[0][-6:]}")
                self._close_client(oldest_key)

            # 创建新客户端
            http_client = httpx.Client(
                base_url=request_url,
                timeout=timeout,
                limits=httpx.Limits(max_connections=20, max_keepalive_connections=10)
            )

            client = OpenAI(
                api_key=app_id,
                base_url=request_url,
                http_client=http_client
            )

            # 存储客户端
            self.clients[key] = client
            self.stats["created_clients"] += 1
            self.stats["total_requests"] += 1
            logger.info(f"创建新客户端: {app_id[-6:]},当前连接池大小: {len(self.clients)}")

            return client

    def _close_client(self, key):
        """
        关闭并移除指定的客户端

        Args:
            key: 客户端的唯一标识
        """
        if key in self.clients:
            try:
                if hasattr(self.clients[key], 'http_client'):
                    self.clients[key].http_client.close()
                    logger.debug(f"关闭客户端: {key.split(':')[0][-6:]}")
            except Exception as e:
                logger.error(f"关闭客户端 {key.split(':')[0][-6:]} 时出错: {e}")
            del self.clients[key]

    def close_all(self):
        """关闭所有客户端连接并清理资源"""
        with self.lock:
            logger.info(f"关闭所有客户端连接,共 {len(self.clients)} 个")
            for key in list(self.clients.keys()):
                self._close_client(key)
            logger.info(f"连接池统计: 创建 {self.stats['created_clients']} 个客户端,"
                        f"复用 {self.stats['reused_clients']} 次,"
                        f"总请求 {self.stats['total_requests']} 次")

    def get_stats(self):
        """获取连接池统计信息"""
        with self.lock:
            stats = self.stats.copy()
            if stats['total_requests'] > 0:
                stats['avg_time'] = stats['total_time'] / stats['total_requests']
            else:
                stats['avg_time'] = 0
            return stats

    def update_stats(self, elapsed_time):
        """更新统计信息"""
        with self.lock:
            self.stats['total_time'] += elapsed_time


# 创建全局客户端池
client_pool = OpenAIClientPool(max_clients=50)

# 性能统计类
class PerformanceStats:
    def __init__(self, name):
        self.name = name
        self.requests = 0
        self.total_time = 0.0
        self.min_time = float('inf')
        self.max_time = 0.0
        self.lock = threading.Lock()

    def add_request(self, elapsed_time):
        with self.lock:
            self.requests += 1
            self.total_time += elapsed_time
            self.min_time = min(self.min_time, elapsed_time)
            self.max_time = max(self.max_time, elapsed_time)

    def get_stats(self):
        with self.lock:
            if self.requests == 0:
                return {
                    'name': self.name,
                    'requests': 0,
                    'avg_time': 0,
                    'min_time': 0,
                    'max_time': 0,
                    'total_time': 0
                }
            return {
                'name': self.name,
                'requests': self.requests,
                'avg_time': self.total_time / self.requests,
                'min_time': self.min_time,
                'max_time': self.max_time,
                'total_time': self.total_time
            }

# 创建性能统计实例
pool_stats = PerformanceStats("连接池版本")
original_stats = PerformanceStats("原始版本")


def inferenceOpenAI(APP_ID, REQUEST_URL, MODEL_NAME, user_prompt, system_prompt="", MAX_TOKENS=30*1024):
    """
    普通方式调用 AI 服务(使用连接池优化)

    Args:
        APP_ID (str): AI 服务的应用 ID
        REQUEST_URL (str): API 请求地址
        MODEL_NAME (str): 使用的模型名称
        user_prompt (str): 用户输入的提示词
        system_prompt (str): 系统提示词
        MAX_TOKENS (int, optional): 最大生成 token 数,默认 30*1024

    Returns:
        dict: 包含 AI 响应结果的字典
    """
    start_time = time.time()
    token_count = get_token_count(system_prompt + user_prompt)

    print(f"[连接池版本] request, model name: {MODEL_NAME} , MAX_TOKENS: {MAX_TOKENS} ,token_used: {token_count}")
    logger.info(f"[连接池版本] 请求: 模型={MODEL_NAME}, tokens={token_count}, max_tokens={MAX_TOKENS}")

    try:
        # 从连接池获取客户端,而不是每次创建新的
        client = client_pool.get_client(APP_ID, REQUEST_URL)

        # 生成唯一的跟踪ID
        trace_id = f"pool_trace_{int(time.time())}_{threading.get_ident()}"

        # 发送请求
        response = client.chat.completions.create(
            model=MODEL_NAME,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            stream=False,
            max_tokens=MAX_TOKENS,
            extra_headers={
                "M-TraceId": trace_id
            }
        )

        # 处理响应
        result = packageResult(response)

        # 记录请求完成信息
        elapsed = time.time() - start_time
        pool_stats.add_request(elapsed)
        client_pool.update_stats(elapsed)
        logger.info(f"[连接池版本] 请求完成: 模型={MODEL_NAME}, 耗时={elapsed:.2f}秒, tokens={result['total_tokens']}")

        return result

    except Exception as e:
        # 记录错误信息
        elapsed = time.time() - start_time
        pool_stats.add_request(elapsed)
        logger.error(f"[连接池版本] 请求失败: 模型={MODEL_NAME}, 耗时={elapsed:.2f}秒, 错误={str(e)}")
        raise


def inferenceOpenAI_Original(APP_ID, REQUEST_URL, MODEL_NAME, user_prompt, system_prompt="", MAX_TOKENS=30*1024):
    """
    原始方式调用 AI 服务(每次创建新连接)

    Args:
        APP_ID (str): AI 服务的应用 ID
        REQUEST_URL (str): API 请求地址
        MODEL_NAME (str): 使用的模型名称
        user_prompt (str): 用户输入的提示词
        system_prompt (str): 系统提示词
        MAX_TOKENS (int, optional): 最大生成 token 数,默认 30*1024

    Returns:
        dict: 包含 AI 响应结果的字典
    """
    start_time = time.time()
    token_count = get_token_count(system_prompt + user_prompt)

    print(f"[原始版本] request, model name: {MODEL_NAME} , MAX_TOKENS: {MAX_TOKENS} ,token_used: {token_count}")
    logger.info(f"[原始版本] 请求: 模型={MODEL_NAME}, tokens={token_count}, max_tokens={MAX_TOKENS}")

    try:
        # 每次创建新的 HTTP 客户端(原始方式)
        http_client = httpx.Client(
            base_url=REQUEST_URL,
            timeout=600.0  # 10分钟超时
        )

        client = OpenAI(
            api_key=APP_ID,
            base_url=REQUEST_URL,
            http_client=http_client
        )

        # 生成唯一的跟踪ID
        trace_id = f"orig_trace_{int(time.time())}_{threading.get_ident()}"

        # 发送请求
        response = client.chat.completions.create(
            model=MODEL_NAME,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            stream=False,
            max_tokens=MAX_TOKENS,
            extra_headers={
                "M-TraceId": trace_id
            }
        )

        # 处理响应
        result = packageResult(response)

        # 记录请求完成信息
        elapsed = time.time() - start_time
        original_stats.add_request(elapsed)
        logger.info(f"[原始版本] 请求完成: 模型={MODEL_NAME}, 耗时={elapsed:.2f}秒, tokens={result['total_tokens']}")

        # 关闭HTTP客户端
        http_client.close()

        return result

    except Exception as e:
        # 记录错误信息
        elapsed = time.time() - start_time
        original_stats.add_request(elapsed)
        logger.error(f"[原始版本] 请求失败: 模型={MODEL_NAME}, 耗时={elapsed:.2f}秒, 错误={str(e)}")
        # 确保在异常情况下也关闭客户端
        try:
            http_client.close()
        except:
            pass
        raise

def packageResult(response):
    """
    解析 API 响应结果,支持非流式响应
    """
    try:
        # 处理普通响应
        content = response.choices[0].message.content
        prompt_tokens = response.usage.prompt_tokens
        completion_tokens = response.usage.completion_tokens
        total_tokens = response.usage.total_tokens
        model = response.model
        request_id = response.id
        created_time = response.created
        result = {
            "content": content,
            "prompt_tokens": prompt_tokens,
            "completion_tokens": completion_tokens,
            "total_tokens": total_tokens,
            "model": model,
            "request_id": request_id,
            "created_time": created_time
        }
        return result
    except Exception as e:
        logger.error(f"解析响应结果时出错: {e}")
        # 返回基本结果
        return {
            "content": response.choices[0].message.content if hasattr(response, 'choices') else "",
            "prompt_tokens": None,
            "completion_tokens": None,
            "total_tokens": None,
            "model": getattr(response, 'model', "unknown"),
            "request_id": getattr(response, 'id', "error"),
            "created_time": getattr(response, 'created', int(time.time()))
        }

def print_performance_comparison():
    """打印性能对比结果"""
    pool_data = pool_stats.get_stats()
    original_data = original_stats.get_stats()

    print("\n" + "="*80)
    print("性能对比结果")
    print("="*80)

    print(f"{'指标':<20} {'连接池版本':<20} {'原始版本':<20} {'性能提升':<20}")
    print("-" * 80)

    if pool_data['requests'] > 0 and original_data['requests'] > 0:
        improvement_avg = ((original_data['avg_time'] - pool_data['avg_time']) / original_data['avg_time']) * 100
        improvement_total = ((original_data['total_time'] - pool_data['total_time']) / original_data['total_time']) * 100

        print(f"{'请求次数':<20} {pool_data['requests']:<20} {original_data['requests']:<20} {'-':<20}")
        print(f"{'平均响应时间(s)':<20} {pool_data['avg_time']:<20.3f} {original_data['avg_time']:<20.3f} {improvement_avg:>18.1f}%")
        print(f"{'最小响应时间(s)':<20} {pool_data['min_time']:<20.3f} {original_data['min_time']:<20.3f} {'-':<20}")
        print(f"{'最大响应时间(s)':<20} {pool_data['max_time']:<20.3f} {original_data['max_time']:<20.3f} {'-':<20}")
        print(f"{'总耗时(s)':<20} {pool_data['total_time']:<20.3f} {original_data['total_time']:<20.3f} {improvement_total:>18.1f}%")
    else:
        print("暂无足够数据进行对比")

    # 显示连接池统计
    pool_client_stats = client_pool.get_stats()
    print("\n连接池详细统计:")
    print(f"  创建的客户端: {pool_client_stats['created_clients']}")
    print(f"  复用的客户端: {pool_client_stats['reused_clients']}")
    print(f"  总请求数: {pool_client_stats['total_requests']}")
    if pool_client_stats['total_requests'] > 0:
        reuse_rate = (pool_client_stats['reused_clients'] / pool_client_stats['total_requests']) * 100
        print(f"  连接复用率: {reuse_rate:.1f}%")


# 确保程序退出时关闭所有连接
def cleanup():
    """程序退出时的清理函数"""
    logger.info("程序退出,清理资源...")
    client_pool.close_all()

# 注册退出处理函数
atexit.register(cleanup)

if __name__ == "__main__":
    import concurrent.futures
    import random

    # 测试配置
    APP_IDS = [
        "21923277120655253527",
        "21921770414276694061",
        "21923297867650007079",
        "21923298145342619704"
    ]  # 多个APP_ID模拟真实场景
    REQUEST_URL = 'https://aigc.sankuai.com/v1/openai/native'
    MODEL_NAME = "gpt-4.1-mini"

    # 测试参数
    TOTAL_REQUESTS = 100  # 总请求数
    MAX_WORKERS = 20  # 最大并发线程数

    # 生成测试提示词
    test_prompts = [
        "Hello, how are you today?",
        "What is Python programming?",
        "Explain machine learning briefly",
        "Tell me a short joke",
        "What is the weather like?",
        "How does AI work?",
        "What is cloud computing?",
        "Explain blockchain technology",
        "What is data science?",
        "How to learn programming?",
        "What is artificial intelligence?",
        "Explain neural networks",
        "What is deep learning?",
        "How does the internet work?",
        "What is software engineering?",
        "Explain database concepts",
        "What is cybersecurity?",
        "How to build a website?",
        "What is mobile development?",
        "Explain API concepts"
    ]


    def worker_task(method, task_id):
        """工作线程任务"""
        app_id = random.choice(APP_IDS)
        prompt = random.choice(test_prompts) + f" (Request #{task_id})"

        try:
            start_time = time.time()
            result = method(app_id, REQUEST_URL, MODEL_NAME, prompt)
            end_time = time.time()

            return {
                'success': True,
                'task_id': task_id,
                'app_id': app_id[-6:],
                'elapsed_time': end_time - start_time,
                'content_length': len(result['content']) if result and 'content' in result else 0,
                'tokens': result.get('total_tokens', 0) if result else 0
            }
        except Exception as e:
            end_time = time.time()
            return {
                'success': False,
                'task_id': task_id,
                'app_id': app_id[-6:],
                'elapsed_time': end_time - start_time,
                'error': str(e),
                'content_length': 0,
                'tokens': 0
            }


    def run_performance_test(method, method_name, total_requests, max_workers):
        """运行性能测试"""
        print(f"\n{'=' * 60}")
        print(f"测试 {method_name}")
        print(f"总请求数: {total_requests}, 并发线程数: {max_workers}")
        print(f"{'=' * 60}")

        results = []
        start_time = time.time()

        # 使用线程池执行并发请求
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            # 提交所有任务
            futures = []
            for i in range(total_requests):
                future = executor.submit(worker_task, method, i + 1)
                futures.append(future)

            # 收集结果,显示进度
            completed = 0
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                results.append(result)
                completed += 1

                # 每完成10个请求显示一次进度
                if completed % 10 == 0 or completed == total_requests:
                    print(f"进度: {completed}/{total_requests} ({completed / total_requests * 100:.1f}%)")

        total_time = time.time() - start_time

        # 分析结果
        successful_results = [r for r in results if r['success']]
        failed_results = [r for r in results if not r['success']]

        if successful_results:
            response_times = [r['elapsed_time'] for r in successful_results]
            avg_response_time = sum(response_times) / len(response_times)
            min_response_time = min(response_times)
            max_response_time = max(response_times)

            total_tokens = sum(r['tokens'] for r in successful_results)
            total_content_length = sum(r['content_length'] for r in successful_results)

            # 按APP_ID统计
            app_id_stats = {}
            for r in successful_results:
                app_id = r['app_id']
                if app_id not in app_id_stats:
                    app_id_stats[app_id] = {'count': 0, 'total_time': 0}
                app_id_stats[app_id]['count'] += 1
                app_id_stats[app_id]['total_time'] += r['elapsed_time']
        else:
            avg_response_time = 0
            min_response_time = 0
            max_response_time = 0
            total_tokens = 0
            total_content_length = 0
            app_id_stats = {}

        # 打印详细统计
        print(f"\n{method_name} 测试结果:")
        print(f"{'=' * 50}")
        print(f"总耗时: {total_time:.2f}秒")
        print(f"成功请求: {len(successful_results)}/{total_requests} ({len(successful_results) / total_requests * 100:.1f}%)")
        print(f"失败请求: {len(failed_results)}")
        print(f"平均响应时间: {avg_response_time:.3f}秒")
        print(f"最小响应时间: {min_response_time:.3f}秒")
        print(f"最大响应时间: {max_response_time:.3f}秒")
        print(f"请求吞吐量: {len(successful_results) / total_time:.2f} 请求/秒")
        print(f"总生成tokens: {total_tokens}")
        print(f"总内容长度: {total_content_length} 字符")

        if app_id_stats:
            print(f"\nAPP_ID使用统计:")
            for app_id, stats in app_id_stats.items():
                avg_time = stats['total_time'] / stats['count']
                print(f"  {app_id}: {stats['count']}次请求, 平均耗时: {avg_time:.3f}秒")

        if failed_results:
            print(f"\n失败请求详情:")
            error_counts = {}
            for r in failed_results:
                error = r.get('error', 'Unknown error')
                error_counts[error] = error_counts.get(error, 0) + 1

            for error, count in error_counts.items():
                print(f"  {error}: {count}次")

        return {
            'method_name': method_name,
            'total_time': total_time,
            'successful_requests': len(successful_results),
            'failed_requests': len(failed_results),
            'avg_response_time': avg_response_time,
            'min_response_time': min_response_time,
            'max_response_time': max_response_time,
            'throughput': len(successful_results) / total_time if total_time > 0 else 0,
            'total_tokens': total_tokens,
            'app_id_stats': app_id_stats
        }


    def print_comparison(pool_result, original_result):
        """打印对比结果"""
        print(f"\n{'=' * 80}")
        print("高并发场景性能对比结果")
        print(f"{'=' * 80}")

        print(f"{'指标':<25} {'连接池版本':<20} {'原始版本':<20} {'性能提升':<15}")
        print("-" * 80)

        # 成功率对比
        pool_success_rate = pool_result['successful_requests'] / (
                    pool_result['successful_requests'] + pool_result['failed_requests']) * 100
        orig_success_rate = original_result['successful_requests'] / (
                    original_result['successful_requests'] + original_result['failed_requests']) * 100

        print(f"{'成功率(%)':<25} {pool_success_rate:<20.1f} {orig_success_rate:<20.1f} {pool_success_rate - orig_success_rate:>13.1f}%")

        # 响应时间对比
        if pool_result['avg_response_time'] > 0 and original_result['avg_response_time'] > 0:
            time_improvement = ((original_result['avg_response_time'] - pool_result['avg_response_time']) /
                                original_result['avg_response_time']) * 100
            print(f"{'平均响应时间(s)':<25} {pool_result['avg_response_time']:<20.3f} {original_result['avg_response_time']:<20.3f} {time_improvement:>13.1f}%")

        # 吞吐量对比
        if pool_result['throughput'] > 0 and original_result['throughput'] > 0:
            throughput_improvement = ((pool_result['throughput'] - original_result['throughput']) / original_result['throughput']) * 100
            print(f"{'吞吐量(请求/秒)':<25} {pool_result['throughput']:<20.2f} {original_result['throughput']:<20.2f} {throughput_improvement:>13.1f}%")

        # 总耗时对比
        if pool_result['total_time'] > 0 and original_result['total_time'] > 0:
            total_time_improvement = ((original_result['total_time'] - pool_result['total_time']) / original_result['total_time']) * 100
            print(f"{'总耗时(s)':<25} {pool_result['total_time']:<20.2f} {original_result['total_time']:<20.2f} {total_time_improvement:>13.1f}%")

        print(f"{'最小响应时间(s)':<25} {pool_result['min_response_time']:<20.3f} {original_result['min_response_time']:<20.3f} {'-':<15}")
        print(f"{'最大响应时间(s)':<25} {pool_result['max_response_time']:<20.3f} {original_result['max_response_time']:<20.3f} {'-':<15}")

        # 连接池统计
        pool_client_stats = client_pool.get_stats()
        print(f"\n连接池详细统计:")
        print(f"  创建的客户端: {pool_client_stats['created_clients']}")
        print(f"  复用的客户端: {pool_client_stats['reused_clients']}")
        print(f"  总请求数: {pool_client_stats['total_requests']}")
        if pool_client_stats['total_requests'] > 0:
            reuse_rate = (pool_client_stats['reused_clients'] / pool_client_stats['total_requests']) * 100
            print(f"  连接复用率: {reuse_rate:.1f}%")

    # 开始性能测试
    print("=== 高并发大量请求场景性能测试 ===")
    print(f"测试配置:")
    print(f"  总请求数: {TOTAL_REQUESTS}")
    print(f"  并发线程数: {MAX_WORKERS}")
    print(f"  APP_ID数量: {len(APP_IDS)}")
    print(f"  测试提示词数量: {len(test_prompts)}")

    # 重置统计数据
    pool_stats = PerformanceStats("连接池版本")
    original_stats = PerformanceStats("原始版本")

    # 测试连接池版本
    pool_result = run_performance_test(
        inferenceOpenAI,
        "连接池版本",
        TOTAL_REQUESTS,
        MAX_WORKERS
    )

    # 等待一段时间,让系统稳定
    print("\n等待5秒让系统稳定...")
    time.sleep(5)

    # 测试原始版本
    original_result = run_performance_test(
        inferenceOpenAI_Original,
        "原始版本",
        TOTAL_REQUESTS,
        MAX_WORKERS
    )

    # 打印对比结果
    print_comparison(pool_result, original_result)