LMS.service/LMS.Tools/MJPackage/TokenService.cs
lq1405 4340dd25a1 修复 单日绘图计数 添加原子性判断和增加
修复 高并发下会多次请求token的bug 现在都单次请求 使用 ConcurrentDictionary
2025-06-23 20:21:54 +08:00

407 lines
16 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using LMS.Common.Extensions;
using LMS.DAO;
using LMS.Repository.DB;
using LMS.Repository.MJPackage;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Collections.Concurrent;
using System.Data;
using System.Runtime.CompilerServices;
namespace LMS.Tools.MJPackage
{
public class TokenService(
ApplicationDbContext dbContext,
IMemoryCache memoryCache,
TokenUsageTracker usageTracker,
ILogger<TokenService> logger) : ITokenService
{
private readonly ApplicationDbContext _dbContext = dbContext;
private readonly IMemoryCache _memoryCache = memoryCache;
private readonly TokenUsageTracker _usageTracker = usageTracker;
private readonly ILogger<TokenService> _logger = logger;
// 在TokenService类内部
private static readonly ConcurrentDictionary<string, Lazy<Task<TokenCacheItem>>> _tokenLoadTasks = new();
/// <summary>
/// 从数据库获取token
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
public async Task<TokenCacheItem?> GetDatabaseTokenAsync(string token, bool hasHistory = false)
{
try
{
var today = BeijingTimeExtension.GetBeijingTime().Date;
// 使用EF Core的FromSqlRaw执行原生SQL
var dbResult = await _dbContext.Database
.SqlQuery<TokenQueryResult>($@"
SELECT
t.Id, t.Token, t.DailyLimit, t.TotalLimit, t.ConcurrencyLimit,
t.CreatedAt, t.ExpiresAt, t.UseToken,
COALESCE(u.DailyUsage, 0) as DailyUsage,
COALESCE(u.TotalUsage, 0) as TotalUsage,
COALESCE(u.HistoryUse, '') as HistoryUse,
COALESCE(u.LastActivityAt, t.CreatedAt) as LastActivityTime
FROM MJApiTokens t
LEFT JOIN MJApiTokenUsage u ON t.Id = u.TokenId
WHERE t.Token = {token}")
.FirstOrDefaultAsync();
if (dbResult == null)
{
return null;
}
// 3. 转换为TokenCacheItem
var tokenItem = new TokenCacheItem
{
Id = dbResult.Id,
Token = dbResult.Token,
UseToken = dbResult.UseToken ?? string.Empty, // 确保UseToken不为null
DailyLimit = dbResult.DailyLimit,
TotalLimit = dbResult.TotalLimit,
ConcurrencyLimit = dbResult.ConcurrencyLimit,
CreatedAt = dbResult.CreatedAt,
ExpiresAt = dbResult.ExpiresAt,
DailyUsage = dbResult.DailyUsage,
TotalUsage = dbResult.TotalUsage,
LastActivityTime = dbResult.LastActivityTime,
HistoryUse = hasHistory ? dbResult.HistoryUse : string.Empty
};
return tokenItem;
}
catch (Exception ex)
{
_logger.LogError(ex, $"从数据库获取Token时发生错误: {token}");
throw;
}
}
public async Task<MJApiTokens?> GetMJapiTokenByIdAsync(long tokenId)
{
try
{
MJApiTokens? mJApiTokens = await _dbContext.MJApiTokens
.AsNoTracking()
.FirstOrDefaultAsync(t => t.Id == tokenId);
return mJApiTokens;
}
catch (Exception ex)
{
_logger.LogError(ex, $"获取Token ID {tokenId} 时发生错误");
throw;
}
}
/// <summary>
/// 获取token
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
public async Task<TokenCacheItem> GetTokenAsync(string token)
{
_logger.LogDebug($"开始获取Token: {token}");
// 1. 检查内存缓存
if (_usageTracker.TryGetToken(token, out var cacheItem))
{
_logger.LogDebug($"Token从内存缓存中获取成功: {token}");
return cacheItem;
}
// 2. 使用Lazy确保任务只创建一次
var lazyTask = _tokenLoadTasks.GetOrAdd(token, _ => new Lazy<Task<TokenCacheItem>>(
() => LoadTokenFromDatabaseAsync(token),
LazyThreadSafetyMode.ExecutionAndPublication));
try
{
return await lazyTask.Value;
}
catch (Exception ex)
{
_logger.LogError(ex, $"加载Token时发生错误: {token}");
throw;
}
finally
{
// 查询完成后移除任务
_tokenLoadTasks.TryRemove(token, out _);
}
}
private async Task<TokenCacheItem> LoadTokenFromDatabaseAsync(string token)
{
_logger.LogInformation($"Token不在缓存中从数据库加载: {token}");
TokenCacheItem? tokenItem = await GetDatabaseTokenAsync(token);
if (tokenItem == null)
{
_logger.LogWarning($"Token未找到: {token}");
return null;
}
// 更新最后活动时间,从数据库中获取得话 设置最后活跃时间为当前时间
tokenItem.LastActivityTime = BeijingTimeExtension.GetBeijingTime();
// 4. 加入内存缓存
_usageTracker.AddOrUpdateToken(tokenItem);
// 5. 设置内存缓存 (30分钟)
//_memoryCache.Set($"Token_{token}", tokenItem, TimeSpan.FromMinutes(30));
_logger.LogInformation($"Token从数据库加载成功: {token}, ID: {tokenItem.Id}, 日限制: {tokenItem.DailyLimit}, 当前日使用: {tokenItem.DailyUsage}, 并发限制: {tokenItem.ConcurrencyLimit}");
return tokenItem;
}
/// <summary>
/// 增加Token使用量
/// </summary>
/// <param name="token">Token字符串</param>
public void IncrementUsage(string token)
{
_logger.LogInformation($"递增Token使用量: {token}");
_usageTracker.IncrementUsage(token);
}
public async Task<string> LoadOriginTokenAsync()
{
// 没找到 从数据库中获取
Options? oprions = await _dbContext.Options.Where(x => x.Key == "MJPackageOriginToken").FirstOrDefaultAsync();
if (oprions == null)
{
_logger.LogWarning("未找到原始Token配置");
return string.Empty;
}
// 处理数据
string originToken = oprions.GetValueObject<string>() ?? string.Empty;
if (string.IsNullOrWhiteSpace(originToken))
{
_logger.LogWarning("未找到原始Token配置");
return string.Empty;
}
_usageTracker.OriginToken = originToken;
return originToken;
}
public async Task<string> GetOriginToken()
{
// 缓存中就有 直接返回
if (!string.IsNullOrWhiteSpace(_usageTracker.OriginToken))
{
return _usageTracker.OriginToken;
}
// 缓存中没有 从数据库中获取
return await LoadOriginTokenAsync();
}
public async Task<string> LoadMJAPIBasicUrlAsync()
{
// 没找到 从数据库中获取
Options? oprions = await _dbContext.Options.Where(x => x.Key == "MJAPIBasicUrl").FirstOrDefaultAsync();
if (oprions == null)
{
_logger.LogWarning("未找到配置的MJAPI Basic URL, 使用默认的!");
_usageTracker.MJAPIBasicUrl = string.Empty;
return _usageTracker.MJAPIBasicUrl;
}
else
{
// 处理数据
string mjBasicUrl = oprions.GetValueObject<string>() ?? string.Empty;
if (string.IsNullOrWhiteSpace(mjBasicUrl))
{
_logger.LogWarning("未找到配置的MJAPI Basic URL 数据, 使用默认的!");
_usageTracker.MJAPIBasicUrl = string.Empty;
return _usageTracker.MJAPIBasicUrl;
}
_usageTracker.MJAPIBasicUrl = mjBasicUrl;
return mjBasicUrl;
}
}
public async Task<string> GetMJAPIBasicUrl()
{
// 缓存中就有 直接返回
if (!string.IsNullOrWhiteSpace(_usageTracker.MJAPIBasicUrl))
{
return _usageTracker.MJAPIBasicUrl;
}
// 缓存中没有 从数据库中获取
return await LoadMJAPIBasicUrlAsync();
}
/// <summary>
/// 重置Token的使用数据
/// </summary>
/// <returns></returns>
public async Task ResetDailyUsage()
{
var startTime = BeijingTimeExtension.GetBeijingTime();
try
{
// 批量重置数据库数据
int totalTokenCount = await BatchResetTokenDailyUsage();
// 删除不活跃的token
var (act, nact) = _usageTracker.RemoveNotActiveTokens(TimeSpan.FromMinutes(5));
// 重置缓存中的数据
_usageTracker.ResetDailyUsage();
var duration = BeijingTimeExtension.GetBeijingTime() - startTime;
_logger.LogInformation($"Token日使用量重置完成: {totalTokenCount} 个Token, 活跃Token: {act}, 耗时: {duration.TotalMilliseconds}ms");
}
catch (Exception ex)
{
var duration = BeijingTimeExtension.GetBeijingTime() - startTime;
_logger.LogError(ex, "Token同步失败耗时: {Duration}ms", duration.TotalMilliseconds);
}
}
/// <summary>
/// 批量重置当日使用限制
/// </summary>
/// <returns></returns>
private async Task<int> BatchResetTokenDailyUsage()
{
var beijingTime = BeijingTimeExtension.GetBeijingTime();
_logger.LogInformation($"重置token日限制开始批量重置 - 北京时间: {beijingTime:yyyy-MM-dd HH:mm:ss}");
// 修复SQL查询 - 只查询有使用记录且需要重置的Token
string sql = @"
SELECT
t.Id, t.Token, t.DailyLimit, t.TotalLimit, t.ConcurrencyLimit,
t.CreatedAt, t.ExpiresAt, t.UseToken,
COALESCE(u.DailyUsage, 0) as DailyUsage,
COALESCE(u.HistoryUse, '') as HistoryUse,
COALESCE(u.TotalUsage, 0) as TotalUsage,
COALESCE(u.LastActivityAt, t.CreatedAt) as LastActivityTime
FROM MJApiTokens t
LEFT JOIN MJApiTokenUsage u ON t.Id = u.TokenId
WHERE u.DailyUsage > 0
AND (t.ExpiresAt IS NULL OR t.ExpiresAt > UTC_TIMESTAMP())";
var dbResult = await _dbContext.Database
.SqlQuery<TokenQueryResult>(FormattableStringFactory.Create(sql))
.ToListAsync();
if (dbResult.Count == 0)
{
_logger.LogInformation("重置token日限制没有需要重置的token");
return 0;
}
_logger.LogInformation($"找到 {dbResult.Count} 个需要重置的Token");
// 统计重置前的总使用量
var totalDailyUsageBeforeReset = dbResult.Sum(x => x.DailyUsage);
_logger.LogInformation($"重置前总日使用量: {totalDailyUsageBeforeReset}");
var updatedCount = 0;
const int batchSize = 100; // 分批处理,避免内存过大
// 分批处理Token重置
for (int batchStart = 0; batchStart < dbResult.Count; batchStart += batchSize)
{
var batch = dbResult.Skip(batchStart).Take(batchSize).ToList();
var batchTokenIds = batch.Select(x => x.Id).ToList();
// 批量查询当前批次的使用记录
var tokenUsageList = await _dbContext.MJApiTokenUsage
.Where(x => batchTokenIds.Contains(x.TokenId))
.ToListAsync();
if (!tokenUsageList.Any())
{
_logger.LogWarning($"批次 {batchStart / batchSize + 1}: 没有找到使用记录");
continue;
}
// 使用事务确保数据一致性
using var transaction = await _dbContext.Database.BeginTransactionAsync();
try
{
foreach (var tokenUsage in tokenUsageList)
{
var tokenInfo = batch.FirstOrDefault(x => x.Id == tokenUsage.TokenId);
if (tokenInfo == null || tokenUsage.DailyUsage == 0)
continue;
// 处理历史记录
ProcessHistoryAndResetUsage(tokenUsage, tokenInfo);
}
// 批量保存
var batchUpdated = await _dbContext.SaveChangesAsync();
await transaction.CommitAsync();
updatedCount += batchUpdated;
_logger.LogInformation($"批次 {batchStart / batchSize + 1} 完成: 处理 {batch.Count} 个Token更新 {batchUpdated} 条记录");
}
catch (Exception ex)
{
await transaction.RollbackAsync();
_logger.LogError(ex, $"批次 {batchStart / batchSize + 1} 重置失败");
throw;
}
}
_logger.LogInformation($"✅ 批量重置完成 - 总共更新 {updatedCount} 条记录");
_logger.LogInformation($"📊 重置统计 - 重置前日使用量: {totalDailyUsageBeforeReset} → 重置后: 0");
return updatedCount;
}
/// <summary>
/// 处理历史记录并重置使用量
/// </summary>
private void ProcessHistoryAndResetUsage(MJApiTokenUsage tokenUsage, TokenQueryResult tokenInfo)
{
try
{
// 解析现有历史记录
List<MJApiTokenUsage> historyList;
try
{
historyList = string.IsNullOrEmpty(tokenUsage.HistoryUse)
? []
: JsonConvert.DeserializeObject<List<MJApiTokenUsage>>(tokenUsage.HistoryUse) ?? new List<MJApiTokenUsage>();
}
catch (JsonException ex)
{
_logger.LogWarning(ex, $"Token {tokenInfo.Token} 历史记录JSON解析失败将创建新的历史记录");
historyList = [];
}
// 添加当前记录到历史
historyList.Add(new MJApiTokenUsage
{
TokenId = tokenUsage.TokenId,
Date = BeijingTimeExtension.GetBeijingTime().Date.AddDays(-1),
DailyUsage = tokenUsage.DailyUsage,
TotalUsage = tokenUsage.TotalUsage,
LastActivityAt = tokenUsage.LastActivityAt,
HistoryUse = ""
});
// 重置使用量
tokenUsage.DailyUsage = 0;
tokenUsage.HistoryUse = JsonConvert.SerializeObject(historyList);
_logger.LogDebug($"Token {tokenInfo.Token} 重置: 日使用量 {tokenUsage.DailyUsage} → 0, 历史记录数: {historyList.Count}");
}
catch (Exception ex)
{
_logger.LogError(ex, $"处理Token {tokenInfo.Token} 的历史记录时发生错误");
}
}
}
}