new-api/pkg/perf_metrics/metrics.go
2026-05-06 13:55:23 +08:00

262 lines
6.6 KiB
Go

package perfmetrics
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/QuantumNous/new-api/common"
"github.com/QuantumNous/new-api/model"
relaycommon "github.com/QuantumNous/new-api/relay/common"
"github.com/QuantumNous/new-api/setting/perf_metrics_setting"
)
var hotBuckets sync.Map
const seriesSchema = "dbcd0a3c01b55203"
func Init() {
go flushLoop()
}
func RecordRelaySample(info *relaycommon.RelayInfo, success bool) {
if info == nil {
return
}
now := time.Now()
hasTtft := info.IsStream && info.HasSendResponse()
ttftMs := int64(0)
if hasTtft {
ttftMs = info.FirstResponseTime.Sub(info.StartTime).Milliseconds()
}
Record(Sample{
Model: info.OriginModelName,
Group: info.UsingGroup,
LatencyMs: now.Sub(info.StartTime).Milliseconds(),
TtftMs: ttftMs,
HasTtft: hasTtft,
Success: success,
})
}
func Record(sample Sample) {
setting := perf_metrics_setting.GetSetting()
if !setting.Enabled || sample.Model == "" {
return
}
if sample.Group == "" {
sample.Group = "default"
}
if sample.LatencyMs < 0 {
sample.LatencyMs = 0
}
key := bucketKey{
model: sample.Model,
group: sample.Group,
bucketTs: bucketStart(time.Now().Unix()),
}
actual, _ := hotBuckets.LoadOrStore(key, &atomicBucket{})
actual.(*atomicBucket).add(sample)
recordRedis(key, sample)
}
func Query(params QueryParams) (QueryResult, error) {
if params.Hours <= 0 {
params.Hours = 24
}
if params.Hours > 24*30 {
params.Hours = 24 * 30
}
endTs := time.Now().Unix()
startTs := endTs - int64(params.Hours)*3600
merged := map[bucketKey]counters{}
rows, err := model.GetPerfMetrics(params.Model, params.Group, startTs, endTs)
if err != nil {
return QueryResult{}, err
}
for _, row := range rows {
mergeCounters(merged, bucketKey{
model: row.ModelName,
group: row.Group,
bucketTs: row.BucketTs,
}, counters{
requestCount: row.RequestCount,
successCount: row.SuccessCount,
totalLatencyMs: row.TotalLatencyMs,
ttftSumMs: row.TtftSumMs,
ttftCount: row.TtftCount,
})
}
hotBuckets.Range(func(key, value any) bool {
k := key.(bucketKey)
if k.model != params.Model || k.bucketTs < startTs || k.bucketTs > endTs {
return true
}
if params.Group != "" && k.group != params.Group {
return true
}
mergeCounters(merged, k, value.(*atomicBucket).snapshot())
return true
})
return buildQueryResult(params.Model, merged), nil
}
func bucketStart(ts int64) int64 {
bucketSeconds := perf_metrics_setting.GetBucketSeconds()
if bucketSeconds <= 0 {
bucketSeconds = 3600
}
return ts - (ts % bucketSeconds)
}
func mergeCounters(merged map[bucketKey]counters, key bucketKey, value counters) {
if value.requestCount == 0 {
return
}
current := merged[key]
current.requestCount += value.requestCount
current.successCount += value.successCount
current.totalLatencyMs += value.totalLatencyMs
current.ttftSumMs += value.ttftSumMs
current.ttftCount += value.ttftCount
merged[key] = current
}
func buildQueryResult(modelName string, merged map[bucketKey]counters) QueryResult {
groupBuckets := map[string]map[int64]counters{}
for key, value := range merged {
if value.requestCount == 0 {
continue
}
if _, ok := groupBuckets[key.group]; !ok {
groupBuckets[key.group] = map[int64]counters{}
}
groupBuckets[key.group][key.bucketTs] = value
}
groups := make([]string, 0, len(groupBuckets))
for group := range groupBuckets {
groups = append(groups, group)
}
sort.Strings(groups)
results := make([]GroupResult, 0, len(groups))
for _, group := range groups {
buckets := groupBuckets[group]
timestamps := make([]int64, 0, len(buckets))
for ts := range buckets {
timestamps = append(timestamps, ts)
}
sort.Slice(timestamps, func(i, j int) bool {
return timestamps[i] < timestamps[j]
})
total := counters{}
series := make([]BucketPoint, 0, len(timestamps))
for _, ts := range timestamps {
value := buckets[ts]
total.requestCount += value.requestCount
total.successCount += value.successCount
total.totalLatencyMs += value.totalLatencyMs
total.ttftSumMs += value.ttftSumMs
total.ttftCount += value.ttftCount
series = append(series, bucketPoint(ts, value))
}
results = append(results, GroupResult{
Group: group,
AvgTtftMs: avg(total.ttftSumMs, total.ttftCount),
AvgLatencyMs: avg(total.totalLatencyMs, total.requestCount),
SuccessRate: successRate(total),
RequestCount: total.requestCount,
SuccessCount: total.successCount,
TtftCount: total.ttftCount,
Series: series,
})
}
return QueryResult{
ModelName: modelName,
SeriesSchema: seriesSchema,
Groups: results,
}
}
func bucketPoint(ts int64, value counters) BucketPoint {
return BucketPoint{
Ts: ts,
AvgTtftMs: avg(value.ttftSumMs, value.ttftCount),
AvgLatencyMs: avg(value.totalLatencyMs, value.requestCount),
SuccessRate: successRate(value),
Count: value.requestCount,
SuccessCount: value.successCount,
TtftCount: value.ttftCount,
}
}
func avg(sum int64, count int64) int64 {
if count <= 0 {
return 0
}
return sum / count
}
func successRate(value counters) float64 {
if value.requestCount <= 0 {
return 0
}
return float64(value.successCount) / float64(value.requestCount) * 100
}
func recordRedis(key bucketKey, sample Sample) {
if !common.RedisEnabled || common.RDB == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
redisKey := redisBucketKey(key)
pipe := common.RDB.TxPipeline()
pipe.HIncrBy(ctx, redisKey, "req", 1)
if sample.Success {
pipe.HIncrBy(ctx, redisKey, "ok", 1)
}
if sample.LatencyMs > 0 {
pipe.HIncrBy(ctx, redisKey, "lat", sample.LatencyMs)
}
if sample.HasTtft && sample.TtftMs >= 0 {
pipe.HIncrBy(ctx, redisKey, "ttft", sample.TtftMs)
pipe.HIncrBy(ctx, redisKey, "ttft_n", 1)
}
pipe.Expire(ctx, redisKey, time.Hour)
_, _ = pipe.Exec(ctx)
}
func mergeRedisActiveBuckets(merged map[bucketKey]counters, params QueryParams, startTs int64, endTs int64) {
if !common.RedisEnabled || common.RDB == nil || params.Model == "" || params.Group == "" {
return
}
active := bucketStart(time.Now().Unix())
if active < startTs || active > endTs {
return
}
key := bucketKey{model: params.Model, group: params.Group, bucketTs: active}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
values, err := common.RDB.HGetAll(ctx, redisBucketKey(key)).Result()
if err != nil || len(values) == 0 {
return
}
mergeCounters(merged, key, redisCounters(values))
}
func redisBucketKey(key bucketKey) string {
return fmt.Sprintf("perf:%s:%s:%d", key.model, key.group, key.bucketTs)
}