スレッド (2)
スレッドの協調
あるスレッドがあるオブジェクトのロックを獲得してsynchorized文を
実行していると、そのロックを取ろうとする他のスレッドは停止を
してしまいます。
全てのオブジェクトは「ウェイトセット」(wait set)を持っています。
「ウェイトセット」とは「そのオブジェクトのwait()メソッドを実行して
動作を停止しているスレッドの集合」です。
スレッドがwait()メソッドを実行すると、スレッドは実行を一時停止し、
そのオブジェクトのウェイトセットに登録されます。
ウェイトセットに登録されたスレッドは以下の事態が起きると起こされます。
- 他のスレッドから notify()メソッドによって起こされる
- 他のスレッドから notifyAll()メソッドによって起こされる
- 他のスレッドから interrupt()メソッドによって起こされる
- wait()メソッドがタイムアウトする。
wait()メソッド
あるオブジェクトObjのロックを獲得したスレッドAが、そのオブジェクトObjの
wait()メソッドを実行すると、スレッドAはオブジェクトObjのロックを開放し、
オブジェクトObjのウェイトセットに入ります。
その後、そのオブジェクトObjのロックを他のスレッドが獲得することが
可能となります。
wait()メソッドを実行するためには、スレッドがロックを持っていなければ
なりません。
wait()しているところを、他のスレッドからnofify()またはnotifyAll()で
起こされると、再びロックを獲得できればロックを獲得してwait()の続き
から実行を再開します。
notify()メソッド
notify()メソッドを使うと、ウェイトセットに登録されているスレッドの
1つをウェイトセットから取り出し、実行を再開させます。
obj.notify()
のように、オブジェクトObjを指定してnotify()を呼び出します。
もしもObjが省略された場合は this を指定したのと同じことになります。
すなわちnotify() は this.notify() は同じ意味です。
複数のスレッドがウェイトセットに登録されているとき、どのスレッドが
選ばれて起こされるのかは処理系依存です。
notifyAll()メソッド
notifyAll()メソッドを使うと、ウェイトセットに登録されている
全てのスレッドをウェイトセットから取り出し、実行を再開させます。
obj.notifyAll()
のように、オブジェクトObjを指定してnotifyAll()を呼び出します。
もしもObjが省略された場合は this を指定したのと同じことになります。
すなわちnotifyAll() は this.notifyAll() は同じ意味です。
注意
「synchronizedメソッド」や「synchronized文」を使うときは、
「このsynchronizedは何を守っているのか」
「どの単位で守るべきか(メソッド全体? or メソッドの一部?)」
「どのオブジェクトのウェイトセットで守っているのか」
をきちんと考えることが重要です。
生産者と消費者 (Producer and Consumer)
CakeMain.javaの実行例 |
$ javac CakeMain.java CakeProducer.java CakeConsumer.java SyncQueue.java Cake.java
$ java CakeMain
Producer1 makes Cake0@Producer1
***[Cake0@Producer1 ]***
Producer1 makes Cake1@Producer1
***[Cake0@Producer1 Cake1@Producer1 ]***
Consumer1 eats Cake0@Producer1
***[Cake1@Producer1 ]***
Consumer1 eats Cake1@Producer1
***[]***
Producer2 makes Cake2@Producer2
***[Cake2@Producer2 ]***
Producer0 makes Cake3@Producer0
***[Cake2@Producer2 Cake3@Producer0 ]***
Consumer0 eats Cake2@Producer2
***[Cake3@Producer0 ]***
Consumer2 eats Cake3@Producer0
***[]***
Producer0 makes Cake4@Producer0
***[Cake4@Producer0 ]***
Consumer2 eats Cake4@Producer0
***[]***
Consumer1 eats Cake5@Producer1
***[]***
Producer1 makes Cake5@Producer1
***[]***
Producer1 makes Cake6@Producer1
***[Cake6@Producer1 ]***
Producer2 makes Cake7@Producer2
***[Cake6@Producer1 Cake7@Producer2 ]***
Consumer0 eats Cake6@Producer1
***[Cake7@Producer2 ]***
Producer0 makes Cake8@Producer0
***[Cake7@Producer2 Cake8@Producer0 ]***
Producer0 makes Cake9@Producer0
***[Cake7@Producer2 Cake8@Producer0 Cake9@Producer0 ]***
Consumer2 eats Cake7@Producer2
***[Cake8@Producer0 Cake9@Producer0 ]***
Producer2 makes Cake10@Producer2
***[Cake8@Producer0 Cake9@Producer0 Cake10@Producer2 ]***
Consumer1 eats Cake8@Producer0
***[Cake9@Producer0 Cake10@Producer2 ]***
Producer2 makes Cake11@Producer2
***[Cake9@Producer0 Cake10@Producer2 Cake11@Producer2 ]***
Consumer0 eats Cake9@Producer0
***[Cake10@Producer2 Cake11@Producer2 ]***
Producer1 makes Cake13@Producer1 ←Cake13がキューに(Cake12はwait中)
***[Cake10@Producer2 Cake11@Producer2 Cake13@Producer1 ]***
Consumer2 eats Cake10@Producer2
***[Cake11@Producer2 Cake13@Producer1 ]***
Producer1 makes Cake15@Producer1 ←Cake13がキューに(Cake14はwait中)
***[Cake11@Producer2 Cake13@Producer1 Cake15@Producer1 ]***
Consumer1 eats Cake11@Producer2
***[Cake13@Producer1 Cake15@Producer1 ]***
Producer2 makes Cake12@Producer2 ←やっとCake12がキューに。
***[Cake13@Producer1 Cake15@Producer1 Cake12@Producer2 ]***
Consumer0 eats Cake13@Producer1
***[Cake15@Producer1 Cake12@Producer2 ]***
Producer1 makes Cake16@Producer1 ←Cake16がキューに。
***[Cake15@Producer1 Cake12@Producer2 Cake16@Producer1 ]***
Consumer2 eats Cake15@Producer1
***[Cake12@Producer2 Cake16@Producer1 ]***
Producer0 makes Cake14@Producer0 ←やっとCake14がキューに。
***[Cake12@Producer2 Cake16@Producer1 Cake14@Producer0 ]***
Consumer1 eats Cake12@Producer2
***[Cake16@Producer1 Cake14@Producer0 ]***
Producer1 makes Cake17@Producer1
***[Cake16@Producer1 Cake14@Producer0 Cake17@Producer1 ]***
Consumer0 eats Cake16@Producer1
***[Cake14@Producer0 Cake17@Producer1 ]***
Producer1 makes Cake18@Producer1
***[Cake14@Producer0 Cake17@Producer1 Cake18@Producer1 ]***
...
|
[注意] もしも何度実行しても、Cakeの順番が入れ替わらない場合は Producerの数を(5とかに)増やして、
SyncQueueの容量を(3に)減らすなどの変更をしてみて下さい。
SyncQueue.javaはsynchronized文で記述したが、thisでメソッド全体に
ロックを掛けているので次のように記述しても構いません。
(ここではthisはプログラムの実行中に使われるSyncQueueオブジェクト
を指します。)
wait()やnotifyAll()を呼び出すときには、どのオブジェクトの
ロックで待っているスレッドを起こすつもりなのかに注意が必要です。
単に wait() や notifyAll()を呼び出した場合は、this.wait()や
this.notifyAll()を呼び出したことになります。
java.util.concurrentパッケージ
スレッドを使って並行動作するプログラムを作るときに必要な
クラスがjava.util.concurrentパッケージにまとめられています。
- BlockingQueueインターフェイス
- ArrayBlockingQueueクラス ... 配列で作成したBlockingQueue
- LinkedBlockingQueueクラス ... 連結リストで作成したBlockingQueue
- PriorityBlockingQueueクラス ... 優先順序付きBlockingQueue
- DelayQueueクラス ... 一定時間が過ぎないとtake()できないBlockingQueue
- SynchronousQueueクラス ... 直接渡しのBlockingQueue
- ConcurrentLinkedQueueクラス ... 要素数に上限がないスレッドセーフなQueue
上で説明した SyncQueue.java は、自分で記述する代わりに
java.util.concurrent.ArrayBlokingQueueクラスなどを使うこともできます。
セマフォ
上記のSyncQueue.javaは、「ある領域をひとつのスレッドだけが実行できる」
例でした。
これを一般化して、「ある領域を最大でN個のスレッドだけが実行できる
ように制限する」問題を考えましょう。
これを実現するのに使う仕組を「計数セマフォ」といいます。
java.util.concurrentパッケージにSemaphoreというクラスがあります。
- Semaphoreのコンストラクタで「実行可能な最大のスレッド数」を指定します。
- 実行するための権限を獲得するのは Semaphoreクラスのacquire()メソッドです。
- 実行するための権限を解放するのは Semaphoreクラスのrelease()メソッドです。
RunTestSemaphore.javaの実行例 |
$ javac RunTestSemaphore.java SemaphoreUser.java BoundedResource.java
$ java RunTestSemaphore
Thread-0:Begin: used = 1
Thread-1:Begin: used = 2
Thread-2:Begin: used = 3
Thread-0:End: used = 3
Thread-3:Begin: used = 3
Thread-1:End: used = 3
Thread-4:Begin: used = 3
Thread-2:End: used = 3
Thread-5:Begin: used = 3
Thread-3:End: used = 3
Thread-6:Begin: used = 3
Thread-5:End: used = 3
Thread-7:Begin: used = 3
Thread-4:End: used = 3
Thread-8:Begin: used = 3
Thread-7:End: used = 3
Thread-9:Begin: used = 3
Thread-8:End: used = 3
Thread-9:End: used = 2
Thread-6:End: used = 1
|
セマフォを自分で作る
セマフォは自分でも簡単に作ることができます。
BoundedResourceクラスを、自分で作ったセマフォMySemaphore.java
を使うように変更してみましょう。
MySemaphore.javaではコードを単純にするために、
- セマフォを操作するスレッドは、acquire()メソッドを呼び出したのと
同じ回数だけ必ずrelease()メソッドを呼び出さなければならない
- セマフォをacquire()していないスレッドがrelease()してはいけない
という利用側の制限をつけています。
BoundedResource.javaの変更点 |
*** java/BoundedResource0.java Sun Oct 12 15:19:32 2008
--- java/BoundedResource2.java Sun Oct 12 15:18:24 2008
***************
*** 1,12 ****
import java.util.*;
import java.util.concurrent.*;
class BoundedResource {
! private final Semaphore semaphore;
private final int permits;
private Random random;
public BoundedResource(int permits,Random random) {
this.permits = permits;
! this.semaphore = new Semaphore(permits);
this.random = random;
}
public void use() throws InterruptedException {
--- 1,12 ----
import java.util.*;
import java.util.concurrent.*;
class BoundedResource {
! private final MySemaphore semaphore;
private final int permits;
private Random random;
public BoundedResource(int permits,Random random) {
this.permits = permits;
! this.semaphore = new MySemaphore(permits);
this.random = random;
}
public void use() throws InterruptedException {
|
MySemaphoreを使ったRunTestSemaphore.javaの実行例 |
$ java RunTestSemaphore
Thread-0:Begin: used = 1
Thread-1:Begin: used = 2
Thread-2:Begin: used = 3
Thread-0:End: used = 3
Thread-9:Begin: used = 3
Thread-1:End: used = 3
Thread-3:Begin: used = 3
Thread-2:End: used = 3
Thread-8:Begin: used = 3
Thread-8:End: used = 3
Thread-4:Begin: used = 3
Thread-9:End: used = 3
Thread-7:Begin: used = 3
Thread-3:End: used = 3
Thread-5:Begin: used = 3
Thread-5:End: used = 3
Thread-6:Begin: used = 3
Thread-4:End: used = 3
Thread-7:End: used = 2
Thread-6:End: used = 1
|