# import torch
from .ContinuousModel import ContinuousModel
import numpy as np
from ..utils import binarize, matmul, to_dense, to_sparse, ismat, get_prediction_with_threshold, show_matrix, multiply
from scipy.sparse import lil_matrix, csr_matrix
from tqdm import tqdm
try:
import torch
except ImportError:
print('[E] Missing package: torch. Please install it first.')
pass
[docs]
class ELBMF(ContinuousModel):
def __init__(
self,
k,
U=None,
V=None,
W='full',
init_method='custom',
reg_l1=0.01,
reg_l2=0.02,
reg_growth=1.02,
rounding=False,
beta=0.0,
tol=0.0,
max_iter=1000,
min_diff=1e-8,
seed=None):
'''
Parameters
----------
k : int
Number of latent factors.
U, V : ndarray, spmatrix
Need to be prepared if `init_method` is 'custom'.
W : ndarray, spmatrix or str in ['full', 'mask']
Weight matrix.
init_method : str in ['custom', 'normal', 'uniform']
reg_l1, reg_l2, reg_growth : float
rounding : bool
Whether to round the factors to {0, 1}.
Disable this for diagnosis purposes.
beta : float
Inertial coefficient of iPALM.
'''
self.check_params(k=k, U=U, V=V, W=W, init_method=init_method, reg_l1=reg_l1, reg_l2=reg_l2, reg_growth=reg_growth, rounding=rounding, beta=beta, tol=tol, max_iter=max_iter, min_diff=min_diff, seed=seed)
[docs]
def fit(self, X_train, X_val=None, X_test=None, **kwargs):
super().fit(X_train, X_val, X_test, **kwargs)
self._fit()
self.finish(show_logs=self.show_logs, save_model=self.save_model, show_result=self.show_result)
[docs]
def init_model(self):
'''Initialize factors and logging variables.
'''
super().init_model()
# initialize U, V
self.init_UV()
# normalize U, V
# self.normalize_UV(method="balance")
self.normalize_UV(method="normalize")
self._to_dense()
# replace zeros in U, V
self.U[self.U == 0] = np.finfo(float).eps
self.V[self.V == 0] = np.finfo(float).eps
def _fit(self):
# elbmf-python
# X = np.array(self.X_train.toarray(), dtype=np.float64)
# X = torch.from_numpy(X).float()
# self.U, self.V = elbmf(
# X = X,
# U = self.U,
# V = self.V,
# n_components = self.k,
# l1reg = self.reg_l1,
# l2reg = self.reg_l2,
# regularization_rate = lambda t: self.reg_growth ** t,
# maxiter = self.max_iter,
# tolerance = self.min_diff,
# beta = self.beta,
# callback = None,
# with_rounding = self.rounding,
# seed = self.seed
# )
# self.U, self.V = csr_matrix(self.U), csr_matrix(self.V).T
# self.X_pd = matmul(self.U, self.V.T, boolean=True, sparse=True)
# self.evaluate(df_name='boolean')
# re-implementation
self.iPALM()
[docs]
def iPALM(self):
U_last, V_last = self.U.copy(), self.V.copy()
err, gap = np.inf, np.inf
pbar = tqdm(total=self.max_iter, desc="[I] f: -, U: [-, -], V: [-, -]")
n_iter = 0
is_improving = True
while is_improving:
reg_l1, reg_l2 = self.reg_l1, self.reg_l2 * (self.reg_growth ** n_iter)
U, U_last = update_U(self.X_train, self.U, self.V, self.W, reg_l1, reg_l2, self.beta, U_last)
V, V_last = update_U(self.X_train.T, self.V, self.U, self.W.T, reg_l1, reg_l2, self.beta, V_last)
# compute err
err, err_last = np.power(self.X_train - U @ V.T, 2).sum(), err
# compute gap
U_gap, V_gap = get_integrality_gap(U, reg_l1, reg_l2), get_integrality_gap(V, reg_l1, reg_l2)
gap, gap_last = U_gap + V_gap, gap
self.U, self.V = U, V
# update pbar
pbar.update(1)
desc = f'[I] n_iter: {n_iter}, err: {err:.3f}, gap: {gap:.3f}, U: [{U.min():.4f}, {U.max():.4f}], V: [{V.min():.4f}, {V.max():.4f}]'
pbar.set_description(desc)
# print(desc)
self.X_pd = get_prediction_with_threshold(U=U, V=V, u=0.5, v=0.5, sparse=True)
self.evaluate(
df_name='updates',
head_info={
'iter': n_iter,
'reg_l1': reg_l1,
'reg_l2': reg_l2,
'gap': gap,
'U_gap': U_gap,
'V_gap': V_gap,
'error': err,
},
metrics=['ERR', 'Accuracy', 'Recall', 'Precision', 'F1'],
)
# if n_iter % 20 == 0:
# show_matrix([(self.X_pd, [0, 0], 'X_pd')], scaling=0.3, colorbar=True)
# measure error and diff
# diff = abs(err - err_last)
diff = abs(gap - gap_last)
is_improving = self.early_stop(error=gap, diff=diff, n_iter=n_iter)
n_iter += 1
[docs]
def get_integrality_gap(U, reg_l1, reg_l2):
dist = np.where(
U < 0.5,
np.abs(U),
np.abs(U - 1)
)
gap = reg_l1 * dist + reg_l2 * dist ** 2
return gap.sum()
[docs]
def update_U(X, U, V, W, reg_l1, reg_l2, beta, U_last):
'''A step of Gauss-Seidel optimization for U, applies to V as well.
'''
# U{t}, U_{t-1}, U_{t-2}
U, U_last, U_before_last = None, U, U_last
# step size eta in terms of the Lipschitz constant
L = max(np.linalg.norm(V.T @ V, ord=2), 1e-4)
eta = 1 / (1.1 * L) if beta == 0 else 2 * (1 - beta) / (1 + 2 * beta) / L
# gradient descent
U = U_last + beta * (U_last - U_before_last)
dFdU = multiply(W, U @ V.T - X) @ V
U = U - eta * dFdU
# proximal operator
U = prox(U, reg_l1 * eta, reg_l2 * eta)
return U, U_last
[docs]
def prox(U, kai, lamda):
'''Proximal operator of the elastic net penalty.
'''
U_prox = np.where(
U <= 0.5,
U - kai * np.sign(U),
U - kai * np.sign(U - 1) + lamda
)
U_prox = U_prox / (1 + lamda)
U_prox[U_prox < 0] = 0
return U_prox
# def proxelbmf(x, k, l):
# return torch.where(x <= 0.5, x - k * torch.sign(x), x - k * torch.sign(x - 1) + l) / (1 + l)
# def proxelbmf_(x, k, l, L):
# return torch.where(x <= 0.5, x - k * torch.sign(x), x - k * torch.sign(x - 1) + l) / (1 + l)
# def proxelbmfbox(x, k, l):
# return torch.clamp(proxelbmf(x, k, l), 0, 1)
# def proxelbmfnn(x, k, l):
# return torch.max(proxelbmf(x, k, l), torch.zeros_like(x))
# def proxelbmfnn_(x, k, l, L):
# return torch.max(proxelbmf_(x, k, l, L), torch.zeros_like(x))
# def integrality_gap_elastic(e, l1reg, l2reg):
# return torch.min((l1reg * e.abs() + l2reg * (e)**2), l1reg * (e - 1).abs() + l2reg * (e - 1)**2).sum()
# @torch.no_grad()
# def elbmf_step_ipalm(X, U, V, Uold, l1reg, l2reg, tau, beta):
# VVt, XVt = V@V.T, X@V.T
# L = max(VVt.norm().item(), 1e-4)
# if beta != 0:
# U += beta * (U - Uold)
# Uold = U
# step_size = 2 * (1 - beta) / (1 + 2 * beta) / L
# else:
# step_size = 1 / (1.1 * L)
# # step_size = 1 / L
# U -= (U @ VVt - XVt) * step_size
# U = proxelbmfnn(U, l1reg * step_size, l2reg * tau * step_size)
# U = proxelbmfnn_(U, l1reg, l2reg * tau, L)
# # debug
# U[U == 0] = np.finfo(float).eps
# return U
# @torch.no_grad()
# def elbmf_ipalm(
# X,
# U,
# V,
# l1reg,
# l2reg,
# regularization_rate,
# maxiter,
# tolerance,
# beta,
# callback
# ):
# if beta != 0:
# Uold, Vold = U.clone(), V.T.clone()
# else:
# Uold, Vold = None, None
# fn = torch.inf
# pbar = tqdm(total=maxiter, desc="[I] error: -, U: -, V: -")
# for t in range(maxiter):
# tau = regularization_rate(t)
# U = elbmf_step_ipalm(X, U, V, Uold, l1reg, l2reg, tau, beta)
# V = elbmf_step_ipalm(X.T, V.T, U.T, Vold, l1reg, l2reg, tau, beta).T
# fn0, fn = fn, (X - (U @ V)).norm() ** 2
# # update pbar
# pbar.update(1)
# desc = f'[I] error: {fn:.4f}, U: [{U.min():.4f}, {U.max():.4f}], V: [{V.min():.4f}, {V.max():.4f}]'
# pbar.set_description(desc)
# if callback != None:
# callback(t, U, V, fn)
# # if (abs(fn - fn0) < tolerance):
# # print("[I] Converged")
# # break
# return U, V
# @torch.no_grad()
# def elbmf(
# X,
# n_components,
# U = None,
# V = None,
# l1reg = 0.01,
# l2reg = 0.02,
# regularization_rate = lambda t: 1.02 ** t,
# maxiter = 3000,
# tolerance = 1e-8,
# beta = 0.0001,
# callback = None,
# with_rounding = True,
# seed = None
# ):
# """
# This function implements the algorithm described in the paper
# Sebastian Dalleiger and Jilles Vreeken. “Efficiently Factorizing Boolean Matrices using Proximal Gradient Descent”.
# In: Thirty-Sixth Conference on Neural Information Processing Systems (NeurIPS). 2022
# Arguments:
# X a Boolean n*m matrix
# n_components number of components
# l1reg l1 coefficient
# l2reg l2 coefficient
# regularization_rate monotonically increasing regularization-rate function
# maxiter maximum number of iterations
# tolerance the threshold to the absolute difference between the current and previous losses determines the convergence
# beta inertial coefficient of iPALM
# callback e.g. lambda t, U, V, fn: print(t, fn)
# with_rounding rounds U and V in case of early stopping
# Returns:
# U n*k factor matrix
# V k*m factor matrix
# """
# if seed is not None:
# torch.manual_seed(seed)
# if U is None or V is None:
# U, V = torch.rand(X.shape[0], n_components, dtype=X.dtype), torch.rand(n_components, X.shape[1], dtype=X.dtype)
# else:
# # imported U, V
# U = np.array(U.toarray(), dtype=np.float64)
# U = torch.from_numpy(U).float()
# V = np.array(V.toarray(), dtype=np.float64).T
# V = torch.from_numpy(V).float()
# U, V = elbmf_ipalm(X, U, V, l1reg, l2reg, regularization_rate, maxiter, tolerance, beta, callback)
# if with_rounding:
# with torch.no_grad():
# U = proxelbmfnn(U, 0.5, l2reg * 1e12)
# V = proxelbmfnn(V, 0.5, l2reg * 1e12)
# return U.round(), V.round()
# else:
# return U, V