Source code for PyBMF.models.Asso

import numpy as np
from ..utils import matmul, add, to_sparse, to_dense, binarize, binarize
from ..utils import invert, coverage_score, ERR, description_length
from ..utils import get_prediction
from .BaseModel import BaseModel
from scipy.sparse import lil_matrix
from tqdm import tqdm


[docs] class Asso(BaseModel): '''The Asso algorithm. .. topic:: Reference The discrete basis problem. Zhang et al. 2007. Parameters ---------- k : int, optional The target rank. If ``None``, it will factorize until the error is smaller than ``tol``, or when other stopping criteria is met. tol : float, default: 0 The error tolerance. tau : float The binarization threshold when building basis. Can be determined via model selection techniques. w_fp : float The penalty weights for FP. w_fn : float, optional, default: None The penalty weights for FN. If ``w_fn`` is ``None``, it will be treated as ``1 - w_fp``. ''' def __init__(self, tau, k=None, tol=0, w_fp=0.5, w_fn=None): self.check_params(tau=tau, k=k, tol=tol, w_fp=w_fp, w_fn=w_fn)
[docs] def fit(self, X_train, X_val=None, X_test=None, **kwargs): '''Fit the model. ''' super().fit(X_train, X_val, X_test, **kwargs) self._fit() self.X_pd = get_prediction(U=self.U, V=self.V, boolean=True) self.finish(show_logs=self.show_logs, save_model=self.save_model, show_result=self.show_result)
[docs] def init_model(self): '''Initialize the model. ''' super().init_model() # real-valued association matrix self.assoc = build_assoc(X=self.X_train, dim=1) # binary-valued basis candidates self.basis = build_basis(assoc=self.assoc, tau=self.tau) settings = [(self.assoc, [0, 0], 'assoc'), (self.basis, [0, 1], 'basis')] self.show_matrix(settings, colorbar=True, clim=[0, 1], title=f'tau: {self.tau}')
[docs] def _fit(self): '''The main procedure of fitting. ''' k = 0 is_improving = True pbar = tqdm(total=self.k, position=0) while is_improving: best_row, best_col, best_idx = None, None, None best_score = 0 if k == 0 else best_score # best coverage score inherited from previous factors n_basis = self.basis.shape[0] # number of basis candidates # early stop detection if n_basis == 0: is_improving = self.early_stop(msg="Candidate list is empty", k=k) break # row-wise score of previous factors self.X_pd = get_prediction(U=self.U, V=self.V, boolean=True) s_old = coverage_score(gt=self.X_train, pd=self.X_pd, w_fp=self.w_fp, w_fn=self.w_fn, axis=1) for i in tqdm(range(n_basis), leave=False, position=0, desc=f"[I] k = {k}"): row = self.basis[i] score, col = get_vector( X_gt=self.X_train, X_old=self.X_pd, s_old=s_old, basis=row, basis_dim=1, w_fp=self.w_fp, w_fn=self.w_fn, ) if score > best_score: best_score, best_row, best_col, best_idx = score, row, col, i # early stop detection if best_idx is None: is_improving = self.early_stop(msg="No pattern found.", k=k) break # update factors self.set_factors(k, best_col.T, best_row.T) # remove this basis (unnecessary) idx = np.array([j for j in range(n_basis) if j != best_idx]) self.basis = self.basis[idx] # evaluation self.X_pd = get_prediction(U=self.U, V=self.V, boolean=True) # show matrix at every step if self.verbose and self.display: self.show_matrix(title=f"k: {k}, tau: {self.tau}, w: {[self.w_fp, self.w_fn]}") # original # self.evaluate(df_name='updates', head_info={'k': k}, train_info={'score': best_score}, verbose=self.verbose) # ex01: score and description length score = coverage_score(gt=self.X_train, pd=self.X_pd, w_fp=0.5, axis=None) desc_len = description_length(gt=self.X_train, pd=self.X_pd, U=self.U, V=self.V, w_model=1, w_fp=1, w_fn=1) self.evaluate( df_name='updates', head_info={'k': k}, train_info={ 'score': best_score, 'score_0.5': score, 'desc_len': desc_len, 'shape': [best_col.sum(), best_row.sum()], }, metrics=['TP', 'TPR', 'FP', 'FPR', 'FN', 'FNR', 'ERR', 'ACC', 'Recall', 'Precision', 'F1'], verbose=self.verbose, ) # early stop detection is_improving = self.early_stop(error=ERR(gt=self.X_train, pd=self.X_pd), k=k) is_improving = self.early_stop(n_factor=k+1) # update pbar and k pbar.update(1) k += 1
[docs] def get_vector(X_gt, X_old, s_old, basis, basis_dim, w_fp, w_fn): '''Return the optimal column/row vector given a row/column basis candidate. Parameters ---------- X_gt : spmatrix The ground-truth matrix. X_old : spmatrix The prediction matrix before adding the current pattern. s_old : array The column/row-wise coverage scores of previous prediction `X_pd`. basis : (1, n) spmatrix The basis vector. basis_dim : int The dimension to which `basis` belongs. If `basis_dim == 0`, a pattern is considered `basis.T * vector`. Otherwise, it's considered `vector.T * basis`. Note that both `basis` and `vector` are row vectors. w_fp, w_fn : float The penalty weights for false positives and false negatives. Returns ------- score : float The coverage score. vector : (1, n) spmatrix The matched vector. ''' vector_dim = 1 - basis_dim vector = lil_matrix(np.ones((1, X_gt.shape[vector_dim]))) pattern = matmul(basis.T, vector, sparse=True, boolean=True) pattern = pattern if basis_dim == 0 else pattern.T # new X and coverage score vector X_new = add(X_old, pattern, sparse=True, boolean=True) s_new = coverage_score(gt=X_gt, pd=X_new, w_fp=w_fp, w_fn=w_fn, axis=basis_dim) vector = lil_matrix(np.array(s_new > s_old, dtype=int)) # scores from old and new entries s_old = s_old[to_dense(invert(vector), squeeze=True).astype(bool)] s_new = s_new[to_dense(vector, squeeze=True).astype(bool)] score = s_old.sum() + s_new.sum() return score, vector
[docs] def build_assoc(X, dim): '''Build the real-valued association matrix. Parameters ---------- X : ndarray, spmatrix The data matrix. dim : int The dimension which ``basis`` belongs to. If ``dim`` == 0, ``basis`` is treated as a column vector and ``vector`` as a row vector. Returns ------- assoc : spmatrix The association matrix. ''' assoc = X @ X.T if dim == 0 else X.T @ X assoc = to_sparse(assoc, 'lil').astype(float) s = X.sum(axis=1-dim) s = to_dense(s, squeeze=True) for i in range(X.shape[dim]): assoc[i, :] = (assoc[i, :] / s[i]) if s[i] > 0 else 0 return assoc
[docs] def build_basis(assoc, tau): '''Get the binary-valued basis candidates. Parameters ---------- assoc : spmatrix The association matrix. tau : float The threshold for the association matrix. Returns ------- basis : spmatrix The binary-valued basis candidates. ''' basis = binarize(assoc, tau) basis = to_sparse(basis, 'lil').astype(int) nonzero_idx = np.array(basis.sum(axis=1) != 0).squeeze() basis = basis[nonzero_idx] return basis