前言
限流是应对流量暴增或某些用户恶意攻击等场景的重要手段之一,然而微软官方从未支持这一重要特性,AspNetCoreRateLimit
这一第三方库限流库一般作为首选使用,然而其配置参数过于繁多,对使用者造成较大的学习成本。令人高兴的是,在刚刚发布的.NET 7 Preview 4
中开始支持限流
中间件。
UseRateLimiter尝鲜
- 安装
.NET 7.0 SDK(v7.0.100-preview.4)
- 通过nuget包安装
Microsoft.AspNetCore.RateLimiting
- 创建.Net7网站应用,注册中间件
全局限流并发1个
app.UseRateLimiter(new RateLimiterOptions { Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource => { return RateLimitPartition.CreateConcurrencyLimiter("MyLimiter", _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); }) });
根据不同资源不同限制并发数,/api
前缀的资源租约数2,等待队列长度为2,其他默认租约数1,队列长度1。
app.UseRateLimiter(new RateLimiterOptions() { // 触发限流的响应码 DefaultRejectionStatusCode = 500, OnRejected = async (ctx, rateLimitLease) => { // 触发限流回调处理 }, Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource => { if (resource.Request.Path.StartsWithSegments("/api")) { return RateLimitPartition.CreateConcurrencyLimiter("WebApiLimiter", _ => new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2)); } else { return RateLimitPartition.CreateConcurrencyLimiter("DefaultLimiter", _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); } }) });
本地测试
- 新建一个webapi项目,并注册限流中间件如下
using Microsoft.AspNetCore.RateLimiting; using System.Threading.RateLimiting; var builder = WebApplication.CreateBuilder(args); // Add services to the container. builder.Services.AddControllers(); // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen(); var app = builder.Build(); // Configure the HTTP request pipeline. if (app.Environment.IsDevelopment()) { app.UseSwagger(); app.UseSwaggerUI(); } app.UseRateLimiter(new RateLimiterOptions { DefaultRejectionStatusCode = 500, OnRejected = async (ctx, lease) => { await Task.FromResult(ctx.Response.WriteAsync("ConcurrencyLimiter")); }, Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource => { return RateLimitPartition.CreateConcurrencyLimiter("MyLimiter", _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); }) }); app.UseHttpsRedirection(); app.UseAuthorization(); app.MapControllers(); app.Run();
- 启动项目,使用jmeter测试100并发,请求接口
/WeatherForecast
所有请求处理成功,失败0!
这个结果是不是有点失望,其实RateLimitPartition.CreateConcurrencyLimiter
创建的限流器是
ConcurrencyLimiter
,后续可以实现个各种策略的限流器进行替换之。
看了ConcurrencyLimiter
的实现,其实就是令牌桶
的限流思想,上面配置的new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1))
,第一个1代表令牌的个数,第二个1代表可以当桶里的令牌为空时,进入等待队列,而不是直接失败,当前面的请求结束后,会归还令牌,此时等待的请求就可以拿到令牌了,QueueProcessingOrder.NewestFirst
代表最新的请求优先获取令牌,也就是获取令牌时非公平的,还有另一个枚举值QueueProcessingOrder.OldestFirst
老的优先,获取令牌是公平的。只要我们获取到令牌的人干活速度快,虽然我们令牌只有1,并发就很高。
3. 测试触发失败场景
只需要让我们拿到令牌的人持有时间长点,就能轻易的触发。
调整jmater并发数为10
相应内容也是我们设置的内容。
ConcurrencyLimiter源码
令牌桶限流思想
获取令牌
protected override RateLimitLease AcquireCore(int permitCount) { // These amounts of resources can never be acquired if (permitCount > _options.PermitLimit) { throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit)); } ThrowIfDisposed(); // Return SuccessfulLease or FailedLease to indicate limiter state if (permitCount == 0) { return _permitCount > 0 ? SuccessfulLease : FailedLease; } // Perf: Check SemaphoreSlim implementation instead of locking if (_permitCount >= permitCount) { lock (Lock) { if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease)) { return lease; } } } return FailedLease; }
尝试获取令牌核心逻辑
private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out RateLimitLease? lease) { ThrowIfDisposed(); // if permitCount is 0 we want to queue it if there are no available permits if (_permitCount >= permitCount && _permitCount != 0) { if (permitCount == 0) { // Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available lease = SuccessfulLease; return true; } // a. if there are no items queued we can lease // b. if there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst)) { _idleSince = null; _permitCount -= permitCount; Debug.Assert(_permitCount >= 0); lease = new ConcurrencyLease(true, this, permitCount); return true; } } lease = null; return false; }
令牌获取失败后进入等待队列
protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken = default) { // These amounts of resources can never be acquired if (permitCount > _options.PermitLimit) { throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit)); } // Return SuccessfulLease if requestedCount is 0 and resources are available if (permitCount == 0 && _permitCount > 0 && !_disposed) { return new ValueTask<RateLimitLease>(SuccessfulLease); } // Perf: Check SemaphoreSlim implementation instead of locking lock (Lock) { if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease)) { return new ValueTask<RateLimitLease>(lease); } // Avoid integer overflow by using subtraction instead of addition Debug.Assert(_options.QueueLimit >= _queueCount); if (_options.QueueLimit - _queueCount < permitCount) { if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && permitCount <= _options.QueueLimit) { // remove oldest items from queue until there is space for the newest request do { RequestRegistration oldestRequest = _queue.DequeueHead(); _queueCount -= oldestRequest.Count; Debug.Assert(_queueCount >= 0); if (!oldestRequest.Tcs.TrySetResult(FailedLease)) { // Updating queue count is handled by the cancellation code _queueCount += oldestRequest.Count; } } while (_options.QueueLimit - _queueCount < permitCount); } else { // Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst return new ValueTask<RateLimitLease>(QueueLimitLease); } } CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken); CancellationTokenRegistration ctr = default; if (cancellationToken.CanBeCanceled) { ctr = cancellationToken.Register(static obj => { ((CancelQueueState)obj!).TrySetCanceled(); }, tcs); } RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr); _queue.EnqueueTail(request); _queueCount += permitCount; Debug.Assert(_queueCount <= _options.QueueLimit); return new ValueTask<RateLimitLease>(request.Tcs.Task); } }
归还令牌
private void Release(int releaseCount) { lock (Lock) { if (_disposed) { return; } _permitCount += releaseCount; Debug.Assert(_permitCount <= _options.PermitLimit); while (_queue.Count > 0) { RequestRegistration nextPendingRequest = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.PeekHead() : _queue.PeekTail(); if (_permitCount >= nextPendingRequest.Count) { nextPendingRequest = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); _permitCount -= nextPendingRequest.Count; _queueCount -= nextPendingRequest.Count; Debug.Assert(_permitCount >= 0); ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count); // Check if request was canceled if (!nextPendingRequest.Tcs.TrySetResult(lease)) { // Queued item was canceled so add count back _permitCount += nextPendingRequest.Count; // Updating queue count is handled by the cancellation code _queueCount += nextPendingRequest.Count; } nextPendingRequest.CancellationTokenRegistration.Dispose(); Debug.Assert(_queueCount >= 0); } else { break; } } if (_permitCount == _options.PermitLimit) { Debug.Assert(_idleSince is null); Debug.Assert(_queueCount == 0); _idleSince = Stopwatch.GetTimestamp(); } } }
总结
虽然这次官方对限流进行了支持,但貌似还不能支持对ip或client级别的限制支持,对于更高级的限流策略仍需要借助第三方库或自己实现,期待后续越来越完善。