Source code for dRFEtools.scoring.random_forest

"""
Random-forest specific scoring and feature elimination helpers.

The routines here wrap out-of-bag (OOB) metrics and recursive feature
elimination for both classification and regression random forest models.
"""

from __future__ import annotations

from itertools import chain
from typing import Dict, Iterable

import numpy as np
from numpy.typing import ArrayLike
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.metrics import (
    accuracy_score,
    explained_variance_score,
    mean_squared_error,
    normalized_mutual_info_score,
    r2_score,
    roc_auc_score,
)

from ..metrics.ranking import features_rank_fnc

__author__ = "Kynon Jade Benjamin"

__all__ = [
    "_rf_fe",
    "oob_score_r2",
    "oob_score_nmi",
    "oob_score_roc",
    "oob_score_mse",
    "oob_score_evar",
    "oob_score_accuracy",
]


def _oob_predictions(estimator: RandomForestClassifier | RandomForestRegressor) -> np.ndarray:
    """Return OOB predictions for supported random forest estimators."""

    if isinstance(estimator, RandomForestClassifier):
        if estimator.oob_decision_function_.ndim == 1:
            return estimator.oob_decision_function_.round().astype(int)
        return np.argmax(estimator.oob_decision_function_, axis=1)
    if isinstance(estimator, RandomForestRegressor):
        return estimator.oob_prediction_
    raise ValueError(
        "Estimator must be either RandomForestClassifier or RandomForestRegressor"
    )


[docs] def oob_score_roc(estimator: RandomForestClassifier, Y: ArrayLike) -> float: """Area under the ROC curve for OOB predictions.""" if len(np.unique(Y)) > 2: labels_pred = estimator.oob_decision_function_ kwargs: Dict[str, str] = {"multi_class": "ovr"} else: labels_pred = _oob_predictions(estimator) kwargs = {"average": "weighted"} return roc_auc_score(Y, labels_pred, **kwargs)
[docs] def oob_score_nmi(estimator: RandomForestClassifier, Y: ArrayLike) -> float: """Normalized mutual information for OOB predictions.""" labels_pred = _oob_predictions(estimator) return normalized_mutual_info_score(Y, labels_pred, average_method="arithmetic")
[docs] def oob_score_accuracy(estimator: RandomForestClassifier, Y: ArrayLike) -> float: """Accuracy for OOB predictions.""" labels_pred = _oob_predictions(estimator) return accuracy_score(Y, labels_pred)
[docs] def oob_score_r2(estimator: RandomForestRegressor, Y: ArrayLike) -> float: """Coefficient of determination for OOB predictions.""" labels_pred = _oob_predictions(estimator) return r2_score(Y, labels_pred)
[docs] def oob_score_mse(estimator: RandomForestRegressor, Y: ArrayLike) -> float: """Mean squared error for OOB predictions.""" labels_pred = _oob_predictions(estimator) return mean_squared_error(Y, labels_pred)
[docs] def oob_score_evar(estimator: RandomForestRegressor, Y: ArrayLike) -> float: """Explained variance for OOB predictions.""" labels_pred = _oob_predictions(estimator) return explained_variance_score(Y, labels_pred, multioutput="uniform_average")
def _rf_fe_step( estimator: RandomForestClassifier | RandomForestRegressor, X: ArrayLike, Y: ArrayLike, n_features_to_keep: int, features: ArrayLike, fold: int, out_dir: str, rank_features: bool, ) -> Dict[str, ArrayLike | Dict[str, float]]: """Eliminate features step-by-step using OOB metrics.""" if n_features_to_keep > X.shape[1]: raise ValueError( "n_features_to_keep cannot be greater than the number of features in X" ) estimator.fit(X, Y) feature_importances = estimator.feature_importances_ rank = np.argsort(feature_importances)[::-1] selected = rank[:n_features_to_keep] features_rank_fnc(features, rank, n_features_to_keep, fold, out_dir, rank_features) metrics: Dict[str, float] = {} if isinstance(estimator, RandomForestClassifier): metrics.update( { "nmi_score": oob_score_nmi(estimator, Y), "accuracy_score": oob_score_accuracy(estimator, Y), "roc_auc_score": oob_score_roc(estimator, Y), } ) else: metrics.update( { "r2_score": oob_score_r2(estimator, Y), "mse_score": oob_score_mse(estimator, Y), "explain_var": oob_score_evar(estimator, Y), } ) return { "n_features": X.shape[1], "selected": selected, "metrics": metrics, } def _rf_fe( estimator: RandomForestClassifier | RandomForestRegressor, X: ArrayLike, Y: ArrayLike, n_features_iter: Iterable[int], features: ArrayLike, fold: int, out_dir: str, rank_features: bool, ): """Iterate over features to be eliminated step-by-step.""" if X.shape[1] != len(features): raise ValueError("Number of columns in X must match the length of features") indices = np.arange(X.shape[1]) for nf in chain(n_features_iter, [1]): payload = _rf_fe_step(estimator, X, Y, nf, features, fold, out_dir, rank_features) yield { "n_features": payload["n_features"], "metrics": payload["metrics"], "indices": indices.copy(), "selected": payload["selected"], } indices = indices[payload["selected"]] features = features[payload["selected"]] X = ( X[:, payload["selected"]] if isinstance(X, np.ndarray) else X.iloc[:, payload["selected"]] )