添加 任务缓存 防止回调频繁修改数据
This commit is contained in:
parent
bc90b17961
commit
c12e1b5d65
@ -8,12 +8,9 @@ namespace LMS.Tools.MJPackage
|
||||
{
|
||||
Task CreateTaskAsync(string token, string thirdPartyTaskId);
|
||||
Task UpdateTaskInDatabase(MJApiTasks mJApiTasks);
|
||||
Task<MJApiTasks> GetTaskInfoAsync(string taskId);
|
||||
|
||||
Task<MJApiTasks> GetTaskInfoByThirdPartyIdAsync(string taskId);
|
||||
Task BatchUpdateTaskChaheToDatabaseAsync();
|
||||
|
||||
Task<IEnumerable<MJApiTasks>> GetRunningTasksAsync(string token = null);
|
||||
Task<(int maxConcurrency, int running, int available)> GetConcurrencyStatusAsync(string token);
|
||||
Task CleanupTimeoutTasksAsync(TimeSpan timeout);
|
||||
Task<MJApiTasks?> GetTaskInfoByThirdPartyIdAsync(string taskId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,5 +18,9 @@ namespace LMS.Tools.MJPackage
|
||||
Task<string> LoadOriginTokenAsync();
|
||||
|
||||
Task<string> GetOriginToken();
|
||||
|
||||
Task<string> LoadMJAPIBasicUrlAsync();
|
||||
|
||||
Task<string> GetMJAPIBasicUrl();
|
||||
}
|
||||
}
|
||||
|
||||
@ -12,23 +12,18 @@ namespace LMS.Tools.MJPackage
|
||||
{
|
||||
public class TaskConcurrencyManager : ITaskConcurrencyManager
|
||||
{
|
||||
private readonly ConcurrentDictionary<string, MJApiTasks> _activeTasks = new();
|
||||
private readonly ConcurrentDictionary<string, string> _thirdPartyTaskMap = new(); // ThirdPartyTaskId -> TaskId
|
||||
private readonly TokenUsageTracker _usageTracker;
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
private readonly ILogger<TaskConcurrencyManager> _logger;
|
||||
private readonly ApplicationDbContext _dbContext;
|
||||
private readonly ITokenService _tokenService;
|
||||
|
||||
public TaskConcurrencyManager(
|
||||
TokenUsageTracker usageTracker,
|
||||
IServiceScopeFactory scopeFactory,
|
||||
ILogger<TaskConcurrencyManager> logger,
|
||||
ApplicationDbContext dbContext,
|
||||
ITokenService tokenService)
|
||||
{
|
||||
_usageTracker = usageTracker;
|
||||
_scopeFactory = scopeFactory;
|
||||
_logger = logger;
|
||||
_dbContext = dbContext;
|
||||
_tokenService = tokenService;
|
||||
@ -63,7 +58,8 @@ namespace LMS.Tools.MJPackage
|
||||
};
|
||||
|
||||
// 5. 持久化任务信息到数据库
|
||||
await SaveTaskToDatabase(mJApiTasks);
|
||||
await _dbContext.AddAsync(mJApiTasks);
|
||||
await _dbContext.SaveChangesAsync();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@ -71,98 +67,28 @@ namespace LMS.Tools.MJPackage
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// 获取任务信息
|
||||
/// </summary>
|
||||
public async Task<MJApiTasks> GetTaskInfoAsync(string taskId)
|
||||
{
|
||||
if (_activeTasks.TryGetValue(taskId, out var taskInfo))
|
||||
{
|
||||
return taskInfo;
|
||||
}
|
||||
|
||||
// 如果内存中没有,尝试从数据库加载
|
||||
return await LoadTaskFromDatabase(taskId);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 通过第三方ID获取数据
|
||||
/// </summary>
|
||||
/// <param name="thirdPartyId"></param>
|
||||
/// <returns></returns>
|
||||
public async Task<MJApiTasks> GetTaskInfoByThirdPartyIdAsync(string thirdPartyId)
|
||||
public async Task<MJApiTasks?> GetTaskInfoByThirdPartyIdAsync(string thirdPartyId)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(thirdPartyId))
|
||||
{
|
||||
_logger.LogWarning("第三方任务ID为空");
|
||||
return null;
|
||||
}
|
||||
MJApiTasks? mJApiTasks = await _dbContext.MJApiTasks.FirstOrDefaultAsync(x => x.ThirdPartyTaskId == thirdPartyId);
|
||||
return mJApiTasks;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 获取运行中的任务列表
|
||||
/// </summary>
|
||||
public async Task<IEnumerable<MJApiTasks>> GetRunningTasksAsync(string token = null)
|
||||
// 先尝试从内存中获取
|
||||
MJApiTasks? mjApiTasks = _usageTracker.TryGetTaskCache(thirdPartyId);
|
||||
// 从数据库获取
|
||||
mjApiTasks ??= await LoadTaskFromDatabaseByThirdPartyId(thirdPartyId);
|
||||
if (mjApiTasks == null)
|
||||
{
|
||||
var runningTasks = _activeTasks.Values
|
||||
.Where(t => t.Status != MJTaskStatus.SUCCESS && t.Status != MJTaskStatus.FAILURE && t.Status != MJTaskStatus.CANCEL)
|
||||
.Where(t => string.IsNullOrEmpty(token) || t.Token == token)
|
||||
.OrderBy(t => t.StartTime)
|
||||
.ToList();
|
||||
|
||||
_logger.LogDebug($"当前运行中的任务数: {runningTasks.Count}" + (string.IsNullOrEmpty(token) ? "" : $", Token={token}"));
|
||||
|
||||
return await Task.FromResult(runningTasks);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 获取Token的并发状态
|
||||
/// </summary>
|
||||
public async Task<(int maxConcurrency, int running, int available)> GetConcurrencyStatusAsync(string token)
|
||||
{
|
||||
var status = _usageTracker.GetConcurrencyStatus(token);
|
||||
return await Task.FromResult((status.maxCount, status.currentlyExecuting, status.available));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 清理超时任务
|
||||
/// </summary>
|
||||
public async Task CleanupTimeoutTasksAsync(TimeSpan timeout)
|
||||
{
|
||||
_logger.LogInformation($"开始清理超时任务,超时阈值: {timeout.TotalMinutes}分钟");
|
||||
|
||||
var cutoffTime = BeijingTimeExtension.GetBeijingTime() - timeout;
|
||||
var timeoutTasks = _activeTasks.Values
|
||||
.Where(t => t.StartTime < cutoffTime && t.Status != MJTaskStatus.SUCCESS && t.Status != MJTaskStatus.FAILURE && t.Status != MJTaskStatus.CANCEL)
|
||||
.ToList();
|
||||
|
||||
_logger.LogInformation($"发现 {timeoutTasks.Count} 个超时任务");
|
||||
|
||||
foreach (var task in timeoutTasks)
|
||||
{
|
||||
_logger.LogWarning($"清理超时任务: TaskId={task.TaskId}, Token={task.Token}, 开始时间={task.StartTime:yyyy-MM-dd HH:mm:ss}");
|
||||
_usageTracker.ReleaseConcurrencyPermit(task.Token);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 保存任务到数据库
|
||||
/// </summary>
|
||||
private async Task SaveTaskToDatabase(MJApiTasks mJApiTasks)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _dbContext.MJApiTasks.AddAsync(mJApiTasks);
|
||||
await _dbContext.SaveChangesAsync();
|
||||
_logger.LogInformation($"任务已保存到数据库: TaskId={mJApiTasks.TaskId}, Token={mJApiTasks.Token}");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, $"保存任务到数据库失败: TaskId={mJApiTasks.TaskId}");
|
||||
_logger.LogWarning($"缓存和数据库中均未找到任务: ThirdPartyTaskId={thirdPartyId}");
|
||||
return null;
|
||||
}
|
||||
return mjApiTasks;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -190,25 +116,79 @@ namespace LMS.Tools.MJPackage
|
||||
}
|
||||
}
|
||||
|
||||
public async Task BatchUpdateTaskChaheToDatabaseAsync()
|
||||
{
|
||||
var startTime = BeijingTimeExtension.GetBeijingTime();
|
||||
try
|
||||
{
|
||||
// 获取所有缓存中的任务
|
||||
var tasks = _usageTracker.GetAllTaskCaches();
|
||||
if (tasks == null || tasks.Count == 0)
|
||||
{
|
||||
_logger.LogInformation("缓存中没有需要更新的任务");
|
||||
return;
|
||||
}
|
||||
// 批量同步
|
||||
var taskList = new List<MJApiTasks>();
|
||||
foreach (var task in tasks)
|
||||
{
|
||||
// 从缓存中获取任务
|
||||
MJApiTasks? mJApiTasks = _usageTracker.TryGetTaskCache(task.ThirdPartyTaskId);
|
||||
if (mJApiTasks != null)
|
||||
{
|
||||
taskList.Add(mJApiTasks);
|
||||
}
|
||||
}
|
||||
if (taskList.Count == 0)
|
||||
{
|
||||
_logger.LogInformation("缓存中没有需要更新的任务");
|
||||
return;
|
||||
}
|
||||
// 批量更新到数据库
|
||||
_dbContext.MJApiTasks.UpdateRange(taskList);
|
||||
await _dbContext.SaveChangesAsync();
|
||||
|
||||
int count = 0;
|
||||
// 删除缓存中状态为已完成的任务
|
||||
for (int i = 0; i < taskList.Count; i++)
|
||||
{
|
||||
|
||||
var task = taskList[i];
|
||||
if (task.Status == MJTaskStatus.SUCCESS || task.Status == MJTaskStatus.FAILURE || task.Status == MJTaskStatus.CANCEL)
|
||||
{
|
||||
bool removeResult = _usageTracker.RemoveTaskCache(task.ThirdPartyTaskId);
|
||||
if (removeResult == true)
|
||||
{
|
||||
count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var duration = BeijingTimeExtension.GetBeijingTime() - startTime;
|
||||
_logger.LogInformation($"批量更新了 {taskList.Count} 个缓存中的任务到数据库,耗费时间: {duration.TotalMilliseconds}, 缓存中删除了 {count} 个完成的任务");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "批量更新任务到数据库失败");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 从数据库加载任务
|
||||
/// </summary>
|
||||
private async Task<MJApiTasks> LoadTaskFromDatabase(string taskId)
|
||||
private async Task<MJApiTasks?> LoadTaskFromDatabaseByThirdPartyId(string thirdPartyId)
|
||||
{
|
||||
try
|
||||
{
|
||||
MJApiTasks? mJApiTasks = await _dbContext.MJApiTasks.FirstOrDefaultAsync(x => x.TaskId == taskId);
|
||||
MJApiTasks? mJApiTasks = await _dbContext.MJApiTasks.FirstOrDefaultAsync(x => x.ThirdPartyTaskId == thirdPartyId);
|
||||
if (mJApiTasks == null)
|
||||
{
|
||||
_logger.LogWarning($"未找到任务: TaskId={taskId}");
|
||||
return null;
|
||||
}
|
||||
|
||||
return mJApiTasks;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, $"从数据库加载任务失败: TaskId={taskId}");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -85,7 +85,8 @@ namespace LMS.Tools.MJPackage
|
||||
}
|
||||
private async Task<string?> TryBackupApiAsync(string id, string useToken)
|
||||
{
|
||||
const string backupUrlTemplate = "https://api.laitool.cc/mj/task/{0}/fetch";
|
||||
string mjAPIBasicUrl = await _tokenService.GetMJAPIBasicUrl();
|
||||
string backupUrl = $"{mjAPIBasicUrl}/mj/task/{id}/fetch";
|
||||
const int maxRetries = 3;
|
||||
const int baseDelayMs = 1000;
|
||||
|
||||
@ -93,7 +94,6 @@ namespace LMS.Tools.MJPackage
|
||||
client.DefaultRequestHeaders.Add("Authorization", "sk-" + useToken);
|
||||
client.Timeout = TimeSpan.FromSeconds(30);
|
||||
|
||||
var backupUrl = string.Format(backupUrlTemplate, id);
|
||||
|
||||
for (int attempt = 1; attempt <= maxRetries; attempt++)
|
||||
{
|
||||
|
||||
@ -9,13 +9,14 @@ using Quartz;
|
||||
namespace LMS.Tools.MJPackage
|
||||
{
|
||||
[DisallowConcurrentExecution]
|
||||
public class TaskStatusCheckService(ITokenService tokenService, ApplicationDbContext dbContext, ILogger<TaskStatusCheckService> logger, ITaskService taskService, ITaskConcurrencyManager taskConcurrencyManager) : IJob
|
||||
public class TaskStatusCheckService(ITokenService tokenService, ApplicationDbContext dbContext, ILogger<TaskStatusCheckService> logger, ITaskService taskService, ITaskConcurrencyManager taskConcurrencyManager, TokenUsageTracker tokenUsageTracker) : IJob
|
||||
{
|
||||
private readonly ITokenService _tokenService = tokenService;
|
||||
private readonly ApplicationDbContext _dbContext = dbContext;
|
||||
private readonly ILogger<TaskStatusCheckService> _logger = logger;
|
||||
private readonly ITaskService _taskService = taskService;
|
||||
private readonly ITaskConcurrencyManager _taskConcurrencyManager = taskConcurrencyManager;
|
||||
private readonly TokenUsageTracker _tokenUsageTracker = tokenUsageTracker;
|
||||
|
||||
public async Task Execute(IJobExecutionContext context)
|
||||
{
|
||||
@ -24,9 +25,14 @@ namespace LMS.Tools.MJPackage
|
||||
var startTime = BeijingTimeExtension.GetBeijingTime();
|
||||
try
|
||||
{
|
||||
// 强制同步数据库数据
|
||||
// 强制同步数据库数据,原请求Token
|
||||
await _tokenService.LoadOriginTokenAsync();
|
||||
|
||||
// 强制同步数据 MJ API 的 Basic URL
|
||||
|
||||
await _tokenService.LoadMJAPIBasicUrlAsync();
|
||||
|
||||
|
||||
// 检查Task状态和返回值
|
||||
// 获取所有超过五分钟没有完成的人物
|
||||
List<MJApiTasks> tasks = await _dbContext.MJApiTasks.Where(t => t.Status != MJTaskStatus.CANCEL && t.Status != MJTaskStatus.SUCCESS && t.Status != MJTaskStatus.FAILURE && t.StartTime < BeijingTimeExtension.GetBeijingTime().AddMinutes(-5)).ToListAsync();
|
||||
@ -54,6 +60,10 @@ namespace LMS.Tools.MJPackage
|
||||
};
|
||||
task.EndTime = BeijingTimeExtension.GetBeijingTime();
|
||||
task.Properties = JsonConvert.SerializeObject(newProperties);
|
||||
// 尝试释放 当前缓存中的任务
|
||||
_tokenUsageTracker.RemoveTaskCache(task.ThirdPartyTaskId);
|
||||
_logger.LogWarning("任务轮询检查,未请求到对应的MJ数据,释放Token,释放任务" + task.Token);
|
||||
_tokenUsageTracker.ReleaseConcurrencyPermit(task.Token);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -75,6 +85,10 @@ namespace LMS.Tools.MJPackage
|
||||
// 开始修改数据
|
||||
task.EndTime = BeijingTimeExtension.GetBeijingTime();
|
||||
task.Properties = JsonConvert.SerializeObject(properties);
|
||||
_logger.LogInformation("任务轮询检查,已请求到对应的MJ数据,并且状态为成功,失败,取消,释放Token,释放任务" + task.Token);
|
||||
_tokenUsageTracker.ReleaseConcurrencyPermit(task.Token);
|
||||
// 尝试释放 当前缓存中的任务
|
||||
_tokenUsageTracker.RemoveTaskCache(task.ThirdPartyTaskId);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -89,7 +103,7 @@ namespace LMS.Tools.MJPackage
|
||||
catch (Exception ex)
|
||||
{
|
||||
// 报错
|
||||
_logger.LogError(ex, "检查任务 {TaskId} 时发生错误", task.TaskId);
|
||||
_logger.LogError(ex, $"检查任务 {task.Token} 时发生错误,释放Token,释放任务", task.TaskId);
|
||||
task.Status = MJTaskStatus.FAILURE;
|
||||
var newProperties = new
|
||||
{
|
||||
@ -97,6 +111,9 @@ namespace LMS.Tools.MJPackage
|
||||
};
|
||||
task.EndTime = BeijingTimeExtension.GetBeijingTime();
|
||||
task.Properties = JsonConvert.SerializeObject(newProperties);
|
||||
_tokenUsageTracker.ReleaseConcurrencyPermit(task.Token);
|
||||
// 尝试释放 当前缓存中的任务
|
||||
_tokenUsageTracker.RemoveTaskCache(task.ThirdPartyTaskId);
|
||||
// 开始修改数据
|
||||
await _taskConcurrencyManager.UpdateTaskInDatabase(task);
|
||||
}
|
||||
|
||||
27
LMS.Tools/MJPackage/TaskSyncService .cs
Normal file
27
LMS.Tools/MJPackage/TaskSyncService .cs
Normal file
@ -0,0 +1,27 @@
|
||||
using LMS.Common.Extensions;
|
||||
using LMS.DAO;
|
||||
using LMS.Repository.MJPackage;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Quartz;
|
||||
using System.Data;
|
||||
using System.Text;
|
||||
|
||||
namespace LMS.Tools.MJPackage
|
||||
{
|
||||
[DisallowConcurrentExecution]
|
||||
public class TaskSyncService(
|
||||
ILogger<TokenSyncService> logger,
|
||||
ITaskConcurrencyManager taskConcurrencyManager) : IJob
|
||||
{
|
||||
private readonly ILogger<TokenSyncService> _logger = logger;
|
||||
private readonly ITaskConcurrencyManager _taskConcurrencyManager = taskConcurrencyManager;
|
||||
|
||||
public async Task Execute(IJobExecutionContext context)
|
||||
{
|
||||
_logger.LogInformation($"开始 Task 信息 - 同步间隔: 15 秒, (使用EF Core)");
|
||||
await _taskConcurrencyManager.BatchUpdateTaskChaheToDatabaseAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -181,6 +181,42 @@ namespace LMS.Tools.MJPackage
|
||||
return await LoadOriginTokenAsync();
|
||||
}
|
||||
|
||||
public async Task<string> LoadMJAPIBasicUrlAsync()
|
||||
{
|
||||
// 没找到 从数据库中获取
|
||||
Options? oprions = await _dbContext.Options.Where(x => x.Key == "MJAPIBasicUrl").FirstOrDefaultAsync();
|
||||
if (oprions == null)
|
||||
{
|
||||
_logger.LogWarning("未找到配置的MJAPI Basic URL, 使用默认的!");
|
||||
_usageTracker.MJAPIBasicUrl = string.Empty;
|
||||
return _usageTracker.MJAPIBasicUrl;
|
||||
}
|
||||
else
|
||||
{
|
||||
// 处理数据
|
||||
string mjBasicUrl = oprions.GetValueObject<string>() ?? string.Empty;
|
||||
if (string.IsNullOrWhiteSpace(mjBasicUrl))
|
||||
{
|
||||
_logger.LogWarning("未找到配置的MJAPI Basic URL 数据, 使用默认的!");
|
||||
_usageTracker.MJAPIBasicUrl = string.Empty;
|
||||
return _usageTracker.MJAPIBasicUrl;
|
||||
}
|
||||
_usageTracker.MJAPIBasicUrl = mjBasicUrl;
|
||||
return mjBasicUrl;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<string> GetMJAPIBasicUrl()
|
||||
{
|
||||
// 缓存中就有 直接返回
|
||||
if (!string.IsNullOrWhiteSpace(_usageTracker.MJAPIBasicUrl))
|
||||
{
|
||||
return _usageTracker.MJAPIBasicUrl;
|
||||
}
|
||||
// 缓存中没有 从数据库中获取
|
||||
return await LoadMJAPIBasicUrlAsync();
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// 重置Token的使用数据
|
||||
|
||||
@ -9,9 +9,11 @@ namespace LMS.Tools.MJPackage
|
||||
public class TokenUsageTracker
|
||||
{
|
||||
private readonly ConcurrentDictionary<string, TokenCacheItem> _tokenCache = new();
|
||||
private readonly ConcurrentDictionary<string, MJApiTasks> _taskCache = new();
|
||||
private readonly ConcurrentDictionary<string, Lazy<ConcurrencyController>> _concurrencyControllers = new();
|
||||
private readonly ReaderWriterLockSlim _cacheLock = new(LockRecursionPolicy.SupportsRecursion);
|
||||
private string _originToken = string.Empty;
|
||||
private string _mjAPIBasicUrl = string.Empty;
|
||||
|
||||
private readonly ILogger<TokenUsageTracker> _logger;
|
||||
|
||||
@ -500,5 +502,111 @@ namespace LMS.Tools.MJPackage
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public string MJAPIBasicUrl
|
||||
{
|
||||
get
|
||||
{
|
||||
if (_mjAPIBasicUrl.EndsWith('/'))
|
||||
{
|
||||
// 删除最后一个 /
|
||||
return _mjAPIBasicUrl = _mjAPIBasicUrl.TrimEnd('/');
|
||||
}
|
||||
else
|
||||
{
|
||||
return _mjAPIBasicUrl.Trim();
|
||||
}
|
||||
}
|
||||
set
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(value))
|
||||
{
|
||||
_mjAPIBasicUrl = value;
|
||||
}
|
||||
else
|
||||
{
|
||||
// 设置初始值
|
||||
_mjAPIBasicUrl = "https://laitool.net";
|
||||
// 如果尝试设置为空值,记录警告日志,可能请求原始的请求不可用
|
||||
_logger.LogWarning("尝试设置OriginToken为空值,可能请求原始的请求不可用!!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public MJApiTasks? TryGetTaskCache(string thirdPartyId)
|
||||
{
|
||||
_cacheLock.EnterReadLock();
|
||||
try
|
||||
{
|
||||
if (_taskCache.TryGetValue(thirdPartyId, out var task))
|
||||
{
|
||||
return task;
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogWarning($"未找到任务缓存: {thirdPartyId}");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_cacheLock.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
public List<MJApiTasks> GetAllTaskCaches()
|
||||
{
|
||||
_cacheLock.EnterReadLock();
|
||||
try
|
||||
{
|
||||
return _taskCache.Values.ToList() ?? [];
|
||||
}
|
||||
finally
|
||||
{
|
||||
_cacheLock.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
public bool AddOrUpdateTaskCache(MJApiTasks task)
|
||||
{
|
||||
_cacheLock.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (task == null || string.IsNullOrWhiteSpace(task.TaskId) || string.IsNullOrWhiteSpace(task.ThirdPartyTaskId))
|
||||
{
|
||||
_logger.LogWarning("尝试添加或更新任务缓存时,任务、任务ID或者第三方任务ID为空");
|
||||
return false;
|
||||
}
|
||||
_taskCache[task.ThirdPartyTaskId] = task;
|
||||
_logger.LogDebug($"任务缓存已增加或更新: {task.TaskId}, 状态: {task.Status}, 第三方任务ID: {task.ThirdPartyTaskId}");
|
||||
return true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_cacheLock.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
public bool RemoveTaskCache(string thirdPartyId)
|
||||
{
|
||||
_cacheLock.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (_taskCache.TryRemove(thirdPartyId, out var removedTask))
|
||||
{
|
||||
_logger.LogDebug($"任务缓存已移除: {removedTask.TaskId}, 状态: {removedTask.Status}, 第三方任务ID: {removedTask.ThirdPartyTaskId}");
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogWarning($"尝试移除但未找到的第三方任务的缓存: {removedTask}");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_cacheLock.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,6 +20,8 @@ public static class QuartzTaskSchedulerConfig
|
||||
// 每30秒任务配置
|
||||
ConfigureThirtySecondTask(q, chinaTimeZone);
|
||||
|
||||
ConfigureFiftySecondTask(q, chinaTimeZone);
|
||||
|
||||
ConfigureFiveMinuteTask(q, chinaTimeZone);
|
||||
});
|
||||
|
||||
@ -33,6 +35,7 @@ public static class QuartzTaskSchedulerConfig
|
||||
services.AddTransient<TokenResetService>();
|
||||
services.AddTransient<TokenSyncService>();
|
||||
services.AddTransient<TaskStatusCheckService>();
|
||||
services.AddTransient<TaskSyncService>();
|
||||
}
|
||||
|
||||
private static TimeZoneInfo GetChinaTimeZone()
|
||||
@ -88,6 +91,16 @@ public static class QuartzTaskSchedulerConfig
|
||||
.WithCronSchedule("*/30 * * * * ?", x => x.InTimeZone(timeZone))); // 每30秒执行一次
|
||||
}
|
||||
|
||||
private static void ConfigureFiftySecondTask(IServiceCollectionQuartzConfigurator q, TimeZoneInfo timeZone)
|
||||
{
|
||||
var jobKey = new JobKey("FiftySecondTask", "DefaultGroup");
|
||||
q.AddJob<TaskSyncService>(opts => opts.WithIdentity(jobKey));
|
||||
q.AddTrigger(opts => opts
|
||||
.ForJob(jobKey)
|
||||
.WithIdentity("FiftySecondTaskTrigger", "DefaultGroup")
|
||||
.WithCronSchedule("*/15 * * * * ?", x => x.InTimeZone(timeZone))); // 每30秒执行一次
|
||||
}
|
||||
|
||||
private static void ConfigureFiveMinuteTask(IServiceCollectionQuartzConfigurator q, TimeZoneInfo timeZone)
|
||||
{
|
||||
var jobKey = new JobKey("FiveMinuteTask", "DefaultGroup");
|
||||
|
||||
@ -12,12 +12,13 @@ namespace LMS.service.Controllers
|
||||
{
|
||||
[Route("api/[controller]")]
|
||||
[ApiController]
|
||||
public class MJPackageController(TokenUsageTracker usageTracker, ITaskConcurrencyManager taskConcurrencyManager, ILogger<MJPackageController> logger, IMJPackageService mJPackageService) : ControllerBase
|
||||
public class MJPackageController(TokenUsageTracker usageTracker, ITaskConcurrencyManager taskConcurrencyManager, ILogger<MJPackageController> logger, IMJPackageService mJPackageService, ITokenService tokenService) : ControllerBase
|
||||
{
|
||||
private readonly TokenUsageTracker _usageTracker = usageTracker;
|
||||
private readonly ILogger<MJPackageController> _logger = logger;
|
||||
private readonly ITaskConcurrencyManager _taskConcurrencyManager = taskConcurrencyManager;
|
||||
private readonly IMJPackageService _mJPackageService = mJPackageService;
|
||||
private readonly ITokenService _tokenService = tokenService;
|
||||
|
||||
[HttpPost("mj/submit/imagine")]
|
||||
[RateLimit]
|
||||
@ -42,7 +43,10 @@ namespace LMS.service.Controllers
|
||||
string body = JsonConvert.SerializeObject(model);
|
||||
|
||||
client.Timeout = Timeout.InfiniteTimeSpan;
|
||||
string mjUrl = "https://api.laitool.cc/mj/submit/imagine";
|
||||
|
||||
string mjAPIBasicUrl = await _tokenService.GetMJAPIBasicUrl();
|
||||
|
||||
string mjUrl = $"{mjAPIBasicUrl}/mj/submit/imagine";
|
||||
var response = await client.PostAsync(mjUrl, new StringContent(body, Encoding.UTF8, "application/json"));
|
||||
// 读取响应内容
|
||||
string content = await response.Content.ReadAsStringAsync();
|
||||
|
||||
@ -150,11 +150,9 @@ namespace LMS.service.Extensions.Attributes
|
||||
// 在异常情况下也要释放并发许可
|
||||
if (_concurrencyAcquired)
|
||||
{
|
||||
|
||||
usageTracker.ReleaseConcurrencyPermit(_token);
|
||||
|
||||
}
|
||||
logger.LogError(ex, $"处理Token请求时发生错误: {_token},已释放Token许可!");
|
||||
logger.LogError(ex, $"处理Token请求时发生错误: {_token},是否有并发 {_concurrencyAcquired},已释放Token许可!");
|
||||
context.Result = new ObjectResult("Internal server error")
|
||||
{
|
||||
StatusCode = StatusCodes.Status500InternalServerError
|
||||
|
||||
@ -6,7 +6,6 @@ using Microsoft.AspNetCore.Mvc;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using System.Net.Sockets;
|
||||
using System.Text.Json;
|
||||
using static Betalgo.Ranul.OpenAI.ObjectModels.StaticValues.AssistantsStatics.MessageStatics;
|
||||
|
||||
namespace LMS.service.Service.MJPackage
|
||||
{
|
||||
@ -94,6 +93,7 @@ namespace LMS.service.Service.MJPackage
|
||||
{
|
||||
TaskId = mjTask.TaskId,
|
||||
Token = mjTask.Token,
|
||||
TokenId = mjTask.TokenId,
|
||||
Status = status,
|
||||
StartTime = mjTask.StartTime,
|
||||
EndTime = null,
|
||||
@ -106,20 +106,36 @@ namespace LMS.service.Service.MJPackage
|
||||
// 当前任务已经被释放过了
|
||||
// 开始修改数据
|
||||
mJApiTasks.EndTime = BeijingTimeExtension.GetBeijingTime();
|
||||
// 不直接修改数据库了 改为修改缓存
|
||||
bool modifySatus = _usageTracker.AddOrUpdateTaskCache(mJApiTasks);
|
||||
if (modifySatus == false)
|
||||
{
|
||||
// 缓存修改失败,可能是因为任务不存在或状态不匹配,尝试修改数据库
|
||||
await _taskConcurrencyManager.UpdateTaskInDatabase(mJApiTasks);
|
||||
|
||||
}
|
||||
return new OkObjectResult(null);
|
||||
}
|
||||
|
||||
if (status == MJTaskStatus.SUCCESS || status == MJTaskStatus.FAILURE || status == MJTaskStatus.CANCEL)
|
||||
{
|
||||
mJApiTasks.EndTime = BeijingTimeExtension.GetBeijingTime();
|
||||
// 任务状态为成功、失败或取消,释放Token
|
||||
_logger.LogInformation("MJNotifyHook 回调: 任务状态为成功、失败或取消,释放Token " + mjTask.Token);
|
||||
// 开始修改数据,然后在释放
|
||||
_usageTracker.ReleaseConcurrencyPermit(mjTask.Token);
|
||||
}
|
||||
|
||||
else
|
||||
{
|
||||
// 只修改状态,不释放Token
|
||||
mJApiTasks.EndTime = null; // 处理中没有结束时间
|
||||
}
|
||||
// 开始修改数据
|
||||
bool updateStatus = _usageTracker.AddOrUpdateTaskCache(mJApiTasks);
|
||||
if (updateStatus == false)
|
||||
{
|
||||
// 缓存修改失败,可能是因为任务不存在或状态不匹配,尝试修改数据库
|
||||
await _taskConcurrencyManager.UpdateTaskInDatabase(mJApiTasks);
|
||||
|
||||
}
|
||||
return new OkObjectResult(null);
|
||||
}
|
||||
catch (Exception ex)
|
||||
@ -146,7 +162,7 @@ namespace LMS.service.Service.MJPackage
|
||||
|
||||
private async Task<ActionResult<object>?> TryOriginApiAsync(string id)
|
||||
{
|
||||
const string originUrl = "https://mjapi.bzu.cn/mj/task/{0}/fetch";
|
||||
string originUrl = $"https://mjapi.bzu.cn/mj/task/{id}/fetch";
|
||||
|
||||
// 判断 原始token 不存在 直接 返回空
|
||||
string orginToken = await _tokenService.GetOriginToken();
|
||||
@ -188,12 +204,12 @@ namespace LMS.service.Service.MJPackage
|
||||
|
||||
private async Task<ActionResult<object>?> TryBackupApiAsync(string id, string useToken)
|
||||
{
|
||||
const string backupUrlTemplate = "https://api.laitool.cc/mj/task/{0}/fetch";
|
||||
string mjAPIBasicUrl = await _tokenService.GetMJAPIBasicUrl();
|
||||
string backupUrl = $"{mjAPIBasicUrl}/mj/task/{id}/fetch";
|
||||
const int maxRetries = 3;
|
||||
const int baseDelayMs = 1000;
|
||||
|
||||
using var client = CreateHttpClient($"Bearer sk-{useToken}", true);
|
||||
var backupUrl = string.Format(backupUrlTemplate, id);
|
||||
|
||||
for (int attempt = 1; attempt <= maxRetries; attempt++)
|
||||
{
|
||||
|
||||
19
SQL/v1.1.3/FileUploads.sql
Normal file
19
SQL/v1.1.3/FileUploads.sql
Normal file
@ -0,0 +1,19 @@
|
||||
-- 文件上传记录表
|
||||
CREATE TABLE FileUploads (
|
||||
Id BIGINT AUTO_INCREMENT PRIMARY KEY,
|
||||
UserId BIGINT NOT NULL, -- 用户ID
|
||||
FileName VARCHAR(255) NOT NULL, -- 原始文件名
|
||||
FileKey VARCHAR(500) NOT NULL, -- 七牛云存储key
|
||||
FileSize BIGINT NOT NULL, -- 文件大小
|
||||
ContentType VARCHAR(100) NOT NULL, -- 内容类型
|
||||
Hash VARCHAR(100) NOT NULL, -- 文件哈希值
|
||||
QiniuUrl VARCHAR(1000) NOT NULL, -- 七牛云访问URL
|
||||
UploadTime DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
Status VARCHAR(20) NOT NULL DEFAULT 'active',
|
||||
CreatedAt DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
||||
|
||||
-- 创建索引
|
||||
CREATE INDEX IX_FileUploads_UserId ON FileUploads(UserId);
|
||||
CREATE INDEX IX_FileUploads_FileKey ON FileUploads(FileKey);
|
||||
CREATE INDEX IX_FileUploads_UploadTime ON FileUploads(UploadTime);
|
||||
Loading…
x
Reference in New Issue
Block a user