始めに
今回はバイトニックソートについてまとめていこうと思います。
参考にした記事がUnityとComputeShaderやJavaScriptでの実装だったので同じ言語だとあまり面白くならないのでC++を使用しました。
今回は図が多く長いので目次を用意します。
バイトニックソートについて
バイトニックソートはソートの並列アルゴリズムの一つだそうです。(wikipediaより引用)
計算量はとクイックソートなどの高速なアルゴリズムと比べて少し大きいですが、n の部分の計算が並列で行えることからかなり速くソートすることが出来ます。
以下の動画の4:53あたりからバイトニックソートの処理を視覚的に見ることが出来ます。(純粋にソートアルゴリズムの勉強にもおすすめです)
少しだけ処理の順番が違いますが、とても参考になると思います。
私なりの考えになりますが、バイトニックソートには2つの流れがあると考えています。
大きな流れ
始めにこのような長さの数列があるとします。
この数列を2つずつ昇順、降順を繰り返した数列に整列させます。 赤が昇順、青が降順です。
次にこの数列を4つずつ昇順、降順を繰り返した配列に整列させます。
最後に全体を昇順に並べてソートが完了します。
このように2, 4, 8, 16. . . .と2の累乗の長さで昇順と降順を繰り返してソートを行っていきます。なので配列の長さが2の累乗でなくてはいけません。
また、この昇順降順の切り替えは数列の長さをとすると回になります。
ここの流れはマージソートを想像するとわかりやすいかもしれません。
小さな流れ
ではこの昇順と降順を作っていく流れを解説していきます。
2つずつの昇順降順
2つずつ昇順と降順を作る際は隣同士(距離1)を比較していきます。 赤が昇順、青が降順です。
4つずつの昇順降順
では4つずつ昇順と降順を作っていきます。始めに0番目と2番目、1番目と3番目と昇順、降順に並び替えます。すなわち距離2で要素を比較します。
次に隣同士(距離1)を比較して4つずつ昇順と降順を繰り返した数列になります。
8つずつの昇順降順
最後に2つの昇順と降順を1つにします。このとき長さ8の昇順を作っていくので距離4で並びかえを行います。なので、0番目と4番目、1番目と5番目と比較を行っていきます。
次に距離2で0番目と2番目、1番目と3番目とを比較していきます。
最後に距離1で比較してソートが完了します。
以上から2つずつの昇順降順を作る際は距離1で、4つずつの場合は距離2, 1で、8つずつの場合は4, 2, 1となっていることがわかります。
つまり、ずつの昇順降順を作る場合は距離をと距離を対数的に減らしながら整列させます。
bitで見るバイトニックソート
長さ8の配列があるとします。それのインデックスをbitで表現してバイトニックソートを考えていきます。
大きな流れ
始めに2つずつで昇順降順にしたとき、2bit目が0の場合に昇順、1の場合に降順になっていることがわかります。
4つずつの場合は3bit目が0か1で昇順降順を確認することが出来ます。
8つずつの場合は少し見づらいかもしれませんが、4bit目がすべて0であることから昇順になることがわかります。
よって、こずつの昇順降順を確かめる場合は番目のbitが1か0かを見ればよいです。
小さな流れ
次に比較する際のbitを見ていきます。
距離1(001)の場合は1bit目以外が一致しているもの同士を比較します。
距離2(010)の場合は2bit目以外が一致しているもの同士を比較します。
距離4(100)の場合は3bit目以外が一致しているもの同士を比較します。
よって、距離がの場合はbit以外が一致しているものを比較していることがわかります。
また、Xとなっているbitを抜いてみると00, 01, 10, 11と一つずつ値が増えていることがわかります。
C++での実装
ではC++でバイトニックソートを実装していきます。
実装についてはこれらの記事を参考にさせて頂きました。
仮眠プログラマーのつぶやき : UnityでGPGPU応用編 バイトニックソートを高速化
以下、プログラムになります。
#include <cmath> #include <iostream> #include <vector> #include <random> using namespace std; constexpr int shiftsize = 3; constexpr int v_size = 1 << shiftsize; vector<int> v(v_size); void Randamize(int seed) { random_device rd; mt19937 mt(seed); for (int i = 0; i < v_size; ++i) v[i] = mt(); } void Swapping(int i, int j) { int swapDistance = 1 << (i - j); // 距離swapDistanceで並び替え for(int v_i = 0; v_i < v_size/2; ++v_i) { int low = v_i & (swapDistance - 1); int swapPos = (v_i << 1) - low; bool isUpper = (swapPos & (2 << i)) == 0; if((v[swapPos] > v[swapPos | swapDistance]) == isUpper) swap(v[swapPos], v[swapPos | swapDistance]); } } void BitonicSort() { int log_vsize = log2(v_size); // 大きな流れ for(int i = 0; i < log_vsize; ++i) { // 小さな流れ for(int j = 0; j < i + 1; ++j) { Swapping(i, j); } } } void PrintV() { for (int i = 0; i < v.size(); ++i) cout << v[i] << endl; cout << endl; } int main() { // 配列にランダムな値を入れる // 引数は乱数のseed Randamize(1); // ソート前の配列を確認 cout << "before sort" << endl; PrintV(); BitonicSort(); // ソート後の配列を確認 cout << "after sort" << endl; PrintV(); return 0; }
解説
まず、大きな流れの回数を求めます。
昇順と降順の長さはと2の累乗になっていることから、配列の長さをとすると回大きな流れをすることがわかります。
int log_vsize = log2(v_size); // 大きな流れ for(int i = 0; i < log_vsize; ++i) {
次に、小さな流れの回数を求めます。
昇順と降順の長さが2の場合は入れ替える距離は1だけなので1回。
昇順と降順の長さが4の場合は入れ替える距離は2, 1なので2回。
昇順と降順の長さが8の場合は入れ替える距離は4, 2, 1なので3回。
よって、長さがのときは = 1回、
長さがのときは = 2回、
長さがのときは = 3回となっていることがわかります。
なので並び替えを行う小さな流れの回数は1, 2, 3. . . と増えてきます。
int log_vsize = log2(v_size); // 大きな流れ for(int i = 0; i < log_vsize; ++i) { // 小さな流れ for(int j = 0; j < i + 1; ++j) { Swapping(i, j); } }
次に、距離の求め方について解説します。
距離はn回目の大きな流れでは n bitから1bitまで距離を減らしながらソートするので1 << (大きな流れの回数 - 小さな流れの回数)となります。 プログラムでは i が大きな流れの回数、j が小さな流れの回数です。
int swapDistance = 1 << (i - j);
では入れ替えを行う要素について解説していきます。
入れ替えを行う要素同士の個数は 配列の個数 / 2となります。もし、ここを配列の個数にしてしまうと余分にもう1回比較を行ってしまいます。
void Swapping(int i, int j) { int swapDistance = 1 << (i - j); // 距離swapDistanceで並び替え for(int v_i = 0; v_i < v_size/2; ++v_i) {
では、比較する要素を求めていきます。
始めに距離のbitが0の要素を求めます。v_iをbit列に直すと0, 1, 10, 11, 100, 101. . .となります。ここで距離のbit以下のv_iを取り出します。
int low = v_i & (swapDistance - 1);
次にv_iを右に1つシフトしたbit列を作り、先ほど取り出した距離以下のbit列を引くことで距離のbitが0になったbit列を求めることができます。
int swapPos = (v_i << 1) - low;
例として比較する距離 swapDistance が2 (010)だった場合、これらのbit列が求まります。この求められたbit列に距離をOR演算することで比較されるbit列を求めることができます。
swapDistance - 1 = 010 - 001 = 001 v_i = 000 : low = 000 & 001 = 000 swapPos = (v_i << 1) - low = 000 - 000 = 000 swapPos | swapDistance = 000 | 010 = 010 v_i = 001: low = 001 & 001 = 001 swapPos = (v_i << 1) - low = 010 - 001 = 001 swapPos | swapDistance = 001 | 010 = 011 v_i = 010: low = 010 & 001 = 000 swapPos = (v_i << 1) - low = 100 - 000 = 100 swapPos | swapDistance = 100 | 010 = 110 v_i = 011: low = 011 & 001 = 001 swapPos = (v_i << 1) - low = 110 - 001 = 101 swapPos | swapDistance = 101 | 010 = 111
最後に昇順か降順かの計算について解説します。
n回目の大きな流れでは(n + 1)bit目が1か0かで昇順降順を判断します。
プログラムでは i が大きな流れの回数なので 1 << (i + 1) = 2 << i を見れば、昇順か降順かを判断できます。 ここでは0の場合が昇順、1の場合が降順になります。
bool isUpper = (swapPos & (2 << i)) == 0;
あとは、要素同士を比較してそれが昇順もしくは降順になっていなければ並び変えます。
if((v[swapPos] > v[swapPos | swapDistance]) == isUpper) swap(v[swapPos], v[swapPos | swapDistance]);
実行結果
実行した結果は次の通りです。出力からソートされていることがわかります。
before sort 1791095845 -12091157 -1201197172 -289663928 491263 550290313 1298508491 -4120955 after sort -1201197172 -289663928 -12091157 -4120955 491263 550290313 1298508491 1791095845
OpenMPによる並列処理
バイトニックソートで要素同士の比較、並び替えの処理は独立して行うことが可能です。 なので要素同士の比較はばらばらに実行しても結果は同じになります。
この処理をOpenMPを使って並列処理をすることが可能です。
// 距離swapDistanceで並び替え // 並列処理 #pragma omp parallel for for(int v_i = 0; v_i < v_size/2; ++v_i) { int low = v_i & (swapDistance - 1); int swapPos = (v_i << 1) - low; bool isUpper = (swapPos & (2 << i)) == 0; if((v[swapPos] > v[swapPos | swapDistance]) == isUpper) swap(v[swapPos], v[swapPos | swapDistance]); }
計測、std::sortとの比較
一度バイトニックソートがどれだけ速いかをstd::sortと比較してみます。
計測用のプログラムは以下の通りです。 長さ=1048576の配列を作り、ソートする前に一度ランダムな値を入れてからソートを開始します。
また、ソートするたびに掛かった時間を計測し最小値、最大値、平均値を出します。
C++プログラム
#include <cmath>
#include <iostream>
#include <random>
#include <vector>
#include <chrono>
#include <algorithm>
#include <limits>
#include <cfloat>
using namespace std;
constexpr int shiftsize = 20;
constexpr int v_size = 1 << shiftsize;
vector<int> v(v_size);
void Randamize(int x) {
random_device rd;
mt19937 mt(x);
for (int i = 0; i < v_size; ++i)
v[i] = mt();
}
void Swapping(int i, int j) {
int swapDistance = 1 << (i - j);
// 距離swapDistanceで並び替え
// 並列処理
#pragma omp parallel for
for (int v_i = 0; v_i < v_size / 2; ++v_i) {
int low = v_i & (swapDistance - 1);
int swapPos = (v_i << 1) - low;
bool isUpper = (swapPos & (2 << i)) == 0;
if ((v[swapPos] > v[swapPos | swapDistance]) == isUpper)
swap(v[swapPos], v[swapPos | swapDistance]);
}
}
void BitonicSort() {
int log_vsize = log2(v_size);
for(int i = 0; i < log_vsize; ++i) {
for(int j = 0; j < i + 1; ++j) {
Swapping(i, j);
}
}
}
void PrintV() {
for (int i = 0; i < v.size(); ++i)
cout << v[i] << endl;
cout << endl;
}
int main() {
constexpr int count = 10;
double minTime_b = DBL_MAX, maxTime_b = DBL_MIN, averageTime_b = 0;
double minTime_s = DBL_MAX, maxTime_s = DBL_MIN, averageTime_s = 0;
cout << "bitonicsort" << endl;
// バイトニックソート計測
for(int i = 0; i < count; ++i) {
Randamize(i);
auto start = chrono::system_clock::now();
BitonicSort();
auto end = chrono::system_clock::now();
double elapsed = chrono::duration_cast<chrono::milliseconds>(end - start).count();
cout << i << ":" << elapsed << endl;
minTime_b = min(minTime_b, elapsed);
maxTime_b = max(maxTime_b, elapsed);
averageTime_b += elapsed;
}
cout << "---result---" << endl;
cout << "min time:" << minTime_b << endl;
cout << "max time:" << maxTime_b << endl;
cout << "ave time:" << averageTime_b/count << endl;
cout << endl;
cout << "std::sort" << endl;
// std::sort計測
for (int i = 0; i < count; ++i) {
Randamize(i);
auto start = chrono::system_clock::now();
sort(v.begin(), v.end());
auto end = chrono::system_clock::now();
double elapsed = chrono::duration_cast<chrono::milliseconds>(end - start).count();
cout << i << ":" << elapsed << endl;
minTime_s = min(minTime_s, elapsed);
maxTime_s = max(maxTime_s, elapsed);
averageTime_s += elapsed;
}
cout << "---result---" << endl;
cout << "min time:" << minTime_s << endl;
cout << "max time:" << maxTime_s << endl;
cout << "ave time:" << averageTime_s / count << endl;
cout << endl;
return 0;
}
また、実行環境はwindowsでCPUはcorei7-9750H、コンパイラはgccで実行時はなるべくタスクを実行しないようにしました。
結果
結果は以下の通りです。 バイトニックソートは少しばらつきはありますが、std::sortに比べて約1.2倍高速でした。
ただし、並列処理を行わなかった場合は実行時間が6倍ほど増えるので普通にソートする場合はstd::sortの方が高速です。
bitonicsort 0:256 1:260 2:273 3:293 4:265 5:276 6:271 7:261 8:216 9:219 ---result--- min time:216 max time:293 ave time:259 std::sort 0:327 1:321 2:319 3:322 4:320 5:322 6:321 7:321 8:321 9:320 ---result--- min time:319 max time:327 ave time:321.4
最後に
このソートを知ったきっかけとしてはniconicoの動画になりますが、そのときは「知らなくてもいいでしょ」と思っていました。
私の感想になりますが、ここまで理解するのに苦しんだソートアルゴリズムはなかったです。しかし、結果としてstd::sortよりも速くソート出来ることがわかりました。
これからになりますが、Boidsアルゴリズムの近傍探索などで使用してみたいです。
参考
重複になりますがこれらの記事を参考にさせて頂きました。
また、OpenMPに関しては簡単ではありますがこの記事を参考にして使用しました。
OpenMPが実行できなかった際は英語になりますがこのサイトで調べて実行することが出来ました。
c++ - mingwでウィンドウ上でopenmpを使用する。 -lpthreadを見つけることができません