"""Regression + summary helpers mirroring the R ``moderndive`` package.
Built on ``statsmodels`` (which, unlike scikit-learn, reports standard errors,
test statistics, p-values, and confidence intervals — the inferential output the
book teaches). Both ``ols()`` (linear) and ``glm()`` (e.g. logistic) fitted
models are supported. Inputs may be polars or pandas frames; outputs are polars.
- :func:`get_regression_table` ~ ``broom::tidy`` + CIs
- :func:`get_regression_points` ~ ``broom::augment`` (fitted values + residuals)
- :func:`get_regression_summaries` ~ ``broom::glance``
- :func:`tidy_summary` ~ per-variable summary statistics
"""
from __future__ import annotations
import re
import numpy as np
import polars as pl
from ._messaging import helpful_error
__all__ = [
"get_regression_table",
"get_regression_points",
"get_regression_summaries",
"tidy_summary",
"count_missing",
]
def _to_pandas(data):
if isinstance(data, pl.DataFrame):
return data.to_pandas()
return data
def _check_model(model) -> None:
"""Beginner-friendly guard: ``model`` must be a fitted statsmodels result."""
if not (hasattr(model, "params") and hasattr(model, "model")):
raise TypeError(
helpful_error(
f"Expected a fitted statsmodels model, got {type(model).__name__}.",
'Fit one first, e.g. smf.ols("y ~ x", data).fit() or smf.glm(...).fit().',
)
)
def _is_glm(model) -> bool:
"""A fitted statsmodels GLM result (logistic, Poisson, …) exposes ``family``."""
return hasattr(model, "family")
def _clean_term(term: str) -> str:
"""Turn patsy term labels into the tidy names moderndive uses.
Examples:
``Intercept`` / ``const`` -> ``intercept``
``income[T.Lower middle income]`` -> ``income: Lower middle income``
``life_exp:income[T.High income]`` -> ``life_exp:income: High income``
``C(income, levels=[...])[T.High income]`` -> ``income: High income``
"""
if term == "const": # the constant column from sm.add_constant (array API)
return "intercept"
term = term.replace("Intercept", "intercept")
# Drop any C(col, ...) wrapper down to just the column name.
term = re.sub(r"C\(\s*([A-Za-z_]\w*)[^)]*\)", r"\1", term)
# Turn a categorical level marker [T.level] into ": level".
term = re.sub(r"\[T\.(.*?)\]", r": \1", term)
return term
def _term_names(model) -> list[str]:
"""Coefficient names for the table. Works for formula/pandas (``params`` is a
Series with an index) and array-API numpy fits (``params`` is a bare array,
so fall back to ``exog_names``)."""
index = getattr(model.params, "index", None)
return list(index) if index is not None else list(model.model.exog_names)
def _has_formula_frame(model) -> bool:
"""Whether the model carries the original data + formula (the formula API).
True for ``smf.ols("y ~ x", data).fit()``; False for an array-API fit like
``sm.OLS(y, X).fit()``, which exposes only the design matrix.
"""
frame = getattr(getattr(model.model, "data", None), "frame", None)
return frame is not None and getattr(model.model, "formula", None) is not None
def _clean_name(expr: str) -> str:
"""Sanitize a transformed term into a tidy column name.
``np.log(mpg)`` -> ``log_mpg``; ``np.sqrt(price)`` -> ``sqrt_price``.
"""
expr = expr.replace("np.", "")
return re.sub(r"[^0-9A-Za-z]+", "_", expr).strip("_").lower()
def _outcome_info(model) -> tuple[str, str, bool]:
"""Resolve the outcome's tidy column name (and whether the LHS is transformed).
An untransformed LHS like ``mpg`` keeps its name; a transformed LHS like
``np.log(mpg)`` becomes ``log_mpg`` (with ``log_mpg_hat`` for fitted values).
"""
endog = model.model.endog_names
if endog in model.model.data.frame.columns:
return endog, f"{endog}_hat", False
name = _clean_name(endog)
return name, f"{name}_hat", True
def _predictor_vars(model) -> list[str]:
"""Original predictor variable names from the RHS, in formula order.
For ``y ~ poly(hp, 2) + wt`` this returns ``["hp", "wt"]`` — the original
columns, never the patsy basis/transform columns.
"""
frame_cols = list(model.model.data.frame.columns)
rhs = model.model.formula.split("~", 1)[1]
ordered: list[str] = []
for token in re.findall(r"[A-Za-z_]\w*", rhs):
if token in frame_cols and token not in ordered:
ordered.append(token)
return ordered
def _intercept_only_term(term: str) -> str:
"""Map only the intercept term to ``intercept``; leave everything else raw."""
return "intercept" if term in ("Intercept", "const") else term
[docs]
def get_regression_table(
model,
digits: int = 3,
conf_level: float = 0.95,
exponentiate: bool = False,
default_categorical_levels: bool = False,
) -> pl.DataFrame:
"""Tidy regression table: term, estimate, std_error, statistic, p_value, lower/upper_ci.
``model`` is a fitted ``statsmodels`` results object — either OLS
(``smf.ols("y ~ x", data).fit()``) or GLM (``smf.glm(...).fit()``).
For GLMs with a log or logit link, pass ``exponentiate=True`` to report the
coefficient estimate and its confidence interval as rate / odds ratios
(``std_error``, ``statistic``, and ``p_value`` stay on the model's link scale,
matching ``broom::tidy``).
By default, categorical-predictor terms are prettified (e.g.
``income[T.High income]`` → ``income: High income``). Pass
``default_categorical_levels=True`` to keep the raw statsmodels term names.
"""
_check_model(model)
conf = np.asarray(model.conf_int(alpha=1 - conf_level), dtype=float)
estimate = np.asarray(model.params, dtype=float)
lower, upper = conf[:, 0], conf[:, 1]
if exponentiate:
estimate, lower, upper = np.exp(estimate), np.exp(lower), np.exp(upper)
name_fn = _intercept_only_term if default_categorical_levels else _clean_term
table = pl.DataFrame(
{
"term": [name_fn(t) for t in _term_names(model)],
"estimate": estimate,
"std_error": np.asarray(model.bse, dtype=float),
"statistic": np.asarray(model.tvalues, dtype=float),
"p_value": np.asarray(model.pvalues, dtype=float),
"lower_ci": lower,
"upper_ci": upper,
}
)
numeric = [c for c in table.columns if c != "term"]
return table.with_columns(pl.col(numeric).round(digits))
[docs]
def get_regression_points(
model, digits: int = 3, *, newdata=None, ID: str | None = None
) -> pl.DataFrame:
"""Fitted values + residuals per observation (~ ``broom::augment``).
Columns: ``ID``, the outcome, each original predictor, ``<outcome>_hat``,
``residual``. In-formula transformations are handled gracefully: a
transformed outcome (``np.log(mpg)``) is shown on the model's scale under a
sanitized name (``log_mpg`` / ``log_mpg_hat``), and transformed predictors
(``poly()``, ``scale()``, ``I()``) are shown as their original columns rather
than leaking basis matrices. For GLMs, fitted values and residuals are on the
response scale (e.g. probabilities for logistic regression).
Pass ``newdata`` (a polars/pandas frame) to apply the model to new
observations: predictions are returned, plus a ``residual`` if the outcome is
present in ``newdata``. ``ID`` names a column to use as the identifier (placed
first); without it, ``ID`` is ``1..n``.
"""
_check_model(model)
if newdata is not None:
return _points_newdata(model, newdata, digits, ID)
name, name_hat, out = (
_points_columns_formula(model)
if _has_formula_frame(model)
else _points_columns_array(model)
)
fitted = np.asarray(model.fittedvalues, dtype=float)
if _is_glm(model):
residual = np.asarray(model.resid_response, dtype=float)
else:
residual = np.asarray(out[name], dtype=float) - fitted
out[name_hat] = fitted
out["residual"] = residual
if ID is not None:
out = {ID: _id_values(model, ID), **out}
df = pl.DataFrame(out).drop_nulls()
if ID is None:
df = df.with_row_index("ID", offset=1).with_columns(pl.col("ID").cast(pl.Int64))
df = df.select("ID", *[c for c in df.columns if c != "ID"])
float_cols = [c for c, dt in df.schema.items() if dt.is_float()]
return df.with_columns(pl.col(float_cols).round(digits))
def _id_values(model, ID: str):
"""Pull the ID column from the model's source frame (formula API only)."""
if not _has_formula_frame(model):
raise TypeError(
helpful_error(
f"ID={ID!r} needs a formula-API model so the source data is available.",
'Refit with smf.ols("y ~ x", data).fit(), or pass newdata=.',
)
)
frame = model.model.data.frame
if ID not in frame.columns:
raise ValueError(
helpful_error(
f"ID column {ID!r} is not in the model's data.",
f"Available columns: {', '.join(map(str, frame.columns))}.",
)
)
return np.asarray(frame[ID])
def _points_newdata(model, newdata, digits: int, ID: str | None) -> pl.DataFrame:
"""Apply ``model`` to ``newdata``: predictions (+ residual if outcome present)."""
nd = _to_pandas(newdata)
name, name_hat, _ = _outcome_info(model)
pvars = _predictor_vars(model)
missing = [c for c in pvars if c not in nd.columns]
if missing:
raise ValueError(
helpful_error(
f"newdata is missing predictor column(s): {', '.join(missing)}.",
f"newdata has: {', '.join(map(str, nd.columns))}.",
)
)
fitted = np.asarray(model.predict(nd), dtype=float)
out = {}
if ID is not None:
if ID not in nd.columns:
raise ValueError(
helpful_error(
f"ID column {ID!r} is not in newdata.",
f"newdata has: {', '.join(map(str, nd.columns))}.",
)
)
out[ID] = np.asarray(nd[ID])
has_outcome = name in nd.columns
if has_outcome:
out[name] = np.asarray(nd[name])
for predictor in pvars:
out[predictor] = np.asarray(nd[predictor])
out[name_hat] = fitted
if has_outcome:
out["residual"] = np.asarray(out[name], dtype=float) - fitted
df = pl.DataFrame(out).drop_nulls()
if ID is None:
df = df.with_row_index("ID", offset=1).with_columns(pl.col("ID").cast(pl.Int64))
df = df.select("ID", *[c for c in df.columns if c != "ID"])
float_cols = [c for c, dt in df.schema.items() if dt.is_float()]
return df.with_columns(pl.col(float_cols).round(digits))
def _points_columns_formula(model):
"""Outcome + original predictor columns for a formula-API model (transform-aware)."""
frame = _to_pandas(model.model.data.frame)
name, name_hat, transformed = _outcome_info(model)
outcome_vals = (
np.asarray(model.model.endog, dtype=float) if transformed else np.asarray(frame[name])
)
out = {name: outcome_vals}
for predictor in _predictor_vars(model):
out[predictor] = np.asarray(frame[predictor])
return name, name_hat, out
def _points_columns_array(model):
"""Outcome + predictor columns for an array-API model (e.g. ``sm.OLS(y, X)``).
There are no in-formula transforms here, so each non-constant design-matrix
column is a predictor and the outcome is the endog array.
"""
name = model.model.endog_names or "y"
out = {name: np.asarray(model.model.endog, dtype=float)}
exog = np.asarray(model.model.exog, dtype=float)
for j, raw in enumerate(model.model.exog_names):
if raw in ("const", "Intercept"):
continue
out[raw] = exog[:, j]
return name, f"{name}_hat", out
[docs]
def get_regression_summaries(model, digits: int = 3) -> pl.DataFrame:
"""Model-fit summaries as a tidy 1-row frame (~ ``broom::glance``).
For an **OLS** model: ``r_squared``, ``adj_r_squared``, ``mse``, ``rmse``,
``sigma``, ``statistic`` (overall F), ``p_value``, ``df``, ``nobs``.
For a **GLM** (no R² applies): ``mse``, ``rmse``, ``deviance``,
``null_deviance``, ``aic``, ``bic``, ``log_lik``, ``df_residual``,
``df_null``, ``nobs``. ``mse``/``rmse`` use response-scale residuals.
``mse`` is the mean squared residual using ``n`` in the denominator (so
``rmse = sqrt(mse)``); for OLS ``sigma`` is the residual standard error
using ``n - p`` — matching the R package.
"""
_check_model(model)
nobs = int(model.nobs)
if _is_glm(model):
res = np.asarray(model.resid_response, dtype=float)
mse = float(np.mean(res**2))
table = pl.DataFrame(
{
"mse": [mse],
"rmse": [float(np.sqrt(mse))],
"deviance": [float(model.deviance)],
"null_deviance": [float(model.null_deviance)],
"aic": [float(model.aic)],
"bic": [float(model.bic_llf)],
"log_lik": [float(model.llf)],
"df_residual": [int(model.df_resid)],
"df_null": [nobs - 1],
"nobs": [nobs],
}
)
else:
mse = float(model.ssr) / nobs
table = pl.DataFrame(
{
"r_squared": [float(model.rsquared)],
"adj_r_squared": [float(model.rsquared_adj)],
"mse": [mse],
"rmse": [float(np.sqrt(mse))],
"sigma": [float(np.sqrt(model.mse_resid))],
"statistic": [float(model.fvalue)],
"p_value": [float(model.f_pvalue)],
"df": [int(model.df_model)],
"nobs": [nobs],
}
)
float_cols = [c for c, dt in table.schema.items() if dt.is_float()]
return table.with_columns(pl.col(float_cols).round(digits))
[docs]
def tidy_summary(data, columns: list[str] | None = None, digits: int = 3) -> pl.DataFrame:
"""Per-variable summary statistics for the selected columns.
Mirrors the R ``moderndive::tidy_summary`` column layout:
``column, n, group, type, min, Q1, mean, median, Q3, max, sd``.
Numeric columns get the five-number summary + mean/sd; non-numeric columns
report ``n`` and ``type`` with the numeric fields left null.
"""
df = data if isinstance(data, pl.DataFrame) else pl.from_pandas(data)
columns = columns or df.columns
rows = []
for col in columns:
series = df[col]
is_num = series.dtype.is_numeric()
n = int(series.len() - series.null_count())
row = {
"column": col,
"n": n,
"group": None,
"type": "numeric" if is_num else "categorical",
"min": None,
"Q1": None,
"mean": None,
"median": None,
"Q3": None,
"max": None,
"sd": None,
}
if is_num:
s = series.drop_nulls()
row.update(
min=round(float(s.min()), digits),
Q1=round(float(s.quantile(0.25)), digits),
mean=round(float(s.mean()), digits),
median=round(float(s.median()), digits),
Q3=round(float(s.quantile(0.75)), digits),
max=round(float(s.max()), digits),
sd=round(float(s.std()), digits),
)
rows.append(row)
schema = {
"column": pl.Utf8,
"n": pl.Int64,
"group": pl.Utf8,
"type": pl.Utf8,
"min": pl.Float64,
"Q1": pl.Float64,
"mean": pl.Float64,
"median": pl.Float64,
"Q3": pl.Float64,
"max": pl.Float64,
"sd": pl.Float64,
}
return pl.DataFrame(rows, schema=schema)
[docs]
def count_missing(data, columns: list[str] | None = None) -> pl.DataFrame:
"""Count missing (``null``) values in each column.
A beginner-friendly alternative to ``df.select(pl.all().is_null().sum())``:
it returns a tidy two-column data frame with one row per column
(``column``, ``n_missing``), sorted from most to fewest missing values so the
columns needing attention surface first.
Parameters
----------
data:
A polars (or pandas) data frame.
columns:
Optional list of column names to check; defaults to every column.
"""
df = data if isinstance(data, pl.DataFrame) else pl.from_pandas(data)
columns = columns or df.columns
return pl.DataFrame(
{
"column": columns,
"n_missing": [int(df[col].null_count()) for col in columns],
},
schema={"column": pl.Utf8, "n_missing": pl.Int64},
).sort("n_missing", descending=True)