したかみ ぶろぐ

Unity成分多め

C++とOpenMPによるバイトニックソート

始めに

今回はバイトニックソートについてまとめていこうと思います。

参考にした記事がUnityとComputeShaderやJavaScriptでの実装だったので同じ言語だとあまり面白くならないのでC++を使用しました。


今回は図が多く長いので目次を用意します。



バイトニックソートについて

バイトニックソートはソートの並列アルゴリズムの一つだそうです。(wikipediaより引用)

計算量は O(n) = n { \log^ 2 n }クイックソートなどの高速なアルゴリズムと比べて少し大きいですが、n の部分の計算が並列で行えることからかなり速くソートすることが出来ます。


以下の動画の4:53あたりからバイトニックソートの処理を視覚的に見ることが出来ます。(純粋にソートアルゴリズムの勉強にもおすすめです)

少しだけ処理の順番が違いますが、とても参考になると思います。


私なりの考えになりますが、バイトニックソートには2つの流れがあると考えています。


大きな流れ

始めにこのような長さ2^ 3 = 8の数列があるとします。

f:id:vxd-naoshi-19961205-maro:20201229231825p:plain



この数列を2つずつ昇順、降順を繰り返した数列に整列させます。 赤が昇順、青が降順です。

f:id:vxd-naoshi-19961205-maro:20201229231837p:plain



次にこの数列を4つずつ昇順、降順を繰り返した配列に整列させます。

f:id:vxd-naoshi-19961205-maro:20201229231847p:plain



最後に全体を昇順に並べてソートが完了します。

f:id:vxd-naoshi-19961205-maro:20201229231859p:plain



このように2, 4, 8, 16. . . .と2の累乗の長さで昇順と降順を繰り返してソートを行っていきます。なので配列の長さが2の累乗でなくてはいけません。

また、この昇順降順の切り替えは数列の長さをnとすると \log_{2} n回になります。

ここの流れはマージソートを想像するとわかりやすいかもしれません。



小さな流れ

ではこの昇順と降順を作っていく流れを解説していきます。

2つずつの昇順降順

2つずつ昇順と降順を作る際は隣同士(距離1)を比較していきます。 赤が昇順、青が降順です。

f:id:vxd-naoshi-19961205-maro:20201229231937p:plain



4つずつの昇順降順

では4つずつ昇順と降順を作っていきます。始めに0番目と2番目、1番目と3番目と昇順、降順に並び替えます。すなわち距離2で要素を比較します。

f:id:vxd-naoshi-19961205-maro:20201229231948p:plain


次に隣同士(距離1)を比較して4つずつ昇順と降順を繰り返した数列になります。

f:id:vxd-naoshi-19961205-maro:20201229231958p:plain



8つずつの昇順降順

最後に2つの昇順と降順を1つにします。このとき長さ8の昇順を作っていくので距離4で並びかえを行います。なので、0番目と4番目、1番目と5番目と比較を行っていきます。

f:id:vxd-naoshi-19961205-maro:20201229232010p:plain


次に距離2で0番目と2番目、1番目と3番目とを比較していきます。

f:id:vxd-naoshi-19961205-maro:20201229232020p:plain


最後に距離1で比較してソートが完了します。

f:id:vxd-naoshi-19961205-maro:20201229232033p:plain


以上から2つずつの昇順降順を作る際は距離1で、4つずつの場合は距離2, 1で、8つずつの場合は4, 2, 1となっていることがわかります。

つまり、 2^ nずつの昇順降順を作る場合は距離を 2^ {n - 1}, 2^ {n - 2}, . . . , 2 ^0と距離を対数的に減らしながら整列させます。



bitで見るバイトニックソート

長さ8の配列があるとします。それのインデックスをbitで表現してバイトニックソートを考えていきます。


f:id:vxd-naoshi-19961205-maro:20201229224842p:plain


大きな流れ

始めに2つずつで昇順降順にしたとき、2bit目が0の場合に昇順、1の場合に降順になっていることがわかります。

4つずつの場合は3bit目が0か1で昇順降順を確認することが出来ます。

8つずつの場合は少し見づらいかもしれませんが、4bit目がすべて0であることから昇順になることがわかります。

f:id:vxd-naoshi-19961205-maro:20201229230321p:plain



よって、nこずつの昇順降順を確かめる場合は \log_{2} n + 1番目のbitが1か0かを見ればよいです。



小さな流れ

次に比較する際のbitを見ていきます。

距離1(001)の場合は1bit目以外が一致しているもの同士を比較します。

距離2(010)の場合は2bit目以外が一致しているもの同士を比較します。

距離4(100)の場合は3bit目以外が一致しているもの同士を比較します。

f:id:vxd-naoshi-19961205-maro:20201229235945p:plain


よって、距離が2^ nの場合は \log_{2} n + 1bit以外が一致しているものを比較していることがわかります。

また、Xとなっているbitを抜いてみると00, 01, 10, 11と一つずつ値が増えていることがわかります。



C++での実装

ではC++でバイトニックソートを実装していきます。

実装についてはこれらの記事を参考にさせて頂きました。

バイトニックソートの実装を理解する - e.blog

仮眠プログラマーのつぶやき : UnityでGPGPU応用編 バイトニックソートを高速化


以下、プログラムになります。

#include <cmath>
#include <iostream>
#include <vector>
#include <random>
using namespace std;

constexpr int shiftsize = 3;
constexpr int v_size = 1 << shiftsize;

vector<int> v(v_size);

void Randamize(int seed) {

    random_device rd;
    mt19937 mt(seed);

    for (int i = 0; i < v_size; ++i) 
        v[i] = mt();

}


void Swapping(int i, int j) {

    int swapDistance = 1 << (i - j);

    // 距離swapDistanceで並び替え
    for(int v_i = 0; v_i < v_size/2; ++v_i) {

        int low = v_i & (swapDistance - 1);
        int swapPos = (v_i << 1) - low;

        bool isUpper = (swapPos & (2 << i)) == 0;

        if((v[swapPos] > v[swapPos | swapDistance]) == isUpper)
            swap(v[swapPos], v[swapPos | swapDistance]);

    }

}

void BitonicSort() {

    int log_vsize = log2(v_size);

    // 大きな流れ
    for(int i = 0; i < log_vsize; ++i) {

        // 小さな流れ
        for(int j = 0; j < i + 1; ++j) {

            Swapping(i, j);

        }
    
    }
}

void PrintV() {

    for (int i = 0; i < v.size(); ++i)
        cout << v[i] << endl;
    cout << endl;

}

int main() {

    // 配列にランダムな値を入れる
    // 引数は乱数のseed
    Randamize(1);

    // ソート前の配列を確認
    cout << "before sort" << endl;
    PrintV();

    BitonicSort();

    // ソート後の配列を確認
    cout << "after sort" << endl;
    PrintV();

    return 0;
}



解説

まず、大きな流れの回数を求めます。

昇順と降順の長さは 2^ 1, 2^ 2, 2^ 3, . . .と2の累乗になっていることから、配列の長さを nとすると \log_{2} n回大きな流れをすることがわかります。

    int log_vsize = log2(v_size);

    // 大きな流れ
    for(int i = 0; i < log_vsize; ++i) {



次に、小さな流れの回数を求めます。

昇順と降順の長さが2の場合は入れ替える距離は1だけなので1回。

昇順と降順の長さが4の場合は入れ替える距離は2, 1なので2回。

昇順と降順の長さが8の場合は入れ替える距離は4, 2, 1なので3回。


よって、長さが 2^ 1のときは \log_{2} {2^ 1} = 1回、

長さが 2^ 2のときは \log_{2} {2^ 2} = 2回、

長さが 2^ 3のときは \log_{2} {2^ 3} = 3回となっていることがわかります。

なので並び替えを行う小さな流れの回数は1, 2, 3. . . と増えてきます。

    int log_vsize = log2(v_size);

    // 大きな流れ
    for(int i = 0; i < log_vsize; ++i) {

        // 小さな流れ
        for(int j = 0; j < i + 1; ++j) {

            Swapping(i, j);

        }
    
    }



次に、距離の求め方について解説します。

距離はn回目の大きな流れでは n bitから1bitまで距離を減らしながらソートするので1 << (大きな流れの回数 - 小さな流れの回数)となります。 プログラムでは i が大きな流れの回数、j が小さな流れの回数です。

    int swapDistance = 1 << (i - j);



では入れ替えを行う要素について解説していきます。

入れ替えを行う要素同士の個数は 配列の個数 / 2となります。もし、ここを配列の個数にしてしまうと余分にもう1回比較を行ってしまいます。

void Swapping(int i, int j) {

    int swapDistance = 1 << (i - j);

    // 距離swapDistanceで並び替え
    for(int v_i = 0; v_i < v_size/2; ++v_i) {


では、比較する要素を求めていきます。

始めに距離のbitが0の要素を求めます。v_iをbit列に直すと0, 1, 10, 11, 100, 101. . .となります。ここで距離のbit以下のv_iを取り出します。

        int low = v_i & (swapDistance - 1);


次にv_iを右に1つシフトしたbit列を作り、先ほど取り出した距離以下のbit列を引くことで距離のbitが0になったbit列を求めることができます。

        int swapPos = (v_i << 1) - low;


例として比較する距離 swapDistance が2 (010)だった場合、これらのbit列が求まります。この求められたbit列に距離をOR演算することで比較されるbit列を求めることができます。

swapDistance - 1 = 010 - 001 = 001

v_i = 000 :  
low = 000 & 001 = 000
swapPos = (v_i << 1) - low = 000 - 000 = 000
swapPos | swapDistance = 000 | 010 = 010

v_i = 001:
low = 001 & 001 = 001
swapPos = (v_i << 1) - low = 010 - 001 = 001
swapPos | swapDistance = 001 | 010 = 011

v_i = 010:
low = 010 & 001 = 000
swapPos = (v_i << 1) - low = 100 - 000 = 100
swapPos | swapDistance = 100 | 010 = 110

v_i = 011:
low = 011 & 001 = 001
swapPos = (v_i << 1) - low = 110 - 001 = 101 
swapPos | swapDistance = 101 | 010 = 111



最後に昇順か降順かの計算について解説します。

n回目の大きな流れでは(n + 1)bit目が1か0かで昇順降順を判断します。

プログラムでは i が大きな流れの回数なので 1 << (i + 1) = 2 << i を見れば、昇順か降順かを判断できます。 ここでは0の場合が昇順、1の場合が降順になります。

        bool isUpper = (swapPos & (2 << i)) == 0;


あとは、要素同士を比較してそれが昇順もしくは降順になっていなければ並び変えます。

        if((v[swapPos] > v[swapPos | swapDistance]) == isUpper)
            swap(v[swapPos], v[swapPos | swapDistance]);



実行結果

実行した結果は次の通りです。出力からソートされていることがわかります。

before sort
1791095845
-12091157
-1201197172
-289663928
491263
550290313
1298508491
-4120955

after sort
-1201197172
-289663928
-12091157
-4120955
491263
550290313
1298508491
1791095845



OpenMPによる並列処理

バイトニックソートで要素同士の比較、並び替えの処理は独立して行うことが可能です。 なので要素同士の比較はばらばらに実行しても結果は同じになります。

f:id:vxd-naoshi-19961205-maro:20201230165318p:plain


この処理をOpenMPを使って並列処理をすることが可能です。

    // 距離swapDistanceで並び替え
    // 並列処理
#pragma omp parallel for
    for(int v_i = 0; v_i < v_size/2; ++v_i) {

        int low = v_i & (swapDistance - 1);
        int swapPos = (v_i << 1) - low;

        bool isUpper = (swapPos & (2 << i)) == 0;

        if((v[swapPos] > v[swapPos | swapDistance]) == isUpper)
            swap(v[swapPos], v[swapPos | swapDistance]);

    }



計測、std::sortとの比較

一度バイトニックソートがどれだけ速いかをstd::sortと比較してみます。

計測用のプログラムは以下の通りです。 長さ 2^ {20}=1048576の配列を作り、ソートする前に一度ランダムな値を入れてからソートを開始します。

また、ソートするたびに掛かった時間を計測し最小値、最大値、平均値を出します。

C++プログラム

#include <cmath>
#include <iostream>
#include <random>
#include <vector>
#include <chrono>
#include <algorithm>
#include <limits>
#include <cfloat>
using namespace std;

constexpr int shiftsize = 20;
constexpr int v_size = 1 << shiftsize;

vector<int> v(v_size);

void Randamize(int x) {

    random_device rd;
    mt19937 mt(x);

    for (int i = 0; i < v_size; ++i) 
        v[i] = mt();

}

void Swapping(int i, int j) {
    int swapDistance = 1 << (i - j);
    
  // 距離swapDistanceで並び替え
  // 並列処理
#pragma omp parallel for
    for (int v_i = 0; v_i < v_size / 2; ++v_i) {

        int low = v_i & (swapDistance - 1);
        int swapPos = (v_i << 1) - low;

        bool isUpper = (swapPos & (2 << i)) == 0;

        if ((v[swapPos] > v[swapPos | swapDistance]) == isUpper)
            swap(v[swapPos], v[swapPos | swapDistance]);

    }
}

void BitonicSort() {

    int log_vsize = log2(v_size);

    for(int i = 0; i < log_vsize; ++i) {

        for(int j = 0; j < i + 1; ++j) {

            Swapping(i, j);

        }

    }

}

void PrintV() {

    for (int i = 0; i < v.size(); ++i)
        cout << v[i] << endl;
    cout << endl;

}

int main() {

    constexpr int count = 10;
    double minTime_b = DBL_MAX, maxTime_b = DBL_MIN, averageTime_b = 0;
    double minTime_s = DBL_MAX, maxTime_s = DBL_MIN, averageTime_s = 0;

    cout << "bitonicsort" << endl;
    // バイトニックソート計測
    for(int i = 0; i < count; ++i) {
        Randamize(i);

        auto start = chrono::system_clock::now();

        BitonicSort();

        auto end = chrono::system_clock::now();

        double elapsed = chrono::duration_cast<chrono::milliseconds>(end - start).count();

        cout << i << ":" << elapsed << endl;

        minTime_b = min(minTime_b, elapsed);
        maxTime_b = max(maxTime_b, elapsed);
        averageTime_b += elapsed;

    }

    cout << "---result---" << endl;
    cout << "min time:" << minTime_b << endl;
    cout << "max time:" << maxTime_b << endl;
    cout << "ave time:" << averageTime_b/count << endl;
    cout << endl;

    cout << "std::sort" << endl;
    // std::sort計測
    for (int i = 0; i < count; ++i) {
        Randamize(i);

        auto start = chrono::system_clock::now();

        sort(v.begin(), v.end());

        auto end = chrono::system_clock::now();

        double elapsed = chrono::duration_cast<chrono::milliseconds>(end - start).count();

        cout << i << ":" << elapsed << endl;

        minTime_s = min(minTime_s, elapsed);
        maxTime_s = max(maxTime_s, elapsed);
        averageTime_s += elapsed;
    }

    cout << "---result---" << endl;
    cout << "min time:" << minTime_s << endl;
    cout << "max time:" << maxTime_s << endl;
    cout << "ave time:" << averageTime_s / count << endl;
    cout << endl;

    return 0;
}


また、実行環境はwindowsでCPUはcorei7-9750H、コンパイラgccで実行時はなるべくタスクを実行しないようにしました。



結果

結果は以下の通りです。 バイトニックソートは少しばらつきはありますが、std::sortに比べて約1.2倍高速でした。

ただし、並列処理を行わなかった場合は実行時間が6倍ほど増えるので普通にソートする場合はstd::sortの方が高速です。

bitonicsort
0:256
1:260
2:273
3:293
4:265
5:276
6:271
7:261
8:216
9:219
---result---
min time:216
max time:293
ave time:259

std::sort
0:327
1:321
2:319
3:322
4:320
5:322
6:321
7:321
8:321
9:320
---result---
min time:319
max time:327
ave time:321.4



最後に

このソートを知ったきっかけとしてはniconicoの動画になりますが、そのときは「知らなくてもいいでしょ」と思っていました。

私の感想になりますが、ここまで理解するのに苦しんだソートアルゴリズムはなかったです。しかし、結果としてstd::sortよりも速くソート出来ることがわかりました。

これからになりますが、Boidsアルゴリズムの近傍探索などで使用してみたいです。



参考

重複になりますがこれらの記事を参考にさせて頂きました。

edom18.hateblo.jp

blog.livedoor.jp



また、OpenMPに関しては簡単ではありますがこの記事を参考にして使用しました。

www.sanko-shoko.net


OpenMPが実行できなかった際は英語になりますがこのサイトで調べて実行することが出来ました。

c++ - mingwでウィンドウ上でopenmpを使用する。 -lpthreadを見つけることができません