[C++]テキストファイルの読み込みを並列化するサンプル。

※最近本ブログが「モダンC++を扱うブログ」として取り上げられ多くの人が訪問してくれていますが、普段の私はこんな感じの緩い記事ばかり書いているので、どうか過度に期待されないようお願いします。

動機

テキストファイルは普通、どんなに大きくてもMB単位で、処理時間を無視でき、人間が編集する機会の多いものに使われるフォーマットである、と私は思っているのだが、時として周囲から強要されGB単位のテキストを扱わなければならないときもある。しかもそれが、データ処理フローの中で重大なボトルネックになることもある。

テキストファイルの読み込みに時間がかかるのは、主としてテキストのパースが遅いからだ。バイナリだったら決まった大きさを読んで直接メモリにコピーすればいいところを、テキストは1文字ずつ読み取ってはその意味を解読しなければならない。
例えばCやC++でテキストを読み出す時は、fgetsとsscanfを用いることが多い。このsscanfがパースを担う関数で、これが非常に遅いのだ。
ただ逆に言えば、ファイルの読み込みそのもの(今回で言えばfgets)はそこまで遅いわけではない。じゃあsscanfやそれに続く処理だけ並列化すればいいのでは、と誰もが考えるだろうし、ちょうど私も最近研究の方で遅すぎるテキスト読み込みの改良に着手したところなのだが、ネット上ではあまりそのようなサンプルコードが見当たらなかったので、ちょっと載せてみることにした。

サンプル

テキストファイルの形式は様々ではあるが、今回は空白区切り1行あたり8列の単純な数値情報を読み出す。このとき、データの順序はテキストに記述されている順序を保存するようにした。

#include <random>
#include <thread>
#include <cstdio>
#include <mutex>
#include <atomic>
#include <deque>
#include <vector>
#include <array>
#include <string>

void GenerateTextFile(const std::string& path, int n)
{
    FILE* fp = fopen(path.c_str(), "w");
    std::uniform_real_distribution<> rrand(0., 1.);
    std::uniform_real_distribution<> rrand2(0., 10000.);
    std::uniform_int_distribution<> irand(0, 1000);
    std::mt19937_64 mt;
    for (int i = 0; i < n; ++i)
    {
        fprintf(fp, "%4d %4d %4d %.4lf %.4lf %.4lf %8.1lf %8.1lf\n",
                irand(mt), irand(mt), irand(mt), rrand(mt), rrand(mt), rrand(mt), rrand2(mt), rrand2(mt));
    }
    fclose(fp);
}

struct Data
{
    bool operator==(const Data& d) const
    {
        return i1 == d.i1 &&
            i2 == d.i2 &&
            i3 == d.i3 &&
            r1 == d.r1 &&
            r2 == d.r2 &&
            r3 == d.r3 &&
            r4 == d.r4 &&
            r5 == d.r5;
    }
    bool operator!=(const Data& d) const
    {
        return !(*this == d);
    }
    int i1, i2, i3;
    double r1, r2, r3;
    double r4, r5;
};

std::vector<Data> ReadSequentially(const std::string& path)
{
    char buf[256];
    FILE* fp = fopen(path.c_str(), "r");
    if (fp == nullptr) throw std::exception();

    std::vector<Data> res;
    while (fgets(buf, sizeof(buf), fp) != nullptr)
    {
        Data d;
        if (sscanf(buf, "%d%d%d%lf%lf%lf%lf%lf", &d.i1, &d.i2, &d.i3, &d.r1, &d.r2, &d.r3, &d.r4, &d.r5) != 8)
        {
            fprintf(stderr, "format error\n");
            throw std::exception();
        }
        res.push_back(d);
    }
    return res;
}

void Parse(std::mutex& bmtx, std::condition_variable& bcv, std::deque<std::pair<int, std::array<char, 256>>>& buf, bool& end,
           std::mutex& rmtx, std::vector<Data>& res)
{
    while (true)
    {
        Data d;
        std::array<char, 256> str;
        int row;
        {
            std::unique_lock<std::mutex> ul(bmtx);
            while (buf.empty())
            {
                if (end == true) goto end;
                bcv.wait(ul);
            }
            std::tie(row, str) = buf.front();
            buf.pop_front();
        }

        if (sscanf(str.data(), "%d%d%d%lf%lf%lf%lf%lf", &d.i1, &d.i2, &d.i3, &d.r1, &d.r2, &d.r3, &d.r4, &d.r5) != 8)
        {
            fprintf(stderr, "format error\n");
            return;
        }

        {
            std::lock_guard<std::mutex> lg(rmtx);
            if (row >= res.size())
                res.insert(res.end(), row - res.size() + 1, Data());
            res[(size_t)row] = d;
        }
    }
end:;

}
std::vector<Data> ReadInParallel(const std::string& path)
{
    std::array<char, 256> str;
    FILE* fp = fopen(path.c_str(), "r");
    if (fp == nullptr) throw std::exception();

    int nth = 5;

    std::mutex bmtx;
    std::condition_variable bcv;
    std::deque<std::pair<int, std::array<char, 256>>> buf;
    bool end = false;

    std::mutex rmtx;
    std::vector<Data> res;

    std::vector<std::thread> ths;
    for (int i = 0; i < nth; ++i)
    {
        ths.emplace_back(&Parse, std::ref(bmtx), std::ref(bcv), std::ref(buf), std::ref(end), std::ref(rmtx), std::ref(res));
    }
    int row = 0;
    while (fgets(str.data(), sizeof(str), fp) != nullptr)
    {
        {
            std::unique_lock<std::mutex> ul(bmtx);
            buf.emplace_back(row, str);
            bcv.notify_one();
        }
        ++row;
    }
    end = true;
    bcv.notify_all();
    fclose(fp);
    for (auto& th : ths) th.join();
    return res;
}

int main()
{
    try
    {
        const int nrow = 5000000;
        std::string path = "test.txt";
        //GenerateTextFile(path, nrow);//nrow行のテキストを生成

        //sequential
        auto start = std::chrono::system_clock::now();
        auto res1 = ReadSequentially(path);
        auto end = std::chrono::system_clock::now();
        double time = (double)std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() / 1000.;
        fprintf(stderr, "sequential = %lf[s]\n", time);

        //parallel
        start = std::chrono::system_clock::now();
        auto res2 = ReadInParallel(path);
        end = std::chrono::system_clock::now();
        time = (double)std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() / 1000.;
        fprintf(stderr, "parallel   = %lf[s]\n", time);

        //データが完全に一致することを確認。
        for (int i = 0; i < nrow; ++i)
        {
            if (res1[i] != res2[i])
            {
                fprintf(stderr, "mismatch at row == %d\n", i);
                fprintf(stderr, "   res1      res2\n");
                fprintf(stderr, "i1 %9d %9d\n", res1[i].i1, res2[i].i1);
                fprintf(stderr, "i2 %9d %9d\n", res1[i].i2, res2[i].i2);
                fprintf(stderr, "i3 %9d %9d\n", res1[i].i3, res2[i].i3);
                fprintf(stderr, "r1 %9lf %9lf\n", res1[i].r1, res2[i].r1);
                fprintf(stderr, "r1 %9lf %9lf\n", res1[i].r2, res2[i].r2);
                fprintf(stderr, "r1 %9lf %9lf\n", res1[i].r3, res2[i].r3);
                fprintf(stderr, "r1 %9lf %9lf\n", res1[i].r4, res2[i].r4);
                fprintf(stderr, "r1 %9lf %9lf\n", res1[i].r5, res2[i].r5);
                throw std::exception();
            }
        }
        fprintf(stderr, "OK.\n");
    }
    catch (const std::exception&)
    {
    }
}

速度比較

ビルドはmsvc2019で、テストはCore i7 3770を搭載するPCで行った。テキストファイルはHDDから読み出している。

sequential = 10.126000[s]
parallel   = 3.685000[s]
OK.

私のテスト環境ではパース用スレッド数を5にしたときに最も良いパフォーマンスが出た。単純なテキストを単純な処理で読み出しているので効果は薄いかと思われたが、それでも3倍近く高速である。ただストレージとCPUの性能のバランスなどによって最適なスレッド数や向上幅は変化する。
複数行分をまとめて処理させたりするともう少しだけ速くできるのだが、コードを必要以上にややこしくしないため今回は行わなかった。他にもっと優れた方法があれば教えてほしい。

余談

我々の研究室では、多くの実験データがテキストファイルとして管理されている。人間にとって扱いやすいテキストファイルを用いることは別におかしなことではないのだが、信じがたいことに、そのデータは数GB、数十GBにまで肥大化することがある。しかもその巨大なテキストが場合によっては何十個、何百個、下手をすれば何千個と存在するのだ。
高エネルギー物理学の分野ではビッグデータと言われるような巨大なデータを扱うことが多いが、普通はデータベース化したり、何らかの方法で利便性を向上させるものである。が、うちにソフトウェアに強い人間は壊滅的に少ないので、そうした改良に無頓着で、むしろ誰にでも扱えるテキストに固執しがちなのだ。一応ソフトウェア開発の専従者はいるが、彼らは単なる老害と成り果てているので改善しようとしない。……実は連中はテキストファイルよりももっと悪質な問題を放置しているのだが、それはまた別の話。
なお私は彼らと同程度のソフトウェア開発力は持っている(つまり彼らは開発専門のくせに大した実力はないのである)が、私の主な仕事はソフトウェア開発ではなくデータ解析なので、このあたりの根本的改善に口出しすることができないし、指摘してもはぐらかされてお終いである。一部の阿呆が影響力を持つと環境が停滞してしまうというよくある事例。
巨大なテキストファイルに苦しめられている人が少しでも救われることを願って、こんなしょうもない記事を書いてみる。