"""
Implements a wrapper for NeighborsDataset which constructs polynomial features from neighbor counts.
"""
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from tdm.log import logger
from tdm.dataset.dataset import Dataset
from tdm.utils import log2_1p, dict_to_dataframe
from itertools import combinations_with_replacement
[docs]
class PolynomialDataset(Dataset):
"""
Performs feature transformations and constructs polynomial features from a dataset.
"""
[docs]
def __init__(
self,
ds: Dataset,
degree: int = 2,
feature_cell_types: list[str] | None = None,
include_bias: bool = True,
scale_features: bool = False,
log_transform: bool = False,
) -> None:
"""Performs feature transformations and constructs polynomial features from a dataset.
Args:
ds (Dataset): dataset whose features we transform
degree (int, optional): order of polynomial interactions to construct. Defaults to 2.
feature_cell_types (list[str] | None, optional): selects a subset of cell types for features. Defaults to all cell types present.
include_bias (bool, optional): add a bias term to the features. Defaults to True.
scale_features (bool, optional): standardize features (subtract mean and divide by standard deviation). Defaults to False.
log_transform (bool, optional): log2(1+x) transformation of features. Defaults to False.
Examples:
>>> # nds is a NeighborsDataset instance
>>> pds = PolynomialDataset(nds, degree=2)
"""
self.degree = degree
self.include_bias = include_bias
self.pf = MyPolynomialFeatures(degree=self.degree, include_bias=self.include_bias)
self.scale_features = scale_features
self.log_transform = log_transform
self.dataset_dict: dict[str, tuple[pd.DataFrame, pd.DataFrame]] = {}
self.scaler_dict: dict[str, StandardScaler] = {} # saved for scaling new features
# default use all cells in the neighbors dataset as features:
self.feature_cell_types = feature_cell_types or ds.cell_types()
self.feature_names = self.pf.feature_names_out(self.feature_cell_types)
for cell_type in ds.cell_types():
neighbor_features, observations = ds.fetch(cell_type)
# don't create features for missing cells:
if neighbor_features.shape[0] == 0:
continue
polynomial_features, scaler = self.construct_polynomial_features(
neighbor_features,
scale_features=self.scale_features,
fit_scaler=True,
target_cell=cell_type,
)
self.dataset_dict[cell_type] = polynomial_features, observations
self.scaler_dict[cell_type] = scaler
[docs]
def construct_polynomial_features(
self,
neighbor_features: pd.DataFrame,
scale_features: bool,
fit_scaler: bool = False,
target_cell: str | None = None,
) -> tuple[pd.DataFrame, StandardScaler]:
"""
Parameters:
neighbor_features: output of fetch() from a NeighborsDataset
scale_features: True performs standard scaling on the resulting polynomial features
fit_scaler: True fits a new standard scaler to the resulting polynomial features
target_cell: cell we're modelling, used to fetch the correct StandardScaler
Returns:
tuple:
(a polynomial features dataframe, a standard scaler)
"""
x = neighbor_features
# x = x.loc[:, cell_subset] # TODO this is the slowest line here!!!! wow
# make sure using same feature order:
if hasattr(self, "feature_names_in_"):
assert np.all(self.feature_names_in_ == x.columns)
if self.log_transform:
x = log2_1p(x)
# x = x.values # to numpy
x = np.array(x)
x = self.pf.transform(x)
# OLD:
# # construct polynomial features:
# if hasattr(self.pf, "n_output_features_"):
# x = x.values
# x = self.pf.transform(x)
# else:
# # on init - fit_transform twice:
# # once, to get feature names in and out:
# self.pf.fit_transform(x) # intentionally not overwriting x!
# self.feature_names_in_ = self.pf.feature_names_in_
# self.feature_names_out_ = self.pf.get_feature_names_out()
# # again, to fit to a numpy array so that future transforms don't perform a slow validation.
# x = x.values
# x = self.pf.fit_transform(x)
# scale after creating polynomial features:
scaler = None
if scale_features:
if fit_scaler:
scaler = StandardScaler().fit(x)
else:
if target_cell is not None:
scaler = self.scaler_dict[target_cell]
else:
raise ValueError(
"Please provide cell_type. \
Required to scale features according to the scale of the original dataset."
)
x = scaler.transform(x)
if self.include_bias:
x[:, 0] = 1 # reset the centered intercept
# re-cast to a readable dataframe:
x = pd.DataFrame(x, columns=self.feature_names, copy=False) # don't have to copy, already a copy
return x, scaler
[docs]
def construct_features_from_counts(
self, cell_counts: dict | pd.DataFrame, target_cell: str, **kwargs
) -> pd.DataFrame:
"""
Constructs features compatible with construct_polynomial_features
Cell vals is in raw counts (i.e "64" cells, not the log-value: 6)!
"""
neighbor_features = dict_to_dataframe(cell_counts, columns=self.cell_types())
if (neighbor_features.shape[0] > 1) and (neighbor_features.values.max() < 10):
logger.debug("expected raw cell counts. max was = %s", neighbor_features.values.max())
if not set(neighbor_features.columns) == set(self.feature_cell_types):
raise ValueError(
f"Cell types provided do not match the original dataset: \nExpected: {set(self.feature_cell_types)}\nGot: {neighbor_features.columns}"
)
x, _ = self.construct_polynomial_features(
neighbor_features,
scale_features=self.scale_features,
fit_scaler=False,
target_cell=target_cell,
)
return x
[docs]
def intercept_idx(self):
if not self.include_bias:
raise LookupError("Can't fetch intercept index because model has no intercept term!")
else:
return 0
class MyPolynomialFeatures:
"""A minimalist numpy-only polynomial feature transformer."""
def __init__(self, degree, include_bias=True):
self.degree = degree
self.include_bias = include_bias
def fit_transform(self, X: np.ndarray):
return self._transform(X)
def transform(self, X: np.ndarray):
return self._transform(X)
def feature_names_out(self, feature_names_in: list[str]):
all_combinations = self._all_combinations(len(feature_names_in))
feature_names = []
for comb in all_combinations:
# Count occurrences of each feature index in the combination
counts = {i: comb.count(i) for i in comb}
# Generate the name based on counts, using exponents when needed
parts = [
f"{feature_names_in[i]}^{count}" if count > 1 else feature_names_in[i] for i, count in counts.items()
]
# Join the parts to form the final feature name
feature_names.append(" ".join(parts))
if not self.include_bias:
# Exclude the first entry which corresponds to the bias term
feature_names = feature_names[1:]
# intercept term
if self.include_bias:
feature_names[0] = "1"
return feature_names
def _all_combinations(self, n_features_in: int):
return [
comb
for d in range(self.degree + 1) # Loop over all degrees from 0 to the given degree
for comb in combinations_with_replacement(range(n_features_in), d)
]
def _transform(self, X: np.ndarray):
n_samples, n_features_in = X.shape
all_combinations = self._all_combinations(n_features_in)
# Create polynomial features by multiplying elements according to combinations
X_poly = np.empty((n_samples, len(all_combinations)))
for i, comb in enumerate(all_combinations):
X_poly[:, i] = np.prod(X[:, comb], axis=1)
if not self.include_bias:
X_poly = X_poly[:, 1:]
return X_poly