1 前言
說(shuō)到線(xiàn)程池八股文背的很熟的肯定知道無(wú)非就這幾個(gè)考點(diǎn):
(1)線(xiàn)程池三大核心參數(shù) corePoolSize、maximumPoolSize、workQueue 的含義
(2)線(xiàn)程池核心線(xiàn)程數(shù)制定策略
(3)建議通過(guò) ThreadPoolExecutor 的構(gòu)造函數(shù)來(lái)聲明,避免使用 Executors 創(chuàng)建線(xiàn)程池
以上考點(diǎn)作為線(xiàn)程池面試幾乎必問(wèn)的內(nèi)容,大部分人應(yīng)該都是如數(shù)家珍,張口就來(lái),但是懂了面試八股文真的就不一定在實(shí)際運(yùn)用中真的就會(huì)把線(xiàn)程池用好 。且看下面這次真實(shí)生產(chǎn)事故還原
2 事故還原
某次一位研發(fā)同事寫(xiě)出了下面類(lèi)似的代碼:
Listitems=getFromDb(); List >completableFutures=items.stream().map(item->CompletableFuture.supplyAsync(()->{ AppMapStationDatadata=mapper.copy(item); //發(fā)起價(jià)格信息查詢(xún)的RPC調(diào)用 data.setPriceInfo(priceApi.getPriceInfoById(item.getId())) returndata; },apiExecutor)).collect(Collectors.toList()); result=completableFutures.stream().map(e->{ returne.get(); }).filter(Objects::nonNull).collect(Collectors.toList());
上面的代碼中,代碼首先從數(shù)據(jù)庫(kù)里面查出來(lái)一堆對(duì)象,然后對(duì)每一個(gè)對(duì)象進(jìn)行模型轉(zhuǎn)換,由于要獲取每個(gè)對(duì)象的價(jià)格信息發(fā)起了一次RPC調(diào)用,由于RPC服務(wù)沒(méi)有提供批量接口,所以代碼里面用了線(xiàn)程池并發(fā)請(qǐng)求,以求得接口盡可能快的返回?cái)?shù)據(jù)。
使用的是CompletableFuture 而且自定義了線(xiàn)程池,線(xiàn)程池指定了10個(gè)核心線(xiàn)程,20個(gè)最大線(xiàn)程,這段代碼在上線(xiàn)后的一段時(shí)間確實(shí)沒(méi)有任何問(wèn)題,但是在灰度放量用戶(hù)量多起來(lái)之后發(fā)現(xiàn)接口經(jīng)常超時(shí)告警。
請(qǐng)問(wèn)為什么上面的代碼在用戶(hù)量稍微大一點(diǎn)的時(shí)候就運(yùn)行緩慢了呢?
實(shí)際代碼問(wèn)題出現(xiàn)在了這個(gè)get方法中,這個(gè)get方法沒(méi)有指定超時(shí)時(shí)間,當(dāng)getPriceInfoById這個(gè)接口響應(yīng)變慢的時(shí)候,這個(gè)主線(xiàn)程的代碼get又沒(méi)有指定超時(shí)時(shí)間,這時(shí)候問(wèn)題就來(lái)了。
由于某次業(yè)務(wù)查詢(xún)查到了非常多的數(shù)據(jù),每條數(shù)據(jù)就是個(gè)模型轉(zhuǎn)換任務(wù),這個(gè)任務(wù)就會(huì)在隊(duì)列排隊(duì),get方法沒(méi)有指定超時(shí)時(shí)間的情況下,其最終耗時(shí)就取決于整個(gè)線(xiàn)程池中執(zhí)行最慢的那一個(gè)任務(wù),所以當(dāng)從DB中查出來(lái)的數(shù)據(jù)量越來(lái)越大的時(shí)候這個(gè)轉(zhuǎn)換任務(wù)的最大耗時(shí)就會(huì)逐漸增加,進(jìn)而引發(fā)接口超時(shí)。
所以這里改進(jìn)上述問(wèn)題需要做到兩個(gè)點(diǎn):
1、數(shù)據(jù)庫(kù)中查出來(lái)的數(shù)據(jù)集合必須分頁(yè)
2、get方法必須設(shè)置超時(shí)時(shí)間
此外需要知道get方法設(shè)置超時(shí)時(shí)間的計(jì)算方式也需要留意,考慮下面這種場(chǎng)景
提交兩個(gè)任務(wù) A 和 B 到線(xiàn)程池,A 任務(wù)耗時(shí) 3 秒,B 任務(wù)耗時(shí) 4 秒,F(xiàn)uture 以 2 秒為超時(shí)時(shí)間獲取任務(wù)結(jié)果
代碼如下:
ExecutorServiceexecutorService=Executors.newFixedThreadPool(2); CallabletaskA=()->{ sleep(3); return"A"; }; Callable taskB=()->{ sleep(4); return"B"; }; List >futures=Stream.of(taskA,taskB) .map(executorService::submit) .collect(Collectors.toList()); for(Future future:futures){ try{ Strings=future.get(2,TimeUnit.SECONDS); System.out.println(s); }catch(Exceptione){ continue; } }
實(shí)際運(yùn)行情況是第一個(gè)任務(wù)會(huì)超時(shí)但是第二個(gè)不會(huì) ,看起來(lái)是不是還有點(diǎn)不可思議,耗時(shí)時(shí)間長(zhǎng)的任務(wù)B反而沒(méi)超時(shí)。原因就在于 Future.get(long timeout, TimeUnit unit) ,調(diào)用 get 時(shí)才開(kāi)始計(jì)時(shí),而非任務(wù)加入線(xiàn)程池的時(shí)間
從圖上就可以看出來(lái),在獲取B的任務(wù)執(zhí)行結(jié)果的時(shí)候B任務(wù)已經(jīng)執(zhí)行了兩秒,所以在等待兩秒的情況下可以獲取到結(jié)果
3 線(xiàn)程池不當(dāng)使用舉例
(1)不區(qū)分業(yè)務(wù)一把梭哈,全用一個(gè)線(xiàn)程池
曾經(jīng)有一個(gè)項(xiàng)目,對(duì)接多個(gè)租戶(hù),每個(gè)租戶(hù)都有各自的任務(wù)需要執(zhí)行,代碼中不區(qū)分租戶(hù)的將所有租戶(hù)的任務(wù)全部丟到一個(gè)線(xiàn)程池中執(zhí)行,結(jié)果一個(gè)租戶(hù)的任務(wù)提交過(guò)多導(dǎo)致線(xiàn)程池執(zhí)行緩慢,但是由于線(xiàn)程池是同一個(gè),影響了所有租戶(hù)接口的響應(yīng)時(shí)間。如果說(shuō)上面說(shuō)的這個(gè)場(chǎng)景用一個(gè)線(xiàn)程池產(chǎn)生了租戶(hù)互相影響的問(wèn)題還不夠嚴(yán)重,那么下面的這種場(chǎng)景就問(wèn)題更大了。
曾經(jīng)有一段這樣的場(chǎng)景,因?yàn)楣灿镁€(xiàn)程池直接導(dǎo)致線(xiàn)程池任務(wù)永遠(yuǎn)完成不了,請(qǐng)看下面的這種情況:
首先向線(xiàn)程池中提交了一個(gè)任務(wù),然后在這個(gè)任務(wù)的內(nèi)部實(shí)現(xiàn)中又往同一個(gè)線(xiàn)程池中再次提交了一個(gè)任務(wù),相當(dāng)于父子任務(wù)在同一個(gè)線(xiàn)程池中執(zhí)行,這時(shí)候極有可出現(xiàn)線(xiàn)程死鎖也就是循環(huán)等待的情況
如上圖所示,父任務(wù)全部處于執(zhí)行狀態(tài),這時(shí)候子任務(wù)想要執(zhí)行需要等父任務(wù)執(zhí)行完成,但是父任務(wù)都執(zhí)行不完,因?yàn)檫€有個(gè)子任務(wù)沒(méi)完成,即父任務(wù)等待子任務(wù)執(zhí)行完成,而子任務(wù)等待父任務(wù)釋放線(xiàn)程池資源,這也就造成了 "死鎖"
所以綜上所述,在代碼中應(yīng)該避免各種任務(wù)都往一個(gè)線(xiàn)程池中投放,對(duì)每個(gè)線(xiàn)程池指定好線(xiàn)程名稱(chēng),做好分類(lèi)比較合適,這里在日常開(kāi)發(fā)中比較推薦使用Guava的工具類(lèi),來(lái)指定線(xiàn)程名稱(chēng)前綴,這樣使用jstack分析線(xiàn)程問(wèn)題也方便排查。
ThreadFactorythreadFactory=newThreadFactoryBuilder() .setNameFormat(threadNamePrefix+"-%d") .setDaemon(true).build(); ExecutorServicethreadPool=newThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
(2)@Async注解不自己定義線(xiàn)程池
@Async用在方法上標(biāo)識(shí)這是一個(gè)異步方法,如果不自己指定線(xiàn)程池這個(gè)方法將直接新建一個(gè)線(xiàn)程執(zhí)行,可以翻看spring實(shí)現(xiàn)源碼知道這個(gè)點(diǎn)
@Async的實(shí)現(xiàn)其實(shí)非常簡(jiǎn)單就是利用AOP,容器啟動(dòng)的時(shí)候會(huì)掃描所有被打上@Async注解的方法,并代理這些方法的執(zhí)行,在執(zhí)行這個(gè)方法的時(shí)候,生成Callable任務(wù)丟到線(xiàn)程池中執(zhí)行(核心代碼位于org.springframework.aop.interceptor.AsyncExecutionInterceptor)
@Override @Nullable publicObjectinvoke(finalMethodInvocationinvocation)throwsThrowable{ Class>targetClass=(invocation.getThis()!=null?AopUtils.getTargetClass(invocation.getThis()):null); MethodspecificMethod=ClassUtils.getMostSpecificMethod(invocation.getMethod(),targetClass); finalMethoduserDeclaredMethod=BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutorexecutor=determineAsyncExecutor(userDeclaredMethod); if(executor==null){ thrownewIllegalStateException( "NoexecutorspecifiedandnodefaultexecutorsetonAsyncExecutionInterceptoreither"); } //將方法調(diào)用封裝成Callable實(shí)例丟入線(xiàn)程池中執(zhí)行 Callable
如果不指定線(xiàn)程池這里就會(huì)啟用默認(rèn)的線(xiàn)程池 SimpleAsyncTaskExecutor 然后我們看下這個(gè)類(lèi)的注釋
那這個(gè)問(wèn)題就很?chē)?yán)重了,假定你的方法執(zhí)行速度慢,而且qps大,這時(shí)候線(xiàn)程數(shù)就會(huì)直接爆炸,所以建議寫(xiě)一個(gè)類(lèi)繼承 AsyncConfigurer接口并復(fù)寫(xiě)getAsyncExecutor方法,然后在使用注解的時(shí)候指定線(xiàn)程池的名稱(chēng)
//使用注解時(shí)指定線(xiàn)程池的Bean名稱(chēng) @Async("apiExecutor")
(3)線(xiàn)程池遇上ThreadLocal
線(xiàn)程池和 ThreadLocal 共用,可能會(huì)導(dǎo)致線(xiàn)程從 ThreadLocal 獲取到的是舊值/臟數(shù)據(jù)。這是因?yàn)榫€(xiàn)程池會(huì)復(fù)用線(xiàn)程對(duì)象,與線(xiàn)程對(duì)象綁定的類(lèi)的靜態(tài)屬性 ThreadLocal 變量也會(huì)被重用,這就導(dǎo)致一個(gè)線(xiàn)程可能獲取到其他線(xiàn)程的 ThreadLocal 值。
比較常規(guī)的做法是在任務(wù)執(zhí)行完畢之后的finally代碼塊里面做清理工作
Runnablerunnable=()->{ try{ BizThreadLocal.set("xxxx"); //dosth }finally{ BizThreadLocal.remove(); } };
但是其實(shí)finally的代碼塊其實(shí)也不是百分百一定執(zhí)行,事實(shí)上Thread#stop() 方法打斷線(xiàn)程執(zhí)行的時(shí)候 finally代碼塊中的內(nèi)容就不會(huì)執(zhí)行,比較推薦的還是# TransmittableThreadLocal
4 再談線(xiàn)程池,幾個(gè)關(guān)鍵要點(diǎn)
(1)為什么默認(rèn)線(xiàn)程池的隊(duì)列長(zhǎng)度不能動(dòng)態(tài)調(diào)整?
曾經(jīng)面對(duì)生產(chǎn)環(huán)境線(xiàn)程池的參數(shù)設(shè)定問(wèn)題,我曾經(jīng)想到一個(gè)方案,既然線(xiàn)程池的參數(shù)不好定,那咱們直接動(dòng)態(tài)修改就行不行呢,線(xiàn)程池本身提供了很多的set方法可以做到參數(shù)修改,比如我們?cè)趕pringBoot項(xiàng)目往往去使用ThreadPoolTaskExecutor 作為線(xiàn)程池,從下圖的set方法列表中可以看出存在很多修改線(xiàn)程池參數(shù)的方法
然后實(shí)際使用的時(shí)候發(fā)現(xiàn)核心線(xiàn)程數(shù)和最大線(xiàn)程數(shù)都能動(dòng)態(tài)修改 但是隊(duì)列長(zhǎng)度卻不能 ,為什么隊(duì)列長(zhǎng)度不能調(diào)用setQueueCapacity方法進(jìn)行動(dòng)態(tài)修改呢?
首先我們可以簡(jiǎn)單理解為spring的ThreadPoolTaskExecutor是Java原生ThreadPoolExecutor的封裝,觀(guān)察這個(gè)類(lèi)的setMaxPoolSize和setQueueCapacity代碼實(shí)現(xiàn)我們就能發(fā)現(xiàn)setQueueCapacity 實(shí)際就是一個(gè)賦值僅在第一次實(shí)例化線(xiàn)程池的使用到了這個(gè)參數(shù)。
publicvoidsetMaxPoolSize(intmaxPoolSize){ synchronized(this.poolSizeMonitor){ if(this.threadPoolExecutor!=null){ this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize); } this.maxPoolSize=maxPoolSize; } } publicvoidsetQueueCapacity(intqueueCapacity){ this.queueCapacity=queueCapacity; } protectedBlockingQueuecreateQueue(intqueueCapacity){ return(BlockingQueue)(queueCapacity>0?newLinkedBlockingQueue(queueCapacity):newSynchronousQueue()); }
從上面的源碼我們還可以看出spring的ThreadPoolTaskExecutor使用的隊(duì)列是LinkedBlockingQueue ,那么為啥線(xiàn)程池ThreadPoolExecutor不支持修改隊(duì)列長(zhǎng)度呢?這個(gè)原因就很簡(jiǎn)單了因?yàn)檫@個(gè)隊(duì)列的capacity是final類(lèi)型的,自然不能修改。
那如果我一定要修改這個(gè)隊(duì)列長(zhǎng)度應(yīng)該怎么處理?那完全就可以仿照美團(tuán)的方式,自定義了一個(gè)叫做 ResizableCapacityLinkedBlockIngQueue 的隊(duì)列(把LinkedBlockingQueue的capacity 字段的final關(guān)鍵字修飾給去掉了,讓它變?yōu)榭勺兊模┦遣皇且彩呛芎?jiǎn)單。
(2)再談核心線(xiàn)程數(shù)的參數(shù)設(shè)置
核心線(xiàn)程池的參數(shù)設(shè)置一般各種網(wǎng)絡(luò)資料中比較推崇的是N+1和2N法,即:
CPU 密集型任務(wù)(N+1) :這種任務(wù)消耗的主要是 CPU 資源,可以將線(xiàn)程數(shù)設(shè)置為 N(CPU 核心數(shù))+1。比 CPU 核心數(shù)多出來(lái)的一個(gè)線(xiàn)程是為了防止線(xiàn)程偶發(fā)的缺頁(yè)中斷,或者其它原因?qū)е碌娜蝿?wù)暫停而帶來(lái)的影響。一旦任務(wù)暫停,CPU 就會(huì)處于空閑狀態(tài),而在這種情況下多出來(lái)的一個(gè)線(xiàn)程就可以充分利用 CPU 的空閑時(shí)間。
I/O 密集型任務(wù)(2N) :這種任務(wù)應(yīng)用起來(lái),系統(tǒng)會(huì)用大部分的時(shí)間來(lái)處理 I/O 交互,而線(xiàn)程在處理 I/O 的時(shí)間段內(nèi)不會(huì)占用 CPU 來(lái)處理,這時(shí)就可以將 CPU 交出給其它線(xiàn)程使用。因此在 I/O 密集型任務(wù)的應(yīng)用中,我們可以多配置一些線(xiàn)程,具體的計(jì)算方法是 2N。
如何判斷是 CPU 密集任務(wù)還是 IO 密集任務(wù)?
CPU 密集型 :簡(jiǎn)單理解就是利用 CPU 計(jì)算能力的任務(wù)比如你在內(nèi)存中對(duì)大量數(shù)據(jù)進(jìn)行排序。
IO 密集型 :涉及到網(wǎng)絡(luò)讀取,文件讀取這類(lèi)都是 IO 密集型,這類(lèi)任務(wù)的特點(diǎn)是 CPU 計(jì)算耗費(fèi)時(shí)間相比于等待 IO 操作完成的時(shí)間來(lái)說(shuō)很少,大部分時(shí)間都花在了等待 IO 操作完成上。
但是實(shí)際上比較科學(xué)的線(xiàn)程數(shù)計(jì)算方式是:
最佳線(xiàn)程數(shù) = N(CPU 核心數(shù))?(1+WT(線(xiàn)程等待時(shí)間)/ST(線(xiàn)程計(jì)算時(shí)間))
WT(線(xiàn)程等待時(shí)間)= 線(xiàn)程運(yùn)行總時(shí)間 - ST(線(xiàn)程計(jì)算時(shí)間)
線(xiàn)程等待時(shí)間所占比例越高,需要越多線(xiàn)程。線(xiàn)程計(jì)算時(shí)間所占比例越高,需要越少線(xiàn)程。(我們可以通過(guò) JDK 自帶的工具 VisualVM 來(lái)查看 WT/ST 比例)
CPU 密集型任務(wù)的 WT/ST 接近或者等于 0,因此, 線(xiàn)程數(shù)可以設(shè)置為 N(CPU 核心數(shù))?(1+0)= N,和我們上面說(shuō)的 N(CPU 核心數(shù))+1 差不多。IO 密集型任務(wù)下,幾乎全是線(xiàn)程等待時(shí)間,從理論上來(lái)說(shuō),你就可以將線(xiàn)程數(shù)設(shè)置為 2N。
這里額外說(shuō)一句早先我雖然知道線(xiàn)程池核心線(xiàn)程數(shù)應(yīng)該和CPU核心線(xiàn)程數(shù)有關(guān),但是悲劇的是我并不知道怎么查Linux系統(tǒng)的核心數(shù),這里把查詢(xún)命令貼出來(lái)供參考:
#總核數(shù)=物理CPU個(gè)數(shù)X每顆物理CPU的核數(shù) #查看物理CPU個(gè)數(shù) cat/proc/cpuinfo|grep"physicalid"|sort|uniq|wc-l #查看每個(gè)物理CPU中core的個(gè)數(shù)(即核數(shù)) cat/proc/cpuinfo|grep"cpucores"|uniq
(3)為什么不太推薦你用ParallelStream?
曾經(jīng)某次代碼評(píng)審中,有位同學(xué)寫(xiě)出了下面類(lèi)似的代碼
//從數(shù)據(jù)庫(kù)中查找學(xué)生對(duì)象 Liststudents=searchDataFromDb(); //使用并行流進(jìn)行模型轉(zhuǎn)換 List res=newArrayList(); students.parallelStream().forEach(student->{ StudentVovo=newStudentVo(student) res.add(student); });
結(jié)果測(cè)試過(guò)程中返回給前端的數(shù)據(jù)總是莫名其妙的少很多和數(shù)據(jù)庫(kù)中的真實(shí)數(shù)據(jù)條數(shù)對(duì)不上,相信大家都看出來(lái)了原因是List并不是線(xiàn)程安全的容器,所以導(dǎo)致了最后結(jié)果不對(duì),其實(shí)這不能算是parallelStream的問(wèn)題,但是很多人寫(xiě)代碼時(shí),以為并行流就快為了追求效率,不假思索就寫(xiě)了這樣的代碼,但是往往在線(xiàn)程池的環(huán)境下大家又仿佛繃緊了并發(fā)神經(jīng),又能考慮到了并發(fā)問(wèn)題。
此外parallelStream的默認(rèn)線(xiàn)程池遇上ThreadLocal同樣也存在一些問(wèn)題,其實(shí)如果不做額外線(xiàn)程池指定,代碼中的 parallelStream 都是共用同一個(gè)線(xiàn)程池的,ParallelStream 底層使用了 ForkJoinPool,當(dāng) Stream 流中元素較多時(shí),整個(gè)運(yùn)行效率也會(huì)大大降低。
5 總結(jié)
本文通過(guò)一次生產(chǎn)事故,進(jìn)一步總結(jié)了線(xiàn)程池在日常開(kāi)發(fā)中需要注意的一些要點(diǎn),希望對(duì)大家有所幫助。
審核編輯:湯梓紅
-
函數(shù)
+關(guān)注
關(guān)注
3文章
4346瀏覽量
62978 -
代碼
+關(guān)注
關(guān)注
30文章
4828瀏覽量
69061 -
線(xiàn)程池
+關(guān)注
關(guān)注
0文章
57瀏覽量
6894
原文標(biāo)題:【避坑】線(xiàn)程池沒(méi)用好,直接出現(xiàn)了生產(chǎn)事故....
文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論