Source code for PyBMF.models.BinaryMFThreshold

from .ContinuousModel import ContinuousModel
from ..utils import multiply, power, sigmoid, ignore_warnings, subtract, get_prediction, get_prediction_with_threshold, ismat, show_factor_distribution
import numpy as np
from ..solvers import line_search, limit_step_size
from tqdm import tqdm


[docs] class BinaryMFThreshold(ContinuousModel): '''Binary matrix factorization, thresholding algorithm with line search. .. topic:: Reference Binary Matrix Factorization with Applications. Algorithms for Non-negative Matrix Factorization. .. note:: To be released: For sigmoid link function, please use ``BinaryMFThresholdExSigmoid``. For columnwise thresholds, please use ``BinaryMFThresholdExColumnwise``. For both, please use ``BinaryMFThresholdExSigmoidColumnwise``. Parameters ---------- u : float Initial threshold for ``U``. v : float Initial threshold for ``V``. lamda : float The 'lambda' in sigmoid function. ''' def __init__(self, k, U, V, W='mask', u=0.5, v=0.5, lamda=100, solver="line-search", min_diff=1e-3, max_iter=100, init_method='custom', normalize_method=None, seed=None): self.check_params(k=k, U=U, V=V, W=W, u=u, v=v, lamda=lamda, solver=solver, min_diff=min_diff, max_iter=max_iter, init_method=init_method, normalize_method=normalize_method, seed=seed)
[docs] def check_params(self, **kwargs): '''Check the validity of parameters. ''' super().check_params(**kwargs) assert self.solver in ['line-search'] assert self.init_method in ['custom'] assert self.normalize_method in ['balance', 'matrixwise-normalize', 'columnwise-normalize', 'matrixwise-mapping', 'columnwise-mapping', None] assert ismat(self.W) or self.W in ['mask', 'full']
[docs] def fit(self, X_train, X_val=None, X_test=None, **kwargs): '''Fit the model. ''' super().fit(X_train, X_val, X_test, **kwargs) self._fit() self.X_pd = get_prediction_with_threshold(U=self.U, V=self.V, u=self.u, v=self.v) self.finish(show_logs=self.show_logs, save_model=self.save_model, show_result=self.show_result)
[docs] def threshold_to_x(self): return np.array([self.u, self.v])
[docs] def x_to_threshold(self, x_last): self.u, self.v = x_last[0], x_last[1]
[docs] def x_bounds(self): eps = 1e-6 x_min = np.array([self.U.min() + eps, self.V.min() + eps]) x_max = np.array([self.U.max() - eps, self.V.max() - eps]) return x_min, x_max
[docs] def evaluate_with_threshold(self, n_iter, new_fval): self.X_pd = get_prediction_with_threshold(U=self.U, V=self.V, u=self.u, v=self.v) self.evaluate(df_name='updates', head_info={'iter': n_iter, 'u': self.u, 'v': self.v, 'F': new_fval})
[docs] def _fit(self): '''The gradient descent method. ''' n_iter = 0 # init n_iter is_improving = True # init flag x_last = self.threshold_to_x() # init threshold p_last = -self.dF(x_last) # descent direction new_fval = self.F(x_last) # init value self.evaluate_with_threshold(n_iter, new_fval) # init evaluation if self.verbose is False: pbar = tqdm(total=self.max_iter, desc='[I] F: -') # init pbar while is_improving: n_iter += 1 # update n_iter xk = x_last # starting point pk = p_last # searching direction alpha, fc, gc, new_fval, old_fval, new_slope = line_search(f=self.F, myfprime=self.dF, xk=xk, pk=pk, maxiter=50) if alpha is None: print("[W] Search direction is not a descent direction.") break # get x_last, p_last x_last = xk + alpha * pk p_last = -new_slope # descent direction # refine x_last, p_last and new_fval by limiting alpha x_min, x_max = self.x_bounds() x_last, alpha = limit_step_size(x_min=x_min, x_max=x_max, x_last=x_last, xk=xk, pk=pk, alpha=alpha) p_last = -self.dF(x_last) new_fval = self.F(x_last) # measurements diff = np.abs(new_fval - old_fval) # the difference of function value # diff = np.sqrt(np.sum((alpha * pk) ** 2)) # the difference of threshold self.print_msg(" Wolfe line search iter : {}".format(n_iter)) self.print_msg(" num of function evals : {}".format(fc)) self.print_msg(" num of gradient evals : {}".format(gc)) self.print_msg(" function value update : {:.3f} -> {:.3f}".format(old_fval, new_fval)) str_xk = ', '.join('{:.2f}'.format(x) for x in list(xk)) str_x_last = ', '.join('{:.2f}'.format(x) for x in list(x_last)) self.print_msg(" threshold update :") self.print_msg(" [{}]".format(str_xk)) self.print_msg(" ->[{}]".format(str_x_last)) str_pk = ', '.join('{:.4f}'.format(p) for p in alpha * pk) self.print_msg(" threshold update direction :") self.print_msg(" [{}]".format(str_pk)) self.x_to_threshold(x_last) # update threshold self.evaluate_with_threshold(n_iter, new_fval) # update evaluation # update pbar if self.verbose is False: pbar.update(1) pbar.set_description(f"[I] F: {new_fval:.6f}") # early stop detection is_improving = self.early_stop(n_iter=n_iter, diff=diff)
@ignore_warnings def F(self, params): '''The objective function. Parameters ---------- params : list of 2 floats The thresholds [``u``, ``v``]. Returns ------- F : float The value of the objective function F(``u``, ``v``). ''' u, v = params U = sigmoid(subtract(self.U, u) * self.lamda) V = sigmoid(subtract(self.V, v) * self.lamda) diff = self.X_train - U @ V.T F = 0.5 * np.sum(power(multiply(self.W, diff), 2)) return F @ignore_warnings def dF(self, params): '''The gradient of the objective function. Parameters ---------- params : list of 2 floats The thresholds [``u``, ``v``]. Returns ------- dF : float The value of the gradient (ascend direction) of the objective function [dF(``u``, ``v``)/du, dF(``u``, ``v``)/dv]. ''' u, v = params U = sigmoid(subtract(self.U, u) * self.lamda) V = sigmoid(subtract(self.V, v) * self.lamda) X_gt = self.X_train X_pd = U @ V.T dFdX = multiply(self.W, X_gt - X_pd) dFdU = dFdX @ V dUdu = self.dXdx(self.U, u) dFdu = multiply(dFdU, dUdu) # (m, k) dFdV = U.T @ dFdX dVdv = self.dXdx(self.V, v) dFdv = multiply(dFdV, dVdv.T) # (k, n) dF = np.array([np.sum(dFdu), np.sum(dFdv)]) return dF # @ignore_warnings
[docs] def dXdx(self, X, x): '''The fractional term in the gradient. dU* dV* dW* dH* This computes --- and --- (or --- and --- as in the paper). du dv dw dh Parameters ---------- X : ndarray The ``X*``, sigmoid(``X`` - ``x``) in the paper. ''' diff = subtract(X, x) num = np.exp(-self.lamda * subtract(X, x)) * self.lamda denom_inv = sigmoid(diff * self.lamda) ** 2 return num * denom_inv