java的concurrent用法詳解_第1頁
java的concurrent用法詳解_第2頁
java的concurrent用法詳解_第3頁
java的concurrent用法詳解_第4頁
java的concurrent用法詳解_第5頁
已閱讀5頁,還剩17頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權,請進行舉報或認領

文檔簡介

1、我們都知道,在JDK1.5之前,Java中要進行業(yè)務并發(fā)時,通常需要有程序員獨立完成代碼實現(xiàn),當然也有一些開源的框架提供了這些功能,但是這些依然沒有JDK自帶的功能使用起來方便。而當針對高質(zhì)量Java多線程并發(fā)程序設計時,為防止死蹦等現(xiàn)象的出現(xiàn),比如使用java之前的wait()、notify()和synchronized等,每每需要考慮性能、死鎖、公平性、資源管理以及如何避免線程安全性方面帶來的危害等諸多因素,往往會采用一些較為復雜的安全策略,加重了程序員的開發(fā)負擔.萬幸的是,在JDK1.5出現(xiàn)之后,Sun大神(Doug Lea)終于為我們這些可憐的小程序員推出了java.util.conc

2、urrent工具包以簡化并發(fā)完成。開發(fā)者們借助于此,將有效的減少競爭條件(race conditions)和死鎖線程。concurrent包很好的解決了這些問題,為我們提供了更實用的并發(fā)程序模型。Executor                  :具體Runnable任務的執(zhí)行者。ExecutorService           :一個線程池管理者,其實現(xiàn)類有多種,我會介紹一部分。我們能把Ru

3、nnable,Callable提交到池中讓其調(diào)度。Semaphore                 :一個計數(shù)信號量ReentrantLock             :一個可重入的互斥鎖定 Lock,功能類似synchronized,但要強大的多。Future               

4、60;    :是與Runnable,Callable進行交互的接口,比如一個線程執(zhí)行結束后取返回的結果等等,還提供了cancel終止線程。BlockingQueue             :阻塞隊列。CompletionService         : ExecutorService的擴展,可以獲得線程執(zhí)行結果的CountDownLatch         &#

5、160;  :一個同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個或多個線程一直等待。 CyclicBarrier             :一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 Future                    :Future 表示異步計算的結果。ScheduledExecutorServ

6、ice :一個 ExecutorService,可安排在給定的延遲后運行或定期執(zhí)行的命令。接下來逐一介紹Executors主要方法說明newFixedThreadPool(固定大小線程池)創(chuàng)建一個可重用固定線程集合的線程池,以共享的無界隊列方式來運行這些線程(只有要請求的過來,就會在一個隊列里等待執(zhí)行)。如果在關閉前的執(zhí)行期間由于失敗而導致任何線程終止,那么一個新線程將代替它執(zhí)行后續(xù)的任務(如果需要)。newCachedThreadPool(無界線程池,可以進行自動線程回收)創(chuàng)建一個可根據(jù)需要創(chuàng)建新線程的線程池,但是在以前構造的線程可用時將重用它們。對于執(zhí)行很多短期異步任務的程序而言,這些線程

7、池通??商岣叱绦蛐阅?。調(diào)用 execute 將重用以前構造的線程(如果線程可用)。如果現(xiàn)有線程沒有可用的,則創(chuàng)建一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。因此,長時間保持空閑的線程池不會使用任何資源。注意,可以使用 ThreadPoolExecutor 構造方法創(chuàng)建具有類似屬性但細節(jié)不同(例如超時參數(shù))的線程池。newSingleThreadExecutor(單個后臺線程)創(chuàng)建一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。(注意,如果因為在關閉前的執(zhí)行期間出現(xiàn)失敗而終止了此單個線程,那么如果需要,一個新線程將代替它執(zhí)行后續(xù)

8、的任務)??杀WC順序地執(zhí)行各個任務,并且在任意給定的時間不會有多個線程是活動的。與其他等效的 newFixedThreadPool(1) 不同,可保證無需重新配置此方法所返回的執(zhí)行程序即可使用其他的線程。這些方法返回的都是ExecutorService對象,這個對象可以理解為就是一個線程池。這個線程池的功能還是比較完善的??梢蕴峤蝗蝿誷ubmit()可以結束線程池shutdown()。01import java.util.concurrent.ExecutorService;02import java.util.concurrent.Executors;03public&#

9、160;class MyExecutor extends Thread 04private int index;05public MyExecutor(int i)06    this.index=i;0708public void run()09    try10     System.out.println(""+this.index+" start.&

10、quot;);11     Thread.sleep(int)(Math.random()*1000);12     System.out.println(""+this.index+" end.");13    14    catch(Exception e)15     e.printStackTrace();16 

11、;   1718public static void main(String args)19    ExecutorService service=Executors.newFixedThreadPool(4);20    for(int i=0;i<10;i+)21     service.execute(new MyExecutor(i);22   

12、;  /service.submit(new MyExecutor(i);23    24    System.out.println("submit finish");25    service.shutdown();2627雖然打印了一些信息,但是看的不是非常清晰,這個線程池是如何工作的,我們來將休眠的時間調(diào)長10倍。Thread.sleep(int)(Math.random()*10000);再來看,會清楚看到只能執(zhí)行4個線程。當

13、執(zhí)行完一個線程后,才會又執(zhí)行一個新的線程,也就是說,我們將所有的線程提交后,線程池會等待執(zhí)行完最后shutdown。我們也會發(fā)現(xiàn),提交的線程被放到一個“無界隊列里”。這是一個有序隊列(BlockingQueue,這個下面會說到)。另外它使用了Executors的靜態(tài)函數(shù)生成一個固定的線程池,顧名思義,線程池的線程是不會釋放的,即使它是Idle。這就會產(chǎn)生性能問題,比如如果線程池的大小為200,當全部使用完畢后,所有的線程會繼續(xù)留在池中,相應的內(nèi)存和線程切換(while(true)+sleep循環(huán))都會增加。如果要避免這個問題,就必須直接使用ThreadPoolExecutor()來構造。可以像

14、通用的線程池一樣設置“最大線程數(shù)”、“最小線程數(shù)”和“空閑線程keepAlive的時間”。這個就是線程池基本用法。Semaphore一個計數(shù)信號量。從概念上講,信號量維護了一個許可集合。如有必要,在許可可用前會阻塞每一個 acquire(),然后再獲取該許可。每個 release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數(shù),并采取相應的行動。Semaphore 通常用于限制可以訪問某些資源(物理或邏輯的)的線程數(shù)目。例如,下面的類使用信號量控制對內(nèi)容池的訪問:這里是一個實際的情況,大家排隊上廁所,廁所只有兩個位

15、置,來了10個人需要排隊。01import java.util.concurrent.ExecutorService;02import java.util.concurrent.Executors;03import java.util.concurrent.Semaphore;04public class MySemaphore extends Thread 05Semaphore position;06private int id;07public MySemaphore(int i,Sema

16、phore s)08    this.id=i;09    this.position=s;1011public void run()12    try13     if(position.availablePermits()>0)14      System.out.println("顧客"+this.id+"

17、進入廁所,有空位");15     16     else17      System.out.println("顧客"+this.id+"進入廁所,沒空位,排隊");18     19     position.acquire();20    &#

18、160;System.out.println("顧客"+this.id+"獲得坑位");21     Thread.sleep(int)(Math.random()*1000);22     System.out.println("顧客"+this.id+"使用完畢");23     position.release();24   

19、60;25    catch(Exception e)26     e.printStackTrace();27    2829public static void main(String args)30    ExecutorService list=Executors.newCachedThreadPool();31    Semaphore posit

20、ion=new Semaphore(2);32    for(int i=0;i<10;i+)33     list.submit(new MySemaphore(i+1,position);34    35    list.shutdown();36    position.acquireUninterruptibly(2);37 

21、0;  System.out.println("使用完畢,需要清掃了");38    position.release(2);3940ReentrantLock一個可重入的互斥鎖定 Lock,它具有與使用 synchronized 方法和語句所訪問的隱式監(jiān)視器鎖定相同的一些基本行為和語義,但功能更強大。ReentrantLock 將由最近成功獲得鎖定,并且還沒有釋放該鎖定的線程所擁有。當鎖定沒有被另一個線程所擁有時,調(diào)用 lock 的線程將成功獲取該鎖定并返回。如果當前線程已經(jīng)擁有該鎖定,此方法將立即返回??梢允褂?/p>

22、 isHeldByCurrentThread() 和 getHoldCount() 方法來檢查此情況是否發(fā)生。此類的構造方法接受一個可選的公平參數(shù)。當設置為 true時,在多個線程的爭用下,這些鎖定傾向于將訪問權授予等待時間最長的線程。否則此鎖定將無法保證任何特定訪問順序。與采用默認設置(使用不公平鎖定)相比,使用公平鎖定的程序在許多線程訪問時表現(xiàn)為很低的總體吞吐量(即速度很慢,常常極其慢),但是在獲得鎖定和保證鎖定分配的均衡性時差異較小。不過要注意的是,公平鎖定不能保證線程調(diào)度的公平性。因此,使用公平鎖定的眾多線程中的一員可能獲得多倍的成功機會,這種情況發(fā)生在其他活動線程沒有被處理并且目前并

23、未持有鎖定時。還要注意的是,未定時的 tryLock 方法并沒有使用公平設置。因為即使其他線程正在等待,只要該鎖定是可用的,此方法就可以獲得成功。建議總是 立即實踐,使用 try 塊來調(diào)用 lock,在之前/之后的構造中,最典型的代碼如下: 01class X 02    private final ReentrantLock lock = new ReentrantLock();03    / .04    public&#

24、160;void m() 05      lock.lock(); / block until condition holds06      try 07        / . method body08       finally 09      

25、  lock.unlock()10      11    12我的例子:01import java.util.concurrent.ExecutorService;02import java.util.concurrent.Executors;03import java.util.concurrent.locks.ReentrantLock;04public class MyReentrantLock extends 

26、;Thread05TestReentrantLock lock;06private int id;07public MyReentrantLock(int i,TestReentrantLock test)08    this.id=i;09    this.lock=test;1011public void run()12    lock.print(id);1314public static 

27、void main(String args)15    ExecutorService service=Executors.newCachedThreadPool();16    TestReentrantLock lock=new TestReentrantLock();17    for(int i=0;i<10;i+)18     service.submit(new MyR

28、eentrantLock(i,lock);19    20    service.shutdown();212223class TestReentrantLock24private ReentrantLock lock=new ReentrantLock();25public void print(int str)26    try27     lock.lock()

29、;28     System.out.println(str+"獲得");29     Thread.sleep(int)(Math.random()*1000);30    31    catch(Exception e)32     e.printStackTrace();33    34 &#

30、160;  finally35     System.out.println(str+"釋放");36     lock.unlock();37    3839BlockingQueue支持兩個附加操作的 Queue,這兩個操作是:檢索元素時等待隊列變?yōu)榉强?,以及存儲元素時等待空間變得可用。BlockingQueue 不接受 null 元素。試圖 add、put 或 offer 一個 null 元素時,某些實現(xiàn)會拋出

31、 NullPointerException。null 被用作指示 poll 操作失敗的警戒值。BlockingQueue 可以是限定容量的。它在任意給定時間都可以有一個 remainingCapacity,超出此容量,便無法無阻塞地 put 額外的元素。沒有任何內(nèi)部容量約束的 BlockingQueue 總是報告 Integer.MAX_VALUE 的剩余容量。BlockingQueue 實現(xiàn)主要用于生產(chǎn)者-使用者隊列,但它另外還支持 Collection 接口。因此,舉例來說,使用 remove(x) 從隊列中移除任意一個元素是有可能的。然而,這種操作通常不 會有效執(zhí)行,只能有計劃地偶爾使用

32、,比如在取消排隊信息時。BlockingQueue 實現(xiàn)是線程安全的。所有排隊方法都可以使用內(nèi)部鎖定或其他形式的并發(fā)控制來自動達到它們的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)沒有 必要自動執(zhí)行,除非在實現(xiàn)中特別說明。因此,舉例來說,在只添加了 c 中的一些元素后,addAll(c) 有可能失敗(拋出一個異常)。BlockingQueue 實質(zhì)上不 支持使用任何一種“close”或“shutdown”操作來指示不再添加任何項。這種功能的需求和使用有依賴于實現(xiàn)的傾向。例如,一種常用的策略是:對于生產(chǎn)者,插入

33、特殊的 end-of-stream 或 poison 對象,并根據(jù)使用者獲取這些對象的時間來對它們進行解釋。下面的例子演示了這個阻塞隊列的基本功能。01import java.util.concurrent.BlockingQueue;02import java.util.concurrent.ExecutorService;03import java.util.concurrent.Executors;04import java.util.concurrent.LinkedBlockingQueue;05public class M

34、yBlockingQueue extends Thread 06public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);07private int index;08public MyBlockingQueue(int i) 09   this.index = i;1011public void run() 12  

35、0;try 13    queue.put(String.valueOf(this.index);14    System.out.println("" + this.index + " in queue!");15    catch (Exception e) 16    e.printStackTrace();17   1819public&#

36、160;static void main(String args) 20   ExecutorService service = Executors.newCachedThreadPool();21   for (int i = 0; i < 10; i+) 22    service.submit(new MyBlockingQueue(i);23   24   Thread t

37、hread = new Thread() 25    public void run() 26     try 27      while (true) 28       Thread.sleep(int) (Math.random() * 1000);29      

38、 if(MyBlockingQueue.queue.isEmpty()30        break;31       String str = MyBlockingQueue.queue.take();32       System.out.println(str + " has take!");33    

39、;  34      catch (Exception e) 35      e.printStackTrace();36     37    38   39   service.submit(thread);40   service.shutdown();4142-執(zhí)行結果-0

40、 in queue!1 in queue!2 in queue!3 in queue!0 has take!4 in queue!1 has take!6 in queue!2 has take!7 in queue!3 has take!8 in queue!4 has take!5 in queue!6 has take!9 in queue!7 has take!8 has take!5 has take!9 has take!-CompletionService將生產(chǎn)新的異步任務與使用已完成任務的結果分離開來的服務。生產(chǎn)者 submit 執(zhí)行的任務。使用者 take 已完成的任務,并按

41、照完成這些任務的順序處理它們的結果。例如,CompletionService 可以用來管理異步 IO ,執(zhí)行讀操作的任務作為程序或系統(tǒng)的一部分提交,然后,當完成讀操作時,會在程序的不同部分執(zhí)行其他操作,執(zhí)行操作的順序可能與所請求的順序不同。通常,CompletionService 依賴于一個單獨的 Executor 來實際執(zhí)行任務,在這種情況下,CompletionService 只管理一個內(nèi)部完成隊列。ExecutorCompletionService 類提供了此方法的一個實現(xiàn)。01import java.util.concurrent.Callable;02import 

42、;java.util.concurrent.CompletionService;03import java.util.concurrent.ExecutorCompletionService;04import java.util.concurrent.ExecutorService;05import java.util.concurrent.Executors;06public class MyCompletionService implements Callable<String> 07private int&

43、#160;id;08 09public MyCompletionService(int i)10   this.id=i;1112public static void main(String args) throws Exception13   ExecutorService service=Executors.newCachedThreadPool();14   CompletionService<String> comp

44、letion=new ExecutorCompletionService<String>(service);15   for(int i=0;i<10;i+)16    completion.submit(new MyCompletionService(i);17   18   for(int i=0;i<10;i+)19    System.out.printl

45、n(completion.take().get();20   21   service.shutdown();2223public String call() throws Exception 24   Integer time=(int)(Math.random()*1000);25   try26    System.out.println(this.id+" start");27 

46、   Thread.sleep(time);28    System.out.println(this.id+" end");29   30   catch(Exception e)31    e.printStackTrace();32   33   return this.id+":"+time;3435Count

47、DownLatch一個同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個或多個線程一直等待。用給定的計數(shù) 初始化 CountDownLatch。由于調(diào)用了 countDown() 方法,所以在當前計數(shù)到達零之前,await 方法會一直受阻塞。之后,會釋放所有等待的線程,await 的所有后續(xù)調(diào)用都將立即返回。這種現(xiàn)象只出現(xiàn)一次計數(shù)無法被重置。如果需要重置計數(shù),請考慮使用 CyclicBarrier。CountDownLatch 是一個通用同步工具,它有很多用途。將計數(shù) 1 初始化的 CountDownLatch 用作一個簡單的開/關鎖存器,或入口:在通過調(diào)用 countDown()

48、 的線程打開入口前,所有調(diào)用 await 的線程都一直在入口處等待。用 N 初始化的 CountDownLatch 可以使一個線程在 N 個線程完成某項操作之前一直等待,或者使其在某項操作完成 N 次之前一直等待。CountDownLatch 的一個有用特性是,它不要求調(diào)用 countDown 方法的線程等到計數(shù)到達零時才繼續(xù),而在所有線程都能通過之前,它只是阻止任何線程繼續(xù)通過一個 await。 一下的例子是別人寫的,非常形象。01import java.util.concurrent.CountDownLatch;02import java.util.conc

49、urrent.ExecutorService;03import java.util.concurrent.Executors;04public class TestCountDownLatch 05public static void main(String args) throws InterruptedException 06   / 開始的倒數(shù)鎖07   final CountDownLatch begin = new CountDownL

50、atch(1);08   / 結束的倒數(shù)鎖09   final CountDownLatch end = new CountDownLatch(10);10   / 十名選手11   final ExecutorService exec = Executors.newFixedThreadPool(10);12   13   for (int index = 0; i

51、ndex < 10; index+) 14    final int NO = index + 1;15    Runnable run = new Runnable() 16     public void run() 17      try 18       begin

52、.await();/一直阻塞19       Thread.sleep(long) (Math.random() * 10000);20       System.out.println("No." + NO + " arrived");21       catch (InterruptedException e) 22 

53、60;     finally 23       end.countDown();24      25     26    27    exec.submit(run);28   29   System.out.println(&

54、quot;Game Start");30   begin.countDown();31   end.await();32   System.out.println("Game Over");33   exec.shutdown();3435CountDownLatch最重要的方法是countDown()和await(),前者主要是倒數(shù)一次,后者是等待倒數(shù)到0,如果沒有到達0,就只有阻塞等待了。CyclicBarrier一個同步輔助類,它允許一組線

55、程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待線程后可以重用,所以稱它為循環(huán) 的 barrier。CyclicBarrier 支持一個可選的 Runnable 命令,在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),該命令只在每個屏障點運行一次。若在繼續(xù)所有參與線程之前更新共享狀態(tài),此屏障操作 很有用。示例用法:下面是一個在并行分解設計中使用 barrier 的例子,很經(jīng)典的旅行團例子:01import 

56、;java.text.SimpleDateFormat;02import java.util.Date;03import java.util.concurrent.BrokenBarrierException;04import java.util.concurrent.CyclicBarrier;05import java.util.concurrent.ExecutorService;06import java.util.concurrent.Executors;07public class TestCyclicBarri

57、er 08  / 徒步需要的時間: Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhan09  private static int timeWalk = 5, 8, 15, 15, 10 10  / 自駕游11  private static int timeSelf = 1, 3, 4, 4, 5 12  / 旅游大巴13  private static 

58、;int timeBus = 2, 4, 6, 6, 7 14   15  static String now() 16     SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");17     return sdf.format(new Date() + ": "18  19&

59、#160; static class Tour implements Runnable 20     private int times;21     private CyclicBarrier barrier;22     private String tourName;23     public Tour(Cycl

60、icBarrier barrier, String tourName, int times) 24       this.times = times;25       this.tourName = tourName;26       this.barrier = barrier;27     28   &#

61、160; public void run() 29       try 30         Thread.sleep(times0 * 1000);31         System.out.println(now() + tourName + " Reached Shenzhen");

62、32         barrier.await();33         Thread.sleep(times1 * 1000);34         System.out.println(now() + tourName + " Reached Guangzhou");35  

63、0;      barrier.await();36         Thread.sleep(times2 * 1000);37         System.out.println(now() + tourName + " Reached Shaoguan");38      

64、;   barrier.await();39         Thread.sleep(times3 * 1000);40         System.out.println(now() + tourName + " Reached Changsha");41         

65、barrier.await();42         Thread.sleep(times4 * 1000);43         System.out.println(now() + tourName + " Reached Wuhan");44         barrier.await();45

66、0;       catch (InterruptedException e) 46        catch (BrokenBarrierException e) 47       48     49  50  public static void main

67、(String args) 51     / 三個旅行團52     CyclicBarrier barrier = new CyclicBarrier(3);53     ExecutorService exec = Executors.newFixedThreadPool(3);54     exec.submit(new Tour(barrier, "WalkT

68、our", timeWalk);55     exec.submit(new Tour(barrier, "SelfTour", timeSelf);56/當我們把下面的這段代碼注釋后,會發(fā)現(xiàn),程序阻塞了,無法繼續(xù)運行下去。57     exec.submit(new Tour(barrier, "BusTour", timeBus);58     exec.shutdown();59  60CyclicBarrier最重要的屬性就是參與者個數(shù),另外最要方法是await()。當所有線程都調(diào)用了await()后,就表示這些線程都可以繼續(xù)執(zhí)行,否則就會等待。FutureFuture 表示異步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,并檢索計算的結果。計算完成后只能使用 get 方法來檢索結果,如有必要,計算完成前可以阻塞此方法。取消則由 cancel 方法來執(zhí)行。還提供了其他方法,以確定任務是正常完成還是被取消了。一旦計算完成,就不能再取消計算。如果

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論