from .BaseModel import BaseModel
from tqdm import tqdm
import numpy as np
from ..utils import sum, invert, multiply, to_dense, ERR, bool_to_index
from ..utils import get_prediction, get_residual, ignore_warnings, weighted_error, description_length
from itertools import product
from scipy.sparse import lil_matrix, hstack
[docs]
class Panda(BaseModel):
'''PaNDa and PaNDa+ algorithm.
PaNDa and PaNDa+ both fix ``w_fp``, ``w_fn`` to ``1.0``. They're allowed to be tuned in our implementation.
PaNDa fixes ``w_model`` to ``1.0`` while PaNDa+ does not.
.. topic:: Reference
Mining Top-K Patterns from Binary Datasets in presence of Noise.
A unifying framework for mining approximate top-k binary patterns.
Parameters
----------
k : int, optional
The target rank.
If `None`, it will factorize until the error is smaller than `tol`, or when other stopping criteria is met.
tol : float, default: 0
The error tolerance.
w_model : float
The model code weight `rho` in the Panda+ paper.
w_fp : float
The penalty weights for false positives (FP).
Added on top of the original works of Panda and Panda+ for flexibility.
w_fn : float
The penalty weights for false negatives (FN).
Added on top of the original works of Panda and Panda+ for flexibility.
init_method : str
How items are sorted.
exact_decomp : bool
Exact decomposition option.
Added on top of the original works of Panda and Panda+ to enable exact decomposition.
'''
def __init__(self, k=None, tol=0, w_model=1, w_fp=1, w_fn=1, init_method='correlation', exact_decomp=False):
self.check_params(k=k, tol=tol, w_model=w_model, w_fp=w_fp, w_fn=w_fn, init_method=init_method, exact_decomp=exact_decomp)
[docs]
def check_params(self, **kwargs):
'''Check the validity of the parameters.
'''
super().check_params(**kwargs)
assert self.init_method in ['frequency', 'couples-frequency', 'correlation']
if self.exact_decomp:
print("[I] Exact decomposition mode.")
self.w_model = 0
self.init_method = 'frequency'
[docs]
def fit(self, X_train, X_val=None, X_test=None, **kwargs):
'''Fit the model.
'''
super().fit(X_train, X_val, X_test, **kwargs)
self._fit()
self.finish(show_logs=self.show_logs, save_model=self.save_model, show_result=self.show_result)
[docs]
def _fit(self):
'''The main process of fitting.
In Panda, 3 lists are maintained.
``E`` (list) is the item extension list.
``I`` (spmatrix) is the item list.
``T`` (spmatrix) is the user or transaction list.
'''
# update residual and coverage
self.X_pd = get_prediction(U=self.U, V=self.V, boolean=True)
self.X_rs = get_residual(X=self.X_train, U=self.U, V=self.V)
self.I = lil_matrix((self.n, 1))
self.T = lil_matrix((self.m, 1))
cost_old = self.w_fn * self.X_rs.sum()
# cost_old = description_length(
# gt=self.X_train,
# U=hstack([self.U, self.T]).tolil(),
# V=hstack([self.V, self.I]).tolil(),
# w_model=self.w_model,
# w_fp=self.w_fp,
# w_fn=self.w_fn,
# )
k = 0
self.n_factors = 0
is_improving = True
if self.verbose is False:
pbar = tqdm(total=self.k, position=0)
while is_improving:
desc = f"[I] k: {k} - [{cost_old}]" # init cost
self.find_core()
desc += f" -> [{self.cost_now}]" # core cost
if not self.exact_decomp:
# disable extend_core step to find dense cores only
self.extend_core()
desc += f" -> [{self.cost_now}]" # extension cost
if self.verbose:
print(desc)
else:
pbar.set_description(desc)
# early stop detection
# the original Panda stops when cost starts increasing. we disble this to keep the algorithm running
if self.cost_now > cost_old:
# is_improving = self.early_stop(msg='Cost starts increasing.', k=k)
# continue
print("[W] Cost increased.")
if self.cost_now - self.w_model * (self.T.sum() + self.I.sum()) > cost_old:
is_improving = self.early_stop(msg="Error starts increasing.", k=k)
continue
cost_old = self.cost_now
# empty factor detection
if self.T.sum() == 0 or self.I.sum() == 0:
is_improving = self.early_stop(msg="No pattern found.", k=k)
continue
# update factors
self.set_factors(k, u=self.T, v=self.I)
# update residual and coverage
self.X_pd = get_prediction(U=self.U, V=self.V, boolean=True)
self.X_rs = get_residual(X=self.X_train, U=self.U, V=self.V)
# evaluate
self.evaluate(
df_name='updates',
head_info={
'cost': self.cost_now,
'shape': [int(self.T.sum()), int(self.I.sum())],
}
)
# early stop detection
is_improving = self.early_stop(
error=ERR(gt=self.X_train, pd=self.X_pd),
n_factor=self.n_factors, # k + 1
k=k)
# update pbar and k
if self.verbose is False:
pbar.update(1)
k += 1
self.n_factors += 1
@ignore_warnings
def find_core(self):
'''Find a dense core (T, I) and its extension list E.
'''
self.E = list(range(self.n)) # start with all items
self.I = lil_matrix((self.n, 1))
self.T = lil_matrix((self.m, 1))
# initialize extension list
if self.init_method == 'frequency':
self.sort_items(method='frequency')
elif self.init_method == 'couples-frequency' or self.init_method == 'correlation':
self.sort_items(method='couples-frequency')
# add the first item from E to I
self.I[self.E[0]] = 1
self.T = self.X_rs[:, self.E[0]]
self.E.pop(0)
cost_old = description_length(
gt=self.X_train,
U=hstack([self.U, self.T]).tolil(),
V=hstack([self.V, self.I]).tolil(),
w_model=self.w_model,
w_fp=self.w_fp,
w_fn=self.w_fn,
)
i = 0
if self.verbose is False:
desc = f"[I] k: {self.n_factors}, core cost: [{cost_old}]"
pbar = tqdm(total=len(self.E), position=0, desc=desc)
while i < len(self.E):
I = self.I.copy()
if self.init_method == 'correlation':
self.sort_items(method='correlation') # re-order extension list
I[self.E[0]] = 1 # use item with highest correlation
T = multiply(self.T, self.X_rs[:, self.E[0]])
else:
I[self.E[i]] = 1
T = multiply(self.T, self.X_rs[:, self.E[i]])
# check cost by description_length(): this is expensive
# cost_new = description_length()
# check cost by cost difference: this is faster
w0, h0 = self.I.sum(), self.T.sum()
w1, h1 = self.I.sum() + 1, T.sum()
d_cost = self.w_model * ((w1 + h1) - (w0 + h0)) - self.w_fn * ((w1 * h1) - (w0 * h0))
if d_cost <= 0: # cost_new <= cost_old
cost_new = cost_old + d_cost
self.print_msg(f" [{cost_old}] -> [{cost_new}] (find_core: add col, d_cost: {d_cost})")
cost_old = cost_new
# update I, T and E
self.I = I
self.T = T
if self.init_method == 'correlation':
self.E.pop(0)
else:
self.E.pop(i)
else:
i += 1
# update pbar
if self.verbose is False:
pbar.update(1)
desc = f"[I] k: {self.n_factors}, core cost: [{cost_old}]"
pbar.set_description(desc)
self.cost_now = cost_old
@ignore_warnings
def extend_core(self):
'''Extend core (T, I) with one item from E.
'''
cost_old = self.cost_now
if self.verbose is False:
desc = f"[I] k: {self.n_factors}, extd cost: [{cost_old}]"
pbar = tqdm(total=len(self.E), position=0, desc=desc)
for i in range(len(self.E)):
# update I
I = self.I.copy()
I[self.E[i]] = 1
# a faster equivalance of description_length()
T = bool_to_index(self.T)
partial_fn = - self.X_rs[T][:, self.E[i]].sum()
partial_fp = self.T.sum() - self.X_pd[T][:, self.E[i]].sum() + partial_fn
cost_new = cost_old + self.w_model * 1 + self.w_fp * partial_fp + self.w_fn * partial_fn
# cost_new = description_length(
# gt=self.X_train,
# U=hstack([self.U, self.T]).tolil(),
# V=hstack([self.V, I]).tolil(),
# w_model=self.w_model,
# w_fp=self.w_fp,
# w_fn=self.w_fn,
# )
# update pbar
if self.verbose is False:
pbar.update(1)
desc = f"[I] k: {self.n_factors}, extd cost: [{cost_old}]"
pbar.set_description(desc)
if cost_new <= cost_old:
self.print_msg(f" [{cost_old}] -> [{cost_new}] (extend_core: add col)")
self.I = I
cost_old = cost_new
else:
continue
# check cost by cost difference: this is faster
idx_I = bool_to_index(self.I) # indices of items in itemset I
idx_T = bool_to_index(invert(self.T)) # indices of transactions outside T
d_fn = - self.X_rs[idx_T][:, idx_I].sum(axis=1) # newly added false negatives
d_fp = self.I.sum() - self.X_pd[idx_T][:, idx_I].sum(axis=1) + d_fn # newly added false positives
d_cost = self.w_model * 1 + (self.w_fn * d_fn + self.w_fp * d_fp) # cost difference
# update T
idx = to_dense(d_cost, squeeze=True) <= 0
idx_T = idx_T[idx] # row indices whose d_cost <= 0
self.T[idx_T] = 1
if idx.sum() > 0: # at least 1 transaction is added to T
d_cost = d_cost[idx].sum()
cost_new = cost_old + d_cost
self.print_msg(f" [{cost_old}] -> [{cost_new}] (extend_core: add row, d_cost: {d_cost}, row(s) added: {idx.sum()})")
cost_old = cost_new
# # update pbar
# if self.verbose is False:
# pbar.update(1)
# desc = f"[I] k: {self.n_factors}, extd cost: [{cost_old}]"
# pbar.set_description(desc)
self.cost_now = cost_old # update current cost
[docs]
def sort_items(self, method):
'''Sort the extension list by descending scores.
Parameters
----------
method : str
Sort method.
``frequency``:
Sort items in extension list by frequency.
``couples-frequency``:
Sort items in extension list by frequency of item-pairs that include an item.
``correlation``:
Sort items in extension list by correlation with current transactions ``T``.
'''
if method == 'frequency':
scores = sum(self.X_rs[:, self.E], axis=0)
elif method == 'couples-frequency':
scores = np.zeros(len(self.E))
for i in range(len(self.E)):
T = self.X_rs[:, self.E[i]] # T of i-th item in E
idx_T = bool_to_index(T) # indices of transactions
scores[i] = sum(self.X_rs[idx_T, :], axis=0).sum() # sum of 1 and 2-itemset frequency
scores = scores - sum(self.X_rs[:, self.E], axis=0) # sum of 2-itemset frequency
elif method == 'correlation':
idx_T = bool_to_index(self.T)
scores = sum(self.X_rs[idx_T][:, self.E], axis=0)
idx = np.flip(np.argsort(scores)).astype(int)
self.E = np.array(self.E)[idx].tolist()