この記事を読むことで、あなたは以下の知識を得ることができます。
java.util.concurrent
パッケージが提供する主要なコンポーネントの全体像- スレッドプールを簡単に扱うためのExecutorフレームワークの具体的な使い方
- Java 8で導入された
CompletableFuture
による、モダンで柔軟な非同期処理の実装方法 - マルチスレッド環境で安全に使えるスレッドセーフなコレクションの活用法
CountDownLatch
やSemaphore
など、スレッド間の複雑な同期を実現するSynchronizerの知識synchronized
ブロックを超える、高機能なLockフレームワークの利用シーンと注意点- ロックを使わずにアトミックな操作を実現するAtomic変数の仕組みとメリット
はじめに:なぜjava.util.concurrentが重要なのか
現代のアプリケーション開発において、パフォーマンスと応答性の向上は至上命題です。特にサーバーサイドJavaでは、多くのリクエストを効率的に捌くためにマルチスレッドプログラミングが不可欠です。しかし、伝統的なThread
クラスやsynchronized
キーワードだけを使った並行処理は、煩雑で、デッドロックや競合状態といった難しい問題を引き起こしがちでした。
こうした問題を解決するために、Java 5.0で導入されたのがjava.util.concurrent
パッケージです。このパッケージは、並行プログラミングで頻出するパターンを抽象化し、堅牢で高性能なコンポーネントとして提供します。これにより、開発者は低レベルなスレッド管理の複雑さから解放され、より本質的なビジネスロジックの実装に集中できるようになりました。この記事では、この強力なライブラリの核心的な機能を、具体的なコード例を交えながら徹底的に解説していきます。
第1章: Executorフレームワーク – スレッド管理の抽象化
並行処理の第一歩は、タスクを別スレッドで実行することです。しかし、タスクごとにnew Thread().start()
を呼び出すのは非効率的です。スレッドの生成と破棄にはコストがかかり、無制限にスレッドを作成するとシステムリソースを枯渇させてしまう危険性があります。
Executorフレームワークは、タスクの「サブミット(投入)」と「実行」を分離し、スレッド管理を抽象化するための仕組みです。これにより、スレッドの再利用(スレッドプーリング)が容易になり、リソースを効率的に活用できます。
ExecutorServiceとExecutors
中心的なインタフェースはExecutorService
で、タスクの実行だけでなく、サービスのライフサイクル管理(シャットダウンなど)も行います。最も簡単な使い方はExecutors
ファクトリクラスを利用する方法です。
Executors.newFixedThreadPool(int nThreads)
指定された数のスレッドを持つスレッドプールを作成します。常に一定数のスレッドでタスクを処理したい場合に利用します。CPUコア数に合わせたコンピューティング集約的なタスクなどに適しています。
Executors.newCachedThreadPool()
必要に応じてスレッドを生成し、アイドル状態のスレッドを60秒間保持して再利用するスレッドプールです。短命なタスクが大量に発生するようなI/Oバウンドな処理に適しています。
Executors.newSingleThreadExecutor()
単一のスレッドでタスクを順次実行するExecutorです。タスクが必ず順番通りに実行されることを保証したい場合に使います。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecutorExample { public static void main(String[] args) { // 3つのスレッドを持つスレッドプールを作成 ExecutorService executor = Executors.newFixedThreadPool(3); // 10個のタスクを投入 for (int i = 0; i < 10; i++) { int taskNo = i; // executeメソッドでRunnableを渡す executor.execute(() -> { System.out.println("Executing task " + taskNo + " by " + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } // 新しいタスクの受け入れを停止し、実行中のタスク完了を待つ executor.shutdown(); try { // すべてのタスクが完了するまで最大1分待機 if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // タイムアウトした場合は、実行中のタスクを強制停止 executor.shutdownNow(); } } catch (InterruptedException e) { executor.shutdownNow(); } System.out.println("All tasks finished."); }
}
FutureとCallable – 非同期処理の結果を受け取る
executor.execute(Runnable)
は戻り値を返しません。もしタスクの実行結果を取得したり、例外をハンドリングしたい場合は、Callable<V>
インタフェースとFuture<V>
インタフェースを使います。
- Callable<V>: 戻り値を返し、チェック例外をスローできる
Runnable
の拡張版です。 - Future<V>: 非同期処理の結果を表すオブジェクトです。
submit()
メソッドの戻り値として得られます。
import java.util.concurrent.*;
public class FutureCallableExample { public static void main(String[] args) { ExecutorService executor = Executors.newSingleThreadExecutor(); // Callableタスクを定義(1から10までを足し算して返す) Callable<Integer> task = () -> { System.out.println("Calculating sum..."); TimeUnit.SECONDS.sleep(2); // 時間のかかる処理を模倣 int sum = 0; for (int i = 1; i <= 10; i++) { sum += i; } return sum; }; // submitメソッドでCallableを渡し、Futureを受け取る Future<Integer> future = executor.submit(task); System.out.println("Task has been submitted. Waiting for the result..."); try { // future.get()で結果が返されるまでブロックする Integer result = future.get(); System.out.println("Result: " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { executor.shutdown(); } }
}
future.get()
は処理が完了するまで呼び出し元のスレッドをブロックします。タイムアウト付きのget(long timeout, TimeUnit unit)
も利用できます。
第2章: CompletableFuture – 非同期処理の進化形 (Java 8+)
Future
は非同期処理の結果を取得するのに便利ですが、いくつかの欠点がありました。get()
がブロッキングであること、複数のFuture
を組み合わせるのが難しいことなどです。
Java 8で導入されたCompletableFuture<T>
は、これらの問題を解決し、より宣言的で流れるような非同期プログラミングを可能にします。PromiseやDeferredパターンの影響を受けており、非同期処理の完了を待ってコールバック処理を実行したり、複数の非同期処理を合成したりできます。
基本的な使い方
CompletableFuture
は主に静的ファクトリメソッドを使って生成します。
runAsync(Runnable runnable)
: 戻り値のない非同期タスクを実行します。supplyAsync(Supplier<U> supplier)
: 戻り値のある非同期タスクを実行します。
コールバックによる処理の連結
CompletableFuture
の真価は、非同期処理の完了後に実行される処理をメソッドチェーンで記述できる点にあります。
メソッド | 説明 | 引数 | 戻り値 |
---|---|---|---|
thenApply(Function<? super T,? extends U> fn) | 結果を受け取って変換し、新しい値を返す | Function | CompletableFuture<U> |
thenAccept(Consumer<? super T> action) | 結果を受け取って処理する(戻り値なし) | Consumer | CompletableFuture<Void> |
thenRun(Runnable action) | 結果を使わずに処理を実行する(戻り値なし) | Runnable | CompletableFuture<Void> |
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class CompletableFutureExample { public static void main(String[] args) throws Exception { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Hello"; }).thenApply(s -> s + " World") // 結果を変換 .thenApply(String::toUpperCase); // さらに変換 System.out.println("Result: " + future.get()); // "HELLO WORLD" が出力される // thenAcceptで最終結果を消費する CompletableFuture.supplyAsync(() -> "Final Result") .thenAccept(System.out::println); // "Final Result" が出力される // メインスレッドが先に終了しないように少し待機 Thread.sleep(2000); }
}
複数のCompletableFutureの合成
複数の独立した非同期処理を並行して実行し、すべての結果が揃ってから次の処理を行いたい場合があります。
thenCombine(CompletionStage<? extends U> other, BiFunction<...> fn)
: 2つのCompletableFuture
の結果を結合します。allOf(CompletableFuture<?>... cfs)
: 指定されたすべてのCompletableFuture
が完了したときに完了する、新しいCompletableFuture
を返します。
import java.util.concurrent.CompletableFuture;
public class CombineFuturesExample { public static void main(String[] args) throws Exception { long startTime = System.currentTimeMillis(); CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { // 2秒かかる処理 try { Thread.sleep(2000); } catch (InterruptedException e) {} return "Result from Future 1"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { // 3秒かかる処理 try { Thread.sleep(3000); } catch (InterruptedException e) {} return "Result from Future 2"; }); // 両方のFutureが完了したら、結果を結合する CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (res1, res2) -> res1 + " | " + res2 ); System.out.println("Combined result: " + combinedFuture.get()); long endTime = System.currentTimeMillis(); System.out.println("Total time: " + (endTime - startTime) + "ms"); // 約3000msとなる }
}
第3章: 同期コレクション – スレッドセーフなデータ構造
ArrayList
やHashMap
などの標準コレクションは、マルチスレッド環境でそのまま使用するとデータが破壊される危険があります。従来はCollections.synchronizedMap()
などでラップする方法がありましたが、これはコレクション全体をロックするため、パフォーマンスのボトルネックになりがちでした。
java.util.concurrent
には、より高性能で粒度の細かいロック機構を持つ、スレッドセーフなコレクションが多数用意されています。
ConcurrentHashMap
スレッドセーフなHashMap
です。読み取り操作はロックフリーで実行でき、書き込み操作もマップ全体ではなく、特定の領域(バケット)のみをロックするため、高いスループットを実現します。特にJava 8以降では内部実装が大きく改善され、さらにパフォーマンスが向上しました。マルチスレッド環境でMap
を使う場合、第一の選択肢となるべきクラスです。
CopyOnWriteArrayList
スレッドセーフなArrayList
です。その名の通り、書き込み(Write)時に内部の配列のコピー(Copy)を作成することでスレッドセーフティを実現します。
- 利点: 読み取り操作はロックを全く必要としないため非常に高速です。イテレータも安全で、イテレーション中に元のリストが変更されても
ConcurrentModificationException
は発生しません(イテレータはコピー作成時点のスナップショットを見ているため)。 - 欠点: 書き込み操作(
add
,set
,remove
など)のたびに配列全体のコピーが発生するため、コストが非常に高いです。
この特性から、「読み取りが圧倒的に多く、書き込みは稀」なユースケース(例:イベントリスナーのリスト管理など)に特化しています。
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.Iterator;
public class CopyOnWriteExample { public static void main(String[] args) { List<String> list = new CopyOnWriteArrayList<>(); list.add("A"); list.add("B"); list.add("C"); // イテレータを取得した後にリストを変更する Iterator<String> iterator = list.iterator(); list.add("D"); // イテレーション中に変更 // イテレータは変更前のスナップショットを見ている while(iterator.hasNext()) { System.out.print(iterator.next() + " "); // A B C が出力される } System.out.println("\nFinal list: " + list); // Final list: [A, B, C, D] }
}
BlockingQueue
BlockingQueueは、キューが空の場合に要素を取得しようとするとスレッドをブロックし、キューが満杯の場合に要素を追加しようとするとスレッドをブロックする機能を持つキューです。この性質は、有名なプロデューサー・コンシューマーパターンを実装するのに非常に役立ちます。
実装クラス | 特徴 | 主な用途 |
---|---|---|
ArrayBlockingQueue | 固定サイズの配列で実装された有界キュー。公平性ポリシーを選択可能。 | リソースプール、バッファリングなど、容量制限が重要な場合。 |
LinkedBlockingQueue | 連結リストで実装されたキュー。容量はデフォルトでInteger.MAX_VALUE (実質無制限)。 | 高いスループットが求められる一般的なプロデューサー・コンシューマー。 |
PriorityBlockingQueue | 要素が自然順序またはコンパレータに基づいて順序付けられる無制限キュー。 | タスクスケジューリングなど、優先度に基づいた処理が必要な場合。 |
SynchronousQueue | 内部容量を持たない特殊なキュー。put 操作は対応するtake 操作が行われるまでブロックされる。 | スレッド間の安全なデータの手渡し(ハンドオフ)。ExecutorフレームワークのnewCachedThreadPool で内部的に利用されている。 |
第4章: Synchronizer – スレッド間の協調
複数のスレッドが単に独立して動くだけでなく、互いに歩調を合わせて協調動作する必要がある場面は多く存在します。Synchronizerは、そのようなスレッド間の同期制御を容易にするためのユーティリティクラス群です。
CountDownLatch
「カウントダウン式のラッチ(掛け金)」。ある1つ以上のスレッドが、他の複数のスレッドでの処理が終わるまで待機するための仕組みです。ラッチは一度カウントがゼロになると再利用できません。
new CountDownLatch(int count)
: 指定したカウント数で初期化します。await()
: カウントがゼロになるまで待機します。countDown()
: カウントを1つ減らします。
利用例:メインスレッドが、複数のワーカースレッドの初期化処理がすべて完了するのを待つ。
CyclicBarrier
「周期的なバリア(障壁)」。指定された数のスレッドがバリアポイントに到達するまで互いに待機する仕組みです。すべてのスレッドが到達するとバリアが解除され、次の処理に進むことができます。バリアは再利用可能です(Cyclicの由来)。
利用例:複数のスレッドで分割して計算を行い、各パートの計算が終わった時点で同期を取り、次のステップに進む、といった処理。
Semaphore
「信号機」。特定の共有リソースへの同時アクセス数を制限するための仕組みです。acquire()
でパーミット(許可)を取得し、release()
で返却します。
利用例:データベース接続プールのコネクション数や、特定の高負荷APIへの同時アクセス数を制限する。
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreExample { public static void main(String[] args) { // 3つのパーミットを持つセマフォ(同時に3つのスレッドしかアクセスできない) Semaphore semaphore = new Semaphore(3); // 10個のスレッドを生成 for (int i = 0; i < 10; i++) { new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " is waiting for a permit."); semaphore.acquire(); // パーミットを取得 System.out.println(Thread.currentThread().getName() + " got a permit. Accessing the resource..."); TimeUnit.SECONDS.sleep(2); // リソースを利用している時間を模倣 } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(Thread.currentThread().getName() + " is releasing the permit."); semaphore.release(); // パーミットを解放 } }).start(); } }
}
第5章: Lockインタフェース – synchronizedを超える柔軟なロック
Javaの初期からあるsynchronized
キーワードは、シンプルで便利ですが、機能面での制約もあります。java.util.concurrent.locks
パッケージは、より高機能で柔軟なロック機構を提供します。
ReentrantLock
synchronized
と同様に再入可能な相互排他ロックですが、より多くの機能を持ちます。
機能 | ReentrantLock | synchronized |
---|---|---|
割り込み可能なロック取得 | lockInterruptibly() で可能 | 不可 |
タイムアウト付きロック取得 | tryLock(long, TimeUnit) で可能 | 不可 |
公平性(Fairness) | コンストラクタで公平ポリシーを選択可能(待機時間が長いスレッドを優先) | 保証されない(非公平) |
複数条件変数 | newCondition() で複数のConditionオブジェクトを生成可能 | オブジェクトごとに1つのみ (wait/notify) |
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockExample { private final ReentrantLock lock = new ReentrantLock(); private int count = 0; public void increment() { lock.lock(); // ロックを取得 try { count++; } finally { lock.unlock(); // 必ずロックを解放 } }
}
ReadWriteLock
読み取りロックと書き込みロックを分離したロックです。読み取り操作はデータを変更しないため、複数のスレッドが同時に実行しても安全です。一方、書き込み操作は排他的でなければなりません。ReadWriteLock
はこの原則を実装します。
- 読み取りロック (Shared Lock): 他の読み取りロックとは共存できるが、書き込みロックとは共存できない。
- 書き込みロック (Exclusive Lock): 他のどのロックとも共存できない。
キャッシュの実装など、「読み取りは頻繁だが、書き込みは稀」というシナリオでReentrantLock
よりも高いパフォーマンスを発揮します。
第6章: Atomic変数 – ロックフリーなアトミック操作
単純なカウンターのインクリメント(i++
)のような操作は、実は「読み取り」「変更」「書き込み」の3ステップからなり、アトミック(不可分)ではありません。そのため、マルチスレッド環境ではロックが必要になります。
しかし、ロックはパフォーマンスオーバーヘッドやデッドロックのリスクを伴います。java.util.concurrent.atomic
パッケージは、ロックを使わずに特定の値に対するアトミックな操作を保証するクラス群を提供します。
CAS (Compare-And-Swap)
アトミック変数の多くは、CPUが提供するCAS (Compare-And-Swap) 命令を利用して実装されています。CASは、「現在の値が期待値と同じであれば、新しい値に更新する」という操作をアトミックに行う命令です。これにより、ロックなしで値の更新が可能になります。
例えばatomicInteger.incrementAndGet()
は、内部的に以下のようなCASループを実行しています。
- 現在の値(current)を読み込む。
- 新しい値(next = current + 1)を計算する。
- CAS命令を使い、「もし現在の値が’current’のままなら、’next’に更新する」を試みる。
- もし他のスレッドによって値が変更されていたらCASは失敗するので、1からリトライする。
このロックを使わないアプローチはロックフリーと呼ばれ、スレッドの競合が少ない場合に非常に高いパフォーマンスを発揮します。
主なアトミック変数
クラス | 説明 |
---|---|
AtomicInteger , AtomicLong , AtomicBoolean | プリミティブ型に対するアトミックな操作を提供します。getAndIncrement() , compareAndSet() など。 |
AtomicReference<V> | オブジェクトの参照をアトミックに更新します。イミュータブルなオブジェクトと組み合わせて使うことが多いです。 |
LongAdder , DoubleAdder (Java 8+) | 多数のスレッドからカウンターへの更新が頻発する場合に特化したクラス。内部で複数のセルに更新を分散させることでAtomicLong よりも高いスループットを実現します。 |
まとめ
java.util.concurrent
パッケージは、Javaにおける現代的な並行処理プログラミングの土台となる、非常に強力で広範なツールセットです。
- 単純なタスク実行からスレッドプールの管理まではExecutorフレームワーク。
- コールバックベースのモダンな非同期処理はCompletableFuture。
- スレッドセーフなデータ共有には同期コレクション(特に
ConcurrentHashMap
)。 - 複雑なスレッド間の協調動作にはSynchronizer。
synchronized
を超える高度なロック制御にはLockフレームワーク。- 高競合下でのカウンターなど、細かい粒度の更新にはAtomic変数。
これらのコンポーネントを適切に使い分けることで、バグが少なく、パフォーマンスが高く、そして保守しやすい並行処理コードを記述することが可能になります。本記事が、そのための第一歩となれば幸いです。