[C++]テキストファイルの読み込みを並列化するサンプル。

※最近本ブログが「モダンC++を扱うブログ」として取り上げられ多くの人が訪問してくれていますが、普段の私はこんな感じの緩い記事ばかり書いているので、どうか過度に期待されないようお願いします。

動機

テキストファイルは普通、どんなに大きくてもMB単位で、処理時間を無視でき、人間が編集する機会の多いものに使われるフォーマットである、と私は思っているのだが、時として周囲から強要されGB単位のテキストを扱わなければならないときもある。しかもそれが、データ処理フローの中で重大なボトルネックになることもある。

テキストファイルの読み込みに時間がかかるのは、主としてテキストのパースが遅いからだ。バイナリだったら決まった大きさを読んで直接メモリにコピーすればいいところを、テキストは1文字ずつ読み取ってはその意味を解読しなければならない。
例えばCやC++でテキストを読み出す時は、fgetsとsscanfを用いることが多い。このsscanfがパースを担う関数で、これが非常に遅いのだ。
ただ逆に言えば、ファイルの読み込みそのもの(今回で言えばfgets)はそこまで遅いわけではない。じゃあsscanfやそれに続く処理だけ並列化すればいいのでは、と誰もが考えるだろうし、ちょうど私も最近研究の方で遅すぎるテキスト読み込みの改良に着手したところなのだが、ネット上ではあまりそのようなサンプルコードが見当たらなかったので、ちょっと載せてみることにした。

サンプル

テキストファイルの形式は様々ではあるが、今回は空白区切り1行あたり8列の単純な数値情報を読み出す。このとき、データの順序はテキストに記述されている順序を保存するようにした。

#include <random>
#include <thread>
#include <cstdio>
#include <mutex>
#include <atomic>
#include <deque>
#include <vector>
#include <array>
#include <string>

void GenerateTextFile(const std::string& path, int n)
{
    FILE* fp = fopen(path.c_str(), "w");
    std::uniform_real_distribution<> rrand(0., 1.);
    std::uniform_real_distribution<> rrand2(0., 10000.);
    std::uniform_int_distribution<> irand(0, 1000);
    std::mt19937_64 mt;
    for (int i = 0; i < n; ++i)
    {
        fprintf(fp, "%4d %4d %4d %.4lf %.4lf %.4lf %8.1lf %8.1lf\n",
                irand(mt), irand(mt), irand(mt), rrand(mt), rrand(mt), rrand(mt), rrand2(mt), rrand2(mt));
    }
    fclose(fp);
}

struct Data
{
    bool operator==(const Data& d) const
    {
        return i1 == d.i1 &&
            i2 == d.i2 &&
            i3 == d.i3 &&
            r1 == d.r1 &&
            r2 == d.r2 &&
            r3 == d.r3 &&
            r4 == d.r4 &&
            r5 == d.r5;
    }
    bool operator!=(const Data& d) const
    {
        return !(*this == d);
    }
    int i1, i2, i3;
    double r1, r2, r3;
    double r4, r5;
};

std::vector<Data> ReadSequentially(const std::string& path)
{
    char buf[256];
    FILE* fp = fopen(path.c_str(), "r");
    if (fp == nullptr) throw std::exception();

    std::vector<Data> res;
    while (fgets(buf, sizeof(buf), fp) != nullptr)
    {
        Data d;
        if (sscanf(buf, "%d%d%d%lf%lf%lf%lf%lf", &d.i1, &d.i2, &d.i3, &d.r1, &d.r2, &d.r3, &d.r4, &d.r5) != 8)
        {
            fprintf(stderr, "format error\n");
            throw std::exception();
        }
        res.push_back(d);
    }
    return res;
}

void Parse(std::mutex& bmtx, std::condition_variable& bcv, std::deque<std::pair<int, std::array<char, 256>>>& buf, bool& end,
           std::mutex& rmtx, std::vector<Data>& res)
{
    while (true)
    {
        Data d;
        std::array<char, 256> str;
        int row;
        {
            std::unique_lock<std::mutex> ul(bmtx);
            while (buf.empty())
            {
                if (end == true) goto end;
                bcv.wait(ul);
            }
            std::tie(row, str) = buf.front();
            buf.pop_front();
        }

        if (sscanf(str.data(), "%d%d%d%lf%lf%lf%lf%lf", &d.i1, &d.i2, &d.i3, &d.r1, &d.r2, &d.r3, &d.r4, &d.r5) != 8)
        {
            fprintf(stderr, "format error\n");
            return;
        }

        {
            std::lock_guard<std::mutex> lg(rmtx);
            if (row >= res.size())
                res.insert(res.end(), row - res.size() + 1, Data());
            res[(size_t)row] = d;
        }
    }
end:;

}
std::vector<Data> ReadInParallel(const std::string& path)
{
    std::array<char, 256> str;
    FILE* fp = fopen(path.c_str(), "r");
    if (fp == nullptr) throw std::exception();

    int nth = 5;

    std::mutex bmtx;
    std::condition_variable bcv;
    std::deque<std::pair<int, std::array<char, 256>>> buf;
    bool end = false;

    std::mutex rmtx;
    std::vector<Data> res;

    std::vector<std::thread> ths;
    for (int i = 0; i < nth; ++i)
    {
        ths.emplace_back(&Parse, std::ref(bmtx), std::ref(bcv), std::ref(buf), std::ref(end), std::ref(rmtx), std::ref(res));
    }
    int row = 0;
    while (fgets(str.data(), sizeof(str), fp) != nullptr)
    {
        {
            std::unique_lock<std::mutex> ul(bmtx);
            buf.emplace_back(row, str);
            bcv.notify_one();
        }
        ++row;
    }
    end = true;
    bcv.notify_all();
    fclose(fp);
    for (auto& th : ths) th.join();
    return res;
}

int main()
{
    try
    {
        const int nrow = 5000000;
        std::string path = "test.txt";
        //GenerateTextFile(path, nrow);//nrow行のテキストを生成

        //sequential
        auto start = std::chrono::system_clock::now();
        auto res1 = ReadSequentially(path);
        auto end = std::chrono::system_clock::now();
        double time = (double)std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() / 1000.;
        fprintf(stderr, "sequential = %lf[s]\n", time);

        //parallel
        start = std::chrono::system_clock::now();
        auto res2 = ReadInParallel(path);
        end = std::chrono::system_clock::now();
        time = (double)std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() / 1000.;
        fprintf(stderr, "parallel   = %lf[s]\n", time);

        //データが完全に一致することを確認。
        for (int i = 0; i < nrow; ++i)
        {
            if (res1[i] != res2[i])
            {
                fprintf(stderr, "mismatch at row == %d\n", i);
                fprintf(stderr, "   res1      res2\n");
                fprintf(stderr, "i1 %9d %9d\n", res1[i].i1, res2[i].i1);
                fprintf(stderr, "i2 %9d %9d\n", res1[i].i2, res2[i].i2);
                fprintf(stderr, "i3 %9d %9d\n", res1[i].i3, res2[i].i3);
                fprintf(stderr, "r1 %9lf %9lf\n", res1[i].r1, res2[i].r1);
                fprintf(stderr, "r1 %9lf %9lf\n", res1[i].r2, res2[i].r2);
                fprintf(stderr, "r1 %9lf %9lf\n", res1[i].r3, res2[i].r3);
                fprintf(stderr, "r1 %9lf %9lf\n", res1[i].r4, res2[i].r4);
                fprintf(stderr, "r1 %9lf %9lf\n", res1[i].r5, res2[i].r5);
                throw std::exception();
            }
        }
        fprintf(stderr, "OK.\n");
    }
    catch (const std::exception&)
    {
    }
}

速度比較

ビルドはmsvc2019で、テストはCore i7 3770を搭載するPCで行った。テキストファイルはHDDから読み出している。

sequential = 10.126000[s]
parallel   = 3.685000[s]
OK.

私のテスト環境ではパース用スレッド数を5にしたときに最も良いパフォーマンスが出た。単純なテキストを単純な処理で読み出しているので効果は薄いかと思われたが、それでも3倍近く高速である。ただストレージとCPUの性能のバランスなどによって最適なスレッド数や向上幅は変化する。
複数行分をまとめて処理させたりするともう少しだけ速くできるのだが、コードを必要以上にややこしくしないため今回は行わなかった。他にもっと優れた方法があれば教えてほしい。

余談

我々の研究室では、多くの実験データがテキストファイルとして管理されている。人間にとって扱いやすいテキストファイルを用いることは別におかしなことではないのだが、信じがたいことに、そのデータは数GB、数十GBにまで肥大化することがある。しかもその巨大なテキストが場合によっては何十個、何百個、下手をすれば何千個と存在するのだ。
高エネルギー物理学の分野ではビッグデータと言われるような巨大なデータを扱うことが多いが、普通はデータベース化したり、何らかの方法で利便性を向上させるものである。が、うちにソフトウェアに強い人間は壊滅的に少ないので、そうした改良に無頓着で、むしろ誰にでも扱えるテキストに固執しがちなのだ。一応ソフトウェア開発の専従者はいるが、彼らは単なる老害と成り果てているので改善しようとしない。……実は連中はテキストファイルよりももっと悪質な問題を放置しているのだが、それはまた別の話。
なお私は彼らと同程度のソフトウェア開発力は持っている(つまり彼らは開発専門のくせに大した実力はないのである)が、私の主な仕事はソフトウェア開発ではなくデータ解析なので、このあたりの根本的改善に口出しすることができないし、指摘してもはぐらかされてお終いである。一部の阿呆が影響力を持つと環境が停滞してしまうというよくある事例。
巨大なテキストファイルに苦しめられている人が少しでも救われることを願って、こんなしょうもない記事を書いてみる。

twitterでバズる人の気分は多分こんな感じなんだろう。

このブログがある方によって「モダンなC++を扱うブログ」の一つとして取り上げられたことで、アクセス数が突然激増してしまうという事態に見舞われた。何気なくアクセス解析を開いたら数字が普段の十倍くらいに跳ね上がっていたものだから何事かと思って、どうやらtwitterからのアクセスが多いらしいことから大本を探してみたら行き当たった。思わぬ形で宣伝されてしまった。

え、いや、何も怒ってないよ?めっちゃ喜んでるよ?ただこのブログの他に挙げられているのが、本の虫Faith and Braveyohhoyの日記地面を見下ろす少年の足蹴にされる私等、日本人C++erなら誰でも知っているレベルの有名ブログばかりで、それにちょこんと私のブログが添え物のように載せられているものだから萎縮してしまったのだ。いやこのブログ、そんな大層なもんじゃないから。私は研究兼趣味でC++を使ってるだけの雑魚学生だから。モダンC++の解説とか時々しかやってないから。素粒子実験系の大学院生のチラ裏みたいなもんだから……。ああなんか恥ずかしくなってきた。
そもそもこのブログを取り上げてくれた方にしたって、知る人ぞ知るSiv3Dというライブラリの開発者である。私も昔、研究用に実験データを三次元可視化するためのライブラリを探している中で、あれの使い方を色々調べたことがある(残念ながら私の用途にはVTKのほうが適していて、そちらを採用してしまったが)。そんな方から言及してもらえたことは光栄という他にないのだが、この方々と比べて私の実力は月とスッポンなので、早々に私の実力不足が露呈してしまいそうで恐ろしい。
あまりに情けないので、今現在研究室内にしか公開していないデータ解析用ライブラリADAPTを一般公開してやろうかと悩み始める始末。が、あれは私がC++を覚えて1~3年の頃に書いた恥ずかしすぎるコードが大量に詰まっている上、私の一存で勝手にオープンソースにして良いものでもなくなっているので、今のところ踏み切れない。第一、あの個人開発のくせに巨大でデバッグ不十分なライブラリを導入したがる人がどれだけいるというのだ。大抵の人はより大規模でユーザーが多くサポートも充実しているCERN ROOTを選ぶはずだ。

まあしかし、流石に他のブログに対して格落ちも甚だしいので、一過性のものに終わるだろう。あれほどアクセスがあっても読者が1人しか増えていないのが良い証拠だ。

ところで、上の方々の中で唯一私だけがtwitterアカウントを持っていなかったので、これを機に作ってみることにした(@thayakawa8)。以前から作ろうかと考えていたものの、本業のプログラマーでない私が踏み入るような場ではないとも思って躊躇していたのだ。曲がりなりにも一流のプログラマーの方に言及してもらえる程度のクォリティがあるのなら、ちょっと試しに紛れ込んでみようかな、と決断した。

というわけで、本ブログは今後も備忘録としてニッチすぎる記事をちまちま書いていく予定である。でももしよかったら、GitHubで公開しているライブラリとかを覗いてくれたりするとすっごく喜ぶよ。

SlackからExportされたjson形式ログファイルのビューワを作った。

以前から作っていたSlack過去ログ閲覧ツールSlackLogViewerを公開してみる。GitHubでビルド済みの状態でも配布している。詳細は使い方の説明を。

背景

Slackは基本的に無料で使用できるが、有料プランも用意されている。特に大きな違いは、過去のメッセージをどれだけ閲覧できるかだろう。フリープランだと10000件までしか遡れないので、活発に動いているワークスペースではすぐに流れて読めなくなってしまう。 しかしフリープランであっても過去ログのExportは可能である。こちらは閲覧できなくなったメッセージまで遡って丸ごと出力してくれるので、有料プランを使わせてくれない貧乏環境に生きている人たちにとってはありがたい話だ。

問題は、このExportされたファイルがjson形式になっていることだ。そのまま読むのはとても大変である。ネット上でこれを閲覧するビューワを探してみたが、あまり良さげなものはなかった。例えばslack-export-viewerなどは致命的なことに検索機能がないし、どうやら非常に重いらしい。その他にも何かしら「作った」という記事は見つかるのだが、一般利用可能な形では公開されていないようだった。

仕方ないので自作することにした。検索機能もある程度のものを付けた。
結果、以下のようなものが出来上がった。個人名ややり取りを公開することはできないのでモザイクばかりだが、まあ雰囲気は伝わるかと。

f:id:thayakawa:20200928025216p:plain
SlackLogViewer

動作環境

Windows 64bitのみ。8以降で動くはずだが動作確認は10でしか行っていないので、問題があればコメント等で報告されたい。MacLinuxは現在対応していない。ほぼ環境依存のない設計にはなっているので対応不可能ではないものの、そもそも需要があるかどうか。

使い方

  1. ExportされたSlackのログファイルを解凍しておく。
  2. ソフトウェア本体はこちらからダウンロードできる。解凍したら適当な場所に配置すれば良い。インストールは不要。
  3. SlackLogViewer.exeを実行する。
  4. 左上のメニューアイコンから「Open」を選択。
  5. 解凍済みのSlackログファイルのフォルダを開く(channel.json、user.jsonなどがあるフォルダ)。
  6. 心ゆくまで閲覧すべし。

なおExportしたフォルダはワークスペース名と日付が付いていると思うが、このフォルダ名は変更しないことが望ましい。Cacheフォルダを見てみれば分かると思うが、内部のキャッシュはこのフォルダ名によって管理されており、フォルダ名によってどのExportファイル群かを識別できることを前提にしているためである。まあ区別できれば問題ないし、そうでなくともキャッシュファイルを時々削除していれば無視できるのだが、一応念のため。

できること

  1. ワークスペース開設時まで遡りすべてのメッセージを見ることができる。もちろん本来無料プランでは表示できない10000件よりも前のものまで見られる。
  2. 1個以上の単語または正規表現による検索機能も付けた。現在のチャンネルまたは全チャンネルを対象に検索できるが、全チャンネル検索は時間がかかる。Qtとマルチスレッドとの親和性がもう少し高ければ、あるいはplacement newとか使えればもうちょい頑張れるのだが。
  3. ファイルに関してはSlackフリープランのストレージ上限によってアーカイブされていなければダウンロード可能である。
  4. スレッドももちろん表示可能。
  5. リアクションはある程度表示できる。ただしSlackの絵文字をUTF-8に変換して表示しているので、見た目がちょっと異なるし、UTF-8にない絵文字は表示できない。

未実装の機能

  1. 自分自身のユーザー情報を設定して参加しているチャンネルだけ表示、とかを作ろうかと思ったが、全く異なるワークスペースに対して対応するのが大変なので未実装。
  2. ダイレクトメッセージ、プライベートチャンネルは考慮していない。だってフリープランだとExportできないんだもの。データの形式すら分からないので対応しようがない。
  3. ファイル検索機能。テキスト内の検索はやろうと思えばできるが、全部ダウンロードして読み込むのは気持ち悪いので今の所用意していない。ファイル名、ユーザー名検索くらいは簡単だからそのうち付けるかもしれない。
  4. キャッシュは自動削除されないので、使い続けていると一時ファイルとは名ばかりにどんどん溜まる。定期的にCacheフォルダ内を手動で掃除する必要がある。

既知の不具合

  1. メッセージの一部を選択しそれを別の場所にD&Dしようとするとクラッシュする。つい先程適当に弄っている中で偶然気づいた。まだ対処法を思いついていない。
  2. 検索機能を使ったあとに別の過去ログフォルダを開いた場合、その後の操作によってはクラッシュすることがある。原因は明らかなのでそのうち修正する。
  3. ファイル名が長すぎて枠をはみ出すことがあるが、実害がないので放置している。

開発に関して

本ソフトウェアはC++で、msvc2019とQt5.15.1を用いて開発されている。 またSlackの絵文字とunicodeを変換するためにemojicppを使用している。

アイコンのいくつかはICOOON MONOからダウンロードさせてもらった。種類が多いわけではないけれども、アプリ中でのクレジット表記などを求められないのは楽である。膨大な種類の無料アイコンを配布するサイトは、下手をするとアイコンごとにライセンスが異なっていたりして、使うに使えないことが多々あるのだ。
いくつかのアイコンは自作である。作成にはvectrを使わせてもらった。無料ウェブアプリでこれだけの機能があるのはとてもありがたい。

免責事項とか

本アプリケーションは私の個人的な趣味と実益のために作成されたものである。MITライセンスの元で公開しており誰でも自由に利用してよいが、これを利用することで受けたあらゆる損害について私は一切の責任を負わない。オープンソースとはいえ不特定多数の開発者が参加しない限りは安全性を担保できないので、もし利用するのなら自己責任で。
また不具合報告や要望などがあれば善処するが、こちらの意欲と時間の余裕に強く依存するため保証しない。

余談

バイナリファイルを配布するのは初めてなので、何か不具合があるんじゃないか、DLLは不足していないか、ここまで作って誰も使ってくれなかったらどうしよう、とか結構おっかなびっくりである。

どうでもいいことだが、バージョンが1.1なのは未公開のバージョン1.0が存在するからである。1.0の時点では検索機能などがなく色々とひどかったので公開を見送ったのだ。またBeta-0となっているのはデバッグが十分でないからだ。ある程度の動作確認はしたが十分ではないし、既知の不具合もある。
どちらかと言えば簡素な実装にすることを心がけたので、黒魔術じみたコードはほぼない。GUIアプリケーションなので、クリティカルな部分以外のパフォーマンスはこだわるだけ無駄である。
しかしQtとC++GUIアプリケーションを作るのは果たして良い方法と言えるのだろうか。研究用と今回のツールと合わせて3つくらい作ってきたが、毎回ひどく苦労する。
例えばWidgetを適当に組み合わせているうちはいいが、凝ったデザインのGUIを作りたいとか考え始めると悲惨なことになる。今回、Slackに似たGUIとそれなりの速度を両立するためにQStyledItemDelegateの派生クラスを設計したのだが、非常に大変だった。QTreeViewやQListViewは普通に使うだけでも大変なのに。
他には、QObjectをスレッド間で受け渡せないのがとても辛かった。この問題を回避するために多くの調整が必要で、それにより検索機能が低速化してしまった。
何となく書いていると決して意図したように動かないので、きちんと仕様を調べながら使わなければならない。他の言語はもっとGUIを作りやすかったりするのだろうか。もちろんC++にはC++の優れた点が多くあるので、隣の青い芝を羨んでいるだけなのかもしれない。

ところでこういうのってSlack Technologiesから怒られたりしないんだろうかとちょっと不安になる。Slack本体とは全く無関係なツールではあるけれども、UIとかそのまんまなので著作権的に大丈夫なんだろうかと思わなくもない。前例があるしそちらはGitHubでかなりの星を獲得しながらも問題となっていないようなので、多分大丈夫なんだろうが……。