LMS.service/LMS.Tools/MJPackage/TokenUsageTracker.cs
lq1405 ef46c30ed3 修改返回报错逻辑
请求失败或者请求返回的code 不是 1 22 24 地时候 可以减少日使用量和释放型号量
单独处理 code 为 24 的时候。创建失败请求和只释放不减少日使用量
2025-06-23 23:47:11 +08:00

683 lines
24 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using LMS.Common.Extensions;
using LMS.Repository.DB;
using LMS.Repository.MJPackage;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
namespace LMS.Tools.MJPackage
{
public class TokenUsageTracker
{
private readonly ConcurrentDictionary<string, TokenCacheItem> _tokenCache = new();
private readonly ConcurrentDictionary<string, MJApiTasks> _taskCache = new();
private readonly ConcurrentDictionary<string, Lazy<ConcurrencyController>> _concurrencyControllers = new();
private readonly ReaderWriterLockSlim _cacheLock = new(LockRecursionPolicy.SupportsRecursion);
private string _originToken = string.Empty;
private string _mjAPIBasicUrl = string.Empty;
private readonly ILogger<TokenUsageTracker> _logger;
public TokenUsageTracker(ILogger<TokenUsageTracker> logger)
{
_logger = logger;
_logger.LogInformation("TokenUsageTracker服务已初始化");
}
/// <summary>
/// 并发控制器 - 支持平滑调整并发限制
/// </summary>
private class ConcurrencyController
{
private SemaphoreSlim _semaphore;
private int _maxCount;
private int _currentlyExecuting;
private readonly object _lock = new object();
private readonly ILogger _logger;
public ConcurrencyController(int initialLimit, ILogger logger)
{
_maxCount = initialLimit;
_semaphore = new SemaphoreSlim(initialLimit, initialLimit);
_currentlyExecuting = 0;
_logger = logger;
}
/// <summary>
/// 获取当前最大并发数
/// </summary>
public int MaxCount => _maxCount;
/// <summary>
/// 获取当前正在执行的任务数
/// </summary>
public int CurrentlyExecuting => _currentlyExecuting;
/// <summary>
/// 获取当前可用的并发槽位
/// </summary>
public int AvailableCount => _semaphore.CurrentCount;
/// <summary>
/// 等待获取执行许可
/// </summary>
public async Task<bool> WaitAsync(string token)
{
var acquired = await _semaphore.WaitAsync(0);
if (acquired)
{
lock (_lock)
{
_currentlyExecuting++;
}
_logger.LogInformation($"Token获取并发许可: {token}, 当前执行中: {_currentlyExecuting}/{_maxCount}");
}
return acquired;
}
/// <summary>
/// 释放执行许可
/// </summary>
public void Release(string token)
{
lock (_lock)
{
if (_currentlyExecuting > 0)
{
_currentlyExecuting--;
_semaphore.Release();
_logger.LogInformation($"Token释放并发许可: {token}, 当前执行中: {_currentlyExecuting}/{_maxCount}");
}
else
{
_logger.LogWarning($"Token释放并发许可: {token}, 尝试释放许可但当前执行数已为0: {token}");
}
}
}
/// <summary>
/// 平滑调整并发限制
/// </summary>
public bool AdjustLimitAsync(int newLimit, string token)
{
if (newLimit <= 0)
{
throw new ArgumentException("并发限制必须大于0", nameof(newLimit));
}
lock (_lock)
{
if (_maxCount == newLimit)
{
return false; // 无需调整
}
var oldLimit = _maxCount;
_maxCount = newLimit;
if (newLimit > oldLimit)
{
// 扩大并发限制:释放额外的许可
var additionalPermits = newLimit - oldLimit;
for (int i = 0; i < additionalPermits; i++)
{
_semaphore.Release();
}
_logger.LogInformation($"Token并发限制已扩大: {token}, {oldLimit} -> {newLimit}, 当前执行: {_currentlyExecuting}");
}
else
{
// 缩小并发限制:等待现有任务完成
var excessExecuting = _currentlyExecuting - newLimit;
if (excessExecuting > 0)
{
_logger.LogWarning($"Token并发限制缩小但有超额任务: {token}, {oldLimit} -> {newLimit}, 超额: {excessExecuting}, 将等待任务自然完成");
}
else
{
_logger.LogInformation($"Token并发限制已缩小: {token}, {oldLimit} -> {newLimit}, 当前执行: {_currentlyExecuting}");
}
}
return true;
}
}
/// <summary>
/// 销毁资源
/// </summary>
public void Dispose()
{
_semaphore?.Dispose();
}
}
/// <summary>
/// 尝试从缓存中获取Token
/// </summary>
public bool TryGetToken(string token, out TokenCacheItem cacheItem)
{
var found = _tokenCache.TryGetValue(token, out cacheItem);
if (found)
{
_logger.LogDebug($"从缓存中找到Token: {token}");
}
return found;
}
/// <summary>
/// 添加或更新Token到缓存支持平滑并发限制调整
/// </summary>
public void AddOrUpdateTokenAsync(TokenCacheItem tokenItem)
{
_cacheLock.EnterWriteLock();
try
{
_tokenCache[tokenItem.Token] = tokenItem;
// 获取或创建并发控制器
var lazyController = _concurrencyControllers.GetOrAdd(
tokenItem.Token,
_ => new Lazy<ConcurrencyController>(() =>
new ConcurrencyController(tokenItem.ConcurrencyLimit, _logger)));
var controller = lazyController.Value;
// 平滑调整并发限制
var adjusted = controller.AdjustLimitAsync(tokenItem.ConcurrencyLimit, tokenItem.Token);
if (adjusted)
{
_logger.LogInformation($"Token并发限制已调整: {tokenItem.Token}, 新限制: {tokenItem.ConcurrencyLimit}");
}
_logger.LogInformation($"Token已添加到缓存: {tokenItem.Token}, 日限制: {tokenItem.DailyLimit}, 总限制: {tokenItem.TotalLimit}, 并发限制: {tokenItem.ConcurrencyLimit}");
}
finally
{
_cacheLock.ExitWriteLock();
}
}
/// <summary>
/// 同步版本(保持向后兼容)
/// </summary>
public void AddOrUpdateToken(TokenCacheItem tokenItem)
{
// 使用异步版本,但同步等待
AddOrUpdateTokenAsync(tokenItem);
}
/// <summary>
/// 增加Token使用量
/// </summary>
public void IncrementUsage(string token)
{
_cacheLock.EnterWriteLock();
try
{
if (_tokenCache.TryGetValue(token, out var cacheItem))
{
int beforeDaily = cacheItem.DailyUsage;
int beforeTotal = cacheItem.TotalUsage;
cacheItem.DailyUsage++;
cacheItem.TotalUsage++;
cacheItem.LastActivityTime = BeijingTimeExtension.GetBeijingTime();
_logger.LogInformation($"Token使用量已更新: {token}, 今日使用: {beforeDaily} → {cacheItem.DailyUsage}, 总使用: {beforeTotal} → {cacheItem.TotalUsage}");
}
else
{
_logger.LogWarning($"尝试更新未缓存的Token使用量: {token}");
}
}
finally
{
_cacheLock.ExitWriteLock();
}
}
/// <summary>
/// 减少使用量 请求失败的时候 或者成功但是没有添加成功的时候
/// </summary>
/// <param name="token"></param>
public void DecrementUsage(string token)
{
_cacheLock.EnterWriteLock();
try
{
if (_tokenCache.TryGetValue(token, out var cacheItem))
{
int beforeDaily = cacheItem.DailyUsage;
int beforeTotal = cacheItem.TotalUsage;
// 确保不会减少到负数
if (cacheItem.DailyUsage > 0)
cacheItem.DailyUsage--;
if (cacheItem.TotalUsage > 0)
cacheItem.TotalUsage--;
cacheItem.LastActivityTime = BeijingTimeExtension.GetBeijingTime();
_logger.LogInformation($"Token使用量已减少: {token}, 今日使用: {beforeDaily} → {cacheItem.DailyUsage}, 总使用: {beforeTotal} → {cacheItem.TotalUsage}");
}
else
{
_logger.LogWarning($"尝试减少未缓存的Token使用量: {token}");
}
}
finally
{
_cacheLock.ExitWriteLock();
}
}
/// <summary>
/// 原子性检查并增加Token使用量仅当未超出限制时增加
/// </summary>
public bool CheckAndIncrementUsage(string token, int dailyLimit)
{
_cacheLock.EnterWriteLock(); // 直接使用写锁确保整个操作的原子性
try
{
if (_tokenCache.TryGetValue(token, out var cacheItem))
{
// 在同一个锁内检查和增加(原子操作)
if (dailyLimit > 0 && cacheItem.DailyUsage >= dailyLimit)
{
_logger.LogWarning($"Token日限制已达上限拒绝请求: {token}, 当前: {cacheItem.DailyUsage}, 限制: {dailyLimit}");
return false; // 已达上限,拒绝增加
}
int beforeDaily = cacheItem.DailyUsage;
int beforeTotal = cacheItem.TotalUsage;
// 未达上限,增加计数
cacheItem.DailyUsage++;
cacheItem.TotalUsage++;
cacheItem.LastActivityTime = BeijingTimeExtension.GetBeijingTime();
_logger.LogInformation($"Token使用量已原子性更新: {token}, 今日使用: {beforeDaily} → {cacheItem.DailyUsage}, 总使用: {beforeTotal} → {cacheItem.TotalUsage}");
return true; // 计数成功增加
}
else
{
_logger.LogWarning($"尝试更新未缓存的Token使用量: {token}");
return false; // Token不在缓存中
}
}
finally
{
_cacheLock.ExitWriteLock();
}
}
/// <summary>
/// 获取Token的并发控制器
/// </summary>
public async Task<bool> WaitForConcurrencyPermitAsync(string token)
{
if (_concurrencyControllers.TryGetValue(token, out var controller))
{
return await controller.Value.WaitAsync(token);
}
_logger.LogWarning($"未找到Token的并发控制器: {token}");
return false;
}
/// <summary>
/// 释放Token的并发许可
/// </summary>
public void ReleaseConcurrencyPermit(string token)
{
if (_concurrencyControllers.TryGetValue(token, out var controller))
{
controller.Value.Release(token);
}
else
{
_logger.LogWarning($"未找到Token的并发控制器无法释放: {token}");
}
}
/// <summary>
/// 获取Token的并发状态信息
/// </summary>
public (int maxCount, int currentlyExecuting, int available) GetConcurrencyStatus(string token)
{
_cacheLock.EnterReadLock();
try
{
if (_concurrencyControllers.TryGetValue(token, out var controller))
{
return (controller.Value.MaxCount, controller.Value.CurrentlyExecuting, controller.Value.AvailableCount);
}
return (0, 0, 0);
}
finally
{
_cacheLock.ExitReadLock();
}
}
/// <summary>
/// 获取活跃Token列表
/// </summary>
public List<TokenCacheItem> GetActiveTokens(TimeSpan activityThreshold)
{
_cacheLock.EnterReadLock();
try
{
var cutoffTime = BeijingTimeExtension.GetBeijingTime() - activityThreshold;
var activeTokens = _tokenCache.Values
.Where(t => t.LastActivityTime > cutoffTime)
.ToList();
_logger.LogDebug($"找到 {activeTokens.Count} 个活跃Token (阈值: {activityThreshold.TotalMinutes} 分钟)");
return activeTokens;
}
finally
{
_cacheLock.ExitReadLock();
}
}
/// <summary>
/// 移除不活跃的Token
/// </summary>
/// <param name="activityThreshold">活跃时间阈值</param>
/// <returns>移除的Token数量</returns>
public (int activateTokenCount, int notActivateTokenCount) RemoveNotActiveTokens(TimeSpan activityThreshold)
{
_cacheLock.EnterWriteLock();
try
{
var cutoffTime = BeijingTimeExtension.GetBeijingTime() - activityThreshold;
var initialCount = _tokenCache.Count;
// 找出需要移除的不活跃Token
var tokensToRemove = _tokenCache
.Where(kvp => kvp.Value.LastActivityTime <= cutoffTime)
.Select(kvp => kvp.Key)
.ToList();
if (tokensToRemove.Count == 0)
{
_logger.LogDebug($"没有找到需要移除的不活跃Token (阈值: {activityThreshold.TotalMinutes} 分钟)");
return (initialCount, 0);
}
// 移除不活跃的Token缓存
var removedCount = 0;
foreach (var tokenKey in tokensToRemove)
{
if (_tokenCache.TryRemove(tokenKey, out var removedToken))
{
// 同时清理对应的并发控制器
if (_concurrencyControllers.TryRemove(tokenKey, out var controller))
{
// 如果并发控制器已经被创建,需要释放资源
if (controller.IsValueCreated)
{
controller.Value.Dispose();
}
}
removedCount++;
_logger.LogDebug($"移除不活跃Token: {tokenKey}, 最后活跃时间: {removedToken.LastActivityTime:yyyy-MM-dd HH:mm:ss}");
}
}
_logger.LogInformation($"清理不活跃Token完成: 移除 {removedCount} 个Token (阈值: {activityThreshold.TotalMinutes} 分钟), 剩余: {_tokenCache.Count} 个");
return (_tokenCache.Count, removedCount);
}
finally
{
_cacheLock.ExitWriteLock();
}
}
/// <summary>
/// 移除指定的token
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
public int RemoveToken(string token)
{
_cacheLock.EnterWriteLock();
try
{
// 找出需要移除的不活跃Token
var tokensToRemove = _tokenCache
.Where(kvp => kvp.Value.Token == token)
.Select(kvp => kvp.Key)
.ToList();
if (tokensToRemove.Count == 0)
{
// 没有找到
return 0;
}
// 移除不活跃的Token缓存
var removedCount = 0;
foreach (var tokenKey in tokensToRemove)
{
if (_tokenCache.TryRemove(tokenKey, out var removedToken))
{
// 同时清理对应的并发控制器
if (_concurrencyControllers.TryRemove(tokenKey, out var controller))
{
// 如果并发控制器已经被创建,需要释放资源
if (controller.IsValueCreated)
{
controller.Value.Dispose();
}
}
removedCount++;
}
}
return _tokenCache.Count;
}
finally
{
_cacheLock.ExitWriteLock();
}
}
/// <summary>
/// 获取所有Token列表
/// </summary>
public IEnumerable<TokenCacheItem> GetAllTokens()
{
_cacheLock.EnterReadLock();
try
{
return _tokenCache.Values.ToList();
}
finally
{
_cacheLock.ExitReadLock();
}
}
/// <summary>
/// 获取缓存统计信息(包含并发状态)
/// </summary>
public TokenCacheStats GetCacheStats()
{
_cacheLock.EnterReadLock();
try
{
var now = BeijingTimeExtension.GetBeijingTime();
var tokens = _tokenCache.Values.ToList();
var stats = new TokenCacheStats
{
TotalTokens = tokens.Count,
ActiveTokens = tokens.Count(t => t.LastActivityTime > now.AddMinutes(-5)),
InactiveTokens = tokens.Count(t => t.LastActivityTime <= now.AddMinutes(-5)),
TotalDailyUsage = tokens.Sum(t => t.DailyUsage),
TotalUsage = tokens.Sum(t => t.TotalUsage)
};
return stats;
}
finally
{
_cacheLock.ExitReadLock();
}
}
/// <summary>
/// 重置所有Token的日使用量
/// </summary>
public void ResetDailyUsage()
{
_cacheLock.EnterWriteLock();
try
{
// 重置缓存中的使用量
foreach (var token in _tokenCache.Values)
{
token.DailyUsage = 0;
}
}
finally
{
_cacheLock.ExitWriteLock();
}
}
public string OriginToken
{
get => _originToken;
set
{
if (!string.IsNullOrWhiteSpace(value))
{
_originToken = value;
}
else
{
// 如果尝试设置为空值,记录警告日志,可能请求原始的请求不可用
_logger.LogWarning("尝试设置OriginToken为空值可能请求原始的请求不可用");
}
}
}
public string MJAPIBasicUrl
{
get
{
if (_mjAPIBasicUrl.EndsWith('/'))
{
// 删除最后一个 /
return _mjAPIBasicUrl = _mjAPIBasicUrl.TrimEnd('/');
}
else
{
return _mjAPIBasicUrl.Trim();
}
}
set
{
if (!string.IsNullOrWhiteSpace(value))
{
_mjAPIBasicUrl = value;
}
else
{
// 设置初始值
_mjAPIBasicUrl = "https://laitool.net";
// 如果尝试设置为空值,记录警告日志,可能请求原始的请求不可用
_logger.LogWarning("尝试设置OriginToken为空值可能请求原始的请求不可用");
}
}
}
public MJApiTasks? TryGetTaskCache(string thirdPartyId)
{
_cacheLock.EnterReadLock();
try
{
if (_taskCache.TryGetValue(thirdPartyId, out var task))
{
return task;
}
else
{
_logger.LogWarning($"未找到任务缓存: {thirdPartyId}");
return null;
}
}
finally
{
_cacheLock.ExitReadLock();
}
}
public List<MJApiTasks> GetAllTaskCaches()
{
_cacheLock.EnterReadLock();
try
{
return _taskCache.Values.ToList() ?? [];
}
finally
{
_cacheLock.ExitReadLock();
}
}
public bool AddOrUpdateTaskCache(MJApiTasks task)
{
_cacheLock.EnterWriteLock();
try
{
if (task == null || string.IsNullOrWhiteSpace(task.TaskId) || string.IsNullOrWhiteSpace(task.ThirdPartyTaskId))
{
_logger.LogWarning("尝试添加或更新任务缓存时任务、任务ID或者第三方任务ID为空");
return false;
}
_taskCache[task.ThirdPartyTaskId] = task;
_logger.LogDebug($"任务缓存已增加或更新: {task.TaskId}, 状态: {task.Status}, 第三方任务ID: {task.ThirdPartyTaskId}");
return true;
}
finally
{
_cacheLock.ExitWriteLock();
}
}
public bool RemoveTaskCache(string thirdPartyId)
{
_cacheLock.EnterWriteLock();
try
{
if (_taskCache.TryRemove(thirdPartyId, out var removedTask))
{
_logger.LogInformation($"任务缓存已移除: {removedTask.TaskId}, 状态: {removedTask.Status}, 第三方任务ID: {removedTask.ThirdPartyTaskId}");
return true;
}
else
{
_logger.LogWarning($"尝试移除但未找到的第三方任务的缓存: {removedTask}");
return false;
}
}
finally
{
_cacheLock.ExitWriteLock();
}
}
}
}