199 lines
7.7 KiB
C#
199 lines
7.7 KiB
C#
|
|
using LMS.Common.Extensions;
|
|||
|
|
using LMS.DAO;
|
|||
|
|
using LMS.Repository.MJPackage;
|
|||
|
|
using Microsoft.EntityFrameworkCore;
|
|||
|
|
using Microsoft.Extensions.DependencyInjection;
|
|||
|
|
using Microsoft.Extensions.Logging;
|
|||
|
|
using Quartz;
|
|||
|
|
using System.Data;
|
|||
|
|
using System.Text;
|
|||
|
|
|
|||
|
|
namespace LMS.Tools.MJPackage
|
|||
|
|
{
|
|||
|
|
[DisallowConcurrentExecution]
|
|||
|
|
public class TokenSyncService : IJob
|
|||
|
|
{
|
|||
|
|
private readonly TokenUsageTracker _usageTracker;
|
|||
|
|
private readonly IServiceProvider _serviceProvider;
|
|||
|
|
private readonly ILogger<TokenSyncService> _logger;
|
|||
|
|
|
|||
|
|
// 活动阈值:5分钟内有活动的Token才同步
|
|||
|
|
private readonly TimeSpan _activityThreshold = TimeSpan.FromMinutes(5);
|
|||
|
|
|
|||
|
|
public TokenSyncService(
|
|||
|
|
TokenUsageTracker usageTracker,
|
|||
|
|
IServiceProvider serviceProvider,
|
|||
|
|
ILogger<TokenSyncService> logger)
|
|||
|
|
{
|
|||
|
|
_usageTracker = usageTracker;
|
|||
|
|
_serviceProvider = serviceProvider;
|
|||
|
|
_logger = logger;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
public async Task Execute(IJobExecutionContext context)
|
|||
|
|
{
|
|||
|
|
_logger.LogInformation($"开始同步Token信息 - 同步间隔: 30 秒, 活动阈值: {_activityThreshold.TotalMinutes}分钟 (使用EF Core)");
|
|||
|
|
|
|||
|
|
var startTime = BeijingTimeExtension.GetBeijingTime();
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
var syncResult = await SyncActiveTokensToDatabase();
|
|||
|
|
var duration = BeijingTimeExtension.GetBeijingTime() - startTime;
|
|||
|
|
|
|||
|
|
if (syncResult.ActiveTokenCount > 0)
|
|||
|
|
{
|
|||
|
|
_logger.LogInformation(
|
|||
|
|
"Token同步完成: {ActiveTokens}/{TotalTokens} 个活跃Token已同步, 耗时: {Duration}ms, 更新记录: {RecordsUpdated}",
|
|||
|
|
syncResult.ActiveTokenCount,
|
|||
|
|
syncResult.TotalTokenCount,
|
|||
|
|
duration.TotalMilliseconds,
|
|||
|
|
syncResult.RecordsUpdated);
|
|||
|
|
}
|
|||
|
|
else
|
|||
|
|
{
|
|||
|
|
_logger.LogDebug(
|
|||
|
|
"Token同步跳过: 无活跃Token (总计: {TotalTokens}, 耗时: {Duration}ms)",
|
|||
|
|
syncResult.TotalTokenCount,
|
|||
|
|
duration.TotalMilliseconds);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
var duration = BeijingTimeExtension.GetBeijingTime() - startTime;
|
|||
|
|
_logger.LogError(ex, "Token同步失败,耗时: {Duration}ms", duration.TotalMilliseconds);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 同步活跃Token数据到数据库
|
|||
|
|
/// </summary>
|
|||
|
|
/// <returns>同步结果</returns>
|
|||
|
|
private async Task<SyncResult> SyncActiveTokensToDatabase()
|
|||
|
|
{
|
|||
|
|
// 先 删除10分钟内不活跃得Token
|
|||
|
|
var (act, nact) = _usageTracker.RemoveNotActiveTokens(TimeSpan.FromMinutes(10));
|
|||
|
|
_logger.LogInformation($"删除不活跃的 Token 数 {nact},删除后活跃 Token 数:{act},判断不活跃时间:{10} 分钟");
|
|||
|
|
|
|||
|
|
// 1. 获取活跃Token(最近5分钟内有活动的Token)
|
|||
|
|
var activeTokens = _usageTracker.GetActiveTokens(_activityThreshold).ToList();
|
|||
|
|
var totalTokens = _usageTracker.GetAllTokens().Count();
|
|||
|
|
|
|||
|
|
if (!activeTokens.Any())
|
|||
|
|
{
|
|||
|
|
_logger.LogInformation("0 条活跃Token,跳过同步!");
|
|||
|
|
return new SyncResult
|
|||
|
|
{
|
|||
|
|
TotalTokenCount = totalTokens,
|
|||
|
|
ActiveTokenCount = 0,
|
|||
|
|
RecordsUpdated = 0
|
|||
|
|
};
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 2. 创建数据库上下文
|
|||
|
|
using var scope = _serviceProvider.CreateScope();
|
|||
|
|
var dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
|
|||
|
|
|
|||
|
|
var today = BeijingTimeExtension.GetBeijingTime().Date;
|
|||
|
|
var recordsUpdated = 0;
|
|||
|
|
|
|||
|
|
// 3. 构造批量数据
|
|||
|
|
var batchData = activeTokens.Select(token => new TokenUsageData
|
|||
|
|
{
|
|||
|
|
TokenId = token.Id,
|
|||
|
|
Date = today,
|
|||
|
|
DailyUsage = token.DailyUsage,
|
|||
|
|
TotalUsage = token.TotalUsage,
|
|||
|
|
LastActivityTime = token.LastActivityTime
|
|||
|
|
}).ToList();
|
|||
|
|
|
|||
|
|
// 4. 使用EF Core事务批量更新数据库
|
|||
|
|
using var transaction = await dbContext.Database.BeginTransactionAsync();
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
recordsUpdated = await BatchUpdateTokenUsageWithEfCore(dbContext, batchData);
|
|||
|
|
await transaction.CommitAsync();
|
|||
|
|
|
|||
|
|
_logger.LogDebug(
|
|||
|
|
"批量更新完成: {RecordsUpdated} 条活跃Token记录已更新,跳过 {SkippedCount} 条非活跃Token",
|
|||
|
|
recordsUpdated,
|
|||
|
|
totalTokens - activeTokens.Count);
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
await transaction.RollbackAsync();
|
|||
|
|
_logger.LogError(ex, "数据库事务失败,已回滚");
|
|||
|
|
throw;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return new SyncResult
|
|||
|
|
{
|
|||
|
|
TotalTokenCount = totalTokens,
|
|||
|
|
ActiveTokenCount = activeTokens.Count,
|
|||
|
|
RecordsUpdated = recordsUpdated
|
|||
|
|
};
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 使用EF Core批量更新Token使用数据
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="dbContext">数据库上下文</param>
|
|||
|
|
/// <param name="batchData">批量数据</param>
|
|||
|
|
/// <returns>更新的记录数</returns>
|
|||
|
|
private async Task<int> BatchUpdateTokenUsageWithEfCore(ApplicationDbContext dbContext, List<TokenUsageData> batchData)
|
|||
|
|
{
|
|||
|
|
int batchSize = 500;
|
|||
|
|
if (!batchData.Any()) return 0;
|
|||
|
|
|
|||
|
|
var recordsUpdated = 0;
|
|||
|
|
|
|||
|
|
// 分批处理
|
|||
|
|
for (int i = 0; i < batchData.Count; i += batchSize)
|
|||
|
|
{
|
|||
|
|
var batch = batchData.Skip(i).Take(batchSize).ToList();
|
|||
|
|
|
|||
|
|
// 构建真正的批量 SQL
|
|||
|
|
var sqlBuilder = new StringBuilder();
|
|||
|
|
var parameters = new List<object>();
|
|||
|
|
|
|||
|
|
sqlBuilder.AppendLine("INSERT INTO MJApiTokenUsage (TokenId, Date, DailyUsage, TotalUsage, LastActivityAt) VALUES ");
|
|||
|
|
|
|||
|
|
// 为每条记录构建 VALUES 子句
|
|||
|
|
for (int j = 0; j < batch.Count; j++)
|
|||
|
|
{
|
|||
|
|
if (j > 0) sqlBuilder.Append(", ");
|
|||
|
|
|
|||
|
|
var paramIndex = j * 5;
|
|||
|
|
sqlBuilder.Append($"({{{paramIndex}}}, {{{paramIndex + 1}}}, {{{paramIndex + 2}}}, {{{paramIndex + 3}}}, {{{paramIndex + 4}}})");
|
|||
|
|
|
|||
|
|
parameters.AddRange(new object[]
|
|||
|
|
{
|
|||
|
|
batch[j].TokenId,
|
|||
|
|
batch[j].Date,
|
|||
|
|
batch[j].DailyUsage,
|
|||
|
|
batch[j].TotalUsage,
|
|||
|
|
batch[j].LastActivityTime
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
sqlBuilder.AppendLine(@"
|
|||
|
|
ON DUPLICATE KEY UPDATE
|
|||
|
|
Date = VALUES(Date),
|
|||
|
|
DailyUsage = VALUES(DailyUsage),
|
|||
|
|
TotalUsage = VALUES(TotalUsage),
|
|||
|
|
LastActivityAt = VALUES(LastActivityAt)");
|
|||
|
|
|
|||
|
|
// 一次性执行整个批次
|
|||
|
|
var affectedRows = await dbContext.Database.ExecuteSqlRawAsync(
|
|||
|
|
sqlBuilder.ToString(),
|
|||
|
|
parameters.ToArray());
|
|||
|
|
|
|||
|
|
recordsUpdated += affectedRows;
|
|||
|
|
|
|||
|
|
_logger.LogInformation($"批量更新完成: 批次 {i / batchSize + 1}, 记录数: {batch.Count}, 影响行数: {affectedRows}");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return recordsUpdated;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|