一年以上ぶりのC++記事であるが、緩めの内容。研究の方で一つ山場を乗り切ったので、気分転換にちょっと遊んでみた結果である。
動機
以前、巨大なテキストファイルの読み出しを並列化する記事を書いたことがある。CSVやSSVのように、ある程度統一されたフォーマットで一行ごとに一個のデータが記録されたファイルを読み出したいときに、一行あたりの文字列をパースする処理がボトルネックになって大変時間がかかるため、並列化で無理やり改善しようとした話である。普通に書けばほんの十行かそこらで終わる処理をわざわざ数十行も費やして2~3倍ほど速くすることには成功したが(研究上役立っているのでそれなりに満足している)、実はその内部では一行あたりの文字列のパースにsscanf
を使っていた。
その記事に先日、ちょっとコメントが来た。sscanfよりもatoiやatofの方が速い、というのだ。これはまあ事実ではある。
sscanf
はフォーマット指定を文字列形式で受け取るため、その解析が動的に行われる。したがって、単純なパース処理以外にも余計なコストが含まれている。原理的には、これを静的に行うことで高速化できるはずである1。そのような方法の一つがatoiやatofだ。これらは動的解析のコストがないので速いのは当然である。ただしこれらは文字列内の最初の数値しか返してくれないので、複数の数値を含む文字列にはそのままでは対応できない。これを繰り返し呼び出してパースさせる関数を自作するにしても、atoiやatofは終端位置を返してくれないなど欠点があるので不適切である2。高速化を意図するのならstrtolやstd::from_charsの方がよいだろう。atoiなども内部的にはstrtolなどを呼び出しているはずだ。
現状C++には複数の数値を含む文字列をパースするのにsscanfよりも優れた関数は用意されていない(はず)。そこで、std::from_charsを使って高速でジェネリックなsscanf風関数を作り、速度比較を行うことにした。
実装
とりあえず、動的解析を排除したsscanf
的なParse
関数を以下のように作成した。文字列は空白区切りで、整数または浮動小数点の数値が記録されているものとする。デリミタの指定や%s
のような文字列型への代入機能は作るのが面倒だったので見送った。なおstd::from_charsとstrtol系では先頭の空白文字を無視するかどうかという違いがあり、今回は空白文字は無視させるのでどちらかと言えばstrtol系の方が都合が良かったのだが、ジェネリックに実装するのがちょっと大変なので試さなかった。
コンセプトを使っているのでC++20が必要である。SFINAEを面倒に感じ始めたことにちょっと危機感。
#include <charconv> #include <type_traits> #include <concepts> #include <utility> #include <cctype> template <std::integral Int> std::pair<Int&, int> WithBase(Int& i, int base) { return { i, base }; } template <std::floating_point Flt> std::pair<Flt&, std::chars_format> WithFmt(Flt& f, std::chars_format fmt) { return { f, fmt }; } template <class T> concept Arithmetic = std::is_arithmetic_v<T>; std::from_chars_result Parse(const char* pos, const char* last) { return std::from_chars_result{ last, std::errc{} }; } template <Arithmetic Value, class ...Values> std::from_chars_result Parse(const char* pos, const char* last, Value& v, Values& ...vs) { while (std::isspace(*pos)) ++pos; auto res = std::from_chars(pos, last, v); if (res.ec != std::errc{}) return res; if (res.ptr != last) return Parse(res.ptr, last, vs...); return std::from_chars_result{ last, std::errc{} }; } template <Arithmetic Value, class BaseOrFmt, class ...Values> std::from_chars_result Parse(const char* pos, const char* last, std::pair<Value&, BaseOrFmt> v, Values& ...vs) { while (std::isspace(*pos)) ++pos; auto res = std::from_chars(pos, last, v.first, v.second); if (res.ec != std::errc{}) return res; if (res.ptr != last) return Parse(res.ptr, last, vs...); return std::from_chars_result{ last, std::errc{} }; }
Parse(first, last, values...)
のように与える。first
、last
はそれぞれパースしたい文字列の先頭と終端のポインタ、valuesは整数、浮動小数点の変数である。一応from_charsのうち整数用のbaseと浮動小数点用のフォーマットをParse(first, last, WithBase(i, 8), WithFmt(f, std::chars_format::hex))
のように与えることはできる。
テスト
以下のように、int型3個、double型2個の値を含む100万個の文字列を生成し、それをパースする速度を計10回測定、その平均時間と標準偏差を求めた。コンピュータの計算時間が正規分布になるのかよく知らないので標準偏差が適切なのかは分からん。
なお私のPCはCore i7 3770搭載機である。10年前の骨董品だが買い替える機会のないままずるずると使い続けている。OSはWindows 10、Visual Studio 2019でビルドしている。背後でブラウザなど大量のアプリが動いているので、数字は参考程度に。
#include <charconv> #include <type_traits> #include <concepts> #include <vector> #include <array> #include <random> #include <iostream> #include <chrono> #include <thread> //Parse関数の部分は省略している。 std::vector<std::array<char, 128>> GenerateData(size_t n) { std::mt19937_64 mt(std::random_device{}()); std::uniform_int_distribution<int> ir(0, 10000); std::uniform_real_distribution<double> dr(-100., 100.); std::vector<std::array<char, 128>> data(n); auto write = [&data](char* first, char* last, auto x, auto base_or_fmt) { auto [ptr, err] = std::to_chars(first, last, x, base_or_fmt); if (err != std::errc{}) throw err; *ptr = ' '; return ptr + 1; }; for (size_t i = 0; i < n; ++i) { char* first = data[i].data(); char* last = first + 128; char* ptr = write(first, last, ir(mt), 10); ptr = write(ptr, last, ir(mt), 10); ptr = write(ptr, last, ir(mt), 10); ptr = write(ptr, last, dr(mt), std::chars_format::scientific); ptr = write(ptr, last, dr(mt), std::chars_format::general); } return data; } template <class Func> void Test(Func func) { std::vector<std::array<char, 128>> data; try { data = GenerateData(1000000); double total = 0; double totalsq = 0; for (int i = 0; i < 10; ++i) { auto start = std::chrono::system_clock::now(); for (auto& d : data) { func(d.data(), d.data() + 128); } auto end = std::chrono::system_clock::now(); double time = (double)std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() / 1000.; fprintf(stderr, "%lf[s]\n", time); total += time; totalsq += time * time; std::this_thread::sleep_for(std::chrono::milliseconds(200)); } double avg = total / 10; double dev = sqrt(totalsq / 10 - avg * avg); fprintf(stdout, "average %lf[s], deviation %lf[s]\n", avg, dev); } catch (std::errc err) { std::cout << "conversion failed." << std::endl; } } int main() { /* auto func = [](const char* first, const char* last) { int i1, i2, i3; double d1, d2; //動作確認のためWithBaseやWithFmtを使っているが、別に意味はない。 auto [ptr, err] = Parse(first, last, i1, WithBase(i2, 10), i3, WithFmt(d1, std::chars_format::scientific), d2); if (err != std::errc{}) throw err; //fprintf(stdout, "%d %d %d %lf %lf\n", i1, i2, i3, d1, d2); }; */ auto func = [](const char* first, const char*) { int i1, i2, i3; double d1, d2; if (sscanf(first, "%d%d%d%le%lf", &i1, &i2, &i3, &d1, &d2) != 5) throw std::errc{}; //fprintf(stdout, "%d %d %d %lf %lf\n", i1, i2, i3, d1, d2); }; Test(func); return 0; }
結果
単位は秒である。
平均 | 標準偏差 | |
---|---|---|
Parse(自作) | 0.539300 | 0.007747 |
sscanf | 0.991400 | 0.012855 |
思いの外差がついて驚いている。sscanfの処理が具体的にどうなっているのかは理解していないのだが、ここまでコストが大きかったとは意外だ。実はフォーマット文字列の動的解析以外にも非効率な処理を行っているのだろうか。MSVCのsscanfは現在はインラインで定義されているので、そちらのコストはないはずだが。
ちなみに件のテキストファイル並列読み出しにも適用してみたところ、やはりsscanfのコストが解消されたことで並列化の恩恵は小さくなったが、まだ1.5~2倍ほど速くなるようだった。並列化も無意味ではなかった。あちらのテストでは読み出したデータをstd::vector
に格納するところまで含めて速度を計測していたので、純粋な読み出し速度のみで計測すればもっと差が広がる可能性もある。
というわけで、sscanfはやっぱり遅かった、という結論で本記事を締めくくる。最近C++の情報を全く追いかけていないが、こういう関数は何かしら提案されていたりしないのだろうか。一行ずつの読み出しってそれなりに需要のある処理のような気がするのだが。