(Implicit) Matrix FactorizationをCythonで高速化する

こんにちは、さとぴりかです。CythonでMatrix Factorizationの実装を行ったときのメモです。

はじめに

ユーザーがアイテムに対して何らかの評価を与えた行列を考えたときに、★1~5のように陽に評価値が与えられたものをExplicit Feedback、そうでないもので購入やクリックの有無(positive feedbackとnegative feedbackの2値)が与えられたものをImplicit Feedbackと言います。negative feedbackについてはユーザーがアイテムに対してnegativeな反応があったというだけではなく、単純にそのアイテムを認知していないものも含むため注意が必要です。今回の実装はこうしたImplicit Feedbackの状況を想定したMatrix Factorizationについて扱っていきたいと思います。

Weighted Matrix Factorization

概要

Implicit Feedbackのデータに対するMatrix Factorizationの最もスタンダードな方法としてWeighted Matrix Factorization (WMF)があります。 WMFはnegative feedbackについて重みを小さくするという非常にシンプルなヒューリスティクスな方法を考えます。 具体的な損失関数は下記のような重み付き2乗誤差となっています。

LWMF=u,icyui(yuiθuTβi)2+λθuθu2+λβiβi2L_{\text{WMF}}=\sum_{u, i} c_{y_{u i}}\left(y_{u i}-\boldsymbol{\theta}_{u}^{\mathsf{T}} \boldsymbol{\beta}_{i}\right)^{2}+\lambda_{\theta} \sum_{u}\left\|\boldsymbol{\theta}_{u}\right\|^{2}+\lambda_{\beta} \sum_{i}\left\|\boldsymbol{\beta}_{i}\right\|^{2}

ここで、confidence cyuic_{y_{ui}}c1>c0c_1 > c_0 となるように設定します。

その他のnotationは以前の記事と同じです。

更新式

SGDで学習する際の更新式は下記の通り。(微分して勾配を求めてください)

θuθu+α(cyuiβi(yuiθuTβi)λθθu)βiβi+α(cyuiθi(yuiθuTβi)λββu)\boldsymbol{\theta}_u \leftarrow \boldsymbol{\theta}_u + \alpha \left( c_{y_{ui}} \boldsymbol{\beta}_i \left(y_{u i}-\boldsymbol{\theta}_{u}^{\mathsf{T}} \boldsymbol{\beta}_{i} \right) - \lambda_{\theta} \boldsymbol{\theta}_u \right)\\ \boldsymbol{\beta}_i \leftarrow \boldsymbol{\beta}_i + \alpha \left( c_{y_{ui}} \boldsymbol{\theta}_i \left(y_{u i}-\boldsymbol{\theta}_{u}^{\mathsf{T}} \boldsymbol{\beta}_{i} \right) - \lambda_{\beta} \boldsymbol{\beta}_u \right)

データセット

今回はMovieLens 100K Datasetのうち、評価が★4以上のものをpositive feedback、それ以外のもの(i.e. ★3以下と評価なし)をnegative feedbackとして扱います。実装時には以前も紹介したlightfmの力を借りて

from lightfm.datasets import fetch_movielens
data = fetch_movielens(min_rating=4.0)
Y_train = data["train"].toarray()
Y_train[Y_train > 0] = 1.0

のようにしてnumpyの形式で使います。

評価

今回はsklearnの力を借りて(ROCの)AUCで評価します。(ちゃんと学習できているかの確認程度)

NumPyによる実装

mini-batchとか使わずに1つ1つのサンプルを使って学習させると下記のような感じ。

from itertools import product

import lightfm
import numpy as np
from lightfm.datasets import fetch_movielens
from sklearn import metrics
from sklearn.utils import shuffle

data = fetch_movielens(min_rating=4.0)

# confidence c。
c_1 = 10.
c_0 = 1.
# 学習回数
num_epochs = 3
# 潜在ベクトルの次元
K = 20
# U ユーザー数
# I アイテム数
U, I = data["train"].shape
# SGDの学習率
learning_rate = 1e-2
# 正則化係数
lam = 1e-3

Y_train = data["train"].toarray()
Y_train[Y_train > 0] = 1.0
Y_test = data["test"].toarray()
Y_test[Y_test > 0] = 1.0

users, items = zip(*product(range(U), range(I)))
users = np.array(users)
items = np.array(items)
ratings = np.array([Y_train[_u, _i] for _u, _i in zip(users, items)])

# 潜在ベクトル初期化
theta = np.random.uniform(low=-0.1, high=0.1, size=(U, K)) / K
beta = np.random.uniform(low=-0.1, high=0.1, size=(I, K)) / K

for epoch in range(num_epochs):
    users, items, ratings = shuffle(users, items, ratings)
    loss = []
    for i in range(U*I):
        predicted = theta[users[i]] @ beta[items[i]]
        e = ratings[i] - predicted
        c = c_1 if ratings[i] == 1. else c_0
        theta_tmp = theta[users[i]].copy()
        beta_tmp = beta[items[i]].copy()
        theta[users[i]] += learning_rate * (beta_tmp  * e * c - lam * theta_tmp)
        beta[items[i]]  += learning_rate * (theta_tmp * e * c - lam * beta_tmp )
        loss.append((e*c)**2 + lam*(theta_tmp**2).sum() + lam*(beta_tmp**2).sum())
    print(np.mean(loss))


predicted = theta.dot(beta.T)
scores = np.zeros(U)
for u in range(U):
    if len(set(Y_test[u])) != 1:
        fpr, tpr, thresholds = metrics.roc_curve(Y_test[u], predicted[u])
        scores[u] = metrics.auc(fpr, tpr)
    else:
        scores[u] = 0.0

print(f"test mean auc: {scores.mean()}")

実行環境にもよりますが、自分の環境では下記の通り2分以上かかりました。

In [1]: %time run mf
epoch 1: loss=0.2557801751819014
epoch 2: loss=0.13357901588200438
epoch 3: loss=0.1294046242539832
test mean auc: 0.8768254588999452
CPU times: user 2min 23s, sys: 630 ms, total: 2min 24s
Wall time: 2min 23s

Cythonによる実装

基本的にループ処理とPyObject同士の演算について気をつければかなりの高速化が図れます。具体的には、全ての変数の型を宣言する、numpy.ndarrayで保持していたデータは全てmemoryview(e.g. double[:,:])として保持する等です。numpy.ndarrayとして保持しないので、NumPyで実装した際には

predicted = theta[users[i]] @ beta[items[i]]

としていた部分などは、@ (numpy.dot)が使えないので

cdef int k
cdef double predicted
...
predicted = 0.0
for k in range(K):
    predicted += theta[users[i], k] * beta[items[i], k]

のように書く必要があります。

実装の全体は下記の通りで、仮にmf_cy.pyxという名前で保存します。

# cython: language_level=3
# distutils: language=c++

from itertools import product

import cython
import numpy as np
cimport numpy as np
from sklearn.utils import shuffle

cdef inline double square(double x) nogil:
    return x * x

class WeightedMatrixFactorization(object):
    def __init__(self, int num_components = 20, double learning_rate = 0.01, double weight = 10., double weight_decay = 0.001):
        self.num_components = num_components
        self.weight = weight
        self.weight_decay = weight_decay
        self.learning_rate = learning_rate
    
    def fit(self, np.ndarray Y, int num_epochs = 3):
        cdef double[:,:] Y_train = Y.astype(np.float64)
        cdef int U = Y.shape[0]
        cdef int I = Y.shape[1]
        cdef int K = self.num_components

        # 潜在ベクトル初期化
        self.theta = np.random.uniform(low=-0.1, high=0.1, size=(U, K)).astype(np.float64) / K
        self.beta = np.random.uniform(low=-0.1, high=0.1, size=(I, K)).astype(np.float64) / K
        self.__fit(Y_train, num_epochs)


    @cython.boundscheck(False)
    @cython.wraparound(False)
    def __fit(self, double[:,:] Y, int num_epochs):
        cdef double[:,:] theta = self.theta
        cdef double[:,:] beta = self.beta
        cdef double lam = self.weight_decay
        cdef double lr = self.learning_rate
        cdef double num_components = self.num_components
        cdef int U = Y.shape[0]
        cdef int I = Y.shape[1]
        cdef int K = self.num_components
        cdef double c, c_1, c_0
        cdef int[:] users, items
        cdef double[:] ratings
        cdef int epoch, i, k
        cdef double predicted
        cdef double[:] theta_tmp = np.zeros(K)
        cdef double[:] beta_tmp = np.zeros(K)
        cdef double e
        cdef double[:] loss = np.zeros(U*I)

        c_1 = self.weight
        c_0 = 1.0

        _users, _items = zip(*product(range(U), range(I)))
        _users = np.array(_users).astype(np.int32)
        _items = np.array(_items).astype(np.int32)
        _ratings = np.array([Y[_u, _i] for _u, _i in zip(_users, _items)])

        users, items, ratings = shuffle(_users, _items, _ratings)

        for epoch in range(num_epochs):
            for i in range(U*I):
                loss[i] = 0.0
                predicted = 0.0
                for k in range(K):
                    predicted += theta[users[i], k] * beta[items[i], k]
                e = ratings[i] - predicted
                c = c_1 if ratings[i] == 1. else c_0

                for k in range(K):
                    theta_tmp[k] = theta[users[i], k]
                    beta_tmp[k] = beta[items[i], k]
                for k in range(K):
                    theta[users[i], k] +=  lr * (beta_tmp[k]  * e * c - lam * theta_tmp[k])
                    beta[items[i], k]  +=  lr * (theta_tmp[k] * e * c - lam * beta_tmp[k] )

                    loss[i] += lam*(square(theta_tmp[k]) + square(beta_tmp[k]))
                loss[i] += square(e)*c
            print(f"epoch {epoch+1}: loss={np.mean(loss)}")

setup.pyは下記の通りで、最終行でCythonで実装したファイル(.pyx)を指定します。

import os
import numpy as np
from Cython.Build import cythonize
from distutils.core import setup

cmpl_args = ['-O3']
lnk_args = []

os.environ['CFLAGS'] = " ".join(cmpl_args + lnk_args)
os.environ['CXXFLAGS'] = " ".join(cmpl_args + lnk_args)

setup(ext_modules=cythonize(["mf_cy.pyx"]), include_dirs= [np.get_include()])

コンパイルは

python setup.py build_ext --inplace

で実行でき、成功すると同じディレクトリにmf_cy.cpython-38-darwin.soのような名前のファイルができているはずです。これを指定するとpython側でimportできるようになります。具体例は下記の通りで、

from itertools import product
import numpy as np
from lightfm.datasets import fetch_movielens
from sklearn import metrics

data = fetch_movielens(min_rating=4.0)

U, I = data["train"].shape
Y_train = data["train"].toarray()
Y_train[Y_train > 0] = 1.0
Y_test = data["test"].toarray()
Y_test[Y_test > 0] = 1.0

from mf_cy import WeightedMatrixFactorization

model = WeightedMatrixFactorization(
    num_components=20, learning_rate=0.01, weight=10., weight_decay=1e-3)
model.fit(Y_train)

predicted = model.theta.dot(model.beta.T)
scores = np.zeros(U)
for u in range(U):
    if len(set(Y_test[u])) != 1:
        fpr, tpr, thresholds = metrics.roc_curve(Y_test[u], predicted[u])
        scores[u] = metrics.auc(fpr, tpr)
    else:
        scores[u] = 0.0

print(f"test mean auc: {scores.mean()}")

実際に実行した結果は

In [1]: %time run mf_cy_ml100k.py 
epoch 1: loss=0.2586439797444402
epoch 2: loss=0.13372843259177666
epoch 3: loss=0.1282534501027116
test mean auc: 0.8765697084700025
CPU times: user 3.77 s, sys: 241 ms, total: 4.01 s
Wall time: 3.43 s

となり、2分20秒程度かかっていたのがわずか3.4秒になっています!

さいごに

今回は学習のloop処理の中についてnumpy.ndarraymemoryviewに置き換えることによって高速化を行いました。実はHogwild(lock-freeな非同期なSGD)を使えば(GloVe公式実装等はこれです)更に高速化することができますが、紹介はまた今度の機会にしたいと思います。 数式やコード、説明に間違いございましたらTwitterかコメント欄までお願い致します。

参考


Written by@satopirka
A software engineer.

GitHubTwitter

© 2021 Minato Sato. All Rights Reserved