using LMS.Common.Extensions; using LMS.DAO; using LMS.Repository.MJPackage; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Quartz; using System.Data; using System.Text; namespace LMS.Tools.MJPackage { [DisallowConcurrentExecution] public class TokenSyncService : IJob { private readonly TokenUsageTracker _usageTracker; private readonly IServiceProvider _serviceProvider; private readonly ILogger _logger; // 活动阈值:5分钟内有活动的Token才同步 private readonly TimeSpan _activityThreshold = TimeSpan.FromMinutes(5); public TokenSyncService( TokenUsageTracker usageTracker, IServiceProvider serviceProvider, ILogger logger) { _usageTracker = usageTracker; _serviceProvider = serviceProvider; _logger = logger; } public async Task Execute(IJobExecutionContext context) { _logger.LogInformation($"开始同步Token信息 - 同步间隔: 30 秒, 活动阈值: {_activityThreshold.TotalMinutes}分钟 (使用EF Core)"); var startTime = BeijingTimeExtension.GetBeijingTime(); try { var syncResult = await SyncActiveTokensToDatabase(); var duration = BeijingTimeExtension.GetBeijingTime() - startTime; if (syncResult.ActiveTokenCount > 0) { _logger.LogInformation( "Token同步完成: {ActiveTokens}/{TotalTokens} 个活跃Token已同步, 耗时: {Duration}ms, 更新记录: {RecordsUpdated}", syncResult.ActiveTokenCount, syncResult.TotalTokenCount, duration.TotalMilliseconds, syncResult.RecordsUpdated); } else { _logger.LogDebug( "Token同步跳过: 无活跃Token (总计: {TotalTokens}, 耗时: {Duration}ms)", syncResult.TotalTokenCount, duration.TotalMilliseconds); } } catch (Exception ex) { var duration = BeijingTimeExtension.GetBeijingTime() - startTime; _logger.LogError(ex, "Token同步失败,耗时: {Duration}ms", duration.TotalMilliseconds); } } /// /// 同步活跃Token数据到数据库 /// /// 同步结果 private async Task SyncActiveTokensToDatabase() { // 先 删除10分钟内不活跃得Token var (act, nact) = _usageTracker.RemoveNotActiveTokens(TimeSpan.FromMinutes(10)); _logger.LogInformation($"删除不活跃的 Token 数 {nact},删除后活跃 Token 数:{act},判断不活跃时间:{10} 分钟"); // 1. 获取活跃Token(最近5分钟内有活动的Token) var activeTokens = _usageTracker.GetActiveTokens(_activityThreshold).ToList(); var totalTokens = _usageTracker.GetAllTokens().Count(); if (!activeTokens.Any()) { _logger.LogInformation("0 条活跃Token,跳过同步!"); return new SyncResult { TotalTokenCount = totalTokens, ActiveTokenCount = 0, RecordsUpdated = 0 }; } // 2. 创建数据库上下文 using var scope = _serviceProvider.CreateScope(); var dbContext = scope.ServiceProvider.GetRequiredService(); var today = BeijingTimeExtension.GetBeijingTime().Date; var recordsUpdated = 0; // 3. 构造批量数据 var batchData = activeTokens.Select(token => new TokenUsageData { TokenId = token.Id, Date = today, DailyUsage = token.DailyUsage, TotalUsage = token.TotalUsage, LastActivityTime = token.LastActivityTime }).ToList(); // 4. 使用EF Core事务批量更新数据库 using var transaction = await dbContext.Database.BeginTransactionAsync(); try { recordsUpdated = await BatchUpdateTokenUsageWithEfCore(dbContext, batchData); await transaction.CommitAsync(); _logger.LogDebug( "批量更新完成: {RecordsUpdated} 条活跃Token记录已更新,跳过 {SkippedCount} 条非活跃Token", recordsUpdated, totalTokens - activeTokens.Count); } catch (Exception ex) { await transaction.RollbackAsync(); _logger.LogError(ex, "数据库事务失败,已回滚"); throw; } return new SyncResult { TotalTokenCount = totalTokens, ActiveTokenCount = activeTokens.Count, RecordsUpdated = recordsUpdated }; } /// /// 使用EF Core批量更新Token使用数据 /// /// 数据库上下文 /// 批量数据 /// 更新的记录数 private async Task BatchUpdateTokenUsageWithEfCore(ApplicationDbContext dbContext, List batchData) { int batchSize = 500; if (!batchData.Any()) return 0; var recordsUpdated = 0; // 分批处理 for (int i = 0; i < batchData.Count; i += batchSize) { var batch = batchData.Skip(i).Take(batchSize).ToList(); // 构建真正的批量 SQL var sqlBuilder = new StringBuilder(); var parameters = new List(); sqlBuilder.AppendLine("INSERT INTO MJApiTokenUsage (TokenId, Date, DailyUsage, TotalUsage, LastActivityAt) VALUES "); // 为每条记录构建 VALUES 子句 for (int j = 0; j < batch.Count; j++) { if (j > 0) sqlBuilder.Append(", "); var paramIndex = j * 5; sqlBuilder.Append($"({{{paramIndex}}}, {{{paramIndex + 1}}}, {{{paramIndex + 2}}}, {{{paramIndex + 3}}}, {{{paramIndex + 4}}})"); parameters.AddRange(new object[] { batch[j].TokenId, batch[j].Date, batch[j].DailyUsage, batch[j].TotalUsage, batch[j].LastActivityTime }); } sqlBuilder.AppendLine(@" ON DUPLICATE KEY UPDATE Date = VALUES(Date), DailyUsage = VALUES(DailyUsage), TotalUsage = VALUES(TotalUsage), LastActivityAt = VALUES(LastActivityAt)"); // 一次性执行整个批次 var affectedRows = await dbContext.Database.ExecuteSqlRawAsync( sqlBuilder.ToString(), parameters.ToArray()); recordsUpdated += affectedRows; _logger.LogInformation($"批量更新完成: 批次 {i / batchSize + 1}, 记录数: {batch.Count}, 影响行数: {affectedRows}"); } return recordsUpdated; } } }