Source code for dRFEtools.dRFEtools

#!/usr/bin/env python
"""Dynamic recursive feature elimination interfaces."""

from __future__ import annotations

from pathlib import Path
from typing import Dict, Iterable, Tuple
from warnings import filterwarnings

from matplotlib import MatplotlibDeprecationWarning
from sklearn.base import is_classifier
from sklearn.ensemble import RandomForestClassifier

from .scoring import _regr_fe, _rf_fe
from .utils import normalize_rfe_result

filterwarnings("ignore", category=MatplotlibDeprecationWarning)
filterwarnings("ignore", category=UserWarning, module="plotnine.*")
filterwarnings("ignore", category=DeprecationWarning, module="plotnine.*")

__all__ = ["rf_rfe", "dev_rfe", "plot_metric", "plot_with_lowess_vline"]


def _n_features_iter(nf: int, keep_rate: float) -> Iterable[int]:
    """Yield the number of features to keep at each elimination step."""

    while nf != 1:
        nf = max(1, int(nf * keep_rate))
        yield nf


def _normalize_metrics(estimator, normalized: Dict) -> Dict:
    """Select task-appropriate metrics from a normalized payload."""

    metrics = normalized.get("metrics", {})
    if not isinstance(normalized, dict):
        return metrics

    if isinstance(estimator, RandomForestClassifier):
        return {
            key: metrics[key]
            for key in ("nmi_score", "accuracy_score", "roc_auc_score")
            if key in metrics
        }
    if is_classifier(estimator):
        return {
            key: metrics[key]
            for key in ("nmi_score", "accuracy_score", "roc_auc_score")
            if key in metrics
        }
    return {
        key: metrics[key]
        for key in ("r2_score", "mse_score", "explain_var")
        if key in metrics
    }


[docs] def rf_rfe( estimator, X, Y, features, fold: int, out_dir: str | Path = ".", elimination_rate: float = 0.2, RANK: bool = True, ) -> Tuple[Dict[int, Dict], Dict]: """Run random-forest feature elimination over an iterator process.""" if not 0 < elimination_rate < 1: raise ValueError("elimination_rate must be between 0 and 1") results: Dict[int, Dict] = {} first_pass: Dict | None = None keep_rate = 1 - elimination_rate out_dir = Path(out_dir) for payload in _rf_fe( estimator, X, Y, _n_features_iter(X.shape[1], keep_rate), features, fold, str(out_dir), RANK, ): normalized = normalize_rfe_result(payload) normalized["metrics"] = _normalize_metrics(estimator, normalized) if first_pass is None: first_pass = normalized results[normalized["n_features"]] = normalized return results, first_pass
[docs] def dev_rfe( estimator, X, Y, features, fold: int, out_dir: str | Path = ".", elimination_rate: float = 0.2, dev_size: float = 0.2, RANK: bool = True, SEED: bool = False, random_state: int | None = None, ) -> Tuple[Dict[int, Dict], Dict]: """Recursive feature elimination for estimators using a dev split.""" if not 0 < elimination_rate < 1 or not 0 < dev_size < 1: raise ValueError("elimination_rate and dev_size must be between 0 and 1") results: Dict[int, Dict] = {} first_pass: Dict | None = None keep_rate = 1 - elimination_rate out_dir = Path(out_dir) resolved_random_state = 13 if SEED else random_state for payload in _regr_fe( estimator, X, Y, _n_features_iter(X.shape[1], keep_rate), features, fold, str(out_dir), dev_size, resolved_random_state, RANK, ): normalized = normalize_rfe_result(payload) normalized["metrics"] = _normalize_metrics(estimator, normalized) if first_pass is None: first_pass = normalized results[normalized["n_features"]] = normalized return results, first_pass