不完全コレスキー分解をCSR形式で実装
昨日からポケモンGOが解禁されましたね! ニュースで話題になっている通り、街中を歩くとフツーの人たちが、フツーにポケモンやっててびっくりです。明らか怪しい奴らもウロウロしていますがご愛嬌ですね。 あっ・・・僕のことですか・・・(・ω・)
簡単?に説明
内容を大幅に執筆し直しました!なぜか検索で上位に引っかかるようになったので、まともそうなこと書きます!!
本記事は、不完全コレスキー分解を実用的に計算したいと思って執筆しました。というのも、大規模疎行列に対して前処理付き共役勾配法を適用したのですが、動作速度が遅かったのです。数学的な背景等は、以下を参照願います。C言語のソースコードも載っているし説明も親切です。
上記ページには大変お世話になっているのですが、ソースコードをそのまま使うのは考えものです。不完全コレスキー分解は、疎行列を対象とした共役勾配法の前処理としてよく用いられるみたいです。疎行列のサイズはアプリケーションによってまちまちですが、例えば画像処理の世界では、扱う行列のサイズは10万行x10万列を悠に超えることがあります。そんな大きな疎行列を通常行列と同様に扱うのはメモリと計算速度の無駄です!
そこで疎行列の特徴であるゼロ要素が多いことを活かして、効率的な計算方法を考えることになります。特に疎行列とベクトルの積を求める方法については、SpMV(Sparse matrix-vector multiplication、疎行列ベクトル積)というテーマとして数多くの論文が出ています。参考までに、Nvidiaの出しているレポートが以下になります。
以降の内容は、不完全コレスキー分解にSpMVの研究成果を適用したよ、というある意味で当然の話を書いています(・ω・)疎行列用の計算ライブラリも同じようなことやっていると思いますよー
疎行列のCSR形式
そんなわけで、僕は疎行列をCSR形式で表すことにしました。
CSR形式とは何かというと・・・困った時のwikipediaに載ってますね。。。wikiを超える説明はできません !(`・ω・´)
CSR形式を採用した理由は、NvidiaのレポートでCPU向けとして紹介されていたからです。グラフを見ていると他形式の方がパフォーマンスがいいのですが、全部GPU向けなんですよね。悲しい。CSR形式による疎行列ベクトル積のプログラム例として、論文では以下のように紹介しています。
__global__ void spmv_csr_scalar_kernel(const int num_rows , const int * ptr , const int * indices , const float * data , const float *x, float y) { int row = blockDim.x * blockIdx.x + threadIdx.x; if(row < num_rows){ float dot = 0; int row_start = ptr[row]; int row_end = ptr[row+1]; for (int jj = row_start; jj < row_end; jj++) dot += data[jj] * x[indices[jj]]; y[row] += dot; } }
はぁ、、、て感じです。よくわかんないですねw てわけで、以下のような疎行列を考えましょう。
行列を通常の行列形式として扱う場合、全ての要素にアクセスする際には、回のループを実施します。の正方行列に対してのアクセスや計算回数はとなります。
一方でのゼロ要素にアクセスする必要がない時、例えば不完全コレスキー分解では、CSR形式で扱うことは利点となります。の場合だと、全ての非ゼロ要素にアクセスするのに6回のループですみます。の正方行列の場合、行列が疎になるにつれて、アクセスや計算回数はに近づきます。
とりあえず、CSR形式にするとループ回数が減らせるんだな、ということが理解できると十分です。この例だとたいしたことないですが、行列サイズが大きくなると圧倒的に速度が違います!
C++による実装
このブログで初めてソースコードを載せます!
初めてにしては頑張ったと思うのですが、読みにくいですよね(´・ω・`) そもそもCSR形式って、めちゃくちゃ扱いにくいんだけど。。。なんなのこれw
#include <iostream> #include <vector> using namespace std; struct str_CSR { float * val; /*データ格納用のポインタ*/ int * col_index; /*行方向のインデックスポインタ */ int * row_index; /*列の開始位置を示すポインタ */ int col_size; /*データの数 = col_indexポインタのサイズ */ int row_size; /*行の数 = row_indexポインタのサイズ */ }; /*------------------------------------------------------------------------ @function name: makeData @note : Create a data for simulation --------------------------------------------------------------------------*/ void makeData(str_CSR * csr, int r_size) { /* 次のような形式の行列を作る (r_size=6) 2 1 0 0 0 0 1 2 1 0 0 0 0 1 2 1 0 0 0 0 1 2 1 0 0 0 0 1 2 1 0 0 0 0 1 2 */ int i, j, k; int c_size = r_size * 3 - 2; csr->val = new float [c_size]; csr->col_index = new int [c_size]; csr->row_index = new int [r_size+1]; csr->col_size = c_size; csr->row_size = r_size; k = 0; csr->row_index[0] = 1; for( i=0 ; i<r_size; i++) { for( j=i-1 ; j<i+2; j++) { if(j < 0 || j >= r_size) continue; if( i == j) csr->val[k] = 2; else csr->val[k] = 1; csr->col_index[k] = j; k++; } csr->row_index[i+1] = k+1; } } /*------------------------------------------------------------------------ @function name: executeIcdCsrFormat @note : Incomplete Cholesky Decomposition for Compressed Sparse Row format --------------------------------------------------------------------------*/ void executeIcdCsrFormat(str_CSR * csr_src , str_CSR * csr_dst, vector<float> &vec_d) { int i, j ,k ,l; /*loop変数*/ int loop_k, loop_l; /*k,lのloop回数を格納*/ float lld; /*L行列成分の途中計算を格納*/ float * tmp_val; /*L行列のサイズを特定できないので*/ int * tmp_col; /*出力データを一時的に格納*/ float * src_val; /*配列の要素ポインタ*/ int * src_col; /*(アロー演算子の記述が面倒なので)*/ int * src_row; /* 同様 */ float * dst_val; /* 同様 */ int * dst_col; /* 同様 */ int * dst_row; /* 同様 */ int tmp_index=0; int j_index; /*メモリ確保 & 値設定*/ csr_dst->row_index = new int [csr_src->row_size]; csr_dst->col_size = csr_src->col_size; csr_dst->row_size = csr_src->row_size; vec_d.resize(csr_src->row_size); //vec_d = new float [csr_src->row_size]; /*ひとまず多めにメモリ確保*/ tmp_val = new float [csr_src->col_size]; tmp_col = new int [csr_src->col_size]; /*ポインタのセット*/ src_val = csr_src->val; src_col = csr_src->col_index; src_row = csr_src->row_index; dst_row = csr_dst->row_index; /*1行目のデータは自明*/ tmp_val[0] = src_val[0]; tmp_col[0] = src_col[0]; vec_d[0] = 1.0 / tmp_val[0]; /*行の開始情報は2行目まで自明*/ dst_row[0] = 1; dst_row[1] = 2; for(i = 1; i < csr_dst->row_size; i++){ for(j = src_row[i]-1; j < src_row[i+1]-1; j++){ if( i < src_col[j]) { break; /*上三角成分は計算しない*/ } lld = src_val[j]; loop_k = j - src_row[i]; j_index = src_col[j]; /*for分を見やすくするため*/ for(k = dst_row[i]-1; k < dst_row[i]+loop_k; k++){ loop_l = k - dst_row[i] + 1; for(l = dst_row[j_index]-1; l < dst_row[j_index]+loop_l ; l++){ if(tmp_col[k] == tmp_col[l]) { lld -= tmp_val[k] * tmp_val[l] * vec_d[tmp_col[l]]; } } } tmp_index++; tmp_val[tmp_index] = lld; tmp_col[tmp_index] = src_col[j]; } vec_d[i] = 1.0 / tmp_val[tmp_index]; dst_row[i+1] = tmp_index+2; } /*出力のためのメモリ確保 & ポインタ設定*/ csr_dst->col_index = new int [tmp_index+1]; csr_dst->val = new float [tmp_index+1]; dst_val = csr_dst->val; dst_col = csr_dst->col_index; /*tmpのデータを移動*/ for(i=0; i<tmp_index+1; i++) { dst_val[i] = tmp_val[i]; dst_col[i] = tmp_col[i]; } delete tmp_val; delete tmp_col; } int main(void) { str_CSR csr; str_CSR csr2; vector<float> vec_d; makeData(&csr, 5); executeIcdCsrFormat(&csr, &csr2, vec_d); }
以上で終了です。このCSR形式ですが、共役勾配法とも相性がいいので試してみてくださいね! あと、書いているのがあくまで素人なので、間違いがあれば指摘してくださいね!