Source code for dRFEtools.scoring.dev

"""Development-set feature elimination utilities.

The functions in this module orchestrate recursive feature elimination
using a hold-out development split. They are intentionally lightweight to
maintain backward compatibility while offering clearer typing and
NumPy-style docstrings.
"""

from __future__ import annotations

from itertools import chain
from typing import Dict, Iterable

import numpy as np
from numpy.typing import ArrayLike
from sklearn.base import is_classifier
from sklearn.metrics import (
    accuracy_score,
    explained_variance_score,
    mean_squared_error,
    normalized_mutual_info_score,
    r2_score,
    roc_auc_score,
)
from sklearn.model_selection import train_test_split

from ..metrics.ranking import features_rank_fnc
from ..utils import get_feature_importances

__author__ = "Kynon J Benjamin"

__all__ = [
    "_regr_fe",
    "dev_score_r2",
    "dev_score_roc",
    "dev_score_mse",
    "dev_score_nmi",
    "dev_score_evar",
    "dev_score_accuracy",
]


[docs] def dev_score_roc(estimator, X: ArrayLike, Y: ArrayLike) -> float: """Area under the ROC curve for development predictions. Parameters ---------- estimator Fitted classifier. X, Y Development-set features and labels. Returns ------- float Weighted ROC-AUC score. """ if len(np.unique(Y)) > 2: labels_pred = estimator.predict_proba(X) kwargs: Dict[str, str] = {"multi_class": "ovr"} else: labels_pred = estimator.predict(X) kwargs = {"average": "weighted"} return roc_auc_score(Y, labels_pred, **kwargs)
[docs] def dev_score_nmi(estimator, X: ArrayLike, Y: ArrayLike) -> float: """Normalized mutual information for development predictions.""" labels_pred = estimator.predict(X) return normalized_mutual_info_score(Y, labels_pred, average_method="arithmetic")
[docs] def dev_score_accuracy(estimator, X: ArrayLike, Y: ArrayLike) -> float: """Accuracy for development predictions.""" labels_pred = estimator.predict(X) return accuracy_score(Y, labels_pred)
[docs] def dev_score_r2(estimator, X: ArrayLike, Y: ArrayLike) -> float: """Coefficient of determination for development predictions.""" labels_pred = estimator.predict(X) return r2_score(Y, labels_pred)
[docs] def dev_score_mse(estimator, X: ArrayLike, Y: ArrayLike) -> float: """Mean squared error for development predictions.""" labels_pred = estimator.predict(X) return mean_squared_error(Y, labels_pred)
[docs] def dev_score_evar(estimator, X: ArrayLike, Y: ArrayLike) -> float: """Explained variance for development predictions.""" labels_pred = estimator.predict(X) return explained_variance_score(Y, labels_pred, multioutput="uniform_average")
def _regr_fe_step( estimator, X: ArrayLike, Y: ArrayLike, n_features_to_keep: int, features: ArrayLike, fold: int, out_dir: str, dev_size: float, random_state: int | None, rank_features: bool, ) -> Dict[str, ArrayLike | Dict[str, float]]: """Perform a single feature-elimination step using a dev split. Parameters ---------- estimator Any scikit-learn compatible estimator. X, Y Training features and labels. n_features_to_keep Number of features to retain after ranking. features Feature names aligned to ``X`` columns. fold Current cross-validation fold number. out_dir Output directory for ranking artifacts. dev_size Proportion of the development split. random_state Optional random seed for reproducibility. rank_features Whether to persist ranking artifacts. Returns ------- dict Feature elimination payload for this step. """ if n_features_to_keep > X.shape[1]: raise ValueError( "n_features_to_keep cannot be greater than the number of features in X" ) X_train, X_dev, y_train, y_dev = train_test_split( X, Y, test_size=dev_size, random_state=random_state ) estimator.fit(X_train, y_train) estimator.feature_importances_ = get_feature_importances(estimator) rank = np.argsort(estimator.feature_importances_)[::-1] selected = rank[:n_features_to_keep] features_rank_fnc(features, rank, n_features_to_keep, fold, out_dir, rank_features) metrics: Dict[str, float] = {} if is_classifier(estimator): metrics.update( { "nmi_score": dev_score_nmi(estimator, X_dev, y_dev), "accuracy_score": dev_score_accuracy(estimator, X_dev, y_dev), "roc_auc_score": dev_score_roc(estimator, X_dev, y_dev), } ) else: metrics.update( { "r2_score": dev_score_r2(estimator, X_dev, y_dev), "mse_score": dev_score_mse(estimator, X_dev, y_dev), "explain_var": dev_score_evar(estimator, X_dev, y_dev), } ) return { "n_features": X_train.shape[1], "selected": selected, "metrics": metrics, } def _regr_fe( estimator, X: ArrayLike, Y: ArrayLike, n_features_iter: Iterable[int], features: ArrayLike, fold: int, out_dir: str, dev_size: float, random_state: int | None, rank_features: bool, ): """Iterate over decreasing feature sets using development scoring.""" if X.shape[1] != len(features): raise ValueError("Number of columns in X must match the length of features") indices = np.arange(X.shape[1]) for nf in chain(n_features_iter, [1]): payload = _regr_fe_step( estimator, X, Y, nf, features, fold, out_dir, dev_size, random_state, rank_features, ) yield { "n_features": payload["n_features"], "metrics": payload["metrics"], "indices": indices.copy(), "selected": payload["selected"], } indices = indices[payload["selected"]] features = features[payload["selected"]] X = ( X[:, payload["selected"]] if isinstance(X, np.ndarray) else X.iloc[:, payload["selected"]] )