Source code for PyBMF.models.HyperPlus

from .Hyper import Hyper
from ..utils import FPR, matmul, FP, get_prediction, get_residual, ignore_warnings
import numpy as np
from tqdm import tqdm
from scipy.sparse import lil_matrix, hstack


[docs] class HyperPlus(Hyper): '''The Hyper+ algorithm. Hyper is an exact decomposition algorithm. Hyper+ is used after fitting a Hyper model. It's a relaxation of the exact decomposition. .. topic:: Reference Summarizing Transactional Databases with Overlapped Hyperrectangles. Xiang et al. SIGKDD 2011. Parameters ---------- model : Hyper class The fitted ``Hyper`` model. target_k : int The target number of factors. By default, it's 1. This will ask the model to factorize all the way down to ``k`` = 1. The last pattern will always be full 1 matrix. samples : int, default: all possible samples Number of pairs to be merged during trials. Smaller ``samples`` will be faster but more likely to surpass the false positive rate ``beta``. beta : float The upper limit of false positive / ground truth. 1.0 means no limit. ''' def __init__(self, model, target_k, samples=500, beta=np.inf): self.check_params(model=model, beta=beta, target_k=target_k, samples=samples)
[docs] def check_params(self, **kwargs): super().check_params(**kwargs) assert self.beta is not None or self.target_k is not None, "Please specify beta or target_k or both." # import model if 'model' in kwargs: model = kwargs.get('model') assert isinstance(model, Hyper), "Please import a Hyper model." self.import_model(U=model.U, V=model.V, T=model.T, I=model.I, k=model.k, logs=model.logs) print("[I] k from model :", self.k)
[docs] def fit(self, X_train, X_val=None, X_test=None, **kwargs): self.check_params(**kwargs) self.load_dataset(X_train=X_train, X_val=X_val, X_test=X_test) # do not re-init model factors and logs since the model is imported self._start_timer() self._fit() self.finish(show_logs=self.show_logs, save_model=self.save_model, show_result=self.show_result)
@ignore_warnings def _fit(self): self.X_pd = get_prediction(U=self.U, V=self.V, boolean=True) self.X_rs = get_residual(X=self.X_train, U=self.U, V=self.V) fpr = FPR(gt=self.X_train, pd=self.X_pd) # fp rate, FP / N fpb = FP(gt=self.X_train, pd=self.X_pd) / self.X_train.sum() # fp budget, FP / P # record U, V self.logs['U'] = [] self.logs['V'] = [] is_improving = True while is_improving: # sample pairs a = int(self.k * (self.k - 1) / 2) size = int(min(self.samples, a)) pairs_idx = np.random.choice(a=a, size=size, replace=False) pairs = [] for m in tqdm(range(self.k - 1), position=0, leave=True, desc="[I] Sampling pairs... Current FP budget: {:.3f}".format(fpb)): # for m in range(self.k - 1): idx_0 = 0.5 * m * ((self.k - 1) + (self.k - m)) idx_1 = 0.5 * (m + 1) * ((self.k - 1) + (self.k - (m + 1))) idx = pairs_idx[(pairs_idx >= idx_0) & (pairs_idx < idx_1)] for i in idx: n = int(i - idx_0 + m + 1) pairs.append([m, n]) # start trials best_m, best_n = None, None best_T, best_I = None, None best_U, best_V = None, None best_savings = 0 # for m, n in tqdm(pairs, position=1, leave=False, desc="[I] Merging"): for m, n in pairs: T = list(set(self.T[m] + self.T[n])) I = list(set(self.I[m] + self.I[n])) U = lil_matrix((self.m, 1)) V = lil_matrix((self.n, 1)) U[T] = 1 V[I] = 1 # X_pd with merged pattern idx = [i for i in range(self.U.shape[1]) if i != m and i != n] pattern = matmul(U, V.T, sparse=True, boolean=True) X_pd = get_prediction(U=self.U[:, idx], V=self.V[:, idx], boolean=True) X_pd[pattern.astype(bool)] = 1 fpr = FPR(gt=self.X_train, pd=X_pd) # fp rate, FP / N fpb = FP(gt=self.X_train, pd=X_pd) / self.X_train.sum() # fp budget, FP / P if fpb > self.beta: continue savings = cost_savings(self.T[m], self.I[m], self.T[n], self.I[n], X_pd) if savings > best_savings: best_m, best_n = m, n best_T, best_I = T, I best_U, best_V = U, V best_savings = savings if best_m is None: is_improving = False break # update T, I idx = [i for i in range(self.k) if i not in [best_m, best_n]] self.T = [self.T[i] for i in idx] self.I = [self.I[i] for i in idx] self.T.append(best_T) self.I.append(best_I) # update U, V self.U = hstack([self.U[:, idx], best_U], format='lil') self.V = hstack([self.V[:, idx], best_V], format='lil') self.logs['U'].append(self.U.copy()) self.logs['V'].append(self.V.copy()) self.X_pd = get_prediction(U=self.U, V=self.V, boolean=True) self.X_rs = get_residual(X=self.X_train, U=self.U, V=self.V) fpr = FPR(gt=self.X_train, pd=self.X_pd) # fp rate, FP / N fpb = FP(gt=self.X_train, pd=self.X_pd) / self.X_train.sum() # fp budget, FP / P ocr = FP(gt=self.X_train, pd=self.X_pd) / self.X_pd.sum() # oc rate, FP / predicted P # self.k -= 1 self.k = self.U.shape[1] # evaluate self.evaluate(df_name='refinements', head_info={ 'k': self.U.shape[1], 'savings': best_savings, 'FPR': fpr, 'FPB': fpb, 'OCR': ocr, }) is_improving = fpb <= self.beta and self.k >= self.target_k
[docs] def cost_savings(T_0, I_0, T_1, I_1, X_pd): '''Compute the cost savings of merging `H_0` and `H_1`. savings = model description savings / exclusive area of merged pattern `H_0` = [`T_0`, `I_0`], `H_1` = [`T_1`, `I_1`] ''' T = list(set(T_0 + T_1)) I = list(set(I_0 + I_1)) denominator = len(T) * len(I) - X_pd[T, :][:, I].sum() if denominator == 0: savings = np.Inf return savings else: numerator = len(T_0) + len(T_1) + len(I_0) + len(I_1) - len(T) - len(I) savings = numerator / denominator return savings