前言
很多時候,我們為了提升接口的性能,會把之前單線程同步執行的代碼,改成多線程異步執行。
比如:查詢用戶信息接口,需要返回用戶基本信息、積分信息、成長值信息,而用戶、積分和成長值,需要調用不同的接口獲取數據。
如果查詢用戶信息接口,同步調用三個接口獲取數據,會非常耗時。
這就非常有必要把三個接口調用,改成異步調用,最后匯總結果。
再比如:注冊用戶接口,該接口主要包含:寫用戶表,分配權限,配置用戶導航頁,發通知消息等功能。
該用戶注冊接口包含的業務邏輯比較多,如果在接口中同步執行這些代碼,該接口響應時間會非常慢。
這時就需要把業務邏輯梳理一下,劃分:核心邏輯和非核心邏輯。這個例子中的核心邏輯是:寫用戶表和分配權限,非核心邏輯是:配置用戶導航頁和發通知消息。
顯然核心邏輯必須在接口中同步執行,而非核心邏輯可以多線程異步執行。
等等。
需要使用多線程的業務場景太多了,使用多線程異步執行的好處不言而喻。
但我要說的是,如果多線程沒有使用好,它也會給我們帶來很多意想不到的問題,不信往后繼續看。
今天跟大家一起聊聊,代碼改成多線程調用之后,帶來的9大問題。
1.獲取不到返回值
如果你通過直接繼承Thread類,或者實現Runnable接口的方式去創建線程。
那么,恭喜你,你將沒法獲取該線程方法的返回值。
使用線程的場景有兩種:
不需要關注線程方法的返回值。
需要關注線程方法的返回值。
大部分業務場景是不需要關注線程方法返回值的,但如果我們有些業務需要關注線程方法的返回值該怎么處理呢?
查詢用戶信息接口,需要返回用戶基本信息、積分信息、成長值信息,而用戶、積分和成長值,需要調用不同的接口獲取數據。
如下圖所示:
在Java8之前可以通過實現Callable接口,獲取線程返回結果。
Java8以后通過CompleteFuture類實現該功能。我們這里以CompleteFuture為例:
publicUserInfogetUserInfo(Longid)throwsInterruptedException,ExecutionException{ finalUserInfouserInfo=newUserInfo(); CompletableFutureuserFuture=CompletableFuture.supplyAsync(()->{ getRemoteUserAndFill(id,userInfo); returnBoolean.TRUE; },executor); CompletableFuturebonusFuture=CompletableFuture.supplyAsync(()->{ getRemoteBonusAndFill(id,userInfo); returnBoolean.TRUE; },executor); CompletableFuturegrowthFuture=CompletableFuture.supplyAsync(()->{ getRemoteGrowthAndFill(id,userInfo); returnBoolean.TRUE; },executor); CompletableFuture.allOf(userFuture,bonusFuture,growthFuture).join(); userFuture.get(); bonusFuture.get(); growthFuture.get(); returnuserInfo; }
溫馨提醒一下,這兩種方式別忘了使用線程池。示例中我用到了executor,表示自定義的線程池,為了防止高并發場景下,出現線程過多的問題。
此外,Fork/join框架也提供了執行任務并返回結果的能力。
2.數據丟失
我們還是以注冊用戶接口為例,該接口主要包含:寫用戶表,分配權限,配置用戶導航頁,發通知消息等功能。
其中:寫用戶表和分配權限功能,需要在一個事務中同步執行。而剩余的配置用戶導航頁和發通知消息功能,使用多線程異步執行。
表面上看起來沒問題。
但如果前面的寫用戶表和分配權限功能成功了,用戶注冊接口就直接返回成功了。
但如果后面異步執行的配置用戶導航頁,或發通知消息功能失敗了,怎么辦?
如下圖所示:
該接口前面明明已經提示用戶成功了,但結果后面又有一部分功能在多線程異步執行中失敗了。
這時該如何處理呢?
沒錯,你可以做失敗重試。
但如果重試了一定的次數,還是沒有成功,這條請求數據該如何處理呢?如果不做任何處理,該數據是不是就丟掉了?
為了防止數據丟失,可以用如下方案:
使用mq異步處理。在分配權限之后,發送一條mq消息,到mq服務器,然后在mq的消費者中使用多線程,去配置用戶導航頁和發通知消息。如果mq消費者中處理失敗了,可以自己重試。
使用job異步處理。在分配權限之后,往任務表中寫一條數據。然后有個job定時掃描該表,然后配置用戶導航頁和發通知消息。如果job處理某條數據失敗了,可以在表中記錄一個重試次數,然后不斷重試。但該方案有個缺點,就是實時性可能不太高。
3.順序問題
如果你使用了多線程,就必須接受一個非常現實的問題,即順序問題。
假如之前代碼的執行順序是:a,b,c,改成多線程執行之后,代碼的執行順序可能變成了:a,c,b。(這個跟cpu調度算法有關)
例如:
publicstaticvoidmain(String[]args){ Threadthread1=newThread(()->System.out.println("a")); Threadthread2=newThread(()->System.out.println("b")); Threadthread3=newThread(()->System.out.println("c")); thread1.start(); thread2.start(); thread3.start(); }
執行結果:
a c b
那么,來自靈魂的一問:如何保證線程的順序呢?
即線程啟動的順序是:a,b,c,執行的順序也是:a,b,c。
如下圖所示:
3.1 join
Thread類的join方法它會讓主線程等待子線程運行結束后,才能繼續運行。
列如:
publicstaticvoidmain(String[]args)throwsInterruptedException{ Threadthread1=newThread(()->System.out.println("a")); Threadthread2=newThread(()->System.out.println("b")); Threadthread3=newThread(()->System.out.println("c")); thread1.start(); thread1.join(); thread2.start(); thread2.join(); thread3.start(); }
執行結果永遠都是:
a b c
3.2 newSingleThreadExecutor
我們可以使用JDK自帶的Excutors類的newSingleThreadExecutor方法,創建一個單線程的線程池。
例如:
publicstaticvoidmain(String[]args){ ExecutorServiceexecutorService=Executors.newSingleThreadExecutor(); Threadthread1=newThread(()->System.out.println("a")); Threadthread2=newThread(()->System.out.println("b")); Threadthread3=newThread(()->System.out.println("c")); executorService.submit(thread1); executorService.submit(thread2); executorService.submit(thread3); executorService.shutdown(); }
執行結果永遠都是:
a b c
使用Excutors類的newSingleThreadExecutor方法創建的單線程的線程池,使用了LinkedBlockingQueue作為隊列,而此隊列按 FIFO(先進先出)排序元素。
添加到隊列的順序是a,b,c,則執行的順序也是a,b,c。
3.3 CountDownLatch
CountDownLatch是一個同步工具類,它允許一個或多個線程一直等待,直到其他線程執行完后再執行。
例如:
publicclassThreadTest{ publicstaticvoidmain(String[]args)throwsInterruptedException{ CountDownLatchlatch1=newCountDownLatch(0); CountDownLatchlatch2=newCountDownLatch(1); CountDownLatchlatch3=newCountDownLatch(1); Threadthread1=newThread(newTestRunnable(latch1,latch2,"a")); Threadthread2=newThread(newTestRunnable(latch2,latch3,"b")); Threadthread3=newThread(newTestRunnable(latch3,latch3,"c")); thread1.start(); thread2.start(); thread3.start(); } } classTestRunnableimplementsRunnable{ privateCountDownLatchlatch1; privateCountDownLatchlatch2; privateStringmessage; TestRunnable(CountDownLatchlatch1,CountDownLatchlatch2,Stringmessage){ this.latch1=latch1; this.latch2=latch2; this.message=message; } @Override publicvoidrun(){ try{ latch1.await(); System.out.println(message); }catch(InterruptedExceptione){ e.printStackTrace(); } latch2.countDown(); } }
執行結果永遠都是:
a b c
此外,使用CompletableFuture的thenRun方法,也能多線程的執行順序,在這里就不一一介紹了。
4.線程安全問題
既然使用了線程,伴隨而來的還會有線程安全問題。
假如現在有這樣一個需求:用多線程執行查詢方法,然后把執行結果添加到一個list集合中。
代碼如下:
Listlist=Lists.newArrayList(); dataList.stream() .map(data->CompletableFuture .supplyAsync(()->query(list,data),asyncExecutor) )); CompletableFuture.allOf(futureArray).join();
使用CompletableFuture異步多線程執行query方法:
publicvoidquery(Listlist,UserEntitycondition){ Useruser=queryByCondition(condition); if(Objects.isNull(user)){ return; } list.add(user); UserExtenduserExtend=queryByOther(condition); if(Objects.nonNull(userExtend)){ user.setExtend(userExtend.getInfo()); } }
在query方法中,將獲取的查詢結果添加到list集合中。
結果list會出現線程安全問題,有時候會少數據,當然也不一定是必現的。
這是因為ArrayList是非線程安全的,沒有使用synchronized等關鍵字修飾。
如何解決這個問題呢?
答:使用CopyOnWriteArrayList集合,代替普通的ArrayList集合,CopyOnWriteArrayList是一個線程安全的機會。
只需一行小小的改動即可:
ListlistLists.newCopyOnWriteArrayList();
溫馨的提醒一下,這里創建集合的方式,用了google的collect包。
5.ThreadLocal獲取數據異常
我們都知道JDK為了解決線程安全問題,提供了一種用空間換時間的新思路:ThreadLocal。
它的核心思想是:共享變量在每個線程都有一個副本,每個線程操作的都是自己的副本,對另外的線程沒有影響。
例如:
@Service publicclassThreadLocalService{ privatestaticfinalThreadLocalthreadLocal=newThreadLocal<>(); publicvoidadd(){ threadLocal.set(1); doSamething(); Integerinteger=threadLocal.get(); } }
ThreadLocal在普通中線程中,的確能夠獲取正確的數據。
但在真實的業務場景中,一般很少用單獨的線程,絕大多數,都是用的線程池。
那么,在線程池中如何獲取ThreadLocal對象生成的數據呢?
如果直接使用普通ThreadLocal,顯然是獲取不到正確數據的。
我們先試試InheritableThreadLocal,具體代碼如下:
privatestaticvoidfun1(){ InheritableThreadLocalthreadLocal=newInheritableThreadLocal<>(); threadLocal.set(6); System.out.println("父線程獲取數據:"+threadLocal.get()); ExecutorServiceexecutorService=Executors.newSingleThreadExecutor(); threadLocal.set(6); executorService.submit(()->{ System.out.println("第一次從線程池中獲取數據:"+threadLocal.get()); }); threadLocal.set(7); executorService.submit(()->{ System.out.println("第二次從線程池中獲取數據:"+threadLocal.get()); }); }
執行結果:
父線程獲取數據:6 第一次從線程池中獲取數據:6 第二次從線程池中獲取數據:6
由于這個例子中使用了單例線程池,固定線程數是1。
第一次submit任務的時候,該線程池會自動創建一個線程。因為使用了InheritableThreadLocal,所以創建線程時,會調用它的init方法,將父線程中的inheritableThreadLocals數據復制到子線程中。所以我們看到,在主線程中將數據設置成6,第一次從線程池中獲取了正確的數據6。
之后,在主線程中又將數據改成7,但在第二次從線程池中獲取數據卻依然是6。
因為第二次submit任務的時候,線程池中已經有一個線程了,就直接拿過來復用,不會再重新創建線程了。所以不會再調用線程的init方法,所以第二次其實沒有獲取到最新的數據7,還是獲取的老數據6。
那么,這該怎么辦呢?
答:使用TransmittableThreadLocal,它并非JDK自帶的類,而是阿里巴巴開源jar包中的類。
可以通過如下pom文件引入該jar包:
com.alibaba transmittable-thread-local 2.11.0 compile
代碼調整如下:
privatestaticvoidfun2()throwsException{ TransmittableThreadLocalthreadLocal=newTransmittableThreadLocal<>(); threadLocal.set(6); System.out.println("父線程獲取數據:"+threadLocal.get()); ExecutorServicettlExecutorService=TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(1)); threadLocal.set(6); ttlExecutorService.submit(()->{ System.out.println("第一次從線程池中獲取數據:"+threadLocal.get()); }); threadLocal.set(7); ttlExecutorService.submit(()->{ System.out.println("第二次從線程池中獲取數據:"+threadLocal.get()); }); }
執行結果:
父線程獲取數據:6 第一次從線程池中獲取數據:6 第二次從線程池中獲取數據:7
我們看到,使用了TransmittableThreadLocal之后,第二次從線程中也能正確獲取最新的數據7了。
nice。
如果你仔細觀察這個例子,你可能會發現,代碼中除了使用TransmittableThreadLocal類之外,還使用了TtlExecutors.getTtlExecutorService方法,去創建ExecutorService對象。
這是非常重要的地方,如果沒有這一步,TransmittableThreadLocal在線程池中共享數據將不會起作用。
創建ExecutorService對象,底層的submit方法會TtlRunnable或TtlCallable對象。
以TtlRunnable類為例,它實現了Runnable接口,同時還實現了它的run方法:
publicvoidrun(){ Map,Object>copied=(Map)this.copiedRef.get(); if(copied!=null&&(!this.releaseTtlValueReferenceAfterRun||this.copiedRef.compareAndSet(copied,(Object)null))){ Mapbackup=TransmittableThreadLocal.backupAndSetToCopied(copied); try{ this.runnable.run(); }finally{ TransmittableThreadLocal.restoreBackup(backup); } }else{ thrownewIllegalStateException("TTLvaluereferenceisreleasedafterrun!"); } }
這段代碼的主要邏輯如下:
把當時的ThreadLocal做個備份,然后將父類的ThreadLocal拷貝過來。
執行真正的run方法,可以獲取到父類最新的ThreadLocal數據。
從備份的數據中,恢復當時的ThreadLocal數據。
6.OOM問題
眾所周知,使用多線程可以提升代碼執行效率,但也不是絕對的。
對于一些耗時的操作,使用多線程,確實可以提升代碼執行效率。
但線程不是創建越多越好,如果線程創建多了,也可能會導致OOM異常。
例如:
Causedby: java.lang.OutOfMemoryError:unabletocreatenewnativethread
在JVM中創建一個線程,默認需要占用1M的內存空間。
如果創建了過多的線程,必然會導致內存空間不足,從而出現OOM異常。
除此之外,如果使用線程池的話,特別是使用固定大小線程池,即使用Executors.newFixedThreadPool方法創建的線程池。
該線程池的核心線程數和最大線程數是一樣的,是一個固定值,而存放消息的隊列是LinkedBlockingQueue。
該隊列的最大容量是Integer.MAX_VALUE,也就是說如果使用固定大小線程池,存放了太多的任務,有可能也會導致OOM異常。
java.lang.OutOfMemeryError:Javaheapspace
7.CPU使用率飆高
不知道你有沒有做過excel數據導入功能,需要將一批excel的數據導入到系統中。
每條數據都有些業務邏輯,如果單線程導入所有的數據,導入效率會非常低。
于是改成了多線程導入。
如果excel中有大量的數據,很可能會出現CPU使用率飆高的問題。
我們都知道,如果代碼出現死循環,cpu使用率會飚的很多高。因為代碼一直在某個線程中循環,沒法切換到其他線程,cpu一直被占用著,所以會導致cpu使用率一直高居不下。
而多線程導入大量的數據,雖說沒有死循環代碼,但由于多個線程一直在不停的處理數據,導致占用了cpu很長的時間。
也會出現cpu使用率很高的問題。
那么,如何解決這個問題呢?
答:使用Thread.sleep休眠一下。
在線程中處理完一條數據,休眠10毫秒。
當然CPU使用率飆高的原因很多,多線程處理數據和死循環只是其中兩種,還有比如:頻繁GC、正則匹配、頻繁序列化和反序列化等。
后面我會寫一篇介紹CPU使用率飆高的原因的專題文章,感興趣的小伙伴,可以關注一下我后續的文章。
8.事務問題
在實際項目開發中,多線程的使用場景還是挺多的。如果spring事務用在多線程場景中,會有問題嗎?
例如:
@Slf4j @Service publicclassUserService{ @Autowired privateUserMapperuserMapper; @Autowired privateRoleServiceroleService; @Transactional publicvoidadd(UserModeluserModel)throwsException{ userMapper.insertUser(userModel); newThread(()->{ roleService.doOtherThing(); }).start(); } } @Service publicclassRoleService{ @Transactional publicvoiddoOtherThing(){ System.out.println("保存role表數據"); } }
從上面的例子中,我們可以看到事務方法add中,調用了事務方法doOtherThing,但是事務方法doOtherThing是在另外一個線程中調用的。
這樣會導致兩個方法不在同一個線程中,獲取到的數據庫連接不一樣,從而是兩個不同的事務。如果想doOtherThing方法中拋了異常,add方法也回滾是不可能的。
如果看過spring事務源碼的朋友,可能會知道spring的事務是通過數據庫連接來實現的。當前線程中保存了一個map,key是數據源,value是數據庫連接。
privatestaticfinalThreadLocal
我們說的同一個事務,其實是指同一個數據庫連接,只有擁有同一個數據庫連接才能同時提交和回滾。如果在不同的線程,拿到的數據庫連接肯定是不一樣的,所以是不同的事務。
所以不要在事務中開啟另外的線程,去處理業務邏輯,這樣會導致事務失效。
9.導致服務掛掉
使用多線程會導致服務掛掉,這不是危言聳聽,而是確有其事。
假設現在有這樣一種業務場景:在mq的消費者中需要調用訂單查詢接口,查到數據之后,寫入業務表中。
本來是沒啥問題的。
突然有一天,mq生產者跑了一個批量數據處理的job,導致mq服務器上堆積了大量的消息。
此時,mq消費者的處理速度,遠遠跟不上mq消息的生產速度,導致的結果是出現了大量的消息堆積,對用戶有很大的影響。
為了解決這個問題,mq消費者改成多線程處理,直接使用了線程池,并且最大線程數配置成了20。
這樣調整之后,消息堆積問題確實得到了解決。
但帶來了另外一個更嚴重的問題:訂單查詢接口并發量太大了,有點扛不住壓力,導致部分節點的服務直接掛掉。
為了解決問題,不得不臨時加服務節點。
在mq的消費者中使用多線程,調用接口時,一定要評估好接口能夠承受的最大訪問量,防止因為壓力過大,而導致服務掛掉的問題。
審核編輯:劉清
-
cpu
+關注
關注
68文章
11042瀏覽量
216053 -
服務器
+關注
關注
13文章
9706瀏覽量
87322 -
JAVA
+關注
關注
20文章
2985瀏覽量
106949
原文標題:麻了,代碼改成多線程,竟有9大問題
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
工控一體機多線程任務調度優化:聚徽分享破解工業復雜流程高效協同密碼
一種實時多線程VSLAM框架vS-Graphs介紹

請問如何在Python中實現多線程與多進程的協作?
請問rt-thread studio如何進行多線程編譯?
探索字節隊列的魔法:多類型支持、函數重載與線程安全

socket 多線程編程實現方法
Python中多線程和多進程的區別

LWIP多線程強烈建議開啟LWIP_ASSERT_CORE_LOCKED宏,這個在RTT里面要怎么實現?
從多線程設計模式到對 CompletableFuture 的應用

探索虛擬線程:原理與實現

動態線程池思想學習及實踐

評論