[C++]ムーブセマンティクスに対応するThreadPool。

 プログラミング界隈でよく知られたマルチスレッド処理のためのパターンであるスレッドプール。使用可能なスレッド数を予め定めておき、そこにキュー方式で処理を追加していくもの。走らせたい関数が100個くらいあったとしても、それを順番待ちに追加して逐次処理してくれる便利なものである。
 C++標準にはスレッドプールは存在しないのだが、思い思いのシングルヘッダー実装がGitHubなどで公開されているので、そのへんのどれかを適当に拾ってきて使えばいい。

 ただし残念なことに、一般公開されているそれらはやや問題を抱えている場合が多い。多くのものはstd::functionやstd::bind、ラムダ式などをそのまま使っていて、ムーブセマンティクスに対応しないかったり、場合によっては通常の参照渡しにさえ対応しないのだ。std::functionはnon-copyableな関数オブジェクトを持つことが出来ないしstd::bindはムーブセマンティクス非対応である。スレッドプールでムーブ不可って意味が分からんレベルの欠陥設計だと思うのだが、コピーコストの大きな変数をも値渡しさせる気なのか?コピー不可のインスタンスを渡したい場合はどうするんだ?そのあたりまでユーザーが頑張ってフォローしろというのか?

 そんなわけで、ムーブセマンティクスに対応するスレッドプールを実装した。これは私のもう一つのブログで公開しているそれの改良版である。あちらはちょっと仕様に気持ちの悪さがあったので、それを修正した形だ。C++11以上で動くはずだが、C++11限定の環境をすぐに用意できなかったので、MSVC2017のC++14環境でのみ動作確認している。昔のバージョンはMSVC2013で動作することを確認したものの、あれからかなり修正したので、C++11やGCC等ではコンパイルできないかもしれない。
 ソースコードはちょっと長いのでGitHubに置いておく。
github.com

使い方

 だいたい以下のように使う。

std::vector<int> func(std::vector<int>&& v)
{
    //スレッドプールに与える関数。
    //引数を右辺値参照で受け取っても問題ない。きちんとmoveされるし、寿命も保証されている。
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::vector<int> s = std::move(v);
    std::sort(s.begin(), s.end());
    return std::move(s);
}
int main()
{
    std::mt19937_64 mt;
    std::uniform_int_distribution<> ui(0, 100);
    ThreadPool tp(4);//4スレッド分用意する。
    std::vector<std::future<std::vector<int>>> f;
    for (int i = 0; i < 20; ++i)
    {
        std::vector<int> v(10);
        for (auto& vv : v) vv = ui(mt);
        f.push_back(tp.AddTask(&func, std::move(v)));
    }
    for (int i = 0; i < 20; ++i)
    {
        auto result = f[i].get();
        for (auto v : result) printf("%2d ", v);
        printf("\n");
    }
    tp.Join();
}

 以下、必要な関数等。

ThreadPool(int num_of_threads);//コンストラクタ
std::future<result_of_func> AddTask(Func func, Args ...args);//関数ポインタ、関数オブジェクトの場合。
std::future<result_of_func> AddTask(MemFunc func, Ptr* ptr, Args ...args);//メンバ関数ポインタの場合。
void Join();//すべての処理が完了するまで待機。

 ThreadPoolのコンストラクタには用意したいスレッド数を与える。どれだけTaskを与えられたとしても、ここで与えたスレッド数を超えて同時に処理されることはなく、上限に達した場合は順番待ちに追加される。
 メンバ関数AddTaskによって処理を追加する。funcには関数ポインタ、関数オブジェクト、メンバ関数ポインタを与えることができる。argsにはその関数を呼び出すために必要な引数を、メンバ関数の場合は第2引数にそのメンバ関数を呼び出したいインスタンスへのポインタを与える。このあたりはINVOKEと似ている。
 もし引数を参照渡ししたい場合はstd::reference_wrapperを、moveしたい場合はstd::moveを使う。
 AddTask関数の戻り値は与えた関数の戻り値を受け取るためのstd::futureである。
 Join関数を呼ぶと、与えられたTaskが全て完了するまで待機する。JoinはThreadPoolインスタンスのデストラクタが勝手に呼ぶので、待機する必要がない場合は呼ばなくともよい。

簡単な解説

 前半200行くらいで長たらしく書かれているのは、C++11に存在しないstd::applyやstd::invokeの代用品、std::reference_wrapperを引っ剥がすものなどである。理解できないのなら読み飛ばしてよい。  ThreadPoolを実装する方法そのものは何通りか考えられる。私が初めて実装したときは、AddTask関数によって処理が追加されるたびに新しいスレッドを作成するといういい加減なものだった。スレッドの作成はそれなりにコストが大きいので、可能であれば使い回すほうが良い。
 今回は事前に用意しておいたWorkerThreadが各々ThreadPoolの持つキュー(std::deque<std::function<void()>> mQueue)へと処理を受け取りに行く構造になっている。まあ大抵のThreadPoolはこのような設計になっているだろう。
 しかし上述のように、std::functionとstd::bindをそのまま使うと問題が発生する。ムーブセマンティクスに対応するためには、キューに一次格納しておく関数オブジェクトを

  • std::bindを用いることなく、
  • non-copyableな引数を保持している場合にもcopy-constructibleに偽装する

ことが必要だった。よってstd::bindではなくTaskBinderクラスを作成し、こちらに引数を束縛するよう設計した。TaskBinderクラスは呼ばれると例外を投げるだけのコピーコンストラクタを持つため、コンパイラ的にはcopy-constructibleである。実際にはムーブコンストラクタが定義されている限りコピーコンストラクタが呼ばれることはないため、このような設計で問題ない。

 というわけで、std::functionとstd::bindの伏線回収を完了した。いやまあ、C++的にそんなに複雑な話でもないのだが、基礎知識に乏しかった頃はこのあたりの意味わからん仕様に苦しめられて、上のようなThreadPoolを正常動作させるために何日も浪費したのだ。三回分の記事にするくらいの苦労はしていると思う。これらの記事が、同じように苦労している人たちの助けとなってくれれば幸いだ。