Source code for dRFEtools.cli

"""Command-line interface for :mod:`dRFEtools`."""

from __future__ import annotations

import argparse
from pathlib import Path
from typing import Dict, Iterable, Tuple

import pandas as pd
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.linear_model import LinearRegression, LogisticRegression

from . import __version__, dev_rfe, rf_rfe

CLASSIFICATION_METRICS = {
    "nmi": 1,
    "accuracy": 2,
    "roc_auc": 3,
}
REGRESSION_METRICS = {
    "r2": 1,
    "mse": 2,
    "explained_variance": 3,
}
MINIMIZE = {"mse"}


def _load_dataset(
    data_path: Path, target: str
) -> Tuple[pd.DataFrame, pd.Series, Iterable[str]]:
    df = pd.read_csv(data_path)
    if target not in df.columns:
        raise ValueError(f"Target column '{target}' not found in {data_path}")
    X = df.drop(columns=[target])
    y = df[target]
    return X, y, X.columns


def _resolve_metric(task: str, metric: str | None) -> str:
    if task == "classification":
        allowed = CLASSIFICATION_METRICS
        default = "nmi"
    else:
        allowed = REGRESSION_METRICS
        default = "r2"

    if metric is None:
        return default
    if metric not in allowed:
        valid = ", ".join(sorted(allowed))
        raise ValueError(
            f"Metric '{metric}' is not valid for task '{task}'. Choose from: {valid}"
        )
    return metric


def _metric_index(task: str, metric: str) -> int:
    return (CLASSIFICATION_METRICS if task == "classification" else REGRESSION_METRICS)[
        metric
    ]


def _summarize_results(
    results: Dict[int, Tuple], task: str, metric: str
) -> Tuple[int, float]:
    idx = _metric_index(task, metric)
    comparator = min if metric in MINIMIZE else max
    best = comparator(results.values(), key=lambda record: record[idx])
    return best[0], best[idx]


def _results_frame(results: Dict[int, Tuple], task: str) -> pd.DataFrame:
    if task == "classification":
        columns = ["n_features", "nmi", "accuracy", "roc_auc", "indices"]
    else:
        columns = ["n_features", "r2", "mse", "explained_variance", "indices"]
    ordered = sorted(results.values(), key=lambda record: record[0])
    return pd.DataFrame(ordered, columns=columns)


def _ensure_output_dir(path: Path) -> Path:
    path.mkdir(parents=True, exist_ok=True)
    return path


[docs] def run_rf_rfe(args: argparse.Namespace) -> None: X, y, features = _load_dataset(Path(args.data), args.target) estimator: RandomForestClassifier | RandomForestRegressor if args.task == "classification": estimator = RandomForestClassifier( n_estimators=args.n_estimators, random_state=args.random_state, oob_score=True, n_jobs=args.n_jobs, ) else: estimator = RandomForestRegressor( n_estimators=args.n_estimators, random_state=args.random_state, oob_score=True, n_jobs=args.n_jobs, ) results, first_step = rf_rfe( estimator, X, y, features, args.fold, out_dir=str(_ensure_output_dir(Path(args.output_dir))), elimination_rate=args.elimination_rate, RANK=args.rank, ) metric = _resolve_metric(args.task, args.metric) best_n, best_score = _summarize_results(results, args.task, metric) print(f"First elimination step retained {first_step[0]} features.") direction = "lowest" if metric in MINIMIZE else "highest" print( f"Best {direction} {metric} achieved with {best_n} features: {best_score:.4f}" ) if args.save_summary: summary_path = Path(args.save_summary) _ensure_output_dir(summary_path.parent) _results_frame(results, args.task).to_csv(summary_path, index=False) print(f"Saved summary metrics to {summary_path}")
[docs] def run_dev_rfe(args: argparse.Namespace) -> None: X, y, features = _load_dataset(Path(args.data), args.target) estimator = ( LogisticRegression(max_iter=1000) if args.task == "classification" else LinearRegression() ) results, first_step = dev_rfe( estimator, X, y, features, args.fold, out_dir=str(_ensure_output_dir(Path(args.output_dir))), elimination_rate=args.elimination_rate, dev_size=args.dev_size, RANK=args.rank, SEED=args.seed, ) metric = _resolve_metric(args.task, args.metric) best_n, best_score = _summarize_results(results, args.task, metric) print(f"First elimination step retained {first_step[0]} features.") direction = "lowest" if metric in MINIMIZE else "highest" print( f"Best {direction} {metric} achieved with {best_n} features: {best_score:.4f}" ) if args.save_summary: summary_path = Path(args.save_summary) _ensure_output_dir(summary_path.parent) _results_frame(results, args.task).to_csv(summary_path, index=False) print(f"Saved summary metrics to {summary_path}")
[docs] def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Run dynamic recursive feature elimination workflows." ) parser.add_argument( "--version", action="version", version=f"dRFEtools {__version__}" ) subparsers = parser.add_subparsers(dest="command", required=True) common = { "data": dict(help="Path to a CSV file containing features and target column."), "target": dict(help="Name of the target column to predict."), "task": dict( choices=["classification", "regression"], default="classification" ), "output_dir": dict(default=".", help="Directory to write ranking artifacts."), "elimination_rate": dict( type=float, default=0.2, help="Fraction of features removed per iteration." ), "metric": dict(default=None, help="Metric used to pick the best iteration."), "fold": dict( type=int, default=1, help="Fold identifier used in saved outputs." ), "rank": dict( action="store_true", help="Persist feature ranking files during elimination.", ), "save_summary": dict( default=None, help="Optional path to write a CSV of iteration metrics." ), } rf_parser = subparsers.add_parser("rf-rfe", help="Run random-forest-based dRFE.") rf_parser.set_defaults(func=run_rf_rfe) rf_parser.add_argument("--data", required=True, **common["data"]) rf_parser.add_argument("--target", required=True, **common["target"]) rf_parser.add_argument("--task", **common["task"]) rf_parser.add_argument("--output-dir", **common["output_dir"]) rf_parser.add_argument("--elimination-rate", **common["elimination_rate"]) rf_parser.add_argument("--metric", **common["metric"]) rf_parser.add_argument("--fold", **common["fold"]) rf_parser.add_argument("--rank", **common["rank"]) rf_parser.add_argument("--save-summary", **common["save_summary"]) rf_parser.add_argument( "--n-estimators", type=int, default=200, help="Number of trees in the random forest.", ) rf_parser.add_argument( "--n-jobs", type=int, default=-1, help="Number of jobs used by the estimator." ) rf_parser.add_argument( "--random-state", type=int, default=13, help="Random state for reproducibility." ) dev_parser = subparsers.add_parser( "dev-rfe", help="Run development-set-based dRFE." ) dev_parser.set_defaults(func=run_dev_rfe) dev_parser.add_argument("--data", required=True, **common["data"]) dev_parser.add_argument("--target", required=True, **common["target"]) dev_parser.add_argument("--task", **common["task"]) dev_parser.add_argument("--output-dir", **common["output_dir"]) dev_parser.add_argument("--elimination-rate", **common["elimination_rate"]) dev_parser.add_argument("--metric", **common["metric"]) dev_parser.add_argument("--fold", **common["fold"]) dev_parser.add_argument("--rank", **common["rank"]) dev_parser.add_argument("--save-summary", **common["save_summary"]) dev_parser.add_argument( "--dev-size", type=float, default=0.2, help="Fraction reserved for the development split.", ) dev_parser.add_argument( "--seed", action="store_true", help="Use a deterministic train/dev split." ) return parser
def main(argv: list[str] | None = None) -> None: parser = build_parser() args = parser.parse_args(argv) args.func(args) if __name__ == "__main__": main()