QA

AI代码审查

Posted by Sutdown on July 15, 2025

第一期 单线程

整体框架结构

背景:在提测提交代码时,利用AI模型对代码的改进提出建议。该工具主要用于对当前的结果进行定量分析。 整体分为两部分,数据集执行和结果评估。

  • 数据集执行的作用在于针对input code给出AI预测的结果
  • 模型评估的作用在于将AI预测结果和真实结果进行比对

数据集执行中分为三部分,check,reflect和review。其中每部分都可以选择不同的模型。

数据源是由天启平台向我们传递单条数据,数据中包含每个部分可以选择的模型,相关的模型参数,具体的单条数据等。由于input code在实际使用中会存在多条output输出,因此也体现为多条数据,需要对多条数据从input id进行聚合。同时根据用户所选择的is like中的有帮助0,无帮助1,有帮助无效果2将样本分为正样本(至少有一个0),负样本(只有1和2,至少有一个1),可优化样本(全为2)三部分。同时控制正负样本的比例。

关于prompt

  • 在check中,根据input code调用ai模型按照json格式生成评论。
  • 在reflect中,根据input code和check中生成的评论,调用模型对评论进行进一步优化。
  • 在review中,根据input code和reflect中生成的评论,调用模型对前面的input code和预计的评论进行评估,加上了valid字段看评论对于input code是否有效。
模型评估Part

模型评估部分中,主要分为计算评价指标和得到定量结果两部分。

第一部分 将数据集执行的多个预测结果和实际结果按照笛卡尔积的形式逐条比对。

  • 正样本:研发人员对于AI给出的建议进行评估,认为其有效。正样本中,AI对于bug的评估结果 和 真实结果对比,向量相似度达到阈值时,记为TP(AI正确识别bug的数量);未达到阈值记为FP(AI错误的认为它为真bug);用真实的总bug数量减去TP(AI正确识别bug的数量)就是FN(AI错误的认为它为假bug,也就是AI漏掉的真bug);
  • 负样本:研发人员对于AI给出的建议进行评估,认为其无效。负样本中,只要AI输出了任意bug,均记为FP(AI错误的认为它为真bug);AI没有输出任何bug,记为TN(AI正确的判定为假bug,也就是无bug)
  • 可优化样本:研发人员对于AI给出的建议进行评估,认为其有效但无需修改。可优化样本中,向量相似度达到阈值,极为RO(AI给出合理建议,但无需强制修改)

其中

  • tp为ai正确的指出真bug的数量,也就是正样本中语义分析的结果达标的数量。
  • tn为ai正确的指出假bug的数量,也就是负样本中ai没有输出任何bug评论。
  • fp为ai错误的指出真bug的数量,也就是正样本中除了tp的剩余,负样本中输出任意bug评论。
  • fn为正样本中未被识别出的bug,也就是真实bug减去tp数量。
  • ro为可优化样本中语义分析结果达标的数量,答对加分,答错不扣分。
\[\begin{align*} \text{准确率 (Accuracy)} \quad & \mathrm{acc} = \frac{tp + tn}{tp + tn + fp + fn} \\[6pt] \text{精确率 (Precision)} \quad & \mathrm{pre} = \frac{tp}{tp + fp} \\[6pt] \text{召回率 (Recall)} \quad & \mathrm{rec} = \frac{tp}{tp + fn} \\[6pt] \text{F1 分数} \quad & \mathrm{f1} = \frac{2 \times \mathrm{pre} \times \mathrm{rec}}{\mathrm{pre} + \mathrm{rec}} \\[6pt] \text{有价值准确率} \quad & \mathrm{vacc} = \frac{tp + ro}{tp + tn + fp + fn + ro} \\[6pt] \text{有价值召回率} \quad & \mathrm{vrec} = \frac{tp + ro}{tp + fn + ro} \end{align*}\]
InferenceAI

以上两种均为从FRIDAY大模型平台调用接口,其中的接入方式主要有两种:

  • 一是api接入,也就是通过http方式调用api。
  • 二是采用sdk接入,仅支持python openai官方sdk调用,使用美团语音平台开放申请的appid,注意要修改api base地址。

关于请求参数: header中添加参数authorization,值为 bearer申请的appid。data中需要包含 model(要使用的模型id);messages(以聊天格式生成聊天完成的信息,包含字段role和content,role中可为system和user,即对系统的身份对话要求和用户发送数据;也包括其它可选参数比如模型配置,是否扩展思考等;另外也可以选择流式或者段式处理,由于不需要实时显示,数据文件不是很大,同时段式实现相对简单这里选用段式处理。

关于响应参数: 包括响应内容,模型名词,请求id这些必选参数外,还有可选参数比如时间戳,统计生成的token等。另外针对段式和流式处理增添当前响应位于第几条和是否为最后一条的字段。

关于http状态码:

  • 200ok 请求成功
  • 400bad request 请求体存在问题,一般为格式错误或许token超过最大token限制
  • 401unauthorized 检查appid是否正确
  • 403forbidden 鉴权失败,检查appid是否有效或者有额度
  • 408request timeout 请求超时,建议重试
  • 429too many requests 请求过多被限流,可能为每分钟请求超过限制,appid达到使用上限,服务端模型总调用达到上限
  • 450unavailable for legal reason输入内容违规
  • 451输出内容违规
  • 500服务器内部异常
  • 504模型超过最大响应时间

为了保证项目的高质量。设计模式采用策略模式,1 定义了一个支持所有算法的公共接口;2 实现策略接口的具体算法;3维护对于策略对象的引用,定义一个接口让策略访问数据。其中策略算法class为checker checkerReview,checkerReflectReview三种,由于其中实现逻辑类型,同时定义一个基类,这三个策略算法继承基类,这样便于之后引进新的类,保障了好的扩展性和弱耦合性。

About调用AI
当前实现分析

当前 inferenceOpenAI.py 实现了两种调用 AI 服务的方式:

  1. 普通调用 (inferenceOpenAI):一次性返回完整响应
  2. 流式调用 (inferenceOpenAIStream):实时返回生成结果

主要特点:

  • 使用 OpenAI 客户端库
  • 每次请求创建新的 httpx.Client
  • 同步阻塞调用
  • 简单的错误处理
  • 无缓存机制
可能实现方案
方案 优点 缺点 适用场景
同步客户端 简单直观 阻塞、低并发、新建连接开销大 小项目、PoC
异步客户端 并发性能好 仍每次新建连接 高并发、异步服务
连接池复用 性能好、连接可复用 需维护 client 生命周期 中高并发、多租户
重试与断路器 提升稳定性 增加复杂度、可能带来额外延迟 不稳定网络、高可用需求
缓存 降低接口调用频率 数据易过时、只适合热数据 热点 prompt 重复调用
最佳实现方案

针对当前场景(多线程并行处理大量请求),最佳实现方案是连接池复用 + 重试机制 + 监控的组合:

当前实现的主要问题与改进建议

  1. 每次请求创建新客户端
    • 问题:增加连接建立开销,浪费资源
    • 改进:实现客户端连接池,复用HTTP连接
  2. 缺乏有效的错误处理
    • 问题:临时错误可能导致任务失败
    • 改进:添加重试机制,特别是对网络错误和限流错误
  3. 无监控和统计
    • 问题:难以监控性能和问题
    • 改进:添加详细日志和性能指标收集
  4. 同步阻塞调用
    • 问题:在高并发场景下效率不高
    • 改进:提供异步API版本,与线程池配合使用
  5. 无资源清理
    • 问题:可能导致连接泄漏
    • 改进:添加资源清理机制,确保连接正确关闭

这些改进可以显著提高系统在高并发场景下的性能、稳定性和可观测性,特别是在处理大量API调用时。

第二期 多进程

整体逻辑变更 一期整体思路为串联,也就是一次只处理一个任务最后聚合。这里的单个任务指一个inputcode,多个真实outputone(对于inputcode识别的bug以及人工给出的评估),多个预测outputone(经数据集执行得到的结果)。最终结果基本符合预期。考虑到一是数据规模庞大:随着评估数据量的不断增长,模型评估过程变得愈发复杂和耗时,极大地影响了评估效率和反馈速度。二在于评估耗时过长:大规模数据下的模型评估周期过长,难以满足模型快速迭代和上线的需求,成为制约模型工程化落地的重要瓶颈。

二期中决定采用并行处理加快效率,能够同时处理多个任务。多模型AI代码评论处理系统。 原始数据 → 任务分解 → 并行处理 → 结果收集 → 统一保存 Excel Task队列 多线程 内存聚合 JSON文件

1
2
3
4
5
1 业务逻辑层 (run_check_and_review)           
2 并发控制层 (run_comments_and_verify_parallel_v2)  
3 资源管理层 (AppIDManager)             
4 任务执行层 (process_with_dynamic_app_id_v2)       
5 API调用层  (process_single_item_with_app_id)    
AppIdManager

(通过APP_ID池管理API调用资源,实现限流和负载均衡)

通过 AppIDManager 管理有限的API资源,实现在高并发环境下实现精确的限流控制和资源分配:

  • 动态分配:按需分配APP_ID
  • 限流控制:每个APP_ID每分钟最多N次请求(默认每分钟 appid 最多接收 3 次请求)
  • 负载均衡:选择使用次数最少的APP_ID(用双端队列也就是滑动窗口记录 appid 的请求时间和请求次数)
  • 实时监控:提供详细的资源使用统计

1 分配 appid 时, 根据限流控制(当前近分钟 appid 接收的请求少于 3)的原则筛选出可用的 appid。记录 appid 请求次数的数据结构为双端队列,也就是滑动窗口,会在查询限流之前清理掉该 appid 一分钟前的请求记录,再看其是否限流。之后在满足限流的 appid 中选择适用次数最少的 appid 进行分配,同时更新该 appid 的使用时间和使用次数;没有满足限流的 appid 时保持等待。其中存在一个waiting_models进行存放已经分配模型的 appid,未分配 appid 时添加到waiting_models集合中,分配后从集合中去除。

2 释放 appid 时, 主要用于将使用完的 app_id 归还到资源池中,并更新相关统计信息

3 在分配和释放都会采用threading.Lock() 对象,用于确保对共享资源的访问是线程安全的,确保数据一致性。

使用流程如下:

  1. 初始化 AppIDManager 实例
  2. 线程请求获取 APP_ID
  3. 使用 APP_ID 完成业务逻辑
  4. 释放 APP_ID 回资源池
  5. 定期检查系统状态

这种设计可以有效的管理API密钥或APPID资源,确保系统在高并发是能够平稳运行,避免对单个API的过度使用。

About限流

目的:1 每个appid每分钟只能用三次 2 每次选择使用次数最少的appid

可供参考方案

1 令牌桶/漏通算法,为每个APPid维护一个令牌通,定时及时补充令牌。可以应对突发流量,平滑处理请求。 2 优先队列,利用优先队列将appid使用少的自动排序在前面,可以更好的选择appid 3 资源池分片,将APPID分配到多个子池中,减少锁竞争(不考虑,appid数量最多三十,不会造成过大的竞争) 4 请求合并,将短时间的多个请求合并为一个批量请求,减少API调用次数,提高效率。可以使用缓冲区收集请求,定时或者达到阈值时批量处理。(暂时不考虑,当前数量不多效果不明显,后续考虑申请不限次数的appid也不需要合并请求) 5 多级缓存策略,为热点请求建立多级缓存,减少api调用。(如果是缓存数据是没必要的,因为数据会被缓存到数据库中,如果是缓存api的调用,那就类似于合并请求,可以考虑作为后续优化方向)

最佳实现策略

AppIDManager 优化实现:条件变量 + 令牌桶限流 + 优先级队列

当前 AppIDManager 实现存在以下问题: 1 忙等待问题:使用轮询等待 APP_ID,浪费 CPU 资源 2 简单计数限流:使用滑动窗口记录请求时间,不够灵活 3 无优先级支持:所有任务平等对待,无法优先处理重要任务 4 资源利用不均衡:可能导致某些 APP_ID 过载

优化思路

1 条件变量替代轮询。使用 threading.Condition 实现线程等待和通知机制,避免忙等待。 2 令牌桶算法实现限流。为每个 APP_ID 维护一个令牌桶,定时补充令牌,更好地应对突发流量。 3 优先级队列管理任务。使用 PriorityQueue 按优先级分配资源,确保重要任务优先处理。 4 资源池分片与动态调整。将 APP_ID 池分成多个子池,减少锁竞争,并根据性能动态调整分配策略。

代码和使用详见附录。

优点 1 消除忙等待:使用条件变量和事件通知,避免 CPU 资源浪费 2 精确限流控制:令牌桶算法可以平滑处理突发流量 3 任务优先级支持:可以优先处理重要任务 4 负载均衡:智能选择使用次数最少的 APP_ID 5 资源利用率高:后台线程主动分配资源,提高效率 6 详细监控:提供丰富的统计信息和日志 7 优雅关闭:支持安全终止后台线程

缺点 1 实现复杂度高:相比原始实现更复杂,维护成本增加 2 内存占用增加:需要维护更多状态和队列 3 潜在死锁风险:复杂的锁操作可能引入死锁风险 4 调试难度增加:多线程和异步操作使调试变得更困难

这种实现特别适合当前代码场景,因为: 1 处理大量并发请求,需要高效的资源管理 2 APP_ID 资源有限且有限流要求 3 长时间运行,需要稳定可靠的资源分配 4 可能存在不同优先级的任务需求 5 需要详细的监控和统计信息 通过这种优化,可以显著提高系统的资源利用率、响应速度和稳定性,特别是在高负载场景下。

多进程用法

并行处理 (多个任务同时进行) 默认并行度: max_workers = min(len(valid_app_id) * 2, len(model_list) * 10) 1 生产者-消费者模式:生产者创建任务,针对不同的模型创建多个任务;消费者:ThreadPoolExecutor处理任务,实现多线程环境下确保数据一致性,避免竞态条件。

2 并行处理所有任务,处理多个模型对数据的并行预测,采用基于 CompletableFuture 的异步并行任务处理系统,相比传统的 Future,提供了更强大的异步编程能力和更好的异常处理机制。 2.1 初始化时,创建固定大小的线程池用户控制并发度;采用原子计数器确保多线程环境下失败计数的线程安全。 2.2 任务提交与 CompletableFuture 创建。

  • Stream API 链式操作:.stream(): 将集合转换为流,.map(): 对每个任务进行转换操作.collect(Collectors.toList()): 收集结果为 List;
  • CompletableFuture 核心方法:supplyAsync() 方法:异步执行processWithDynamicAppIdV2(这里用了 appid)有返回值的任务;同时具备异常处理,任务抛出异常时触发,记录日志,返回失败计数。

2.3 等待所有任务完成。allOf() 方法 批量等待:等待所有 CompletableFuture 完成;.get() 阻塞调用 同步等待:阻塞当前线程直到所有任务完成

3 并发任务结果收集和处理机制,处理完成的任务,收集处理结果。使用 concurrent.futures.as_completed() 迭代器来异步收集多个并发任务的执行结果,当某个任务完成时立即处理其结果,而不需要等待所有任务完成。 3.1 结果聚合: 将每个模型的处理结果存储到 data_model_map[input_id][processed_model] 中 3.2 完成度检查: 检查某个数据项是否已被所有模型处理完成 3.3 数据重构: 当所有模型都完成后,将结果合并为最终格式并存储all_completed_results 3.4 异常处理: 详细记录和打印异常信息,包括错误类型、消息、堆栈

About多进程

不使用协程的最佳多进程方案

分析当前代码,如果不使用协程,最适合的多进程方案是进程池+线程池混合模式,具体实现思路如下:

最佳方案:进程池+线程池混合模式

这种方案结合了进程和线程的优势:

  • 进程池负责跨CPU核心的并行计算
  • 线程池负责每个进程内的I/O并发

为什么这是最佳方案

  1. 充分利用多核CPU:进程池可以绕过Python的GIL限制,实现真正的并行计算
  2. 高效处理I/O密集型任务:每个进程内使用线程池处理I/O等待,提高资源利用率
  3. 资源隔离:每个进程有独立的AppIDManager,减少锁竞争,提高并发性能
  4. 负载均衡:通过数据分块,确保每个进程处理相近数量的任务
  5. 容错性:单个进程失败不会影响其他进程,提高整体稳定性
  6. 内存效率:每个进程只处理部分数据,减少内存压力

这种混合方案结合了进程和线程的优势,既能充分利用多核CPU,又能高效处理I/O等待,特别适合当前这种需要大量API调用的场景。

Other:性能分析

该部分主要用于通过将实际结果和 AI 预测结果进行对比,以此将评估结果量化,目前得到的准确率,精确率,召回率都在 30% 左右,预计十月份能达到 60%,明年二月份可达到 90%。

从结果和实际情况来看,在正负样本的基础上增加可优化样本,使得添加的有价值准确率vacc和有价值召回率vrec增加到 40%。AI 代码审查的作用在于:1 根据结果选择最适合于 check,reflect,review 的模型。2 当数据集中代码评估的规则(持续更新),system prompt,user prompt 等经过训练不断更新时,召回率能不能获得对应的提升。

从性能上来看,当有 400 条数据,110 个任务,串行处理需要 50 分钟,并行处理只需要 30 分钟,其中用到的线程为 7 个,并行并没有达到预期效果的原因在于1 appid 数量过少且限流每分钟至多访问三次。2 调用 AI api 时 耗时过长,导致 CPU 空闲。后者目前未解决,前者申请更多的 appid,且考虑到 appid 的有效性,也同时申请 api 不会失效,基本上效率基本能够提升到串行的 1/3,基本符合预期。

附录

inferenceAI可能实现方案

  1. 同步客户端方案(当前)
1
2
3
4
5
def inferenceOpenAI(APP_ID, REQUEST_URL, MODEL_NAME, user_prompt, system_prompt=""):
    http_client = httpx.Client(base_url=REQUEST_URL, timeout=600.0)
    client = OpenAI(api_key=APP_ID, base_url=REQUEST_URL, http_client=http_client)
    response = client.chat.completions.create(...)
    return packageResult(response)
  1. 异步客户端方案
1
2
3
4
5
async def inferenceOpenAIAsync(APP_ID, REQUEST_URL, MODEL_NAME, user_prompt, system_prompt=""):
    async with httpx.AsyncClient(base_url=REQUEST_URL, timeout=600.0) as http_client:
        client = AsyncOpenAI(api_key=APP_ID, base_url=REQUEST_URL, http_client=http_client)
        response = await client.chat.completions.create(...)
        return packageResult(response)
  1. 连接池复用方案
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 全局客户端池
client_pool = {}

def get_client(APP_ID, REQUEST_URL):
    key = f"{APP_ID}:{REQUEST_URL}"
    if key not in client_pool:
        http_client = httpx.Client(base_url=REQUEST_URL, timeout=600.0)
        client_pool[key] = OpenAI(api_key=APP_ID, base_url=REQUEST_URL, http_client=http_client)
    return client_pool[key]

def inferenceOpenAI(APP_ID, REQUEST_URL, MODEL_NAME, user_prompt, system_prompt=""):
    client = get_client(APP_ID, REQUEST_URL)
    response = client.chat.completions.create(...)
    return packageResult(response)
  1. 重试与断路器方案
1
2
3
4
5
6
7
8
from tenacity import retry, stop_after_attempt, wait_exponential
from circuitbreaker import circuit

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
@circuit(failure_threshold=5, recovery_timeout=30)
def inferenceOpenAI(APP_ID, REQUEST_URL, MODEL_NAME, user_prompt, system_prompt=""):
    # 实现与当前类似,但添加了重试和断路器机制
    ...
  1. 缓存方案
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import hashlib
from functools import lru_cache

def get_cache_key(MODEL_NAME, system_prompt, user_prompt):
    content = f"{MODEL_NAME}:{system_prompt}:{user_prompt}"
    return hashlib.md5(content.encode()).hexdigest()

@lru_cache(maxsize=1000)
def inferenceOpenAICached(cache_key, APP_ID, REQUEST_URL, MODEL_NAME, user_prompt, system_prompt=""):
    # 实际调用逻辑
    return inferenceOpenAI(APP_ID, REQUEST_URL, MODEL_NAME, user_prompt, system_prompt)

def inferenceOpenAI(APP_ID, REQUEST_URL, MODEL_NAME, user_prompt, system_prompt=""):
    cache_key = get_cache_key(MODEL_NAME, system_prompt, user_prompt)
    return inferenceOpenAICached(cache_key, APP_ID, REQUEST_URL, MODEL_NAME, user_prompt, system_prompt)

InferfenceAI连接池复用 + 重试机制 + 监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import httpx
from openai import OpenAI
import time
import threading
from functools import wraps
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("InferenceAI")

class OpenAIClientManager:
    """管理OpenAI客户端连接池"""
    
    def __init__(self):
        self.clients = {}
        self.lock = threading.RLock()
        self.stats = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "total_latency": 0,
        }
        
    def get_client(self, app_id, base_url, timeout=600.0):
        """获取或创建OpenAI客户端"""
        key = f"{app_id}:{base_url}"
        
        with self.lock:
            if key not in self.clients:
                http_client = httpx.Client(
                    base_url=base_url,
                    timeout=timeout,
                    limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
                )
                self.clients[key] = OpenAI(
                    api_key=app_id,
                    base_url=base_url,
                    http_client=http_client
                )
                logger.info(f"Created new OpenAI client for APP_ID: {app_id[-6:]}")
            
            return self.clients[key]
            
    def update_stats(self, success, latency):
        """更新统计信息"""
        with self.lock:
            self.stats["total_requests"] += 1
            if success:
                self.stats["successful_requests"] += 1
            else:
                self.stats["failed_requests"] += 1
            self.stats["total_latency"] += latency
            
    def get_stats(self):
        """获取统计信息"""
        with self.lock:
            stats = self.stats.copy()
            avg_latency = stats["total_latency"] / max(1, stats["total_requests"])
            success_rate = stats["successful_requests"] / max(1, stats["total_requests"]) * 100
            
            return {
                **stats,
                "avg_latency": avg_latency,
                "success_rate": success_rate
            }
            
    def close_all(self):
        """关闭所有客户端连接"""
        with self.lock:
            for key, client in self.clients.items():
                try:
                    if hasattr(client, 'http_client') and client.http_client:
                        client.http_client.close()
                except Exception as e:
                    logger.error(f"Error closing client {key}: {str(e)}")
            self.clients.clear()

# 创建全局客户端管理器
client_manager = OpenAIClientManager()

def with_retry(max_retries=3, backoff_factor=0.5):
    """重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    wait_time = backoff_factor * (2 ** attempt)
                    logger.warning(f"Attempt {attempt+1}/{max_retries} failed: {str(e)}. Retrying in {wait_time:.2f}s")
                    time.sleep(wait_time)
            
            # 所有重试都失败
            logger.error(f"All {max_retries} attempts failed. Last error: {str(last_exception)}")
            raise last_exception
        return wrapper
    return decorator

@with_retry(max_retries=3)
def inferenceOpenAI(APP_ID, REQUEST_URL, MODEL_NAME, user_prompt, system_prompt="", MAX_TOKENS=30*1024):
    """改进的AI调用函数,使用连接池和重试机制"""
    start_time = time.time()
    success = False
    
    try:
        # 获取客户端(从连接池)
        client = client_manager.get_client(APP_ID, REQUEST_URL)
        
        # 记录请求信息
        token_count = get_token_count(system_prompt + user_prompt)
        logger.info(f"Request: model={MODEL_NAME}, tokens={token_count}, max_tokens={MAX_TOKENS}")
        
        # 发送请求
        response = client.chat.completions.create(
            model=MODEL_NAME,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            stream=False,
            max_tokens=MAX_TOKENS,
            extra_headers={
                "M-TraceId": f"trace_{int(time.time())}"
            }
        )
        
        # 处理响应
        result = packageResult(response)
        success = True
        return result
        
    except Exception as e:
        logger.error(f"Error calling {MODEL_NAME}: {str(e)}")
        raise
    finally:
        # 更新统计信息
        latency = time.time() - start_time
        client_manager.update_stats(success, latency)
        logger.debug(f"Request completed in {latency:.2f}s, success={success}")

# 同样方式改进流式调用函数
@with_retry(max_retries=2)  # 流式调用重试次数少一些
def inferenceOpenAIStream(APP_ID, REQUEST_URL, MODEL_NAME, user_prompt, system_prompt="", MAX_TOKENS=30*1024):
    """改进的流式AI调用函数"""
    # 实现类似,但处理流式响应
    ...

# 应用关闭时清理资源
def cleanup():
    """关闭所有连接并清理资源"""
    logger.info("Cleaning up resources...")
    client_manager.close_all()

# 注册退出处理
import atexit
atexit.register(cleanup)

AppID 条件变量 + 令牌桶限流 + 优先级队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
import threading
import time
import heapq
from collections import defaultdict, deque
from queue import PriorityQueue, Empty
import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("appid_manager.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger("AppIDManager")

class TokenBucket:
    """令牌桶实现,用于限流控制"""
    def __init__(self, capacity, refill_rate):
        """
        初始化令牌桶
        
        Args:
            capacity: 桶容量(最大令牌数)
            refill_rate: 令牌补充速率(每秒补充的令牌数)
        """
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = capacity  # 初始满桶
        self.last_refill_time = time.time()
        self.lock = threading.RLock()
        
    def _refill(self):
        """补充令牌"""
        now = time.time()
        elapsed = now - self.last_refill_time
        # 计算需要补充的令牌数
        new_tokens = elapsed * self.refill_rate
        
        if new_tokens > 0:
            self.tokens = min(self.capacity, self.tokens + new_tokens)
            self.last_refill_time = now
            
    def try_acquire(self, tokens=1):
        """
        尝试获取令牌,非阻塞
        
        Args:
            tokens: 需要获取的令牌数
            
        Returns:
            bool: 是否成功获取令牌
        """
        with self.lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False
            
    def acquire(self, tokens=1, timeout=None):
        """
        获取令牌,支持超时
        
        Args:
            tokens: 需要获取的令牌数
            timeout: 超时时间(秒),None表示无限等待
            
        Returns:
            bool: 是否成功获取令牌
        """
        start_time = time.time()
        
        while True:
            if self.try_acquire(tokens):
                return True
                
            # 检查是否超时
            if timeout is not None and time.time() - start_time >= timeout:
                return False
                
            # 计算下一个令牌可用的时间
            with self.lock:
                self._refill()
                if self.tokens >= tokens:
                    self.tokens -= tokens
                    return True
                
                # 计算需要等待的时间
                required_tokens = tokens - self.tokens
                wait_time = required_tokens / self.refill_rate
                
                # 考虑超时
                if timeout is not None:
                    remaining = timeout - (time.time() - start_time)
                    if remaining <= 0:
                        return False
                    wait_time = min(wait_time, remaining)
                    
            # 等待一段时间再尝试
            time.sleep(min(wait_time, 0.1))  # 最多等待100ms,避免过长等待

class Task:
    """表示一个需要APP_ID的任务"""
    def __init__(self, model_name, priority=0, timestamp=None):
        self.model_name = model_name
        self.priority = priority  # 优先级,数字越小优先级越高
        self.timestamp = timestamp or time.time()  # 创建时间戳
        self.event = threading.Event()  # 用于通知任务已分配APP_ID
        self.app_id = None  # 分配的APP_ID
        
    def __lt__(self, other):
        """比较运算符,用于优先级队列排序"""
        # 先按优先级排序,再按时间戳排序(FIFO)
        if self.priority != other.priority:
            return self.priority < other.priority
        return self.timestamp < other.timestamp

class EnhancedAppIDManager:
    """增强版APP_ID管理器,使用条件变量、令牌桶和优先级队列"""
    def __init__(self, valid_app_ids, review_model, rate_limit_per_minute=3):
        """
        初始化APP_ID管理器
        
        Args:
            valid_app_ids: 有效的APP_ID列表
            review_model: 审核模型名称
            rate_limit_per_minute: 每分钟每个APP_ID的请求限制
        """
        # 基本属性
        self.review_model = review_model
        self.rate_limit_per_minute = rate_limit_per_minute
        
        # 资源池管理
        self.app_id_pool = valid_app_ids.copy()
        self.app_id_in_use = set()  # 正在使用的APP_ID
        
        # 线程同步
        self.lock = threading.RLock()
        self.condition = threading.Condition(self.lock)
        
        # 任务队列(按优先级)
        self.waiting_tasks = PriorityQueue()
        
        # 令牌桶(限流)
        refill_rate = rate_limit_per_minute / 60.0  # 每秒补充的令牌数
        self.token_buckets = {
            app_id: TokenBucket(capacity=rate_limit_per_minute, refill_rate=refill_rate)
            for app_id in valid_app_ids
        }
        
        # 使用统计
        self.app_id_usage_count = defaultdict(int)  # 总使用次数
        self.app_id_success_count = defaultdict(int)  # 成功次数
        self.app_id_failure_count = defaultdict(int)  # 失败次数
        self.app_id_current_tasks = defaultdict(set)  # 当前正在处理的任务
        
        # 性能监控
        self.last_status_print_time = time.time()
        self.total_requests = 0
        self.total_success = 0
        self.total_failed = 0
        self.total_wait_time = 0
        self.wait_count = 0
        
        # 启动资源分配线程
        self.running = True
        self.allocator_thread = threading.Thread(target=self._resource_allocator, daemon=True)
        self.allocator_thread.start()
        
        logger.info(f"初始化AppIDManager,APP_ID数量: {len(valid_app_ids)},限流: {rate_limit_per_minute}/分钟")
        
    def _resource_allocator(self):
        """后台线程,负责分配资源给等待的任务"""
        while self.running:
            allocated = False
            
            with self.lock:
                # 检查是否有可用的APP_ID
                available_app_ids = [
                    app_id for app_id in self.app_id_pool 
                    if app_id not in self.app_id_in_use and self.token_buckets[app_id].try_acquire()
                ]
                
                if available_app_ids and not self.waiting_tasks.empty():
                    try:
                        # 获取优先级最高的任务
                        task = self.waiting_tasks.get_nowait()
                        
                        # 选择使用次数最少的APP_ID(负载均衡)
                        selected_app_id = min(
                            available_app_ids, 
                            key=lambda x: self.app_id_usage_count[x]
                        )
                        
                        # 分配APP_ID给任务
                        self.app_id_in_use.add(selected_app_id)
                        task.app_id = selected_app_id
                        self.app_id_usage_count[selected_app_id] += 1
                        self.app_id_current_tasks[selected_app_id].add(task.model_name)
                        self.total_requests += 1
                        
                        # 通知等待的线程
                        task.event.set()
                        allocated = True
                        
                        logger.debug(f"分配APP_ID: {selected_app_id[-6:]} 给模型: {task.model_name}")
                    except Empty:
                        pass
            
            # 如果没有分配资源,短暂休眠避免CPU占用
            if not allocated:
                time.sleep(0.01)
                
    def get_app_id(self, model_name, priority=0, timeout=None):
        """
        获取一个可用的APP_ID
        
        Args:
            model_name: 模型名称
            priority: 优先级(数字越小优先级越高)
            timeout: 超时时间(秒),None表示无限等待
            
        Returns:
            str: 分配的APP_ID,如果超时返回None
        """
        start_time = time.time()
        
        # 创建任务
        task = Task(model_name, priority)
        
        # 将任务加入等待队列
        self.waiting_tasks.put(task)
        logger.debug(f"模型 {model_name} 请求APP_ID,优先级: {priority}")
        
        # 等待分配APP_ID或超时
        if timeout is not None:
            success = task.event.wait(timeout)
            if not success:
                # 超时,从队列中移除任务(实际无法直接从优先级队列中移除)
                logger.warning(f"获取APP_ID超时: 模型={model_name}, 优先级={priority}")
                return None
        else:
            task.event.wait()
        
        # 计算等待时间
        wait_time = time.time() - start_time
        with self.lock:
            self.total_wait_time += wait_time
            self.wait_count += 1
            
        logger.debug(f"模型 {model_name} 获得APP_ID: {task.app_id[-6:]},等待时间: {wait_time:.2f}秒")
        return task.app_id
        
    def release_app_id(self, app_id, model_name, success=True):
        """
        释放APP_ID回池中
        
        Args:
            app_id: 要释放的APP_ID
            model_name: 使用的模型名称
            success: 是否成功完成任务
        """
        with self.lock:
            if app_id in self.app_id_in_use:
                self.app_id_in_use.remove(app_id)
                self.app_id_pool.append(app_id)
                
                # 更新统计信息
                if success:
                    self.app_id_success_count[app_id] += 1
                    self.total_success += 1
                else:
                    self.app_id_failure_count[app_id] += 1
                    self.total_failed += 1
                    
                # 移除当前任务记录
                if model_name in self.app_id_current_tasks[app_id]:
                    self.app_id_current_tasks[app_id].remove(model_name)
                
                # 通知等待的线程
                self.condition.notify_all()
                
                logger.debug(f"释放APP_ID: {app_id[-6:]},模型: {model_name},成功: {success}")
            else:
                logger.warning(f"尝试释放未使用的APP_ID: {app_id[-6:]},模型: {model_name}")
                
    def print_status(self, force=False):
        """
        打印当前状态
        
        Args:
            force: 是否强制打印,不考虑时间间隔
        """
        current_time = time.time()
        if force or (current_time - self.last_status_print_time >= 30):  # 每30秒打印一次
            with self.lock:
                # 计算平均等待时间
                avg_wait_time = self.total_wait_time / max(1, self.wait_count)
                
                status_lines = []
                status_lines.append(f"┌{'─' * 90}┐")
                status_lines.append(f"│ 当前状态: 使用中APP_ID: {len(self.app_id_in_use)}/{len(self.app_id_pool) + len(self.app_id_in_use)} | 等待任务: {self.waiting_tasks.qsize()} │")
                
                # APP_ID使用统计
                app_id_stats = []
                for app_id in sorted(self.token_buckets.keys(), 
                                    key=lambda x: self.app_id_usage_count[x], 
                                    reverse=True)[:5]:  # 只显示使用最多的5个
                    short_id = app_id[-6:] if len(app_id) > 6 else app_id
                    usage = self.app_id_usage_count[app_id]
                    success = self.app_id_success_count[app_id]
                    failure = self.app_id_failure_count[app_id]
                    current = len(self.app_id_current_tasks[app_id])
                    app_id_stats.append(f"{short_id}({usage}次:{success}/{failure}, 当前:{current})")
                
                if app_id_stats:
                    stats_str = ', '.join(app_id_stats)
                    status_lines.append(f"│ APP_ID统计(总次数:成功/失败, 当前): {stats_str:<50}│")
                
                status_lines.append(f"│ 总请求: {self.total_requests} | 成功: {self.total_success} | 失败: {self.total_failed} | 平均等待: {avg_wait_time:.2f}秒 │")
                status_lines.append(f"│ 限流: {self.rate_limit_per_minute}/分钟/APP_ID │")
                status_lines.append(f"└{'─' * 90}┘")
                
                print('\n'.join(status_lines))
                self.last_status_print_time = current_time
                
    def shutdown(self):
        """关闭管理器,停止后台线程"""
        self.running = False
        if hasattr(self, 'allocator_thread') and self.allocator_thread.is_alive():
            self.allocator_thread.join(timeout=1)
        logger.info("AppIDManager已关闭")

AppIdManger使用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# 修改 process_with_dynamic_app_id_v2 函数使用新的 AppIDManager
def process_with_dynamic_app_id_v2(task_info, app_id_manager, mode="check_and_review", max_retries=3):
    """使用动态分配的APP_ID处理单个任务"""
    target_data, check_model, review_model = task_info
    
    # 确定任务优先级(可以根据业务需求自定义)
    priority = 0  # 默认优先级
    if "priority" in target_data:
        priority = target_data["priority"]
                
    try:
        print(f"开始处理任务 - input_id: {target_data.get('input_id', 'Unknown')}, model: {check_model}")
    except Exception as e:
        print(f"获取任务信息时出错: {str(e)}")
        return None, None, None

    retry_count = 0
    while retry_count < max_retries:
        # 使用优先级获取APP_ID,最多等待30秒
        app_id = app_id_manager.get_app_id(check_model, priority=priority, timeout=30)
        
        if app_id is None:
            # 如果超时未获取到APP_ID,增加重试次数
            retry_count += 1
            print(f"获取APP_ID超时,重试 {retry_count}/{max_retries}")
            continue
                
        try:
            print(f"使用 APP_ID: {app_id[-6:]}... 处理 input_id: {target_data.get('input_id', 'Unknown')}")

            result = process_single_item_with_app_id(
                target_data=target_data,
                check_model=check_model,
                review_model=review_model,
                app_id=app_id,
                mode=mode
            )

            if result is not None:
                app_id_manager.release_app_id(app_id, check_model, success=True)
                return target_data["input_id"], check_model, result
            else:
                retry_count += 1
                app_id_manager.release_app_id(app_id, check_model, success=False)
                if retry_count < max_retries:
                    print(f"处理结果为空,重试 {retry_count}/{max_retries} for input_id: {target_data.get('input_id', 'Unknown')}, model: {check_model}")
                    time.sleep(1)

        except Exception as e:
            retry_count += 1
            app_id_manager.release_app_id(app_id, check_model, success=False)
            # 错误处理代码...

    return target_data.get("input_id", "Unknown"), check_model, None

# 修改 run_comments_and_verify_parallel_v2 函数使用新的 AppIDManager
def run_comments_and_verify_parallel_v2(origin_data, model_list, valid_app_id, save_file,
                                       mode="check_and_review", review_model="gpt-4.1-mini",
                                       max_workers=None, rate_limit_per_minute=3):
    # ... 现有代码 ...
    
    # 创建增强版APP_ID管理器
    app_id_manager = EnhancedAppIDManager(valid_app_id, review_model, rate_limit_per_minute)
    
    try:
        # ... 现有处理代码 ...
    finally:
        # 确保关闭AppIDManager
        app_id_manager.shutdown()

进程池+线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# ... 保留现有代码 ...

import multiprocessing
from functools import partial

def worker_process(data_chunk, model_list, valid_app_id, review_model, mode, rate_limit_per_minute):
    """每个进程内的工作函数,使用线程池处理一部分数据"""
    # 每个进程创建自己的AppIDManager和线程池
    app_id_manager = AppIDManager(valid_app_id, review_model, rate_limit_per_minute)
    results = {}
    
    # 创建任务列表
    tasks = []
    for data_id, data in data_chunk.items():
        for model_name in model_list:
            tasks.append((data, model_name, review_model))
    
    # 使用线程池处理任务
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(valid_app_id)*2) as executor:
        futures = {executor.submit(process_with_dynamic_app_id_v2, task, app_id_manager, mode): task for task in tasks}
        
        for future in concurrent.futures.as_completed(futures):
            task = futures[future]
            try:
                input_id, model_name, result = future.result()
                if input_id not in results:
                    results[input_id] = {}
                if result is not None:
                    results[input_id][model_name] = result
            except Exception as e:
                print(f"任务处理异常: {str(e)}")
    
    return results

def run_comments_and_verify_parallel_hybrid(origin_data, model_list, valid_app_id, save_file,
                                           mode="check_and_review", review_model="gpt-4.1-mini",
                                           rate_limit_per_minute=3):
    """混合进程池和线程池的并行处理函数"""
    # 确定进程数量 - 通常设置为CPU核心数
    num_processes = min(multiprocessing.cpu_count(), 8)  # 限制最大进程数
    print(f"使用 {num_processes} 个进程进行并行处理")
    
    # 读取已保存的数据
    try:
        saved_data = read_json_file(save_file)
        if saved_data is None:
            saved_data = {}
    except Exception as e:
        print(f"读取保存文件出错: {str(e)}")
        saved_data = {}
    
    # 过滤掉已处理的数据
    filtered_data = {}
    for input_id, data in origin_data.items():
        if str(input_id) not in saved_data:
            filtered_data[str(input_id)] = data
    
    if not filtered_data:
        print("所有数据都已处理完成")
        return
    
    print(f"需要处理的数据项: {len(filtered_data)}")
    
    # 将数据分成大致相等的块
    data_chunks = split_data_into_chunks(filtered_data, num_processes)
    print(f"数据已分成 {len(data_chunks)} 个块")
    
    # 准备进程池
    start_time = time.time()
    
    # 使用进程池处理数据块
    with multiprocessing.Pool(processes=num_processes) as pool:
        # 创建偏函数,固定除数据块外的其他参数
        process_func = partial(
            worker_process,
            model_list=model_list,
            valid_app_id=valid_app_id,
            review_model=review_model,
            mode=mode,
            rate_limit_per_minute=rate_limit_per_minute
        )
        
        # 并行处理所有数据块
        results = pool.map(process_func, data_chunks)
    
    # 合并所有进程的结果
    all_results = {}
    for result_dict in results:
        for input_id, model_results in result_dict.items():
            if input_id not in all_results:
                all_results[input_id] = {}
            all_results[input_id].update(model_results)
    
    # 构建最终结果并保存
    final_results = {}
    for input_id, model_results in all_results.items():
        if input_id in filtered_data:
            data_copy = filtered_data[input_id].copy()
            data_copy["output_pred"] = {k:v for k,v in model_results.items()}
            final_results[input_id] = data_copy
    
    # 合并已有结果和新结果
    final_data = saved_data.copy()
    final_data.update(final_results)
    
    # 保存结果
    try:
        with open(save_file, 'w', encoding='utf-8') as f:
            json.dump(final_data, f, ensure_ascii=False, indent=4)
        print(f"成功保存结果到: {save_file}")
    except Exception as e:
        print(f"保存结果时出错: {str(e)}")
        # 尝试保存到备份文件
        try:
            with open(save_file + ".backup", 'w', encoding='utf-8') as f:
                json.dump(final_results, f, ensure_ascii=False, indent=4)
            print(f"结果已保存到备份文件: {save_file}.backup")
        except Exception as backup_e:
            print(f"备份保存也失败: {str(backup_e)}")
    
    # 打印统计信息
    elapsed_time = time.time() - start_time
    print(f"\n处理完成!")
    print(f"处理的数据项: {len(filtered_data)}")
    print(f"成功处理的数据项: {len(final_results)}")
    print(f"总耗时: {elapsed_time/60:.2f} 分钟")

def split_data_into_chunks(data_dict, num_chunks):
    """将字典数据分成指定数量的块"""
    items = list(data_dict.items())
    chunk_size = max(1, len(items) // num_chunks)
    return [dict(items[i:i+chunk_size]) for i in range(0, len(items), chunk_size)]

# 修改主函数调用
def run_check_and_review(origin_data_json, save_file, check_model_list, review_model=None,
                        mode=None, rate_limit_per_minute=3):
    print("check_model_list: {}".format(check_model_list))
    print("review_model: {}".format(review_model))
    print("mode: {}".format(mode))
    print("save_file: {}".format(save_file))
    print("rate_limit_per_minute: {}".format(rate_limit_per_minute))

    if origin_data_json is not None:
        valid_app_id = get_available_app_ids(APP_ID_LIST)
        
        print(f"Using {multiprocessing.cpu_count()} processes with {len(valid_app_id)} available APP_IDs")
        print("Starting hybrid parallel processing...")

        try:
            run_comments_and_verify_parallel_hybrid(
                origin_data=origin_data_json,
                model_list=check_model_list,
                valid_app_id=valid_app_id,
                save_file=save_file,
                mode=mode,
                review_model=review_model,
                rate_limit_per_minute=rate_limit_per_minute
            )
            print("Processing completed successfully!")
        except Exception as e:
            print(f"Processing failed with error: {str(e)}")