"""Development-set feature elimination utilities.
The functions in this module orchestrate recursive feature elimination
using a hold-out development split. They are intentionally lightweight to
maintain backward compatibility while offering clearer typing and
NumPy-style docstrings.
"""
from __future__ import annotations
from itertools import chain
from typing import Dict, Iterable
import numpy as np
from numpy.typing import ArrayLike
from sklearn.base import is_classifier
from sklearn.metrics import (
accuracy_score,
explained_variance_score,
mean_squared_error,
normalized_mutual_info_score,
r2_score,
roc_auc_score,
)
from sklearn.model_selection import train_test_split
from ..metrics.ranking import features_rank_fnc
from ..utils import get_feature_importances
__author__ = "Kynon J Benjamin"
__all__ = [
"_regr_fe",
"dev_score_r2",
"dev_score_roc",
"dev_score_mse",
"dev_score_nmi",
"dev_score_evar",
"dev_score_accuracy",
]
[docs]
def dev_score_roc(estimator, X: ArrayLike, Y: ArrayLike) -> float:
"""Area under the ROC curve for development predictions.
Parameters
----------
estimator
Fitted classifier.
X, Y
Development-set features and labels.
Returns
-------
float
Weighted ROC-AUC score.
"""
if len(np.unique(Y)) > 2:
labels_pred = estimator.predict_proba(X)
kwargs: Dict[str, str] = {"multi_class": "ovr"}
else:
labels_pred = estimator.predict(X)
kwargs = {"average": "weighted"}
return roc_auc_score(Y, labels_pred, **kwargs)
[docs]
def dev_score_nmi(estimator, X: ArrayLike, Y: ArrayLike) -> float:
"""Normalized mutual information for development predictions."""
labels_pred = estimator.predict(X)
return normalized_mutual_info_score(Y, labels_pred, average_method="arithmetic")
[docs]
def dev_score_accuracy(estimator, X: ArrayLike, Y: ArrayLike) -> float:
"""Accuracy for development predictions."""
labels_pred = estimator.predict(X)
return accuracy_score(Y, labels_pred)
[docs]
def dev_score_r2(estimator, X: ArrayLike, Y: ArrayLike) -> float:
"""Coefficient of determination for development predictions."""
labels_pred = estimator.predict(X)
return r2_score(Y, labels_pred)
[docs]
def dev_score_mse(estimator, X: ArrayLike, Y: ArrayLike) -> float:
"""Mean squared error for development predictions."""
labels_pred = estimator.predict(X)
return mean_squared_error(Y, labels_pred)
[docs]
def dev_score_evar(estimator, X: ArrayLike, Y: ArrayLike) -> float:
"""Explained variance for development predictions."""
labels_pred = estimator.predict(X)
return explained_variance_score(Y, labels_pred, multioutput="uniform_average")
def _regr_fe_step(
estimator,
X: ArrayLike,
Y: ArrayLike,
n_features_to_keep: int,
features: ArrayLike,
fold: int,
out_dir: str,
dev_size: float,
random_state: int | None,
rank_features: bool,
) -> Dict[str, ArrayLike | Dict[str, float]]:
"""Perform a single feature-elimination step using a dev split.
Parameters
----------
estimator
Any scikit-learn compatible estimator.
X, Y
Training features and labels.
n_features_to_keep
Number of features to retain after ranking.
features
Feature names aligned to ``X`` columns.
fold
Current cross-validation fold number.
out_dir
Output directory for ranking artifacts.
dev_size
Proportion of the development split.
random_state
Optional random seed for reproducibility.
rank_features
Whether to persist ranking artifacts.
Returns
-------
dict
Feature elimination payload for this step.
"""
if n_features_to_keep > X.shape[1]:
raise ValueError(
"n_features_to_keep cannot be greater than the number of features in X"
)
X_train, X_dev, y_train, y_dev = train_test_split(
X, Y, test_size=dev_size, random_state=random_state
)
estimator.fit(X_train, y_train)
estimator.feature_importances_ = get_feature_importances(estimator)
rank = np.argsort(estimator.feature_importances_)[::-1]
selected = rank[:n_features_to_keep]
features_rank_fnc(features, rank, n_features_to_keep, fold, out_dir, rank_features)
metrics: Dict[str, float] = {}
if is_classifier(estimator):
metrics.update(
{
"nmi_score": dev_score_nmi(estimator, X_dev, y_dev),
"accuracy_score": dev_score_accuracy(estimator, X_dev, y_dev),
"roc_auc_score": dev_score_roc(estimator, X_dev, y_dev),
}
)
else:
metrics.update(
{
"r2_score": dev_score_r2(estimator, X_dev, y_dev),
"mse_score": dev_score_mse(estimator, X_dev, y_dev),
"explain_var": dev_score_evar(estimator, X_dev, y_dev),
}
)
return {
"n_features": X_train.shape[1],
"selected": selected,
"metrics": metrics,
}
def _regr_fe(
estimator,
X: ArrayLike,
Y: ArrayLike,
n_features_iter: Iterable[int],
features: ArrayLike,
fold: int,
out_dir: str,
dev_size: float,
random_state: int | None,
rank_features: bool,
):
"""Iterate over decreasing feature sets using development scoring."""
if X.shape[1] != len(features):
raise ValueError("Number of columns in X must match the length of features")
indices = np.arange(X.shape[1])
for nf in chain(n_features_iter, [1]):
payload = _regr_fe_step(
estimator,
X,
Y,
nf,
features,
fold,
out_dir,
dev_size,
random_state,
rank_features,
)
yield {
"n_features": payload["n_features"],
"metrics": payload["metrics"],
"indices": indices.copy(),
"selected": payload["selected"],
}
indices = indices[payload["selected"]]
features = features[payload["selected"]]
X = (
X[:, payload["selected"]]
if isinstance(X, np.ndarray)
else X.iloc[:, payload["selected"]]
)