LMS.service/LMS.Tools/MJPackage/TokenUsageTracker.cs

683 lines
24 KiB
C#
Raw Permalink Normal View History

using LMS.Common.Extensions;
using LMS.Repository.DB;
using LMS.Repository.MJPackage;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
namespace LMS.Tools.MJPackage
{
public class TokenUsageTracker
{
private readonly ConcurrentDictionary<string, TokenCacheItem> _tokenCache = new();
private readonly ConcurrentDictionary<string, MJApiTasks> _taskCache = new();
private readonly ConcurrentDictionary<string, Lazy<ConcurrencyController>> _concurrencyControllers = new();
private readonly ReaderWriterLockSlim _cacheLock = new(LockRecursionPolicy.SupportsRecursion);
private string _originToken = string.Empty;
private string _mjAPIBasicUrl = string.Empty;
private readonly ILogger<TokenUsageTracker> _logger;
public TokenUsageTracker(ILogger<TokenUsageTracker> logger)
{
_logger = logger;
_logger.LogInformation("TokenUsageTracker服务已初始化");
}
/// <summary>
/// 并发控制器 - 支持平滑调整并发限制
/// </summary>
private class ConcurrencyController
{
private SemaphoreSlim _semaphore;
private int _maxCount;
private int _currentlyExecuting;
private readonly object _lock = new object();
private readonly ILogger _logger;
public ConcurrencyController(int initialLimit, ILogger logger)
{
_maxCount = initialLimit;
_semaphore = new SemaphoreSlim(initialLimit, initialLimit);
_currentlyExecuting = 0;
_logger = logger;
}
/// <summary>
/// 获取当前最大并发数
/// </summary>
public int MaxCount => _maxCount;
/// <summary>
/// 获取当前正在执行的任务数
/// </summary>
public int CurrentlyExecuting => _currentlyExecuting;
/// <summary>
/// 获取当前可用的并发槽位
/// </summary>
public int AvailableCount => _semaphore.CurrentCount;
/// <summary>
/// 等待获取执行许可
/// </summary>
public async Task<bool> WaitAsync(string token)
{
var acquired = await _semaphore.WaitAsync(0);
if (acquired)
{
lock (_lock)
{
_currentlyExecuting++;
}
_logger.LogInformation($"Token获取并发许可: {token}, 当前执行中: {_currentlyExecuting}/{_maxCount}");
}
return acquired;
}
/// <summary>
/// 释放执行许可
/// </summary>
public void Release(string token)
{
lock (_lock)
{
if (_currentlyExecuting > 0)
{
_currentlyExecuting--;
_semaphore.Release();
_logger.LogInformation($"Token释放并发许可: {token}, 当前执行中: {_currentlyExecuting}/{_maxCount}");
}
else
{
_logger.LogWarning($"Token释放并发许可: {token}, 尝试释放许可但当前执行数已为0: {token}");
}
}
}
/// <summary>
/// 平滑调整并发限制
/// </summary>
public bool AdjustLimitAsync(int newLimit, string token)
{
if (newLimit <= 0)
{
throw new ArgumentException("并发限制必须大于0", nameof(newLimit));
}
lock (_lock)
{
if (_maxCount == newLimit)
{
return false; // 无需调整
}
var oldLimit = _maxCount;
_maxCount = newLimit;
if (newLimit > oldLimit)
{
// 扩大并发限制:释放额外的许可
var additionalPermits = newLimit - oldLimit;
for (int i = 0; i < additionalPermits; i++)
{
_semaphore.Release();
}
_logger.LogInformation($"Token并发限制已扩大: {token}, {oldLimit} -> {newLimit}, 当前执行: {_currentlyExecuting}");
}
else
{
// 缩小并发限制:等待现有任务完成
var excessExecuting = _currentlyExecuting - newLimit;
if (excessExecuting > 0)
{
_logger.LogWarning($"Token并发限制缩小但有超额任务: {token}, {oldLimit} -> {newLimit}, 超额: {excessExecuting}, 将等待任务自然完成");
}
else
{
_logger.LogInformation($"Token并发限制已缩小: {token}, {oldLimit} -> {newLimit}, 当前执行: {_currentlyExecuting}");
}
}
return true;
}
}
/// <summary>
/// 销毁资源
/// </summary>
public void Dispose()
{
_semaphore?.Dispose();
}
}
/// <summary>
/// 尝试从缓存中获取Token
/// </summary>
public bool TryGetToken(string token, out TokenCacheItem cacheItem)
{
var found = _tokenCache.TryGetValue(token, out cacheItem);
if (found)
{
_logger.LogDebug($"从缓存中找到Token: {token}");
}
return found;
}
/// <summary>
/// 添加或更新Token到缓存支持平滑并发限制调整
/// </summary>
public void AddOrUpdateTokenAsync(TokenCacheItem tokenItem)
{
_cacheLock.EnterWriteLock();
try
{
_tokenCache[tokenItem.Token] = tokenItem;
// 获取或创建并发控制器
var lazyController = _concurrencyControllers.GetOrAdd(
tokenItem.Token,
_ => new Lazy<ConcurrencyController>(() =>
new ConcurrencyController(tokenItem.ConcurrencyLimit, _logger)));
var controller = lazyController.Value;
// 平滑调整并发限制
var adjusted = controller.AdjustLimitAsync(tokenItem.ConcurrencyLimit, tokenItem.Token);
if (adjusted)
{
_logger.LogInformation($"Token并发限制已调整: {tokenItem.Token}, 新限制: {tokenItem.ConcurrencyLimit}");
}
_logger.LogInformation($"Token已添加到缓存: {tokenItem.Token}, 日限制: {tokenItem.DailyLimit}, 总限制: {tokenItem.TotalLimit}, 并发限制: {tokenItem.ConcurrencyLimit}");
}
finally
{
_cacheLock.ExitWriteLock();
}
}
/// <summary>
/// 同步版本(保持向后兼容)
/// </summary>
public void AddOrUpdateToken(TokenCacheItem tokenItem)
{
// 使用异步版本,但同步等待
AddOrUpdateTokenAsync(tokenItem);
}
/// <summary>
/// 增加Token使用量
/// </summary>
public void IncrementUsage(string token)
{
_cacheLock.EnterWriteLock();
try
{
if (_tokenCache.TryGetValue(token, out var cacheItem))
{
int beforeDaily = cacheItem.DailyUsage;
int beforeTotal = cacheItem.TotalUsage;
cacheItem.DailyUsage++;
cacheItem.TotalUsage++;
cacheItem.LastActivityTime = BeijingTimeExtension.GetBeijingTime();
_logger.LogInformation($"Token使用量已更新: {token}, 今日使用: {beforeDaily} → {cacheItem.DailyUsage}, 总使用: {beforeTotal} → {cacheItem.TotalUsage}");
}
else
{
_logger.LogWarning($"尝试更新未缓存的Token使用量: {token}");
}
}
finally
{
_cacheLock.ExitWriteLock();
}
}
/// <summary>
/// 减少使用量 请求失败的时候 或者成功但是没有添加成功的时候
/// </summary>
/// <param name="token"></param>
public void DecrementUsage(string token)
{
_cacheLock.EnterWriteLock();
try
{
if (_tokenCache.TryGetValue(token, out var cacheItem))
{
int beforeDaily = cacheItem.DailyUsage;
int beforeTotal = cacheItem.TotalUsage;
// 确保不会减少到负数
if (cacheItem.DailyUsage > 0)
cacheItem.DailyUsage--;
if (cacheItem.TotalUsage > 0)
cacheItem.TotalUsage--;
cacheItem.LastActivityTime = BeijingTimeExtension.GetBeijingTime();
_logger.LogInformation($"Token使用量已减少: {token}, 今日使用: {beforeDaily} → {cacheItem.DailyUsage}, 总使用: {beforeTotal} → {cacheItem.TotalUsage}");
}
else
{
_logger.LogWarning($"尝试减少未缓存的Token使用量: {token}");
}
}
finally
{
_cacheLock.ExitWriteLock();
}
}
/// <summary>
/// 原子性检查并增加Token使用量仅当未超出限制时增加
/// </summary>
public bool CheckAndIncrementUsage(string token, int dailyLimit)
{
_cacheLock.EnterWriteLock(); // 直接使用写锁确保整个操作的原子性
try
{
if (_tokenCache.TryGetValue(token, out var cacheItem))
{
// 在同一个锁内检查和增加(原子操作)
if (dailyLimit > 0 && cacheItem.DailyUsage >= dailyLimit)
{
_logger.LogWarning($"Token日限制已达上限拒绝请求: {token}, 当前: {cacheItem.DailyUsage}, 限制: {dailyLimit}");
return false; // 已达上限,拒绝增加
}
int beforeDaily = cacheItem.DailyUsage;
int beforeTotal = cacheItem.TotalUsage;
// 未达上限,增加计数
cacheItem.DailyUsage++;
cacheItem.TotalUsage++;
cacheItem.LastActivityTime = BeijingTimeExtension.GetBeijingTime();
_logger.LogInformation($"Token使用量已原子性更新: {token}, 今日使用: {beforeDaily} → {cacheItem.DailyUsage}, 总使用: {beforeTotal} → {cacheItem.TotalUsage}");
return true; // 计数成功增加
}
else
{
_logger.LogWarning($"尝试更新未缓存的Token使用量: {token}");
return false; // Token不在缓存中
}
}
finally
{
_cacheLock.ExitWriteLock();
}
}
/// <summary>
/// 获取Token的并发控制器
/// </summary>
public async Task<bool> WaitForConcurrencyPermitAsync(string token)
{
if (_concurrencyControllers.TryGetValue(token, out var controller))
{
return await controller.Value.WaitAsync(token);
}
_logger.LogWarning($"未找到Token的并发控制器: {token}");
return false;
}
/// <summary>
/// 释放Token的并发许可
/// </summary>
public void ReleaseConcurrencyPermit(string token)
{
if (_concurrencyControllers.TryGetValue(token, out var controller))
{
controller.Value.Release(token);
}
else
{
_logger.LogWarning($"未找到Token的并发控制器无法释放: {token}");
}
}
/// <summary>
/// 获取Token的并发状态信息
/// </summary>
public (int maxCount, int currentlyExecuting, int available) GetConcurrencyStatus(string token)
{
_cacheLock.EnterReadLock();
try
{
if (_concurrencyControllers.TryGetValue(token, out var controller))
{
return (controller.Value.MaxCount, controller.Value.CurrentlyExecuting, controller.Value.AvailableCount);
}
return (0, 0, 0);
}
finally
{
_cacheLock.ExitReadLock();
}
}
/// <summary>
/// 获取活跃Token列表
/// </summary>
public List<TokenCacheItem> GetActiveTokens(TimeSpan activityThreshold)
{
_cacheLock.EnterReadLock();
try
{
var cutoffTime = BeijingTimeExtension.GetBeijingTime() - activityThreshold;
var activeTokens = _tokenCache.Values
.Where(t => t.LastActivityTime > cutoffTime)
.ToList();
_logger.LogDebug($"找到 {activeTokens.Count} 个活跃Token (阈值: {activityThreshold.TotalMinutes} 分钟)");
return activeTokens;
}
finally
{
_cacheLock.ExitReadLock();
}
}
/// <summary>
/// 移除不活跃的Token
/// </summary>
/// <param name="activityThreshold">活跃时间阈值</param>
/// <returns>移除的Token数量</returns>
public (int activateTokenCount, int notActivateTokenCount) RemoveNotActiveTokens(TimeSpan activityThreshold)
{
_cacheLock.EnterWriteLock();
try
{
var cutoffTime = BeijingTimeExtension.GetBeijingTime() - activityThreshold;
var initialCount = _tokenCache.Count;
// 找出需要移除的不活跃Token
var tokensToRemove = _tokenCache
.Where(kvp => kvp.Value.LastActivityTime <= cutoffTime)
.Select(kvp => kvp.Key)
.ToList();
if (tokensToRemove.Count == 0)
{
_logger.LogDebug($"没有找到需要移除的不活跃Token (阈值: {activityThreshold.TotalMinutes} 分钟)");
return (initialCount, 0);
}
// 移除不活跃的Token缓存
var removedCount = 0;
foreach (var tokenKey in tokensToRemove)
{
if (_tokenCache.TryRemove(tokenKey, out var removedToken))
{
// 同时清理对应的并发控制器
if (_concurrencyControllers.TryRemove(tokenKey, out var controller))
{
// 如果并发控制器已经被创建,需要释放资源
if (controller.IsValueCreated)
{
controller.Value.Dispose();
}
}
removedCount++;
_logger.LogDebug($"移除不活跃Token: {tokenKey}, 最后活跃时间: {removedToken.LastActivityTime:yyyy-MM-dd HH:mm:ss}");
}
}
_logger.LogInformation($"清理不活跃Token完成: 移除 {removedCount} 个Token (阈值: {activityThreshold.TotalMinutes} 分钟), 剩余: {_tokenCache.Count} 个");
return (_tokenCache.Count, removedCount);
}
finally
{
_cacheLock.ExitWriteLock();
}
}
/// <summary>
/// 移除指定的token
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
public int RemoveToken(string token)
{
_cacheLock.EnterWriteLock();
try
{
// 找出需要移除的不活跃Token
var tokensToRemove = _tokenCache
.Where(kvp => kvp.Value.Token == token)
.Select(kvp => kvp.Key)
.ToList();
if (tokensToRemove.Count == 0)
{
// 没有找到
return 0;
}
// 移除不活跃的Token缓存
var removedCount = 0;
foreach (var tokenKey in tokensToRemove)
{
if (_tokenCache.TryRemove(tokenKey, out var removedToken))
{
// 同时清理对应的并发控制器
if (_concurrencyControllers.TryRemove(tokenKey, out var controller))
{
// 如果并发控制器已经被创建,需要释放资源
if (controller.IsValueCreated)
{
controller.Value.Dispose();
}
}
removedCount++;
}
}
return _tokenCache.Count;
}
finally
{
_cacheLock.ExitWriteLock();
}
}
/// <summary>
/// 获取所有Token列表
/// </summary>
public IEnumerable<TokenCacheItem> GetAllTokens()
{
_cacheLock.EnterReadLock();
try
{
return _tokenCache.Values.ToList();
}
finally
{
_cacheLock.ExitReadLock();
}
}
/// <summary>
/// 获取缓存统计信息(包含并发状态)
/// </summary>
public TokenCacheStats GetCacheStats()
{
_cacheLock.EnterReadLock();
try
{
var now = BeijingTimeExtension.GetBeijingTime();
var tokens = _tokenCache.Values.ToList();
var stats = new TokenCacheStats
{
TotalTokens = tokens.Count,
ActiveTokens = tokens.Count(t => t.LastActivityTime > now.AddMinutes(-5)),
InactiveTokens = tokens.Count(t => t.LastActivityTime <= now.AddMinutes(-5)),
TotalDailyUsage = tokens.Sum(t => t.DailyUsage),
TotalUsage = tokens.Sum(t => t.TotalUsage)
};
return stats;
}
finally
{
_cacheLock.ExitReadLock();
}
}
/// <summary>
/// 重置所有Token的日使用量
/// </summary>
public void ResetDailyUsage()
{
_cacheLock.EnterWriteLock();
try
{
// 重置缓存中的使用量
foreach (var token in _tokenCache.Values)
{
token.DailyUsage = 0;
}
}
finally
{
_cacheLock.ExitWriteLock();
}
}
public string OriginToken
{
get => _originToken;
set
{
if (!string.IsNullOrWhiteSpace(value))
{
_originToken = value;
}
else
{
// 如果尝试设置为空值,记录警告日志,可能请求原始的请求不可用
_logger.LogWarning("尝试设置OriginToken为空值可能请求原始的请求不可用");
}
}
}
public string MJAPIBasicUrl
{
get
{
if (_mjAPIBasicUrl.EndsWith('/'))
{
// 删除最后一个 /
return _mjAPIBasicUrl = _mjAPIBasicUrl.TrimEnd('/');
}
else
{
return _mjAPIBasicUrl.Trim();
}
}
set
{
if (!string.IsNullOrWhiteSpace(value))
{
_mjAPIBasicUrl = value;
}
else
{
// 设置初始值
_mjAPIBasicUrl = "https://laitool.net";
// 如果尝试设置为空值,记录警告日志,可能请求原始的请求不可用
_logger.LogWarning("尝试设置OriginToken为空值可能请求原始的请求不可用");
}
}
}
public MJApiTasks? TryGetTaskCache(string thirdPartyId)
{
_cacheLock.EnterReadLock();
try
{
if (_taskCache.TryGetValue(thirdPartyId, out var task))
{
return task;
}
else
{
_logger.LogWarning($"未找到任务缓存: {thirdPartyId}");
return null;
}
}
finally
{
_cacheLock.ExitReadLock();
}
}
public List<MJApiTasks> GetAllTaskCaches()
{
_cacheLock.EnterReadLock();
try
{
return _taskCache.Values.ToList() ?? [];
}
finally
{
_cacheLock.ExitReadLock();
}
}
public bool AddOrUpdateTaskCache(MJApiTasks task)
{
_cacheLock.EnterWriteLock();
try
{
if (task == null || string.IsNullOrWhiteSpace(task.TaskId) || string.IsNullOrWhiteSpace(task.ThirdPartyTaskId))
{
_logger.LogWarning("尝试添加或更新任务缓存时任务、任务ID或者第三方任务ID为空");
return false;
}
_taskCache[task.ThirdPartyTaskId] = task;
_logger.LogDebug($"任务缓存已增加或更新: {task.TaskId}, 状态: {task.Status}, 第三方任务ID: {task.ThirdPartyTaskId}");
return true;
}
finally
{
_cacheLock.ExitWriteLock();
}
}
public bool RemoveTaskCache(string thirdPartyId)
{
_cacheLock.EnterWriteLock();
try
{
if (_taskCache.TryRemove(thirdPartyId, out var removedTask))
{
_logger.LogInformation($"任务缓存已移除: {removedTask.TaskId}, 状态: {removedTask.Status}, 第三方任务ID: {removedTask.ThirdPartyTaskId}");
return true;
}
else
{
_logger.LogWarning($"尝试移除但未找到的第三方任务的缓存: {removedTask}");
return false;
}
}
finally
{
_cacheLock.ExitWriteLock();
}
}
}
}