from ..utils import check_sparse, safe_indexing, to_sparse, sum, ignore_warnings
from scipy.sparse import csr_matrix, spmatrix
import time
import numpy as np
from typing import Union
[docs]
class BaseSplit:
'''Base class for data splitting and negative sampling methods ``NoSplit``, ``RatioSplit`` and ``CrossValidation``.
.. note::
Attributes of ``BaseSplit``.
X_train : spmatrix
The training data matrix.
X_val : spmatrix
The validation data matrix.
X_test : spmatrix
The test data matrix.
Parameters
----------
X : ndarray, spmatrix
The data matrix.
'''
def __init__(self, X: Union[np.ndarray, spmatrix]):
# input
self.X = to_sparse(X, 'csr')
# output
self.X_train = None
self.X_val = None
self.X_test = None
[docs]
def negative_sample(self):
'''Negative sampling.
.. note::
We can only add 0's using csr/csc_matrix, and validate negative samples using ``coo_matrix`` or triplet.
``coo_matrix`` does not support value assignment;
``lil_matrix`` has no effect when adding 0's onto it.
Any arithmetic operation or ``csr_matrix.eliminate_zeros()`` will cause a sparse matrix to lose the negative samples.
'''
raise NotImplementedError("Missing negative_sample method.")
[docs]
def check_params(self, **kwargs):
'''Check patameters.
Checking the random seed.
'''
if "seed" in kwargs:
seed = kwargs.get("seed")
if seed is None and not hasattr(self,'seed'): # use time as self.seed
seed = int(time.time())
self.seed = seed
self.rng = np.random.RandomState(seed)
print("[I] seed :", self.seed)
elif seed is not None: # overwrite self.seed
self.seed = seed
self.rng = np.random.RandomState(seed)
print("[I] seed :", self.seed)
else: # self.rng remains unchanged
pass
[docs]
def load_pos_data(self, train_idx, val_idx, test_idx):
'''Load positive data.
Used in ``RatioSplit`` and ``CrossValidation``.
Leave ``X_val``, ``X_test`` empty if ``val_idx``/``test_idx`` length is 0 for negative sampling.
Parameters
----------
train_idx : ndarray
The indices of training data.
val_idx : ndarray
The indices of validation data.
test_idx : ndarray
The indices of test data.
'''
self.X_train = safe_indexing(self.X, train_idx)
self.X_val = safe_indexing(self.X, val_idx) if len(val_idx) > 0 else csr_matrix(self.X.shape)
self.X_test = safe_indexing(self.X, test_idx) if len(test_idx) > 0 else csr_matrix(self.X.shape)
self.pos_train_size = len(train_idx)
self.pos_val_size = len(val_idx)
self.pos_test_size = len(test_idx)
[docs]
def get_neg_indices(self, n_negatives, type):
'''Generate negative indices.
Used in ``RatioSplit.negative_sample`` and ``CrossValidation.negative_sample``.
This is fast but intractable for large dataset. Use trial-and-error for large dataset.
Parameters
----------
n_negatives : int
Number of negative samples.
type : str
Negative sampling type.
'''
if n_negatives == 0:
return np.array([]), np.array([])
assert type in ['uniform', 'popularity'], "Unsupported negative sampling option: {}".format(type)
m, n = self.X.shape
if type == "uniform":
p = np.ones((m, n))
elif type == "popularity":
p = np.zeros((m, n))
pu, pv = sum(self.X)
pu = pu / self.X.nnz
pv = pv / self.X.nnz
for r in range(m):
p[r] = pu[r] * pv
else:
raise ValueError("[E] Unsupported negative sampling option: {}".format(type))
p[self.X.toarray() == 1] = 0
p = p.flatten()
p = p / p.sum()
indices = self.rng.choice(a=m*n, size=n_negatives, replace=False, p=p)
U_neg = (indices / n).astype(int)
V_neg = (indices % n).astype(int)
return U_neg, V_neg
@ignore_warnings
def load_neg_data(self, train_idx, val_idx, test_idx, U_neg, V_neg):
'''Load negative data.
Used in ``RatioSplit.negative_sample`` and ``CrossValidation.negative_sample``.
'''
self.X_train = to_sparse(self.X_train, type='csr')
self.X_val = to_sparse(self.X_val, type='csr')
self.X_test = to_sparse(self.X_test, type='csr')
self.X_train.eliminate_zeros()
self.X_val.eliminate_zeros()
self.X_test.eliminate_zeros()
# SparseEfficiencyWarning
self.X_train[U_neg[train_idx], V_neg[train_idx]] = 0
if len(val_idx) > 0:
self.X_val[U_neg[val_idx], V_neg[val_idx]] = 0
if len(test_idx) > 0:
self.X_test[U_neg[test_idx], V_neg[test_idx]] = 0
self.neg_train_size = len(train_idx)
self.neg_val_size = len(val_idx)
self.neg_test_size = len(test_idx)