from .BaseModel import BaseModel
import numpy as np
from ..utils import binarize, matmul, to_dense, to_sparse, ismat
from scipy.sparse import csr_matrix
[docs]
class ContinuousModel(BaseModel):
'''Base class for continuous binary matrix factorization models.
'''
def __init__(self):
raise NotImplementedError("This is a template class.")
[docs]
def init_model(self):
'''The ``BaseModel.init_model()`` for continuous models.
'''
self._start_timer()
self._make_name()
self._init_logs()
# avoid init factors when custom factors are provided
if not (hasattr(self, 'init_method') and self.init_method == 'custom'):
self._init_factors()
self.init_W()
self.init_UV()
self.normalize_UV()
self._to_dense()
self._to_float()
# replace zeros in U, V
if hasattr(self, 'solver') and self.solver == 'mu' and hasattr(self, 'U') and self.U is not None and hasattr(self, 'V') and self.V is not None:
eps = np.finfo(np.float64).eps
self.U[self.U == 0] = eps
self.V[self.V == 0] = eps
[docs]
def init_W(self):
'''Initialize masking weight matrix for models that accept masking weights.
This turns codenames into matrix.
If ``W`` is 'mask': ``W`` will be assigned 1 for any entrances in ``X_train``, no matter if the value is 1, or 0 from negative sampling.
If ``W`` is 'full': ``W`` is full 1 matrix. The loss will take the whole matrix into consideration.
If ``W`` is ndarray or spmatrix: ``W`` will be used as the mask matrix.
'''
if hasattr(self, 'W'):
assert self.W in ['mask', 'full'] or ismat(self.W)
if isinstance(self.W, str):
if self.W == 'mask':
self.W = self.X_train.copy()
self.W.data = np.ones(self.X_train.data.shape)
elif self.W == 'full':
self.W = np.ones((self.m, self.n))
self.W = to_sparse(self.W, type='csr')
[docs]
def init_UV(self, init_method="normal", ):
'''Initialize factors U and V with given ``init_method``.
'''
if hasattr(self, 'init_method'):
if self.init_method == "normal":
avg = np.sqrt(self.X_train.mean() / self.k)
V = avg * self.rng.standard_normal(size=(self.n, self.k))
U = avg * self.rng.standard_normal(size=(self.m, self.k))
self.U, self.V = np.abs(U), np.abs(V)
elif self.init_method == "uniform":
avg = np.sqrt(self.X_train.mean() / self.k)
self.V = self.rng.uniform(low=0, high=avg * 2, size=(self.n, self.k))
self.U = self.rng.uniform(low=0, high=avg * 2, size=(self.m, self.k))
elif self.init_method == "custom":
# U and V must be provided at this point
assert hasattr(self, 'U') and hasattr(self, 'V')
[docs]
def normalize_UV(self):
'''Normalize factors U and V with given ``normalize_method``.
.. topic:: Reference
The method 'balance' comes from the paper behind model ``BinaryMFPenalty``:
Binary Matrix Factorization with Applications.
If 'balance': balance each pair of factors, used in `BinaryMFPenalty`.
This does not necessarily map the factors to an interval within [0, 1].
If 'matrixwise-normalize': normalize the whole factor matrix to [0, 1], used in thresholding methods.
This will maintain the relative magnitude of the values within the whole factor matrix.
If 'columnwise-normalize': normalize each factor vector to [0, 1], used in thresholding methods.
This will maintain the relative magnitude of the values within each factor vector.
If 'matrixwise-mapping': map unique values in the whole factor matrix to an athmetic sequence in [0, 1].
This will maintain the relative magnitude of the values within the whole factor matrix.
If 'columnwise-mapping': map unique values in each factor vector to an athmetic sequence in [0, 1].
This will maintain the relative magnitude of the values within each factor vector.
If None: do nothing.
'''
if hasattr(self, 'normalize_method'):
U0_min, U0_max, V0_min, V0_max = self.U.min(), self.U.max(), self.V.min(), self.V.max()
if self.normalize_method == 'balance':
diag_U = to_dense(np.sqrt(np.max(self.U, axis=0))).flatten()
diag_V = to_dense(np.sqrt(np.max(self.V, axis=0))).flatten()
for i in range(self.k):
self.U[:, i] = self.U[:, i] * diag_V[i] / diag_U[i]
self.V[:, i] = self.V[:, i] * diag_U[i] / diag_V[i]
elif self.normalize_method == 'matrixwise-normalize':
self.U, self.V = self.U / self.U.max(), self.V / self.V.max()
elif self.normalize_method == 'columnwise-normalize':
for i in range(self.k):
self.U[:, i] = self.U[:, i] / self.U[:, i].max()
self.V[:, i] = self.V[:, i] / self.V[:, i].max()
elif self.normalize_method == 'matrixwise-mapping':
self.U = unique_values_mapping(to_dense(self.U))
self.V = unique_values_mapping(to_dense(self.V))
elif self.normalize_method == 'columnwise-mapping':
for i in range(self.k):
self.U[:, i] = unique_values_mapping(to_dense(self.U[:, i]))
self.V[:, i] = unique_values_mapping(to_dense(self.V[:, i]))
elif self.normalize_method is None:
return
U1_min, U1_max, V1_min, V1_max = self.U.min(), self.U.max(), self.V.min(), self.V.max()
print("[I] Normalized from: U: [{:.4f}, {:.4f}], V: [{:.4f}, {:.4f}]".format(U0_min, U0_max, V0_min, V0_max))
print("[I] to: U: [{:.4f}, {:.4f}], V: [{:.4f}, {:.4f}]".format(U1_min, U1_max, V1_min, V1_max))
[docs]
def show_matrix(self, settings=None, u=None, v=None, boolean=True, **kwargs):
'''Wrapper of ``BaseModel.show_matrix()`` with thresholds ``u`` and ``v``.
'''
if settings is None:
U = binarize(self.U, u) if boolean and u is not None else self.U
V = binarize(self.V, v) if boolean and v is not None else self.V
X = matmul(U, V.T, boolean=boolean)
settings = [(X, [0, 0], "X"), (U, [0, 1], "U"), (V.T, [1, 0], "V")]
super().show_matrix(settings, **kwargs)
[docs]
def _show_matrix(self):
'''Wrapper for ``BaseModel._show_matrix()``.
'''
settings = [(self.X_train, [0, 0], 'gt'), (self.X_pd, [0, 1], 'pd')]
self.show_matrix(settings, colorbar=True, discrete=False, keep_nan=False)
[docs]
def _to_dense(self):
'''Turn X, W, U and V into dense matrices.
For temporary use in development.
'''
self.X_train = to_dense(self.X_train)
if self.X_val is not None:
self.X_val = to_dense(self.X_val)
if self.X_test is not None:
self.X_test = to_dense(self.X_test)
if hasattr(self, 'W'):
self.W = to_dense(self.W)
if hasattr(self, 'U') and self.U is not None:
self.U = to_dense(self.U)
if hasattr(self, 'V') and self.V is not None:
self.V = to_dense(self.V)
[docs]
def _to_float(self):
'''Turn X, W, U and V into float matrices.
For temporary use in development.
'''
self.X_train = self.X_train.astype(np.float64)
if self.X_val is not None:
self.X_val = self.X_val.astype(np.float64)
if self.X_test is not None:
self.X_test = self.X_test.astype(np.float64)
if hasattr(self, 'W'):
self.W = self.W.astype(np.float64)
if hasattr(self, 'U') and self.U is not None:
self.U = self.U.astype(np.float64)
if hasattr(self, 'V') and self.V is not None:
self.V = self.V.astype(np.float64)
[docs]
def _to_bool(self):
'''Turn X, W, U and V into bool matrices.
For temporary use in development.
'''
self.X_train = self.X_train.astype(bool)
if self.X_val is not None:
self.X_val = self.X_val.astype(bool)
if self.X_test is not None:
self.X_test = self.X_test.astype(bool)
if hasattr(self, 'W'):
self.W = self.W.astype(bool)
if hasattr(self, 'U') and self.U is not None:
self.U = self.U.astype(bool)
if hasattr(self, 'V') and self.V is not None:
self.V = self.V.astype(bool)
[docs]
def unique_values_mapping(arr):
'''Map unique values in a matrix to [0, 1] interval.
'''
unique_values = np.unique(arr)
mapping = {val: idx / len(unique_values) for idx, val in enumerate(unique_values)}
projected_arr = np.vectorize(mapping.get)(arr)
return projected_arr