ASP.NET Core 3.x 并發(fā)限制的實(shí)現(xiàn)代碼
前言
Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0后增加的,用于傳入的請(qǐng)求進(jìn)行排隊(duì)處理,避免線程池的不足.
我們?nèi)粘i_(kāi)發(fā)中可能常做的給某web服務(wù)器配置連接數(shù)以及,請(qǐng)求隊(duì)列大小,那么今天我們看看如何在通過(guò)中間件形式實(shí)現(xiàn)一個(gè)并發(fā)量以及隊(duì)列長(zhǎng)度限制.
Queue策略
添加Nuget
Install-Package Microsoft.AspNetCore.ConcurrencyLimiter
public void ConfigureServices(IServiceCollection services) { services.AddQueuePolicy(options => { //最大并發(fā)請(qǐng)求數(shù) options.MaxConcurrentRequests = 2; //請(qǐng)求隊(duì)列長(zhǎng)度限制 options.RequestQueueLimit = 1; }); services.AddControllers(); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { //添加并發(fā)限制中間件 app.UseConcurrencyLimiter(); app.Run(async context => { Task.Delay(100).Wait(); // 100ms sync-over-async await context.Response.WriteAsync("Hello World!"); }); if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseHttpsRedirection(); app.UseRouting(); app.UseAuthorization(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); }
通過(guò)上面簡(jiǎn)單的配置,我們就可以將他引入到我們的代碼中,從而做并發(fā)量限制,以及隊(duì)列的長(zhǎng)度;那么問(wèn)題來(lái)了,他是怎么實(shí)現(xiàn)的呢?
public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure) { services.Configure(configure); services.AddSingleton<IQueuePolicy, QueuePolicy>(); return services; }
QueuePolicy采用的是SemaphoreSlim信號(hào)量設(shè)計(jì),SemaphoreSlim、Semaphore(信號(hào)量)支持并發(fā)多線程進(jìn)入被保護(hù)代碼,對(duì)象在初始化時(shí)會(huì)指定 最大任務(wù)數(shù)量,當(dāng)線程請(qǐng)求訪問(wèn)資源,信號(hào)量遞減,而當(dāng)他們釋放時(shí),信號(hào)量計(jì)數(shù)又遞增。
/// <summary> /// 構(gòu)造方法(初始化Queue策略) /// </summary> /// <param name="options"></param> public QueuePolicy(IOptions<QueuePolicyOptions> options) { _maxConcurrentRequests = options.Value.MaxConcurrentRequests; if (_maxConcurrentRequests <= 0) { throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer."); } _requestQueueLimit = options.Value.RequestQueueLimit; if (_requestQueueLimit < 0) { throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number."); } //使用SemaphoreSlim來(lái)限制任務(wù)最大個(gè)數(shù) _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests); }
ConcurrencyLimiterMiddleware中間件
/// <summary> /// Invokes the logic of the middleware. /// </summary> /// <param name="context">The <see cref="HttpContext"/>.</param> /// <returns>A <see cref="Task"/> that completes when the request leaves.</returns> public async Task Invoke(HttpContext context) { var waitInQueueTask = _queuePolicy.TryEnterAsync(); // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets. bool result; if (waitInQueueTask.IsCompleted) { ConcurrencyLimiterEventSource.Log.QueueSkipped(); result = waitInQueueTask.Result; } else { using (ConcurrencyLimiterEventSource.Log.QueueTimer()) { result = await waitInQueueTask; } } if (result) { try { await _next(context); } finally { _queuePolicy.OnExit(); } } else { ConcurrencyLimiterEventSource.Log.RequestRejected(); ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger); context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable; await _onRejected(context); } }
每次當(dāng)我們請(qǐng)求的時(shí)候首先會(huì)調(diào)用_queuePolicy.TryEnterAsync(),進(jìn)入該方法后先開(kāi)啟一個(gè)私有l(wèi)ock鎖,再接著判斷總請(qǐng)求量是否≥(請(qǐng)求隊(duì)列限制的大小+最大并發(fā)請(qǐng)求數(shù)),如果當(dāng)前數(shù)量超出了,那么我直接拋出,送你個(gè)503狀態(tài);
if (result) { try { await _next(context); } finally { _queuePolicy.OnExit(); } } else { ConcurrencyLimiterEventSource.Log.RequestRejected(); ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger); context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable; await _onRejected(context); }
問(wèn)題來(lái)了,我這邊如果說(shuō)還沒(méi)到你設(shè)置的大小呢,我這個(gè)請(qǐng)求沒(méi)有給你服務(wù)器造不成壓力,那么你給我處理一下吧.
await _serverSemaphore.WaitAsync();
異步等待進(jìn)入信號(hào)量,如果沒(méi)有線程被授予對(duì)信號(hào)量的訪問(wèn)權(quán)限,則進(jìn)入執(zhí)行保護(hù)代碼;否則此線程將在此處等待,直到信號(hào)量被釋放為止
lock (_totalRequestsLock) { if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests) { return false; } TotalRequests++; } //異步等待進(jìn)入信號(hào)量,如果沒(méi)有線程被授予對(duì)信號(hào)量的訪問(wèn)權(quán)限,則進(jìn)入執(zhí)行保護(hù)代碼;否則此線程將在此處等待,直到信號(hào)量被釋放為止 await _serverSemaphore.WaitAsync(); return true; }
返回成功后那么中間件這邊再進(jìn)行處理,_queuePolicy.OnExit();通過(guò)該調(diào)用進(jìn)行調(diào)用_serverSemaphore.Release();釋放信號(hào)燈,再對(duì)總請(qǐng)求數(shù)遞減
Stack策略
再來(lái)看看另一種方法,棧策略,他是怎么做的呢?一起來(lái)看看.再附加上如何使用的代碼.
public void ConfigureServices(IServiceCollection services) { services.AddStackPolicy(options => { //最大并發(fā)請(qǐng)求數(shù) options.MaxConcurrentRequests = 2; //請(qǐng)求隊(duì)列長(zhǎng)度限制 options.RequestQueueLimit = 1; }); services.AddControllers(); }
通過(guò)上面的配置,我們便可以對(duì)我們的應(yīng)用程序執(zhí)行出相應(yīng)的策略.下面再來(lái)看看他是怎么實(shí)現(xiàn)的呢
public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure) { services.Configure(configure); services.AddSingleton<IQueuePolicy, StackPolicy>(); return services; }
可以看到這次是通過(guò)StackPolicy類(lèi)做的策略.來(lái)一起來(lái)看看主要的方法
/// <summary> /// 構(gòu)造方法(初始化參數(shù)) /// </summary> /// <param name="options"></param> public StackPolicy(IOptions<QueuePolicyOptions> options) { //棧分配 _buffer = new List<ResettableBooleanCompletionSource>(); //隊(duì)列大小 _maxQueueCapacity = options.Value.RequestQueueLimit; //最大并發(fā)請(qǐng)求數(shù) _maxConcurrentRequests = options.Value.MaxConcurrentRequests; //剩余可用空間 _freeServerSpots = options.Value.MaxConcurrentRequests; }
當(dāng)我們通過(guò)中間件請(qǐng)求調(diào)用,_queuePolicy.TryEnterAsync()時(shí),首先會(huì)判斷我們是否還有訪問(wèn)請(qǐng)求次數(shù),如果_freeServerSpots>0,那么則直接給我們返回true,讓中間件直接去執(zhí)行下一步,如果當(dāng)前隊(duì)列=我們?cè)O(shè)置的隊(duì)列大小的話,那我們需要取消先前請(qǐng)求;每次取消都是先取消之前的保留后面的請(qǐng)求;
public ValueTask<bool> TryEnterAsync() { lock (_bufferLock) { if (_freeServerSpots > 0) { _freeServerSpots--; return _trueTask; } // 如果隊(duì)列滿了,取消先前的請(qǐng)求 if (_queueLength == _maxQueueCapacity) { _hasReachedCapacity = true; _buffer[_head].Complete(false); _queueLength--; } var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this); _cachedResettableTCS = null; if (_hasReachedCapacity || _queueLength < _buffer.Count) { _buffer[_head] = tcs; } else { _buffer.Add(tcs); } _queueLength++; // increment _head for next time _head++; if (_head == _maxQueueCapacity) { _head = 0; } return tcs.GetValueTask(); } }
當(dāng)我們請(qǐng)求后調(diào)用_queuePolicy.OnExit();出棧,再將請(qǐng)求長(zhǎng)度遞減
public void OnExit() { lock (_bufferLock) { if (_queueLength == 0) { _freeServerSpots++; if (_freeServerSpots > _maxConcurrentRequests) { _freeServerSpots--; throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync"); } return; } // step backwards and launch a new task if (_head == 0) { _head = _maxQueueCapacity - 1; } else { _head--; } //退出,出棧 _buffer[_head].Complete(true); _queueLength--; } }
總結(jié)
基于棧結(jié)構(gòu)的特點(diǎn),在實(shí)際應(yīng)用中,通常只會(huì)對(duì)棧執(zhí)行以下兩種操作:
- 向棧中添加元素,此過(guò)程被稱(chēng)為"進(jìn)棧"(入?;驂簵#?;
- 從棧中提取出指定元素,此過(guò)程被稱(chēng)為"出棧"(或彈棧);
隊(duì)列存儲(chǔ)結(jié)構(gòu)的實(shí)現(xiàn)有以下兩種方式:
- 順序隊(duì)列:在順序表的基礎(chǔ)上實(shí)現(xiàn)的隊(duì)列結(jié)構(gòu);
- 鏈隊(duì)列:在鏈表的基礎(chǔ)上實(shí)現(xiàn)的隊(duì)列結(jié)構(gòu);
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持我們。
上一篇:.net core在服務(wù)器端獲取api傳遞的參數(shù)過(guò)程
欄 目:ASP.NET
下一篇:VS2019下opencv4.1.2配置圖文教程(永久配置)
本文標(biāo)題:ASP.NET Core 3.x 并發(fā)限制的實(shí)現(xiàn)代碼
本文地址:http://mengdiqiu.com.cn/a1/ASP_NET/10845.html
您可能感興趣的文章
- 01-11如何給asp.net core寫(xiě)個(gè)簡(jiǎn)單的健康檢查
- 01-11淺析.Net Core中Json配置的自動(dòng)更新
- 01-11.net core高吞吐遠(yuǎn)程方法如何調(diào)用組件XRPC詳解
- 01-11.NET Core 遷移躺坑記續(xù)集之Win下莫名其妙的超時(shí)
- 01-11docker部署Asp.net core應(yīng)用的完整步驟
- 01-11.net core webapi jwt 更為清爽的認(rèn)證詳解
- 01-11ASP.NET Core靜態(tài)文件的使用方法
- 01-11.NET Core 3.0之創(chuàng)建基于Consul的Configuration擴(kuò)展組件
- 01-11.net core EF Core調(diào)用存儲(chǔ)過(guò)程的方式
- 01-11asp.net Core3.0區(qū)域與路由配置的方法


閱讀排行
- 1C語(yǔ)言 while語(yǔ)句的用法詳解
- 2java 實(shí)現(xiàn)簡(jiǎn)單圣誕樹(shù)的示例代碼(圣誕
- 3利用C語(yǔ)言實(shí)現(xiàn)“百馬百擔(dān)”問(wèn)題方法
- 4C語(yǔ)言中計(jì)算正弦的相關(guān)函數(shù)總結(jié)
- 5c語(yǔ)言計(jì)算三角形面積代碼
- 6什么是 WSH(腳本宿主)的詳細(xì)解釋
- 7C++ 中隨機(jī)函數(shù)random函數(shù)的使用方法
- 8正則表達(dá)式匹配各種特殊字符
- 9C語(yǔ)言十進(jìn)制轉(zhuǎn)二進(jìn)制代碼實(shí)例
- 10C語(yǔ)言查找數(shù)組里數(shù)字重復(fù)次數(shù)的方法
本欄相關(guān)
- 01-11vscode extension插件開(kāi)發(fā)詳解
- 01-11VsCode插件開(kāi)發(fā)之插件初步通信的方法
- 01-11如何給asp.net core寫(xiě)個(gè)簡(jiǎn)單的健康檢查
- 01-11.net core高吞吐遠(yuǎn)程方法如何調(diào)用組件
- 01-11淺析.Net Core中Json配置的自動(dòng)更新
- 01-11.NET開(kāi)發(fā)人員關(guān)于ML.NET的入門(mén)學(xué)習(xí)
- 01-11.NET Core 遷移躺坑記續(xù)集之Win下莫名其
- 01-11.net core webapi jwt 更為清爽的認(rèn)證詳解
- 01-11docker部署Asp.net core應(yīng)用的完整步驟
- 01-11ASP.NET Core靜態(tài)文件的使用方法
隨機(jī)閱讀
- 01-10SublimeText編譯C開(kāi)發(fā)環(huán)境設(shè)置
- 01-10C#中split用法實(shí)例總結(jié)
- 01-10delphi制作wav文件的方法
- 01-11ajax實(shí)現(xiàn)頁(yè)面的局部加載
- 04-02jquery與jsp,用jquery
- 08-05織夢(mèng)dedecms什么時(shí)候用欄目交叉功能?
- 01-11Mac OSX 打開(kāi)原生自帶讀寫(xiě)NTFS功能(圖文
- 08-05dedecms(織夢(mèng))副欄目數(shù)量限制代碼修改
- 01-10使用C語(yǔ)言求解撲克牌的順子及n個(gè)骰子
- 08-05DEDE織夢(mèng)data目錄下的sessions文件夾有什