[Rustのはじめ方] Part26: チャネルによる通信(mpsc)

Rust

はじめに:スレッド間で情報をやりとりする!

前回は std::thread を使って新しいスレッドを生成する方法を学びましたね。複数のスレッドが同時に処理を進めることで、プログラムのパフォーマンスを向上させることができます。しかし、それぞれのスレッドが独立して動くだけでなく、互いに情報を交換したり、処理の結果を共有したりする必要が出てくることがあります。🤔

そんな時に活躍するのがチャネル (Channel) です!チャネルは、あるスレッドから別のスレッドへ安全にデータを送るための仕組みを提供します。水道管🚰のように、一方からデータを流し込み、もう一方から受け取るイメージです。

Rustの標準ライブラリでは、std::sync::mpsc モジュールでチャネル機能が提供されています。この mpsc は “Multiple Producer, Single Consumer” の略で、複数の送信者 (Producer) がデータを送り、単一の受信者 (Consumer) がデータを受け取るタイプのチャネルであることを意味します。

💡 ポイント: チャネルを使うことで、スレッド間でデータを安全かつ簡単に送受信できます。共有メモリを直接操作するよりも、データ競合などの問題を避けやすくなります。

基本的なチャネルの使い方 (std::sync::mpsc)

まずは、基本的なチャネルの作成と送受信の方法を見ていきましょう。mpsc::channel() 関数を使うと、チャネルの送信側 (Sender<T>) と受信側 (Receiver<T>) のペアを作成できます。T はチャネルを通じて送受信するデータの型です。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // String 型のデータを送受信するチャネルを作成
    // tx: 送信者 (Transmitter), rx: 受信者 (Receiver)
    let (tx, rx) = mpsc::channel();

    // 新しいスレッドを生成し、送信側 (tx) の所有権を移動
    thread::spawn(move || {
        let message = String::from("こんにちは、チャネル!👋");
        println!("送信スレッド: メッセージを送信します: '{}'", message);
        // send メソッドでメッセージを送信
        // send は Result<(), SendError<T>> を返す
        tx.send(message).unwrap(); // 送信失敗時は panic

        // 少し待ってから別のメッセージを送信
        thread::sleep(Duration::from_secs(1));
        let message2 = String::from("元気ですか?");
        println!("送信スレッド: 2つ目のメッセージを送信します: '{}'", message2);
        tx.send(message2).unwrap();
    });

    // メインスレッドでメッセージを受信
    println!("メインスレッド: メッセージの受信を待っています...");

    // recv メソッドでメッセージを受信 (メッセージが来るまでブロックする)
    // recv は Result<T, RecvError> を返す
    let received_message = rx.recv().unwrap(); // 受信失敗時 (チャネルクローズなど) は panic
    println!("メインスレッド: 受信しました🎉: '{}'", received_message);

    // 2つ目のメッセージを受信
    let received_message2 = rx.recv().unwrap();
    println!("メインスレッド: 2つ目を受信しました🎉: '{}'", received_message2);

    // try_recv を使うとブロックせずに受信を試みることもできる
    // メッセージがない場合は Err(TryRecvError::Empty) が返る
    match rx.try_recv() {
        Ok(msg) => println!("メインスレッド: try_recv で受信: '{}'", msg),
        Err(mpsc::TryRecvError::Empty) => println!("メインスレッド: try_recv: 受信するメッセージはありませんでした。"),
        Err(mpsc::TryRecvError::Disconnected) => println!("メインスレッド: try_recv: チャネルが切断されました。"),
    }

    println!("メインスレッド: 処理を終了します。");
}

上記のコードでは、まず mpsc::channel() でチャネルを作成し、送信側 tx と受信側 rx を受け取ります。 新しいスレッドを thread::spawn で生成し、move クロージャを使って tx の所有権を新しいスレッドに移動させています。これにより、新しいスレッドからメインスレッドへメッセージを送信できます。

送信スレッドは tx.send(message) を使ってメッセージを送ります。send メソッドは送信する値の所有権を奪います。これは、送信後に送信側スレッドがその値を誤って変更したりしないようにするためです。

一方、メインスレッド(受信側)は rx.recv() を使ってメッセージを受信します。recv メソッドは、メッセージがチャネルに到着するまで、またはチャネルがクローズされるまで、現在のスレッドの実行をブロックします。メッセージを受信すると、その値の所有権が受信側に移ります。

また、rx.try_recv() というメソッドもあります。これはブロックせずに、すぐに受信を試みます。メッセージがあれば Ok(message) を、なければ Err(TryRecvError::Empty) を、チャネルが切断されていれば Err(TryRecvError::Disconnected) を返します。

複数の送信者 (Multiple Producers)

mpsc の名前の通り、複数の送信者 (Producer) から単一の受信者 (Consumer) へメッセージを送ることができます。これは、送信側である Sender<T>clone() することで実現します。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    // 1つ目の送信者用の Sender をクローン
    let tx1 = tx.clone();
    // 1つ目の送信スレッド
    thread::spawn(move || {
        let messages = vec![
            String::from("スレッド1からのメッセージA"),
            String::from("スレッド1からのメッセージB"),
        ];
        for msg in messages {
            println!("スレッド1: 送信中 - '{}'", msg);
            tx1.send(msg).unwrap();
            thread::sleep(Duration::from_millis(500));
        }
        println!("スレッド1: 送信完了。");
        // tx1 はここでドロップされる
    });

    // 2つ目の送信スレッド (元の tx を使う)
    thread::spawn(move || {
        let messages = vec![
            String::from("スレッド2からのメッセージX"),
            String::from("スレッド2からのメッセージY"),
        ];
        for msg in messages {
            println!("スレッド2: 送信中 - '{}'", msg);
            tx.send(msg).unwrap();
            thread::sleep(Duration::from_millis(700)); // 少し違う間隔で送信
        }
        println!("スレッド2: 送信完了。");
        // tx はここでドロップされる
    });

    println!("メインスレッド: 受信を開始します...");
    // Receiver はイテレータとしても使える!
    // すべての Sender がドロップされると、イテレーションは終了する
    for received in rx {
        println!("メインスレッド: 受信しました✅: '{}'", received);
    }

    println!("メインスレッド: すべての送信者が終了し、チャネルが閉じられました。");
}

この例では、tx.clone() で新しい Sender を作成し、それを最初のスレッドに渡しています。元の tx は2番目のスレッドに渡されます。これで、2つのスレッドが同じチャネル(同じ Receiver)に対してメッセージを送信できるようになります。

注目すべきは、受信側のループです。Receiver<T> はイテレータとして振る舞うため、for ループを使ってメッセージを順番に受信できます。このループは、チャネルがクローズされるまで続きます。

チャネルのクローズ

チャネルはいつクローズされるのでしょうか? 🤔 それは、そのチャネルに紐づくすべての Sender がスコープから外れた(ドロップされた)時です。

上記の複数送信者の例で、for received in rx ループが終了したのは、tx1tx の両方がそれぞれのスレッドの最後でドロップされたからです。すべての送信者がいなくなると、Receiver はもう新しいメッセージを受け取ることはないと判断し、recv()Err(RecvError) を返すようになります。for ループで使っている場合、イテレーションが自然に終了します。

同様に、もし Receiver が先にドロップされると、Sendersend() しようとした際に Err(SendError) が返されます。これは、メッセージを送る相手がいなくなったことを示します。

⚠️ 注意: sendrecv が返す Result を適切に処理することが重要です。unwrap() は簡単な例では便利ですが、実際のアプリケーションでは match? 演算子を使ってエラーハンドリングを行うべきです。

まとめ ✨

今回は、Rustの標準ライブラリが提供する mpsc チャネルを使って、スレッド間で安全にデータを送受信する方法を学びました。

  • mpsc::channel()SenderReceiver を作成します。
  • Sendersend メソッドでメッセージを送信し、所有権を渡します。
  • Receiverrecv (ブロッキング) または try_recv (ノンブロッキング) でメッセージを受信します。
  • Senderclone() することで、複数の送信者 (Producer) を作れます。
  • すべての Sender がドロップされると、チャネルはクローズされます。
  • Receiver はイテレータとしても利用でき、チャネルがクローズされるまでメッセージを受信し続けられます。

チャネルは、スレッド間の通信をシンプルかつ安全に行うための強力なツールです。特に、所有権システムと組み合わせることで、データ競合を防ぎながら並行処理を記述するのに役立ちます。 💪

次は、非同期処理の世界で使われるチャネルや、より高度な同期プリミティブについて見ていくことになります。お楽しみに!🚀

コメント

タイトルとURLをコピーしました