import numpy as np
from .base import AbstractDecompositionModel
from sklearn.decomposition import TruncatedSVD
from sklearn.model_selection import train_test_split
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted
from .utils import column_select, stability_selection_to_threshold
[docs]class UoI_CUR(AbstractDecompositionModel):
"""Performs column subset selection (CUR decomposition) in the Union of
Intersections framework.
Parameters
----------
n_boots : int
Number of bootstraps.
max_k : int
The maximum rank of the singular value decomposition.
boots_frac : float
The fraction of data to use in the bootstrap.
algorithm : string, optional
SVD solver to use. Either “arpack” for the ARPACK wrapper in SciPy
(scipy.sparse.linalg.svds), or “randomized” for the randomized
algorithm due to Halko (2009).
n_iter : int, optional
Number of iterations for randomized SVD solver. Not used by ARPACK.
The default is larger than the default in randomized_svd to handle
sparse matrices that may have large slowly decaying spectrum.
random_state : int, RandomState instance, or None, optional
If int, random_state is the seed used by the random number
generator; If RandomState instance, random_state is the random
number generator; If None, the random number generator is the
RandomState instance used by np.random.
tol : float, optional
Tolerance for ARPACK. 0 means machine precision. Ignored by
randomized SVD solver.
Attributes
----------
components_ : ndarray, shape (n_samples, n_components)
The selected columns of the design matrix.
column_indices_ : ndarray, shape (n_components,)
The indices of the columns selected by the algorithm.
"""
def __init__(
self, n_boots, max_k, boots_frac, stability_selection=1.,
algorithm='randomized', n_iter=5, tol=0.0, random_state=None
):
self.n_boots = n_boots
self.max_k = max_k
self.boots_frac = boots_frac
self.stability_selection = stability_selection
self.algorithm = algorithm
self.n_iter = n_iter
self.tol = tol
self.random_state = random_state
[docs] def fit(self, X, ks=None, cs=None, stratify=None):
"""Performs column subset selection in the UoI framework on a provided
matrix.
Parameters
----------
X : ndarray, shape (n_samples, n_features)
The data matrix.
cs : int, float or None
The expected number of columns to select. If None, c will vary with
the rank k.
ks : int, list, ndarray, or None
The ranks to consider union over. If None, all ranks from
(1, ..., max_k) will be used.
stratify : array-like or None
Ensures groups of samples are alloted to bootstraps proportionally.
Labels for each group must be an int greater than zero. Must be of
size equal to the number of samples, with further restrictions on
the number of groups.
Returns
-------
union : ndarray, shape (n_components,)
A numpy array containing the indices of the selected columns.
"""
ks, cs = self.check_ks_and_cs(ks=ks, cs=cs)
n_ks = ks.size
n_samples, n_features = X.shape
selection_threshold = stability_selection_to_threshold(
self.stability_selection,
self.n_boots)
# keep track of which columns are chosen
columns = np.zeros((n_ks, self.n_boots, n_features), dtype='int')
# truncated SVD fitter object
tsvd = TruncatedSVD(n_components=self.max_k,
algorithm=self.algorithm,
n_iter=self.n_iter,
tol=self.tol,
random_state=self.random_state)
# iterate over bootstraps
for bootstrap in range(self.n_boots):
# extract bootstrap
if self.boots_frac == 1:
X_train = X
else:
X_train, _ = train_test_split(X, test_size=1 - self.boots_frac,
stratify=stratify,
random_state=self.random_state)
# perform truncated SVD on bootstrap
tsvd.fit(X_train)
# extract right singular vectors
V = tsvd.components_.T
# iterate over ranks
for k_idx, k in enumerate(ks):
c = cs[k_idx]
column_indices = column_select(V[:, :k], c=c,
random_state=self.random_state)
# convert column flags to set
columns[k_idx, bootstrap][column_indices] += 1
intersection = np.sum(columns, axis=1) >= selection_threshold
# calculate union of intersections
union = np.sum(intersection, axis=0) > 0
self.column_indices_ = np.argwhere(union).ravel()
if self.column_indices_.size == 0:
self.components_ = np.array([[]])
else:
self.components_ = X[:, self.column_indices_]
return self
[docs] def check_ks_and_cs(self, ks=None, cs=None):
"""Process the set of ranks to calculate leverage scores over, and the
expected number of columns for each rank.
Parameters
----------
ks : ndarray
The ranks to compute leverage scores over.
cs : ndarray
The expected number of columns to select for each rank.
Returns
-------
ks : ndarray
Processed and checked ranks.
cs : ndarray
Processed expected number of columns.
"""
# convert ks to a numpy array
if ks is None:
ks = 1 + np.arange(self.max_k)
elif isinstance(ks, int):
ks = np.array([ks])
elif isinstance(ks, list):
ks = np.array(ks)
elif not isinstance(ks, np.ndarray):
raise ValueError('ks must be a valid int, list or numpy array.')
# check that the numpy array contains valid values
if (
(not np.all(ks <= self.max_k))
or (not np.all(ks > 0))
or (not np.issubdtype(ks.dtype, np.integer))
):
raise ValueError('Ranks must be positive, integers, and no more'
' than the max rank.')
# check expected column numbers
n_ks = ks.size
if cs is None:
cs = ks + 20
elif isinstance(cs, int) or isinstance(cs, float):
cs = cs * np.ones(n_ks)
elif isinstance(cs, list):
cs = np.array(cs)
elif not isinstance(cs, np.ndarray):
raise ValueError('cs must be a valid int, float, list or numpy'
' array.')
# check that the numpy array contains valid values
if (
(not np.all(cs > 0))
or (not np.issubdtype(cs.dtype, np.number))
or (not cs.size == ks.size)
):
raise ValueError('Expected number of columns must consist of'
' positive numbers, with total size equal to ks.')
return ks, cs
[docs]class CUR(AbstractDecompositionModel):
"""Performs ordinary column subset selection through a CUR decomposition.
Parameters
----------
max_k : int
The maximum rank of the singular value decomposition.
algorithm : string, optional
SVD solver to use. Either “arpack” for the ARPACK wrapper in SciPy
(scipy.sparse.linalg.svds), or “randomized” for the randomized
algorithm due to Halko (2009).
n_iter : int, optional
Number of iterations for randomized SVD solver. Not used by ARPACK.
The default is larger than the default in randomized_svd to handle
sparse matrices that may have large slowly decaying spectrum.
random_state : int, RandomState instance, or None, optional
If int, random_state is the seed used by the random number
generator; If RandomState instance, random_state is the random
number generator; If None, the random number generator is the
RandomState instance used by np.random.
tol : float, optional
Tolerance for ARPACK. 0 means machine precision. Ignored by
randomized SVD solver.
Attributes
----------
components_ : ndarray, shape (n_samples, n_components)
The selected columns of the design matrix.
column_indices_ : ndarray, shape (n_components,)
The indices of the columns selected by the algorithm.
"""
def __init__(
self, max_k, algorithm='randomized', n_iter=5, tol=0.0,
random_state=None
):
self.max_k = max_k
self.algorithm = algorithm
self.n_iter = n_iter
self.tol = tol
self.random_state = random_state
[docs] def fit(self, X, c=None):
"""Performs column subset selection in the UoI framework on a provided
matrix.
Parameters
----------
X : ndarray, shape (n_samples, n_features)
The data matrix.
c : float
The expected number of columns to select. If None, c will vary with
the rank k.
"""
n_samples, n_features = X.shape
# truncated SVD fitter object
tsvd = TruncatedSVD(n_components=self.max_k,
algorithm=self.algorithm,
n_iter=self.n_iter,
tol=self.tol,
random_state=self.random_state)
# perform truncated SVD on bootstrap
tsvd.fit(X)
# extract right singular vectors
V = tsvd.components_.T[:, :self.max_k]
# perform column selection on the subset of singular vectors
if c is None:
c = self.max_k + 20
column_indices = column_select(V=V, c=c,
leverage_sort=True,
random_state=self.random_state)
self.column_indices_ = column_indices
self.components_ = X[:, self.column_indices_]
return self