C++20のstd::barrierとstd::latchを使った同期の方法

C++20で導入されたstd::barrierとstd::latchは、マルチスレッドプログラミングにおける同期をシンプルかつ効率的に行うための強力なツールです。本記事では、これらの新機能を使ってスレッド間の同期を実現する方法を具体例を交えて詳しく解説します。まず、std::barrierとstd::latchの基本概念を理解し、その後、実際のコードを通じて使い方を学びます。さらに、これらを組み合わせることで得られる利点やベストプラクティスについても触れていきます。最後に、応用例と演習問題を通じて、実践的なスキルを身につけていただける構成になっています。

目次

std::barrierとは

std::barrierは、C++20で導入された同期プリミティブで、複数のスレッドが特定のポイントに到達するまで待機するためのものです。全てのスレッドがbarrierに到達することで、次のステップに進むことができます。この機能は、並列処理の各ステージを同期させるために非常に有用です。

std::barrierの基本構造

std::barrierは、カウンタと呼ばれる同期点を保持しており、指定されたスレッド数がそのポイントに到達するまで待機します。全スレッドが到達すると、カウンタがリセットされ、次の同期ポイントに進むことができます。

std::barrierのコンストラクタ

#include <barrier>

// スレッド数を指定してbarrierを初期化
std::barrier sync_point(スレッド数);

std::barrierのメンバ関数

  • arrive_and_wait(): スレッドが同期ポイントに到達し、他のスレッドが到達するのを待つ。
  • arrive_and_drop(): スレッドが同期ポイントに到達し、待機せずに同期から外れる。

これらの機能を理解することで、スレッド間の同期を効率的に管理できるようになります。次のセクションでは、具体的な使い方について見ていきましょう。

std::barrierの使い方

std::barrierの具体的な使い方を、コード例を交えて説明します。これにより、実際のプロジェクトでどのように適用するかが明確になります。

基本的な使用例

以下のコードは、3つのスレッドが特定のポイントで同期する例です。

#include <iostream>
#include <thread>
#include <barrier>
#include <vector>

// 同期ポイントを3つのスレッドで初期化
std::barrier sync_point(3);

void worker(int id) {
    std::cout << "Thread " << id << " is working...\n";
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 擬似的な作業
    std::cout << "Thread " << id << " reached the barrier\n";
    sync_point.arrive_and_wait(); // 同期ポイントで待機
    std::cout << "Thread " << id << " passed the barrier\n";
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 3; ++i) {
        threads.emplace_back(worker, i);
    }
    for (auto& th : threads) {
        th.join();
    }
    return 0;
}

このコードでは、各スレッドがsync_point.arrive_and_wait()に到達するまで作業を行い、その後全スレッドが到達するまで待機します。全スレッドが到達すると、同期ポイントがリセットされ、各スレッドが次のステップに進むことができます。

arrive_and_dropの使用例

特定のスレッドが同期後に次のステップに進まずに終了する場合、arrive_and_drop()を使用します。

#include <iostream>
#include <thread>
#include <barrier>
#include <vector>

std::barrier sync_point(3);

void worker(int id) {
    std::cout << "Thread " << id << " is working...\n";
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 擬似的な作業
    std::cout << "Thread " << id << " reached the barrier\n";
    if (id == 2) {
        sync_point.arrive_and_drop(); // このスレッドは同期後に終了
    } else {
        sync_point.arrive_and_wait(); // 他のスレッドは同期後に続行
    }
    std::cout << "Thread " << id << " passed the barrier\n";
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 3; ++i) {
        threads.emplace_back(worker, i);
    }
    for (auto& th : threads) {
        th.join();
    }
    return 0;
}

この例では、idが2のスレッドが同期後に終了し、他のスレッドは次のステップに進みます。

これらの基本的な使い方を理解することで、std::barrierを使った同期の基本が身につきます。次のセクションでは、std::barrierの応用例を見ていきます。

std::barrierの応用例

std::barrierは、単純な同期だけでなく、より複雑な同期パターンでも利用できます。以下では、いくつかの応用例を紹介します。

並列処理におけるステージ間の同期

多段階の並列処理パイプラインにおいて、各ステージの終了を待ってから次のステージに進む例です。

#include <iostream>
#include <thread>
#include <barrier>
#include <vector>

constexpr int num_stages = 3;
constexpr int num_threads = 3;

std::barrier sync_point(num_threads);

void stage_worker(int id, int stage) {
    std::cout << "Thread " << id << " working on stage " << stage << "\n";
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 擬似的な作業
    sync_point.arrive_and_wait(); // 同期ポイントで待機
}

int main() {
    std::vector<std::thread> threads;
    for (int stage = 0; stage < num_stages; ++stage) {
        for (int i = 0; i < num_threads; ++i) {
            threads.emplace_back(stage_worker, i, stage);
        }
        for (auto& th : threads) {
            th.join();
        }
        threads.clear(); // 次のステージのためにスレッドをクリア
        std::cout << "All threads completed stage " << stage << "\n";
    }
    return 0;
}

この例では、各スレッドがステージごとに作業を行い、全スレッドがそのステージを終了するのを待ちます。次のステージに進む前に、全スレッドがbarrierに到達することで同期されます。

データ処理のパイプライン

複数のスレッドが異なるデータ処理ステージを担当し、データが各ステージを通過する例です。

#include <iostream>
#include <thread>
#include <barrier>
#include <vector>
#include <atomic>

constexpr int num_stages = 3;
constexpr int num_threads = 3;

std::barrier sync_point(num_threads);
std::atomic<int> data(0);

void stage_worker(int id, int stage) {
    while (true) {
        sync_point.arrive_and_wait(); // 同期ポイントで待機
        if (data == -1) break; // 終了条件
        std::cout << "Thread " << id << " processing data " << data.load() << " at stage " << stage << "\n";
        data.fetch_add(1); // データを更新
        sync_point.arrive_and_wait(); // 同期ポイントで待機
    }
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back(stage_worker, i, i % num_stages);
    }

    for (int i = 0; i < 10; ++i) { // データ処理を10回繰り返す
        data = i;
        sync_point.arrive_and_wait(); // 同期ポイントで待機
    }

    data = -1; // 終了条件を設定
    sync_point.arrive_and_wait(); // 終了同期
    for (auto& th : threads) {
        th.join();
    }
    return 0;
}

この例では、各スレッドがデータ処理の異なるステージを担当し、データが全ステージを通過するたびに同期されます。処理が終了したら、特殊な終了条件を設定してスレッドを終了させます。

これらの応用例を通じて、std::barrierを使ったより高度な同期方法を理解できるようになります。次のセクションでは、std::latchの基本概念と使い方について説明します。

std::latchとは

std::latchは、C++20で導入された同期プリミティブで、スレッドが指定されたカウントダウンを完了するまで待機するためのものです。カウントダウンがゼロになると、待機しているすべてのスレッドが続行できます。これにより、一度だけの同期ポイントを提供します。

std::latchの基本構造

std::latchは初期カウントを指定して構築され、そのカウントがゼロになるまで待機する機能を持ちます。カウントダウン操作は、他のスレッドからも行うことができます。

std::latchのコンストラクタ

#include <latch>

// 初期カウントを指定してlatchを初期化
std::latch sync_point(スレッド数);

std::latchのメンバ関数

  • count_down(): カウントを1減らす。カウントがゼロになると、待機中のスレッドが続行できる。
  • wait(): カウントがゼロになるまで待機する。
  • arrive_and_wait(): カウントを1減らし、ゼロになるまで待機する。

これらの機能を理解することで、特定のスレッドが終了するまで待機するようなシナリオでstd::latchを利用できます。次のセクションでは、具体的な使い方について見ていきます。

std::latchの使い方

std::latchの具体的な使い方を、コード例を交えて説明します。これにより、実際のプロジェクトでどのように適用するかが明確になります。

基本的な使用例

以下のコードは、3つのスレッドが特定のカウントダウンを完了するまでメインスレッドが待機する例です。

#include <iostream>
#include <thread>
#include <latch>
#include <vector>

std::latch sync_point(3);

void worker(int id) {
    std::cout << "Thread " << id << " is working...\n";
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 擬似的な作業
    std::cout << "Thread " << id << " reached the latch\n";
    sync_point.count_down(); // カウントダウン
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 3; ++i) {
        threads.emplace_back(worker, i);
    }
    sync_point.wait(); // 全スレッドのカウントダウンが完了するまで待機
    for (auto& th : threads) {
        th.join();
    }
    std::cout << "All threads have reached the latch\n";
    return 0;
}

このコードでは、各スレッドがsync_point.count_down()を呼び出してカウントを減らし、メインスレッドがsync_point.wait()で待機します。全スレッドがカウントダウンを完了すると、メインスレッドが続行されます。

arrive_and_waitの使用例

特定のスレッドがカウントダウン後にすぐに続行する場合、arrive_and_wait()を使用します。

#include <iostream>
#include <thread>
#include <latch>
#include <vector>

std::latch sync_point(3);

void worker(int id) {
    std::cout << "Thread " << id << " is working...\n";
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 擬似的な作業
    std::cout << "Thread " << id << " reached the latch\n";
    sync_point.arrive_and_wait(); // カウントダウンして待機
    std::cout << "Thread " << id << " passed the latch\n";
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 3; ++i) {
        threads.emplace_back(worker, i);
    }
    for (auto& th : threads) {
        th.join();
    }
    std::cout << "All threads have passed the latch\n";
    return 0;
}

この例では、各スレッドがsync_point.arrive_and_wait()を呼び出し、カウントダウンして待機します。全スレッドが到達した後、同期が解除されます。

これらの基本的な使い方を理解することで、std::latchを使った同期の基本が身につきます。次のセクションでは、std::latchの応用例を見ていきます。

std::latchの応用例

std::latchは、特定のスレッド数が終了するのを待つだけでなく、複雑な同期パターンでも利用できます。以下では、いくつかの応用例を紹介します。

並列タスクの一括処理

複数のタスクを並列で実行し、全てのタスクが完了するのを待つ例です。

#include <iostream>
#include <thread>
#include <latch>
#include <vector>

std::latch sync_point(5);

void task(int id) {
    std::cout << "Task " << id << " is starting...\n";
    std::this_thread::sleep_for(std::chrono::seconds(id)); // 擬似的な作業
    std::cout << "Task " << id << " completed\n";
    sync_point.count_down(); // カウントダウン
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 1; i <= 5; ++i) {
        threads.emplace_back(task, i);
    }
    sync_point.wait(); // 全タスクが完了するまで待機
    for (auto& th : threads) {
        th.join();
    }
    std::cout << "All tasks have completed\n";
    return 0;
}

この例では、5つのタスクが並列で実行され、各タスクが終了するとカウントダウンします。全てのタスクが完了するまでメインスレッドが待機します。

フェーズごとのタスク実行

タスクをフェーズごとに分割し、各フェーズの終了を待って次のフェーズに進む例です。

#include <iostream>
#include <thread>
#include <latch>
#include <vector>

constexpr int num_threads = 3;
constexpr int num_phases = 3;

void phase_task(int id, int phase, std::latch& sync_point) {
    std::cout << "Thread " << id << " starting phase " << phase << "\n";
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 擬似的な作業
    std::cout << "Thread " << id << " completed phase " << phase << "\n";
    sync_point.count_down(); // フェーズ終了を通知
}

int main() {
    for (int phase = 0; phase < num_phases; ++phase) {
        std::latch sync_point(num_threads);
        std::vector<std::thread> threads;
        for (int i = 0; i < num_threads; ++i) {
            threads.emplace_back(phase_task, i, phase, std::ref(sync_point));
        }
        sync_point.wait(); // 全スレッドのフェーズ終了を待機
        for (auto& th : threads) {
            th.join();
        }
        std::cout << "All threads have completed phase " << phase << "\n";
    }
    return 0;
}

この例では、3つのスレッドが3つのフェーズに分かれてタスクを実行し、各フェーズの終了を待ってから次のフェーズに進みます。

複数リソースの初期化

複数のリソースを並列で初期化し、全てのリソースが初期化されるのを待つ例です。

#include <iostream>
#include <thread>
#include <latch>
#include <vector>

std::latch sync_point(4);

void init_resource(int id) {
    std::cout << "Initializing resource " << id << "\n";
    std::this_thread::sleep_for(std::chrono::seconds(id)); // 擬似的な初期化作業
    std::cout << "Resource " << id << " initialized\n";
    sync_point.count_down(); // カウントダウン
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 1; i <= 4; ++i) {
        threads.emplace_back(init_resource, i);
    }
    sync_point.wait(); // 全リソースの初期化が完了するまで待機
    for (auto& th : threads) {
        th.join();
    }
    std::cout << "All resources have been initialized\n";
    return 0;
}

この例では、4つのリソースが並列で初期化され、全ての初期化が完了するまでメインスレッドが待機します。

これらの応用例を通じて、std::latchを使ったより高度な同期方法を理解できるようになります。次のセクションでは、std::barrierとstd::latchの違いについて説明します。

std::barrierとstd::latchの違い

std::barrierとstd::latchはどちらもスレッド間の同期をサポートするためのプリミティブですが、いくつかの重要な違いがあります。これらの違いを理解することで、適切な場面で適切な同期プリミティブを選択することができます。

用途と動作の違い

std::barrier

  • 用途: 繰り返しの同期ポイントを提供し、指定された数のスレッドが特定のポイントに到達するまで待機する。
  • 動作: 全てのスレッドが同期ポイントに到達すると、同期が解除され、次のステップに進む。これを繰り返し行うことができる。
  • 再利用可能性: 繰り返し使うことができる。各サイクルでカウントがリセットされる。

std::latch

  • 用途: 一度だけの同期ポイントを提供し、カウントダウンがゼロになるまで待機する。
  • 動作: カウントダウンがゼロになると、待機している全てのスレッドが一斉に進行する。
  • 再利用可能性: 一度限りの使用。カウントダウンがゼロになった後は再利用できない。

具体的な例での違い

std::barrierの例

std::barrierを使ったステージごとの同期では、各ステージが終わるたびに全てのスレッドが次のステージに進むまで待機します。以下のコードはその例です。

#include <iostream>
#include <thread>
#include <barrier>
#include <vector>

constexpr int num_stages = 3;
constexpr int num_threads = 3;

std::barrier sync_point(num_threads);

void stage_worker(int id, int stage) {
    std::cout << "Thread " << id << " working on stage " << stage << "\n";
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 擬似的な作業
    sync_point.arrive_and_wait(); // 同期ポイントで待機
}

int main() {
    std::vector<std::thread> threads;
    for (int stage = 0; stage < num_stages; ++stage) {
        for (int i = 0; i < num_threads; ++i) {
            threads.emplace_back(stage_worker, i, stage);
        }
        for (auto& th : threads) {
            th.join();
        }
        threads.clear(); // 次のステージのためにスレッドをクリア
        std::cout << "All threads completed stage " << stage << "\n";
    }
    return 0;
}

std::latchの例

std::latchを使ったリソースの初期化では、全てのリソースが初期化されるまで待機します。以下のコードはその例です。

#include <iostream>
#include <thread>
#include <latch>
#include <vector>

std::latch sync_point(4);

void init_resource(int id) {
    std::cout << "Initializing resource " << id << "\n";
    std::this_thread::sleep_for(std::chrono::seconds(id)); // 擬似的な初期化作業
    std::cout << "Resource " << id << " initialized\n";
    sync_point.count_down(); // カウントダウン
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 1; i <= 4; ++i) {
        threads.emplace_back(init_resource, i);
    }
    sync_point.wait(); // 全リソースの初期化が完了するまで待機
    for (auto& th : threads) {
        th.join();
    }
    std::cout << "All resources have been initialized\n";
    return 0;
}

これらの例を通じて、std::barrierとstd::latchの適切な使い方とその違いを理解できます。次のセクションでは、これらを組み合わせて使用する方法について説明します。

std::barrierとstd::latchを組み合わせる方法

std::barrierとstd::latchを組み合わせることで、より柔軟で強力な同期メカニズムを実現できます。これにより、複雑な同期パターンにも対応することが可能です。

基本的な組み合わせ例

以下のコードは、複数のフェーズを持つタスクを実行し、各フェーズごとにスレッドが同期する例です。フェーズ間の同期にはstd::barrierを使用し、全フェーズが終了した後の最終的な同期にはstd::latchを使用します。

#include <iostream>
#include <thread>
#include <barrier>
#include <latch>
#include <vector>

constexpr int num_phases = 3;
constexpr int num_threads = 3;

void phase_task(int id, int phase, std::barrier<>& sync_point, std::latch& final_sync) {
    std::cout << "Thread " << id << " starting phase " << phase << "\n";
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 擬似的な作業
    std::cout << "Thread " << id << " completed phase " << phase << "\n";
    sync_point.arrive_and_wait(); // フェーズ終了を通知

    if (phase == num_phases - 1) {
        final_sync.count_down(); // 最終フェーズの終了を通知
    }
}

int main() {
    std::latch final_sync(num_threads);
    std::barrier sync_point(num_threads);

    for (int phase = 0; phase < num_phases; ++phase) {
        std::vector<std::thread> threads;
        for (int i = 0; i < num_threads; ++i) {
            threads.emplace_back(phase_task, i, phase, std::ref(sync_point), std::ref(final_sync));
        }
        for (auto& th : threads) {
            th.join();
        }
        std::cout << "All threads have completed phase " << phase << "\n";
    }

    final_sync.wait(); // 全フェーズが完了するまで待機
    std::cout << "All phases are completed\n";
    return 0;
}

このコードでは、各フェーズごとにstd::barrierを使ってスレッドを同期し、最終フェーズが完了した後、std::latchを使って最終的な同期を行っています。

段階的なデータ処理パイプライン

以下の例は、複数のスレッドが段階的にデータを処理し、各段階の終了を待機して次の段階に進むシナリオです。

#include <iostream>
#include <thread>
#include <barrier>
#include <latch>
#include <vector>

constexpr int num_stages = 3;
constexpr int num_threads = 3;
std::latch final_sync(num_threads);

void process_stage(int id, int stage, std::barrier<>& sync_point) {
    std::cout << "Thread " << id << " processing stage " << stage << "\n";
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 擬似的な作業
    std::cout << "Thread " << id << " completed stage " << stage << "\n";
    sync_point.arrive_and_wait(); // ステージ終了を通知

    if (stage == num_stages - 1) {
        final_sync.count_down(); // 最終ステージの終了を通知
    }
}

int main() {
    std::barrier sync_point(num_threads);

    for (int stage = 0; stage < num_stages; ++stage) {
        std::vector<std::thread> threads;
        for (int i = 0; i < num_threads; ++i) {
            threads.emplace_back(process_stage, i, stage, std::ref(sync_point));
        }
        for (auto& th : threads) {
            th.join();
        }
        std::cout << "All threads have completed stage " << stage << "\n";
    }

    final_sync.wait(); // 全ステージが完了するまで待機
    std::cout << "All stages are completed\n";
    return 0;
}

この例では、各ステージの処理が完了するたびにstd::barrierで同期し、全ステージが終了した後、std::latchで最終的な同期を行います。

これらの例を通じて、std::barrierとstd::latchを組み合わせることで、複雑な同期パターンを効率的に管理する方法を理解できます。次のセクションでは、std::barrierとstd::latchを使った同期のベストプラクティスについて説明します。

std::barrierとstd::latchを使った同期のベストプラクティス

std::barrierとstd::latchを使った同期のベストプラクティスを理解することで、効率的でバグのないマルチスレッドプログラミングを実現できます。ここでは、これらのプリミティブを使用する際の推奨される方法と注意点について説明します。

適切な用途に応じた選択

  • std::barrier:
  • 複数回の同期ポイントが必要な場合に使用します。
  • ステージごとの同期や繰り返し処理の各ステップ間の同期に適しています。
  • std::latch:
  • 一度限りの同期ポイントが必要な場合に使用します。
  • 初期化処理や全タスクが完了するまでの待機に適しています。

スレッドのライフサイクル管理

スレッドのライフサイクルを適切に管理することで、デッドロックやリソースリークを防ぐことができます。

  • スレッドを生成したら、必ずjoinを呼び出して終了を待ちます。
  • スレッドのリソースは使用後に確実に解放するようにします。
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; ++i) {
    threads.emplace_back(worker_function, i);
}
for (auto& th : threads) {
    th.join();
}

同期ポイントの明確な定義

同期ポイントを明確に定義し、スレッドがどこで待機し、どこで続行するかをコードで明示します。

  • arrive_and_wait()count_down()の呼び出し場所をコードの中で一貫して管理します。
  • 不要な同期ポイントの追加を避け、必要最小限の同期を行うようにします。

エラーハンドリングと例外安全性

同期ポイントで例外が発生する可能性を考慮し、例外が発生しても安全にプログラムが終了するように設計します。

  • スレッド内でtry-catchブロックを使用し、例外が発生した場合の処理を明確に定義します。
void safe_worker(int id, std::barrier<>& sync_point) {
    try {
        // 作業の実行
        std::cout << "Thread " << id << " is working...\n";
        std::this_thread::sleep_for(std::chrono::seconds(1));
        // 同期ポイントに到達
        sync_point.arrive_and_wait();
    } catch (const std::exception& e) {
        std::cerr << "Exception in thread " << id << ": " << e.what() << "\n";
    }
}

デッドロックの回避

同期ポイントを適切に使用することでデッドロックを回避できます。

  • 各スレッドが必ず同期ポイントに到達するようにし、到達しない可能性のあるコードパスを避けます。
  • 不必要なロックの取得を避け、必要な箇所でのみ同期を行います。

性能の最適化

同期プリミティブを過度に使用すると、性能に影響を与える可能性があります。必要最小限の同期を行い、並列処理のメリットを最大限に活かします。

  • 複雑な計算やI/O操作の間に同期ポイントを設けないようにします。
  • スレッドの作成と終了のオーバーヘッドを最小限に抑えるため、スレッドプールの使用を検討します。
#include <thread>
#include <vector>

std::vector<std::thread> create_thread_pool(int num_threads, void(*task)(int)) {
    std::vector<std::thread> threads;
    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back(task, i);
    }
    return threads;
}

void join_thread_pool(std::vector<std::thread>& threads) {
    for (auto& th : threads) {
        th.join();
    }
}

これらのベストプラクティスを遵守することで、std::barrierとstd::latchを使用した同期処理が安全かつ効率的になります。次のセクションでは、理解を深めるための演習問題を提供します。

応用例の演習問題

ここでは、std::barrierとstd::latchの使用方法を深く理解するための演習問題をいくつか紹介します。これらの問題を解くことで、実際のプログラムでどのようにこれらの同期プリミティブを活用できるかを学びます。

演習問題1: 並列計算のステージ同期

以下のコードを完成させて、3つのスレッドが2つのステージを持つ並列計算を行い、各ステージの終了時に全てのスレッドが同期するようにしてください。最終ステージの終了後、全スレッドが処理を完了したことを確認するコードも追加してください。

#include <iostream>
#include <thread>
#include <barrier>
#include <latch>
#include <vector>

constexpr int num_threads = 3;
constexpr int num_stages = 2;

void stage_task(int id, int stage, std::barrier<>& sync_point, std::latch& final_sync) {
    std::cout << "Thread " << id << " processing stage " << stage << "\n";
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 擬似的な作業
    std::cout << "Thread " << id << " completed stage " << stage << "\n";
    sync_point.arrive_and_wait(); // ステージ終了を通知

    if (stage == num_stages - 1) {
        final_sync.count_down(); // 最終ステージの終了を通知
    }
}

int main() {
    std::latch final_sync(num_threads);
    std::barrier sync_point(num_threads);

    for (int stage = 0; stage < num_stages; ++stage) {
        std::vector<std::thread> threads;
        for (int i = 0; i < num_threads; ++i) {
            threads.emplace_back(stage_task, i, stage, std::ref(sync_point), std::ref(final_sync));
        }
        for (auto& th : threads) {
            th.join();
        }
        std::cout << "All threads have completed stage " << stage << "\n";
    }

    final_sync.wait(); // 全ステージが完了するまで待機
    std::cout << "All stages are completed\n";
    return 0;
}

演習問題2: 動的スレッド数の同期

以下のコードを修正して、任意のスレッド数で動作するようにしてください。ユーザーがスレッド数を入力し、その数に基づいてスレッドを生成し、同期するようにします。

#include <iostream>
#include <thread>
#include <barrier>
#include <latch>
#include <vector>

void worker(int id, std::barrier<>& sync_point, std::latch& final_sync) {
    std::cout << "Thread " << id << " is working...\n";
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 擬似的な作業
    std::cout << "Thread " << id << " reached the barrier\n";
    sync_point.arrive_and_wait(); // 同期ポイントで待機
    final_sync.count_down(); // スレッド終了を通知
}

int main() {
    int num_threads;
    std::cout << "Enter the number of threads: ";
    std::cin >> num_threads;

    std::latch final_sync(num_threads);
    std::barrier sync_point(num_threads);

    std::vector<std::thread> threads;
    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back(worker, i, std::ref(sync_point), std::ref(final_sync));
    }
    for (auto& th : threads) {
        th.join();
    }

    final_sync.wait(); // 全スレッドが終了するまで待機
    std::cout << "All threads have completed their work\n";
    return 0;
}

演習問題3: 並列初期化と最終処理

以下のコードを完成させて、複数のリソースを並列で初期化し、全てのリソースが初期化された後に最終処理を行うようにしてください。初期化が完了するまで待機し、最終処理を行うメインスレッドのコードを追加します。

#include <iostream>
#include <thread>
#include <barrier>
#include <latch>
#include <vector>

constexpr int num_resources = 4;

void init_resource(int id, std::latch& init_sync) {
    std::cout << "Initializing resource " << id << "\n";
    std::this_thread::sleep_for(std::chrono::seconds(id)); // 擬似的な初期化作業
    std::cout << "Resource " << id << " initialized\n";
    init_sync.count_down(); // 初期化完了を通知
}

int main() {
    std::latch init_sync(num_resources);

    std::vector<std::thread> threads;
    for (int i = 0; i < num_resources; ++i) {
        threads.emplace_back(init_resource, i, std::ref(init_sync));
    }

    init_sync.wait(); // 全リソースの初期化が完了するまで待機
    for (auto& th : threads) {
        th.join();
    }

    // 最終処理
    std::cout << "All resources have been initialized. Performing final processing...\n";
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Final processing completed\n";

    return 0;
}

これらの演習問題を解くことで、std::barrierとstd::latchの使用方法に関する理解が深まります。次のセクションでは、これまでの内容を簡潔にまとめます。

まとめ

本記事では、C++20で導入された同期プリミティブであるstd::barrierとstd::latchについて詳しく説明しました。これらのプリミティブを使うことで、複雑なスレッド間の同期を効率的に管理することができます。具体的な使い方や応用例を通じて、実際のプロジェクトでの活用方法を学びました。また、ベストプラクティスを守ることで、安全かつ効率的な同期処理を実現できます。最後に、演習問題を通じて実践的なスキルを身につけることができました。これらの知識を活用して、より高性能なマルチスレッドプログラムを作成してください。

コメント

コメントする

目次