C++でのスレッド間通信: std::condition_variableの使い方を徹底解説

C++のマルチスレッドプログラミングにおいて、スレッド間の効率的な通信は非常に重要です。その中でも、std::condition_variableはスレッド間通信を実現するための強力なツールです。この記事では、std::condition_variableの基本的な概念から、具体的な使用方法、応用例までを詳しく解説します。これにより、マルチスレッドプログラミングにおける課題を効果的に解決できるようになります。

目次

std::condition_variableの基礎知識

std::condition_variableは、C++標準ライブラリに含まれる同期プリミティブの一つで、スレッド間の通信と同期を行うために使用されます。主に、スレッドが特定の条件を満たすまで待機し、その条件が満たされた時に他のスレッドに通知を送るために利用されます。これにより、スレッド間の協調動作を簡潔かつ効率的に実現できます。次に、std::condition_variableの構造と基本的な使い方について説明します。

std::condition_variableの基本的な使い方

std::condition_variableを使うためには、まずstd::mutexと連携させて使用します。以下に、基本的な使い方を示すコード例を紹介します。

基本的なコード例

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void print_id(int id) {
    std::unique_lock<std::mutex> lck(mtx);
    cv.wait(lck, [] { return ready; });
    std::cout << "Thread " << id << '\n';
}

void go() {
    std::unique_lock<std::mutex> lck(mtx);
    ready = true;
    cv.notify_all();
}

int main() {
    std::thread threads[10];
    for (int i = 0; i < 10; ++i)
        threads[i] = std::thread(print_id, i);

    std::cout << "10 threads ready to race...\n";
    go();

    for (auto& th : threads) th.join();

    return 0;
}

コードの説明

  • std::mutex mtx:スレッド間の排他制御を行うためのミューテックス。
  • std::condition_variable cv:スレッド間の待機と通知を行うための条件変数。
  • bool ready = false:スレッドが処理を開始する準備ができたかを示すフラグ。
  • print_id関数:スレッドが実行する関数。cv.waitreadyがtrueになるまで待機します。
  • go関数:readyをtrueに設定し、全ての待機中のスレッドに通知を送ります。
  • main関数:10個のスレッドを生成し、go関数を呼び出して全スレッドを開始させます。

このコード例では、複数のスレッドがstd::condition_variableを使用して同期し、一斉に処理を開始する様子を示しています。

スレッドの待機と通知の仕組み

std::condition_variableを使用する際、スレッドの待機と通知の仕組みは重要なポイントです。これにより、スレッド間で効率的に情報をやり取りし、協調動作を実現できます。

スレッドの待機

スレッドが特定の条件を満たすまで待機するために、std::condition_variable::waitメソッドを使用します。このメソッドは、条件が満たされるまでスレッドをブロックします。以下に基本的な使い方を示します。

std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, []{ return ready; });
  • std::unique_lock<std::mutex> lck(mtx):ミューテックスをロックするためにunique_lockを使用します。
  • cv.wait(lck, []{ return ready; }):条件が満たされるまでスレッドを待機させます。この場合、readyがtrueになるまで待機します。

スレッドへの通知

条件が満たされた時に待機中のスレッドに通知を送るために、std::condition_variable::notify_oneまたはstd::condition_variable::notify_allメソッドを使用します。

cv.notify_one();  // 一つのスレッドに通知
cv.notify_all();  // すべての待機中のスレッドに通知
  • cv.notify_one():一つの待機中のスレッドに通知を送り、再開させます。
  • cv.notify_all():すべての待機中のスレッドに通知を送り、再開させます。

通知の実行例

以下に、notify_onenotify_allの使用例を示します。

void notify_example() {
    std::unique_lock<std::mutex> lck(mtx);
    ready = true;
    cv.notify_one();  // または cv.notify_all();
}

この例では、readyフラグをtrueに設定し、条件変数を使用して待機中のスレッドに通知を送ります。これにより、待機中のスレッドは再開し、処理を続行します。

スレッドの待機と通知の仕組みを理解することで、効率的なスレッド間通信と同期を実現でき、マルチスレッドプログラミングの強力なツールとして活用できます。

std::mutexとの連携

std::condition_variableは、必ずstd::mutexと連携して使用します。これにより、スレッド間のデータ競合を防ぎ、安全な通信が可能になります。ここでは、std::mutexとの連携方法について詳しく解説します。

std::mutexの基本

std::mutexは、スレッド間で共有されるデータのアクセスを制御するための排他制御オブジェクトです。std::condition_variableと組み合わせて使用することで、データの一貫性を保ちながらスレッド間の通信を行えます。

std::mutexとstd::condition_variableの連携例

以下に、std::mutexとstd::condition_variableを連携させた基本的な例を示します。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex mtx;
std::condition_variable cv;
bool ready = false;
int data = 0;

void producer() {
    std::unique_lock<std::mutex> lck(mtx);
    data = 100; // 何らかのデータを生成
    ready = true;
    cv.notify_one(); // 消費者スレッドに通知
}

void consumer() {
    std::unique_lock<std::mutex> lck(mtx);
    cv.wait(lck, []{ return ready; });
    std::cout << "Consumed data: " << data << '\n';
}

int main() {
    std::thread prod(producer);
    std::thread cons(consumer);

    prod.join();
    cons.join();

    return 0;
}

コードの説明

  • std::mutex mtx:共有データへのアクセスを制御するためのミューテックス。
  • std::condition_variable cv:スレッド間の待機と通知を行うための条件変数。
  • bool ready = false:データの準備ができたかを示すフラグ。
  • int data = 0:共有されるデータ。
  • producer関数:データを生成し、readyをtrueに設定して条件変数に通知を送ります。
  • consumer関数:条件変数で待機し、readyがtrueになったらデータを消費します。
  • main関数:プロデューサースレッドとコンシューマースレッドを生成し、実行します。

詳細な動作解説

  1. producer関数が呼び出されると、ミューテックスmtxをロックし、データを生成します。
  2. readyをtrueに設定し、cv.notify_oneで待機中のスレッドに通知を送ります。
  3. consumer関数はcv.waitで待機し、readyがtrueになるのを待ちます。
  4. 通知を受け取ったconsumer関数はミューテックスを再びロックし、データを消費します。

このように、std::mutexとstd::condition_variableを連携させることで、安全かつ効率的にスレッド間通信を実現できます。

例外処理とエラーハンドリング

std::condition_variableを使用する際には、例外処理とエラーハンドリングも重要な要素です。スレッド間通信において予期せぬエラーが発生することを防ぎ、堅牢なプログラムを作成するために必要な対策について説明します。

例外処理の重要性

マルチスレッドプログラミングでは、スレッドが異常終了する可能性があります。これを適切にハンドリングしないと、デッドロックやデータの不整合が発生するリスクがあります。std::condition_variableを使用する場合も例外処理を取り入れることが推奨されます。

基本的な例外処理の方法

基本的な例外処理は、try-catchブロックを用いて行います。以下に例を示します。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex mtx;
std::condition_variable cv;
bool ready = false;
int data = 0;

void producer() {
    try {
        std::unique_lock<std::mutex> lck(mtx);
        data = 100; // 何らかのデータを生成
        ready = true;
        cv.notify_one(); // 消費者スレッドに通知
    } catch (const std::exception& e) {
        std::cerr << "Exception in producer: " << e.what() << '\n';
    }
}

void consumer() {
    try {
        std::unique_lock<std::mutex> lck(mtx);
        cv.wait(lck, []{ return ready; });
        std::cout << "Consumed data: " << data << '\n';
    } catch (const std::exception& e) {
        std::cerr << "Exception in consumer: " << e.what() << '\n';
    }
}

int main() {
    try {
        std::thread prod(producer);
        std::thread cons(consumer);

        prod.join();
        cons.join();
    } catch (const std::exception& e) {
        std::cerr << "Exception in main: " << e.what() << '\n';
    }

    return 0;
}

コードの説明

  • producer関数とconsumer関数の両方にtry-catchブロックを追加し、例外が発生した場合にエラーメッセージを表示します。
  • main関数でもtry-catchブロックを使用し、スレッドの生成や結合中に発生する例外をキャッチします。

エラーハンドリングのベストプラクティス

  • リソースの適切な解放:例外が発生した場合でも、ミューテックスやメモリなどのリソースが適切に解放されるようにします。C++のRAII(Resource Acquisition Is Initialization)原則を利用することが推奨されます。
  • ロギング:例外発生時にエラーログを記録し、後でデバッグしやすくします。
  • 再試行ロジック:場合によっては、エラーが発生した処理を再試行するロジックを組み込むことが有効です。

これらの例外処理とエラーハンドリングの方法を実装することで、より堅牢で信頼性の高いスレッド間通信を実現することができます。

実践的なコード例

std::condition_variableの理論を理解したら、実際に使ってみることが重要です。ここでは、std::condition_variableを用いた実践的なコード例を紹介します。この例では、簡単なタスクキューを実装し、スレッド間でタスクを共有します。

タスクキューの実装

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>

std::mutex mtx;
std::condition_variable cv;
bool done = false;
std::queue<std::function<void()>> tasks;

void worker() {
    while (true) {
        std::function<void()> task;
        {
            std::unique_lock<std::mutex> lck(mtx);
            cv.wait(lck, []{ return !tasks.empty() || done; });

            if (done && tasks.empty())
                break;

            task = std::move(tasks.front());
            tasks.pop();
        }
        task();
    }
}

void add_task(std::function<void()> task) {
    {
        std::unique_lock<std::mutex> lck(mtx);
        tasks.push(std::move(task));
    }
    cv.notify_one();
}

void stop_workers() {
    {
        std::unique_lock<std::mutex> lck(mtx);
        done = true;
    }
    cv.notify_all();
}

int main() {
    std::thread worker_thread(worker);

    add_task([]{ std::cout << "Task 1 executed\n"; });
    add_task([]{ std::cout << "Task 2 executed\n"; });
    add_task([]{ std::cout << "Task 3 executed\n"; });

    std::this_thread::sleep_for(std::chrono::seconds(1)); // simulate work

    stop_workers();
    worker_thread.join();

    return 0;
}

コードの説明

  • std::mutex mtx:タスクキューへのアクセスを制御するためのミューテックス。
  • std::condition_variable cv:タスクの追加や完了を通知するための条件変数。
  • bool done = false:ワーカーが終了するかどうかを示すフラグ。
  • std::queue<std::function<void()>> tasks:タスクを保持するキュー。

関数の詳細

  • worker関数:タスクを処理するワーカースレッド。cv.waitでタスクが追加されるのを待機し、タスクを実行します。
  • add_task関数:タスクをキューに追加し、ワーカースレッドに通知を送ります。
  • stop_workers関数:ワーカースレッドを終了させるためにdoneフラグをtrueに設定し、すべてのワーカースレッドに通知を送ります。
  • main関数:ワーカースレッドを生成し、タスクを追加し、ワーカースレッドを停止させます。

この実践的なコード例を通じて、std::condition_variableの効果的な使い方とスレッド間通信の実装方法を理解できます。このようなタスクキューは、並行処理が求められる多くのシステムで役立ちます。

応用例: 生産者-消費者問題の解決

std::condition_variableを使用する典型的な例の一つが、生産者-消費者問題です。ここでは、生産者がデータを生成し、消費者がそのデータを消費するというシナリオを実装します。この問題は、スレッド間の同期を適切に行うための良い例です。

生産者-消費者問題の概要

生産者-消費者問題は、共有リソースに対するアクセスを調整する問題です。生産者スレッドはデータを生成し、バッファに追加します。一方、消費者スレッドはバッファからデータを取り出して処理します。この際、バッファが空の場合は消費者は待機し、バッファが満の場合は生産者が待機します。

実装例

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

std::mutex mtx;
std::condition_variable cv;
std::queue<int> buffer;
const unsigned int maxBufferSize = 10;
bool done = false;

void producer(int id) {
    for (int i = 0; i < 20; ++i) {
        std::unique_lock<std::mutex> lck(mtx);
        cv.wait(lck, []{ return buffer.size() < maxBufferSize; });

        buffer.push(i);
        std::cout << "Producer " << id << " produced " << i << '\n';

        cv.notify_all(); // 通知を送る
    }
    std::unique_lock<std::mutex> lck(mtx);
    done = true;
    cv.notify_all(); // 全ての消費者に通知
}

void consumer(int id) {
    while (true) {
        std::unique_lock<std::mutex> lck(mtx);
        cv.wait(lck, []{ return !buffer.empty() || done; });

        if (!buffer.empty()) {
            int item = buffer.front();
            buffer.pop();
            std::cout << "Consumer " << id << " consumed " << item << '\n';
            cv.notify_all(); // 通知を送る
        } else if (done) {
            break; // 生産者が終了し、バッファが空のとき、消費者も終了
        }
    }
}

int main() {
    std::thread producers[2];
    std::thread consumers[3];

    for (int i = 0; i < 2; ++i)
        producers[i] = std::thread(producer, i);

    for (int i = 0; i < 3; ++i)
        consumers[i] = std::thread(consumer, i);

    for (auto& p : producers) p.join();
    for (auto& c : consumers) c.join();

    return 0;
}

コードの説明

  • std::mutex mtx:共有バッファへのアクセスを制御するためのミューテックス。
  • std::condition_variable cv:生産者と消費者間の同期を行うための条件変数。
  • std::queue<int> buffer:共有バッファ。
  • const unsigned int maxBufferSize = 10:バッファの最大サイズ。
  • bool done = false:生産者の作業完了を示すフラグ。

関数の詳細

  • producer関数:データを生成し、バッファに追加します。バッファが満杯の場合は、スペースができるまで待機します。データを追加した後、消費者に通知を送ります。
  • consumer関数:バッファからデータを取り出して消費します。バッファが空の場合は、データが追加されるまで待機します。バッファが空でかつ生産者が終了した場合、消費者も終了します。
  • main関数:生産者スレッドと消費者スレッドを生成し、実行します。

この実装例を通じて、std::condition_variableを使用したスレッド間の同期と通信の実践的な方法を学べます。このようなパターンは、リアルタイムシステムや並列処理アプリケーションで広く使用されます。

応用例: タスクキューの実装

std::condition_variableは、タスクキューの実装にも非常に有用です。ここでは、タスクをキューに追加し、ワーカースレッドがそれらのタスクを処理するシンプルなタスクキューを実装します。

タスクキューの概要

タスクキューは、タスクをキューに追加し、複数のワーカースレッドがそれを取り出して処理する設計です。この方法により、タスクの負荷分散が可能となり、並列処理の効率が向上します。

実装例

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <vector>

std::mutex mtx;
std::condition_variable cv;
std::queue<std::function<void()>> taskQueue;
bool stop = false;

void worker(int id) {
    while (true) {
        std::function<void()> task;
        {
            std::unique_lock<std::mutex> lck(mtx);
            cv.wait(lck, []{ return !taskQueue.empty() || stop; });

            if (stop && taskQueue.empty())
                return;

            task = std::move(taskQueue.front());
            taskQueue.pop();
        }
        std::cout << "Worker " << id << " is processing a task\n";
        task();
    }
}

void addTask(std::function<void()> task) {
    {
        std::unique_lock<std::mutex> lck(mtx);
        taskQueue.push(std::move(task));
    }
    cv.notify_one();
}

void stopWorkers() {
    {
        std::unique_lock<std::mutex> lck(mtx);
        stop = true;
    }
    cv.notify_all();
}

int main() {
    const int numWorkers = 4;
    std::vector<std::thread> workers;
    for (int i = 0; i < numWorkers; ++i) {
        workers.emplace_back(worker, i);
    }

    for (int i = 0; i < 10; ++i) {
        addTask([i]{ std::cout << "Executing task " << i << "\n"; });
    }

    std::this_thread::sleep_for(std::chrono::seconds(2)); // シミュレーションのための待機

    stopWorkers();
    for (auto& worker : workers) {
        worker.join();
    }

    return 0;
}

コードの説明

  • std::mutex mtx:タスクキューへのアクセスを制御するためのミューテックス。
  • std::condition_variable cv:タスクの追加や完了を通知するための条件変数。
  • std::queue<std::function<void()>> taskQueue:タスクを保持するキュー。
  • bool stop = false:ワーカーの停止を示すフラグ。

関数の詳細

  • worker関数:タスクを処理するワーカースレッド。タスクキューが空でないか、停止フラグが立つまで待機し、タスクを取り出して実行します。
  • addTask関数:タスクをキューに追加し、ワーカースレッドに通知を送ります。
  • stopWorkers関数:すべてのワーカースレッドを停止させるために、停止フラグをtrueに設定し、全てのワーカースレッドに通知を送ります。
  • main関数:ワーカースレッドを生成し、タスクを追加し、一定時間待機後にワーカースレッドを停止させます。

このタスクキューの実装例を通じて、std::condition_variableを用いたタスクの分散処理とスレッド間の通信方法を学べます。タスクキューは、スケーラブルな並列処理アプリケーションの基盤として広く利用されています。

練習問題: スレッド間通信を実装する

ここでは、std::condition_variableとstd::mutexを使ったスレッド間通信の練習問題を提供します。実際に手を動かして実装することで、理解を深めましょう。

問題1: シンプルな生産者-消費者問題の実装

次の条件に従って、生産者-消費者問題を実装してください。

  1. 生産者スレッドは、1から100までの整数を生成してキューに追加します。
  2. 消費者スレッドは、キューから整数を取り出して表示します。
  3. キューが空の場合、消費者スレッドは生産者スレッドがデータを追加するまで待機します。
  4. すべての整数が処理されたら、スレッドを終了させます。

ヒント:

  • std::mutexとstd::condition_variableを使用して、スレッド間のデータ競合を防ぎましょう。
  • 生産者スレッドがデータを追加するたびに、消費者スレッドに通知を送りましょう。
  • すべてのデータが処理されたら、停止フラグを設定し、消費者スレッドに通知を送りましょう。

例:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

std::mutex mtx;
std::condition_variable cv;
std::queue<int> buffer;
const int max_items = 100;
bool done = false;

void producer() {
    for (int i = 1; i <= max_items; ++i) {
        std::unique_lock<std::mutex> lck(mtx);
        buffer.push(i);
        std::cout << "Produced: " << i << '\n';
        cv.notify_one();
    }
    std::unique_lock<std::mutex> lck(mtx);
    done = true;
    cv.notify_all();
}

void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lck(mtx);
        cv.wait(lck, []{ return !buffer.empty() || done; });

        if (!buffer.empty()) {
            int item = buffer.front();
            buffer.pop();
            std::cout << "Consumed: " << item << '\n';
        } else if (done) {
            break;
        }
    }
}

int main() {
    std::thread prod(producer);
    std::thread cons(consumer);

    prod.join();
    cons.join();

    return 0;
}

問題2: 複数の生産者・消費者の実装

次の条件に従って、複数の生産者と消費者を持つシステムを実装してください。

  1. 2つの生産者スレッドを作成し、それぞれ1から50までの整数を生成してキューに追加します。
  2. 3つの消費者スレッドを作成し、キューから整数を取り出して表示します。
  3. 生産者スレッドがすべての整数を生成し終わった後、消費者スレッドがすべての整数を処理し終わったら、スレッドを終了させます。

ヒント:

  • 複数の生産者スレッドが同時にキューにアクセスするため、std::mutexを適切に使用してデータ競合を防ぎましょう。
  • すべての生産者がデータを生成し終わったかを確認するためのフラグを使用しましょう。

例:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>

std::mutex mtx;
std::condition_variable cv;
std::queue<int> buffer;
const int items_per_producer = 50;
bool done = false;
int active_producers = 2;

void producer(int id) {
    for (int i = 1; i <= items_per_producer; ++i) {
        std::unique_lock<std::mutex> lck(mtx);
        buffer.push(id * 100 + i); // 識別子としてidを使う
        std::cout << "Producer " << id << " produced: " << id * 100 + i << '\n';
        cv.notify_one();
    }
    std::unique_lock<std::mutex> lck(mtx);
    --active_producers;
    if (active_producers == 0) {
        done = true;
    }
    cv.notify_all();
}

void consumer(int id) {
    while (true) {
        std::unique_lock<std::mutex> lck(mtx);
        cv.wait(lck, []{ return !buffer.empty() || done; });

        if (!buffer.empty()) {
            int item = buffer.front();
            buffer.pop();
            std::cout << "Consumer " << id << " consumed: " << item << '\n';
        } else if (done) {
            break;
        }
    }
}

int main() {
    std::vector<std::thread> producers;
    std::vector<std::thread> consumers;

    for (int i = 0; i < 2; ++i)
        producers.emplace_back(producer, i);

    for (int i = 0; i < 3; ++i)
        consumers.emplace_back(consumer, i);

    for (auto& p : producers) p.join();
    for (auto& c : consumers) c.join();

    return 0;
}

これらの練習問題を通じて、std::condition_variableとstd::mutexを用いたスレッド間通信の実装に慣れてください。スレッドの待機と通知の仕組みを理解し、効率的な並行処理を実現するためのスキルを身につけましょう。

まとめ

この記事では、C++のstd::condition_variableを使ったスレッド間通信について詳しく解説しました。基本的な概念から始まり、std::mutexとの連携方法、例外処理とエラーハンドリング、実践的なコード例、応用例としての生産者-消費者問題とタスクキューの実装まで、幅広くカバーしました。最後に、練習問題を通じて実際に手を動かして学ぶことで、std::condition_variableの理解を深めていただけたと思います。これらの知識と技術を活用して、効率的で堅牢なマルチスレッドプログラムを構築してください。

コメント

コメントする

目次