import inspect
import warnings
from collections.abc import Callable, Hashable
from pathlib import Path
from time import time
from typing import Literal
import geopandas as gpd
import numpy as np
import pandas as pd
from joblib import Parallel, delayed, dump, load
from libpysal import graph
from scipy.spatial import KDTree
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin
from sklearn.model_selection import train_test_split
# TODO: summary
# TODO: formal documentation
# TODO: comments in code
__all__ = ["BaseClassifier", "BaseRegressor"]
def _triangular(distances: np.ndarray, bandwidth: np.ndarray | float) -> np.ndarray:
u = np.clip(distances / bandwidth, 0, 1)
return 1 - u
def _parabolic(distances: np.ndarray, bandwidth: np.ndarray | float) -> np.ndarray:
u = np.clip(distances / bandwidth, 0, 1)
return 1 - u**2
def _gaussian(distances: np.ndarray, bandwidth: np.ndarray | float) -> np.ndarray:
u = distances / bandwidth
return np.exp(-((u / 2) ** 2))
def _bisquare(distances: np.ndarray, bandwidth: np.ndarray | float) -> np.ndarray:
u = np.clip(distances / bandwidth, 0, 1)
return (1 - u**2) ** 2
def _cosine(distances: np.ndarray, bandwidth: np.ndarray | float) -> np.ndarray:
u = np.clip(distances / bandwidth, 0, 1)
return np.cos(np.pi / 2 * u)
def _exponential(distances: np.ndarray, bandwidth: np.ndarray | float) -> np.ndarray:
u = distances / bandwidth
return np.exp(-u)
def _boxcar(distances: np.ndarray, bandwidth: np.ndarray | float) -> np.ndarray:
r = (distances < bandwidth).astype(int)
return r
def _tricube(distances: np.ndarray, bandwidth: np.ndarray | float) -> np.ndarray:
u = np.clip(distances / bandwidth, 0, 1)
return (1 - u**3) ** 3
_kernel_functions = {
"triangular": _triangular,
"parabolic": _parabolic,
# "gaussian": _gaussian,
"bisquare": _bisquare,
"tricube": _tricube,
"cosine": _cosine,
"boxcar": _boxcar,
# "exponential": _exponential,
}
class _BaseModel(BaseEstimator):
"""Base class for geographically weighted models"""
def __init__(
self,
model,
*,
bandwidth: float | None = None,
fixed: bool = False,
kernel: Literal[
"triangular",
"parabolic",
# "gaussian",
"bisquare",
"tricube",
"cosine",
"boxcar",
# "exponential",
]
| Callable = "bisquare",
include_focal: bool = False,
geometry: gpd.GeoSeries | None = None,
graph: graph.Graph | None = None,
n_jobs: int = -1,
fit_global_model: bool = True,
strict: bool | None = False,
keep_models: bool | str | Path = False,
temp_folder: str | None = None,
batch_size: int | None = None,
verbose: bool = False,
**kwargs,
):
self.model = model
self.bandwidth = bandwidth
self.kernel = kernel
self.include_focal = include_focal
self.geometry = geometry
self.graph = graph
self.fixed = fixed
self._model_kwargs = kwargs
self.n_jobs = n_jobs
self.fit_global_model = fit_global_model
self.strict = strict
if isinstance(keep_models, str):
keep_models = Path(keep_models)
self.keep_models = keep_models
self.temp_folder = temp_folder
self.batch_size = batch_size
self.verbose = verbose
self._model_type = None
def _validate_geometry(self, geometry):
"""Validate that geometry contains only Point geometries"""
if not isinstance(geometry, gpd.GeoSeries):
raise ValueError(
f"geometry needs to be geopandas.GeoSeries. Got {type(geometry)}."
)
if geometry is not None and not (geometry.geom_type == "Point").all():
raise ValueError(
"Unsupported geometry type. Only point geometry is allowed."
)
def _build_weights(self) -> graph.Graph:
"""Build spatial weights graph"""
if not isinstance(self.bandwidth, float | int):
raise ValueError(
"Bandwidth is not a valid value. Needs to be float or int, "
f"got {self.bandwidth}."
)
if self.fixed: # fixed distance
weights = graph.Graph.build_kernel(
self.geometry,
kernel=_kernel_functions[self.kernel],
bandwidth=self.bandwidth,
)
else: # adaptive KNN
weights = graph.Graph.build_kernel(
self.geometry,
kernel="identity",
k=self.bandwidth - 1 if self.include_focal else self.bandwidth,
)
# post-process identity weights by the selected kernel
# and kernel bandwidth derived from each neighborhood
# the epsilon comes from MGWR to avoid division by zero
bandwidth = weights._adjacency.groupby(level=0).transform("max") * 1.0000001
weights = graph.Graph(
adjacency=_kernel_functions[self.kernel](weights._adjacency, bandwidth),
is_sorted=True,
)
if self.include_focal:
weights = weights.assign_self_weight(1)
return weights
def _setup_model_storage(self):
"""Setup model storage directory if needed"""
if isinstance(self.keep_models, Path):
self.keep_models.mkdir(exist_ok=True)
def _fit_models_batch(
self,
X: pd.DataFrame,
y: pd.Series,
weights: graph.Graph,
) -> list:
"""Fit models in batches or all at once"""
if self.batch_size:
training_output = []
num_groups = len(y)
indices = np.arange(num_groups)
for i in range(0, num_groups, self.batch_size):
if self.verbose:
print(
f"Processing batch {i // self.batch_size + 1} "
f"out of {(num_groups // self.batch_size) + 1}."
)
batch_indices = indices[i : i + self.batch_size]
subset_weights = weights._adjacency.loc[batch_indices, :]
index = subset_weights.index
_weight = subset_weights.values
X_focals = X.values[batch_indices]
batch_training_output = self._batch_fit(X, y, index, _weight, X_focals)
training_output.extend(batch_training_output)
else:
index = weights._adjacency.index
_weight = weights._adjacency.values
X_focals = X.values
training_output = self._batch_fit(X, y, index, _weight, X_focals)
return training_output
def _batch_fit(
self,
X: pd.DataFrame,
y: pd.Series,
index: pd.MultiIndex,
_weight: np.ndarray,
X_focals: np.ndarray,
) -> list:
"""Fit a batch of local models"""
data = X.copy()
data["_y"] = y
data = data.loc[index.get_level_values(1)]
data["_weight"] = _weight
grouper = data.groupby(index.get_level_values(0), sort=False)
invariant = grouper["_y"].nunique() == 1
if invariant.any():
if self.strict:
raise ValueError(
f"y at locations {invariant.index[invariant]} is invariant."
)
elif self.strict is None:
warnings.warn(
f"y at locations {invariant.index[invariant]} is invariant.",
stacklevel=3,
)
return Parallel(n_jobs=self.n_jobs, temp_folder=self.temp_folder)(
delayed(self._fit_local)(
self.model,
group,
name,
focal_x,
self._model_kwargs,
)
for (name, group), focal_x in zip(grouper, X_focals, strict=False)
)
def _fit_global_model(self, X: pd.DataFrame, y: pd.Series):
"""Fit global baseline model"""
if self._model_type == "random_forest":
self._model_kwargs["oob_score"] = True
# fit global model as a baseline
if "n_jobs" in inspect.signature(self.model).parameters:
self.global_model = self.model(n_jobs=self.n_jobs, **self._model_kwargs)
else:
self.global_model = self.model(**self._model_kwargs)
# see gh#44 - remove filter once oldest sklearn is 1.10
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message="'n_jobs' has no effect since 1.8 and will be removed in 1.10.",
category=FutureWarning,
)
self.global_model.fit(X=X, y=y)
def _store_model(self, local_model, name: Hashable):
"""Store or serialize local model"""
if self.keep_models is True: # if True, models are kept in memory
return local_model
elif isinstance(self.keep_models, Path): # if Path, models are saved to disk
p = f"{self.keep_models.joinpath(f'{name}.joblib')}"
with open(p, "wb") as f:
dump(local_model, f, protocol=5)
del local_model
return p
else:
del local_model
return None
def _compute_hat_value(
self, X: pd.DataFrame, weights: np.ndarray, focal_x: np.ndarray
) -> float:
"""
Compute the hat value (leverage) for the focal point.
For classification problems, this is an approximation rather than an ideal
solution but it should be good enough for bandwidth search.
Parameters:
-----------
X : pd.DataFrame
Design matrix of the local neighborhood
weights : numpy.ndarray
Spatial weights for the neighborhood
focal_x : numpy.ndarray
Feature vector of the focal point
Returns:
--------
float : hat value for the focal point
"""
try:
# Add intercept if not present
if not (X.iloc[:, 0] == 1).all():
X_with_intercept = np.column_stack([np.ones(len(X)), X.values])
focal_with_intercept = np.concatenate([[1], focal_x.flatten()])
else:
X_with_intercept = X.values
focal_with_intercept = focal_x.flatten()
# Compute (X^T W X)^(-1)
XtWX = X_with_intercept.T @ np.diag(weights) @ X_with_intercept
XtWX_inv = np.linalg.pinv(
XtWX
) # Use pseudo-inverse for numerical stability
# Hat value: h_ii = x_i^T (X^T W X)^(-1) x_i * w_i
hat_value = focal_with_intercept.T @ XtWX_inv @ focal_with_intercept
return hat_value
except (np.linalg.LinAlgError, ValueError):
# Return NaN if computation fails (singular matrix, etc.)
return np.nan
def _compute_information_criteria(self):
"""Compute AIC, BIC, and AICc using the global log likelihood"""
n = (
self._n_fitted_models
if hasattr(self, "_n_fitted_models")
else len(self.pred_)
)
# Use effective degrees of freedom as the number of parameters
k = self.effective_df_
if not np.isnan(self.log_likelihood_) and not np.isnan(k):
# Akaike Information Criterion
self.aic_ = 2 * (k + 1) - 2 * self.log_likelihood_
# Bayesian Information Criterion
# Cast n to float to avoid overload resolution issues with numpy.log
self.bic_ = np.log(n) * (k + 1) - 2 * self.log_likelihood_
# Corrected AIC for small samples
if n - k - 1 > 0:
self.aicc_ = self.aic_ + (2 * k * (k + 1)) / (n - k - 1)
# the implementation below matches MGWR but the formula above is
# typical AICc implementation. The difference is minor.
# self.aicc_ = -2.0 * self.log_likelihood_ + 2.0 * n * (k + 1.0) / (
# n - k - 2.0
# )
else:
self.aicc_ = np.nan
else:
self.aic_ = np.nan
self.bic_ = np.nan
self.aicc_ = np.nan
def _prepare_prediction_neighborhoods(
self, geometry: gpd.GeoSeries
) -> tuple[list, list]:
"""
Prepare neighborhood information for prediction on new observations.
Parameters
----------
geometry : geopandas.GeoSeries
Point geometries for new observations.
Returns
-------
tuple
- local_model_ids: list of arrays with local model indices per observation
- distances: list of arrays with kernel weights per observation
"""
self._validate_geometry(geometry)
if not (
isinstance(self.bandwidth, float | int)
and isinstance(self.geometry, gpd.GeoSeries)
):
raise ValueError(
"Bandwidth and geometry need to be specified to enable prediction."
)
if self.fixed:
input_ids, local_ids = self.geometry.sindex.query(
geometry, predicate="dwithin", distance=self.bandwidth
)
distance = _kernel_functions[self.kernel](
self.geometry.iloc[local_ids].distance(
geometry.iloc[input_ids], align=False
),
self.bandwidth,
)
else:
training_coords = self.geometry.get_coordinates()
tree = KDTree(training_coords)
query_coords = geometry.get_coordinates()
distances, indices_array = tree.query(query_coords, k=self.bandwidth)
# Flatten arrays for consistent format
input_ids = np.repeat(np.arange(len(geometry)), self.bandwidth)
local_ids = self._local_models.index[indices_array.flatten()].to_numpy()
distances = distances.flatten()
# For adaptive KNN, determine the bandwidth for each neighborhood
# by finding the max distance in each neighborhood
kernel_bandwidth = (
pd.Series(distances).groupby(input_ids).transform("max") + 1e-6
) # can't have 0
distance = _kernel_functions[self.kernel](distances, kernel_bandwidth)
split_indices = np.where(np.diff(input_ids))[0] + 1
local_model_ids = np.split(local_ids, split_indices)
distances = np.split(np.asarray(distance), split_indices)
return local_model_ids, distances
def _predict_local(
self,
x_: pd.DataFrame,
models_: np.ndarray,
distances_: np.ndarray,
):
"""
Make prediction for a single observation using local models.
Must be implemented by subclasses.
Parameters
----------
x_ : pd.DataFrame
Single-row DataFrame with features for the observation.
models_ : np.ndarray
Array of local model indices to use for prediction.
distances_ : np.ndarray
Array of kernel weights for each local model.
Returns
-------
Prediction result (type depends on subclass implementation).
"""
raise NotImplementedError("Subclasses must implement _predict_local")
# Abstract methods that subclasses must implement
def _fit_local(
self,
model,
data: pd.DataFrame,
name: Hashable,
focal_x: np.ndarray,
model_kwargs: dict,
) -> list[Hashable]:
raise NotImplementedError("Subclasses must implement _fit_local")
def fit(self, X: pd.DataFrame, y: pd.Series):
raise NotImplementedError("Subclasses must implement fit")
def _get_score_data(self, local_model, X, y): # noqa: ARG002
"""Subclasses should implement custom function"""
return np.nan
# def __sklearn_tags__(self):
# tags = super().__sklearn_tags__(self)
# return tags
[docs]
class BaseClassifier(ClassifierMixin, _BaseModel):
"""Generic geographically weighted classification meta-estimator.
This class wraps a scikit-learn-compatible *classifier class* and fits one local
model per focal observation using spatially varying sample weights.
The spatial interaction is defined either by (a) ``geometry`` + bandwidth/kernel
settings or (b) a precomputed :class:`libpysal.graph.Graph` passed via ``graph``.
Notes
-----
- ``y`` must be binary (``{0, 1}`` or boolean).
- To enable prediction on new data via :meth:`predict`/:meth:`predict_proba`, you
must set ``keep_models=True`` (store in memory) or ``keep_models=Path(...)``
(serialize to disk).
- Only point geometries are supported.
Parameters
----------
model : ClassifierMixin
Class implementing the scikit-learn classifier API (e.g.
:class:`sklearn.linear_model.LogisticRegression`). The class (not an instance)
is instantiated internally for each local model.
bandwidth : float | int | None
Bandwidth for defining neighborhoods.
- If ``fixed=True``, this is a distance threshold.
- If ``fixed=False``, this is the number of nearest neighbors used to form the
local neighborhood.
If ``graph`` is provided, ``bandwidth`` is ignored.
fixed : bool, optional
True for distance based bandwidth and False for adaptive (nearest neighbor)
bandwidth, by default False
kernel : str | Callable, optional
Type of kernel function used to weight observations, by default "bisquare"
include_focal : bool, optional
Include focal in the local model training. Excluding it allows assessment of
geographically weighted metrics on unseen data without a need for train/test
split, hence providing value for all samples. This is needed for further spatial
analysis of the model performance (and generalises to models that do not support
OOB scoring). However, it leaves out the most representative sample. By default
False
geometry : gpd.GeoSeries, optional
Geographic location of the observations in the sample. Used to determine the
spatial interaction weight based on specification by ``bandwidth``, ``fixed``,
``kernel``, and ``include_focal`` keywords. Either ``geometry`` or ``graph``
need to be specified. To allow prediction, it is required to specify
``geometry``.
graph : Graph, optional
Custom libpysal.graph.Graph object encoding the spatial interaction between
observations in the sample. If given, it is used directly and ``bandwidth``,
``fixed``, ``kernel``, and ``include_focal`` keywords are ignored. Either
``geometry`` or ``graph`` need to be specified. To allow prediction, it is
required to specify ``geometry``. Potentially, both can be specified where
``graph`` encodes spatial interaction between observations in ``geometry``.
n_jobs : int, optional
The number of jobs to run in parallel. ``-1`` means using all processors by
default ``-1``
fit_global_model : bool, optional
Determines if the global baseline model shall be fitted alongside the
geographically weighted, by default True
strict : bool | None, optional
Do not fit any models if at least one neighborhood has invariant ``y``, by
default False. None is treated as False but provides a warning if there are
invariant models.
keep_models : bool | str | Path, optional
Keep all local models (required for prediction), by default False. Note that for
some models, like random forests, the objects can be large. If string or Path is
provided, the local models are not held in memory but serialized to the disk
from which they are loaded in prediction.
temp_folder : str | None, optional
Folder to be used by the pool for memmapping large arrays for sharing memory
with worker processes, e.g., ``/tmp``. Passed to ``joblib.Parallel``, by default
None
batch_size : int | None, optional
Number of models to process in each batch. Specify batch_size if your models do
not fit into memory. By default None
min_proportion : float, optional
Minimum proportion of minority class for a model to be fitted, by default 0.2
undersample : bool | float, optional
Whether to apply random undersampling to balance classes.
If ``True``, undersample the majority class to match the minority class
(i.e., minority/majority ratio = 1.0).
If a float ``alpha > 0``, target a minority/majority ratio of ``alpha`` after
resampling, i.e. ``alpha = N_min / N_resampled_majority``.
By default False
leave_out : float | int, optional
Leave out a fraction (when float) or a set number (when int) of random
observations from each local model to be used to measure out-of-sample log loss
based on pooled samples from all the models. This is useful for bandwidth
selection for cases where some local models are not fitted due to local
invariance and resulting information criteria are not comparable.
random_state : int | None, optional
Random seed for reproducibility, by default None
verbose : bool, optional
Whether to print progress information, by default False
**kwargs
Additional keyword arguments passed to ``model`` initialisation
Attributes
----------
proba_ : pd.DataFrame
Probability predictions for focal locations based on a local model trained
around the point itself.
pred_ : pd.Series
Binary predictions for focal locations based on a local model trained around the
location itself.
hat_values_ : pd.Series
Hat values for each location (diagonal elements of hat matrix)
effective_df_ : float
Effective degrees of freedom (sum of hat values)
log_likelihood_ : float
Global log likelihood of the model
aic_ : float
Akaike information criterion of the model
aicc_ : float
Corrected Akaike information criterion to account for model complexity (smaller
bandwidths)
bic_ : float
Bayesian information criterion
prediction_rate_ : float
Proportion of models that are fitted, where the rest are skipped due to not
fulfilling ``min_proportion``.
left_out_y_ : numpy.ndarray
Array of ``y`` values left out when ``leave_out`` is set.
left_out_proba_ : numpy.ndarray
Array of probabilites on left out observations in local models when
``leave_out`` is set.
left_out_w_ : numpy.ndarray
Array of weights on left out observations in local models when
``leave_out`` is set.
Examples
--------
Fit a geographically weighted logistic regression by passing a scikit-learn
classifier class.
>>> import geopandas as gpd
>>> from geodatasets import get_path
>>> from sklearn.linear_model import LogisticRegression
>>> from gwlearn.base import BaseClassifier
>>> gdf = gpd.read_file(get_path('geoda.guerry'))
>>> X = gdf[['Crm_prp', 'Litercy', 'Donatns', 'Lottery']]
>>> y = gdf["Region"] == 'E'
>>> gw = BaseClassifier(
... LogisticRegression,
... bandwidth=30,
... fixed=False,
... geometry=gdf.representative_point(),
... include_focal=True,
... keep_models=True,
... max_iter=200,
... ).fit(X, y)
>>> gw.pred_.head()
0 True
1 False
2 False
3 True
4 True
dtype: boolean
"""
def __init__(
self,
model,
*,
bandwidth: float | None = None,
fixed: bool = False,
kernel: Literal[
"triangular",
"parabolic",
# "gaussian",
"bisquare",
"tricube",
"cosine",
"boxcar",
# "exponential",
]
| Callable = "bisquare",
include_focal: bool = False,
geometry: gpd.GeoSeries | None = None,
graph: graph.Graph | None = None,
n_jobs: int = -1,
fit_global_model: bool = True,
strict: bool | None = False,
keep_models: bool | str | Path = False,
temp_folder: str | None = None,
batch_size: int | None = None,
min_proportion: float = 0.2,
undersample: bool | float = False,
leave_out: float | int | None = None,
random_state: int | None = None,
verbose: bool = False,
**kwargs,
):
super().__init__(
model=model,
bandwidth=bandwidth,
fixed=fixed,
kernel=kernel,
include_focal=include_focal,
geometry=geometry,
graph=graph,
n_jobs=n_jobs,
fit_global_model=fit_global_model,
strict=strict,
keep_models=keep_models,
temp_folder=temp_folder,
batch_size=batch_size,
verbose=verbose,
**kwargs,
)
self.min_proportion = min_proportion
self.undersample = undersample
self.random_state = random_state
self.leave_out = leave_out
self._empty_score_data = None
self._empty_feature_imp = None
[docs]
def fit(self, X: pd.DataFrame, y: pd.Series) -> "BaseClassifier":
"""Fit geographically weighted local classification models.
This fits one local model per focal observation (subject to local invariance
checks and ``min_proportion``) and stores focal predictions.
Parameters
----------
X : pandas.DataFrame
Feature matrix.
y : pandas.Series
Binary target encoded as boolean or ``{0, 1}``.
Returns
-------
self
Fitted estimator.
Notes
-----
The neighborhood definition comes from either ``self.graph`` or from
``self.geometry`` + (``bandwidth``, ``fixed``, ``kernel``, ``include_focal``).
"""
self._start = time()
def _is_binary(series: pd.Series) -> bool:
"""Check if a pandas Series encodes a binary variable (bool or 0/1)."""
unique_values = set(np.unique(series))
# Check for boolean type
if series.dtype == bool or unique_values.issubset({True, False}):
return True
# Check for 0, 1 encoding
return bool(unique_values.issubset({0, 1}))
if not _is_binary(y):
raise ValueError("Only binary dependent variable is supported.")
if self.verbose:
print(f"{(time() - self._start):.2f}s: Building weights")
if self.graph is not None:
weights = self.graph
else:
self._validate_geometry(self.geometry)
weights = self._build_weights()
if self.verbose:
print(f"{(time() - self._start):.2f}s: Weights ready")
self._setup_model_storage()
self._global_classes = np.unique(y)
if isinstance(X, pd.DataFrame):
self.feature_names_in_ = X.columns.to_numpy()
else:
self.feature_names_in_ = np.arange(X.shape[1])
# fit the models
if self.verbose:
print(f"{(time() - self._start):.2f}s: Fitting the models")
training_output = self._fit_models_batch(X, y, weights)
if self.verbose:
print(f"{(time() - self._start):.2f}s: Models fitted")
if self.keep_models:
(
self._names,
self._n_labels,
self._score_data,
self._feature_importances,
focal_proba,
hat_values,
left_out_proba,
models,
) = zip(*training_output, strict=False)
self._local_models = pd.Series(models, index=self._names)
else:
(
self._names,
self._n_labels,
self._score_data,
self._feature_importances,
focal_proba,
hat_values,
left_out_proba,
) = zip(*training_output, strict=False)
self._n_labels = pd.Series(self._n_labels, index=self._names)
self.proba_ = pd.DataFrame(focal_proba, index=self._names)
# Store hat values and compute effective degrees of freedom
self.hat_values_ = pd.Series(hat_values, index=self._names)
self.effective_df_ = np.nansum(self.hat_values_)
# support both bool and 0, 1 encoding of binary variable
col = True if True in self.proba_.columns else 1
# global GW accuracy
nan_mask = self.proba_[col].isna()
self.pred_ = pd.Series(pd.NA, index=y.index, dtype="boolean")
self.pred_.loc[~nan_mask] = self.proba_[col][~nan_mask] > 0.5
self._n_fitted_models = (~self.proba_[col].isna()).sum()
self.prediction_rate_ = self._n_fitted_models / nan_mask.shape[0]
if self.leave_out and self.prediction_rate_ > 0:
self.left_out_y_ = np.concatenate([arr[1] for arr in left_out_proba])
self.left_out_proba_ = np.concatenate([arr[0] for arr in left_out_proba])
self.left_out_w_ = np.concatenate([arr[2] for arr in left_out_proba])
if self.fit_global_model:
if self.verbose:
print(f"{(time() - self._start):.2f}s: Fitting global model")
self._fit_global_model(X, y)
# Compute global log likelihood and information criteria
if self.verbose:
print(f"{(time() - self._start):.2f}s: Computing global likelihood")
self.log_likelihood_ = self._compute_global_log_likelihood(y)
if self.verbose:
print(f"{(time() - self._start):.2f}s: Computing information criteria")
self._compute_information_criteria()
return self
def _fit_local(
self,
model,
data: pd.DataFrame,
name: Hashable,
focal_x: np.ndarray,
model_kwargs: dict,
) -> list[Hashable]:
"""Fit individual local model"""
if self.undersample:
from .undersample import BinaryRandomUnderSampler
vc = data["_y"].value_counts()
n_labels = len(vc)
skip = n_labels == 1
if n_labels > 1:
skip = (vc.iloc[1] / vc.iloc[0]) < self.min_proportion
# empty data for skipped models
score_data = self._empty_score_data
feature_imp = self._empty_feature_imp
output = [
name,
n_labels,
score_data,
feature_imp,
pd.Series(np.nan, index=self._global_classes),
np.nan,
(np.zeros(shape=(0, 2)), data["_y"].iloc[:0], data["_weight"].iloc[:0]),
]
if self.keep_models:
output.append(None)
if skip:
return output
local_model = model(random_state=self.random_state, **model_kwargs)
if self.undersample:
if isinstance(self.undersample, float):
rus = BinaryRandomUnderSampler(
sampling_strategy=self.undersample, random_state=self.random_state
)
else:
rus = BinaryRandomUnderSampler(random_state=self.random_state)
data, _ = rus.fit_resample(data, data["_y"])
if self.leave_out:
try:
data, left_out_data = train_test_split(
data, test_size=self.leave_out, stratify=data["_y"]
)
except ValueError: # only 1 observation of True
return output
if len(data["_y"].value_counts()) == 1:
return output
X = data.drop(columns=["_y", "_weight"])
y = data["_y"]
local_model.fit(
X=X,
y=y,
sample_weight=data["_weight"],
)
focal_x_df = pd.DataFrame(focal_x.reshape(1, -1), columns=X.columns)
focal_proba = pd.Series(
local_model.predict_proba(focal_x_df).flatten(), index=local_model.classes_
)
hat_value = self._compute_hat_value(X, data["_weight"], focal_x)
if self.leave_out:
left_out_proba = local_model.predict_proba(
left_out_data.drop(columns=["_y", "_weight"])
)
left_out_proba = (
left_out_proba,
left_out_data["_y"],
left_out_data["_weight"],
)
else:
left_out_proba = None
output = [
name,
n_labels,
self._get_score_data(local_model, X, y),
getattr(local_model, "feature_importances_", None),
focal_proba,
hat_value,
left_out_proba,
]
if self.keep_models:
output.append(self._store_model(local_model, name))
else:
del local_model
return output
def _compute_global_log_likelihood(self, y: pd.Series) -> float:
"""Compute a pooled (global) log-likelihood from focal probabilities."""
# Get valid predictions (non-NaN)
mask = ~self.proba_.isna().any(axis=1)
if not mask.any():
return np.nan
y_valid = y[mask]
proba_valid = self.proba_[mask]
# Handle both boolean and 0/1 encoding
if True in proba_valid.columns:
p = proba_valid[True]
y_binary = y_valid.astype(int) if y_valid.dtype == bool else y_valid
else:
p = proba_valid[1]
y_binary = y_valid
# Clip probabilities to avoid log(0)
p = np.clip(p, 1e-15, 1 - 1e-15)
log_likelihood = np.sum(y_binary * np.log(p) + (1 - y_binary) * np.log(1 - p))
return log_likelihood
[docs]
def predict_proba(self, X: pd.DataFrame, geometry: gpd.GeoSeries) -> pd.DataFrame:
"""Predict class probabilities for new observations.
Parameters
----------
X : pandas.DataFrame
Feature matrix for new observations.
geometry : geopandas.GeoSeries
Point geometries for new observations.
Returns
-------
pandas.DataFrame
Predicted probabilities with columns equal to the global classes observed
during fit.
Notes
-----
Requires the estimator to have been fit with ``keep_models=True`` (or a
``Path``) so local models can be used at prediction time.
"""
local_model_ids, distances = self._prepare_prediction_neighborhoods(geometry)
data = [X.iloc[[i]] for i in range(len(X))]
probabilities = []
for x_, models_, distances_ in zip(
data, local_model_ids, distances, strict=True
):
probabilities.append(self._predict_local(x_, models_, distances_))
return pd.DataFrame(probabilities, columns=self._global_classes, index=X.index)
def _predict_local(
self,
x_: pd.DataFrame,
models_: np.ndarray,
distances_: np.ndarray,
) -> pd.Series:
pred = []
for i in models_:
local_model = self._local_models[i]
if isinstance(local_model, str):
with open(local_model, "rb") as f:
local_model = load(f)
if local_model is not None:
pred.append(
pd.Series(
local_model.predict_proba(x_).flatten(),
index=local_model.classes_,
)
)
else:
pred.append(
pd.Series(
np.nan,
index=self._global_classes,
)
)
pred = pd.DataFrame(pred)
mask = pred.isna().any(axis=1)
if mask.all():
return pd.Series(np.nan, index=pred.columns)
weighted = np.average(pred[~mask], axis=0, weights=distances_[~mask])
# normalize
weighted = weighted / weighted.sum()
return pd.Series(weighted, index=pred.columns)
[docs]
def predict(self, X: pd.DataFrame, geometry: gpd.GeoSeries) -> pd.Series:
"""Predict classes for new observations.
This is equivalent to ``predict_proba(...).idxmax(axis=1)``.
Parameters
----------
X : pandas.DataFrame
Feature matrix for new observations.
geometry : geopandas.GeoSeries
Point geometries for new observations.
Returns
-------
pandas.Series
Predicted class.
Notes
-----
Requires the estimator to have been fit with ``keep_models=True`` (or a
``Path``) so local models can be used at prediction time.
"""
proba = self.predict_proba(X, geometry)
return proba.idxmax(axis=1)
[docs]
def local_metric(self, func: Callable, *args, **kwargs) -> np.ndarray:
"""Compute a metric per fitted local model.
Parameters
----------
func : callable
Callable with a signature ``func(y_true, y_pred, *args, **kwargs)``.
Returns
-------
numpy.ndarray
One value per focal location (NaN for skipped / unfitted local models).
"""
results = []
for y, y_pred in zip(self._y_local, self._pred_local, strict=True):
if y.shape[0] == 0:
results.append(np.nan)
else:
results.append(func(y, y_pred, *args, **kwargs))
return np.array(results)
[docs]
class BaseRegressor(_BaseModel, RegressorMixin):
"""Generic geographically weighted regression meta-estimator.
This class wraps a scikit-learn-compatible *regressor class* and fits one local
model per focal observation using spatially varying sample weights.
The fitted object exposes focal predictions (``pred_``, in-sample if
``include_focal=True``) and local goodness-of-fit summaries.
Prediction for new (out-of-sample) observations is not currently implemented for
regressors.
Notes
-----
- Only point geometries are supported.
Parameters
----------
model : RegressorMixin
Class implementing the scikit-learn regressor API (e.g.
:class:`sklearn.linear_model.LinearRegression`). The class (not an instance)
is instantiated internally for each local model.
bandwidth : float | int | None
Bandwidth for defining neighborhoods.
- If ``fixed=True``, this is a distance threshold.
- If ``fixed=False``, this is the number of nearest neighbors used to form the
local neighborhood.
If ``graph`` is provided, ``bandwidth`` is ignored.
fixed : bool, optional
True for distance based bandwidth and False for adaptive (nearest neighbor)
bandwidth, by default False
kernel : str | Callable, optional
Type of kernel function used to weight observations, by default "bisquare"
include_focal : bool, optional
Include focal in the local model training. Excluding it allows
assessment of geographically weighted metrics on unseen data without a need for
train/test split, hence providing value for all samples. This is needed for
further spatial analysis of the model performance (and generalises to models
that do not support OOB scoring). However, it leaves out the most representative
sample. By default False
geometry : gpd.GeoSeries, optional
Geographic location of the observations in the sample. Used to determine the
spatial interaction weight based on specification by ``bandwidth``, ``fixed``,
``kernel``, and ``include_focal`` keywords. Either ``geometry`` or ``graph``
need to be specified. To allow prediction, it is required to specify
``geometry``.
graph : Graph, optional
Custom libpysal.graph.Graph object encoding the spatial interaction between
observations in the sample. If given, it is used directly and ``bandwidth``,
``fixed``, ``kernel``, and ``include_focal`` keywords are ignored. Either
``geometry`` or ``graph`` need to be specified. To allow prediction, it is
required to specify ``geometry``. Potentially, both can be specified where
``graph`` encodes spatial interaction between observations in ``geometry``.
n_jobs : int, optional
The number of jobs to run in parallel. ``-1`` means using all processors by
default ``-1``
fit_global_model : bool, optional
Determines if the global baseline model shall be fitted alongside
the geographically weighted, by default True
strict : bool | None, optional
Do not fit any models if at least one neighborhood has invariant ``y``,
by default False. None is treated as False but provides a warning if there are
invariant models.
keep_models : bool | str | Path, optional
Keep all local models (required for prediction), by default False. Note that
for some models, like random forests, the objects can be large. If string or
Path is provided, the local models are not held in memory but serialized to
the disk from which they are loaded in prediction.
temp_folder : str | None, optional
Folder to be used by the pool for memmapping large arrays for sharing memory
with worker processes, e.g., ``/tmp``. Passed to ``joblib.Parallel``, by default
None
batch_size : int | None, optional
Number of models to process in each batch. Specify batch_size if your models do
not fit into memory. By default None
verbose : bool, optional
Whether to print progress information, by default False
**kwargs
Additional keyword arguments passed to ``model`` initialisation
Attributes
----------
pred_ : pd.Series
Focal predictions for each location.
resid_ : pd.Series
Residuals for each location (``y`` - ``pred_``).
RSS_ : pd.Series
Residual sum of squares for each location.
TSS_ : pd.Series
Total sum of squares for each location.
y_bar_ : pd.Series
Weighted mean of y for each location.
local_r2_ : pd.Series
Local R2 for each location.
hat_values_ : pd.Series
Hat values for each location (diagonal elements of hat matrix).
effective_df_ : float
Effective degrees of freedom (sum of hat values).
log_likelihood_ : float
Global log likelihood of the model.
aic_ : float
Akaike information criterion of the model.
aicc_ : float
Corrected Akaike information criterion to account for model
complexity (smaller bandwidths).
bic_ : float
Bayesian information criterion.
Examples
--------
>>> import geopandas as gpd
>>> from geodatasets import get_path
>>> from sklearn.linear_model import LinearRegression
>>> from gwlearn.base import BaseRegressor
>>> gdf = gpd.read_file(get_path('geoda.guerry'))
>>> X = gdf[['Crm_prp', 'Litercy', 'Donatns', 'Lottery']]
>>> y = gdf["Suicids"]
>>> gwr = BaseRegressor(
... LinearRegression,
... bandwidth=30,
... fixed=False,
... include_focal=True,
... geometry=gdf.representative_point(),
... ).fit(X, y)
>>> gwr.local_r2_.head()
0 0.614715
1 0.488495
2 0.599862
3 0.662435
4 0.662276
dtype: float64
"""
[docs]
def fit(self, X: pd.DataFrame, y: pd.Series) -> "BaseRegressor":
"""Fit geographically weighted local regression models.
Fits one local model per focal observation and stores focal (in-sample if
``include_focal=True``) predictions in ``pred_``.
Parameters
----------
X : pandas.DataFrame
Feature matrix.
y : pandas.Series
Target values.
Returns
-------
self
Fitted estimator.
Notes
-----
The neighborhood definition comes from either ``self.graph`` or from
``self.geometry`` + (``bandwidth``, ``fixed``, ``kernel``, ``include_focal``).
"""
self._validate_geometry(self.geometry)
weights = self.graph if self.graph is not None else self._build_weights()
self._setup_model_storage()
# fit the models
training_output = self._fit_models_batch(X, y, weights)
if self.keep_models:
(
self._names,
focal_pred,
y_bar,
tss,
hat_values, # Add this
self._score_data,
models,
) = zip(*training_output, strict=False)
self._local_models = pd.Series(models, index=self._names)
else:
(
self._names,
focal_pred,
y_bar,
tss,
hat_values, # Add this
self._score_data,
) = zip(*training_output, strict=False)
self.pred_ = pd.Series(np.nan, index=y.index)
self.pred_.loc[np.array(self._names)] = focal_pred
self.resid_ = y - self.pred_
resids_ = (
weights.adjacency.values
* self.resid_.loc[weights.adjacency.index.get_level_values(1)] ** 2
)
self.RSS_ = resids_.groupby(weights.adjacency.index.get_level_values(0)).sum()
self.TSS_ = pd.Series(tss, index=self._names)
self.y_bar_ = pd.Series(y_bar, index=self._names)
self.local_r2_ = (self.TSS_ - self.RSS_) / self.TSS_
if self.fit_global_model:
self._fit_global_model(X, y)
# Store hat values
self.hat_values_ = pd.Series(hat_values, index=self._names)
# Compute effective degrees of freedom (trace of hat matrix)
self.effective_df_ = np.nansum(self.hat_values_)
# adjusted R2
# n = len(self.pred_)
# if not np.isnan(self.focal_r2_) and not np.isnan(self.effective_df_):
# if n - self.effective_df_ - 1 > 0:
# self.focal_adj_r2_ = 1 - (
# (1 - self.focal_r2_) * (n - 1) / (n - self.effective_df_ - 1)
# )
# else:
# self.focal_adj_r2_ = np.nan
self.log_likelihood_ = self._compute_global_log_likelihood()
self._compute_information_criteria()
return self
[docs]
def predict(self, X: pd.DataFrame, geometry: gpd.GeoSeries) -> pd.Series:
"""Predict target values for new observations.
Parameters
----------
X : pandas.DataFrame
Feature matrix for new observations.
geometry : geopandas.GeoSeries
Point geometries for new observations.
Returns
-------
pandas.Series
Predicted values.
Notes
-----
Requires the estimator to have been fit with ``keep_models=True`` (or a
``Path``) so local models can be used at prediction time.
"""
local_model_ids, distances = self._prepare_prediction_neighborhoods(geometry)
data = [X.iloc[[i]] for i in range(len(X))]
predictions = []
for x_, models_, distances_ in zip(
data, local_model_ids, distances, strict=True
):
predictions.append(self._predict_local(x_, models_, distances_))
return pd.Series(predictions, index=X.index)
def _predict_local(
self,
x_: pd.DataFrame,
models_: np.ndarray,
distances_: np.ndarray,
) -> float:
pred = []
for i in models_:
local_model = self._local_models[i]
if isinstance(local_model, str):
with open(local_model, "rb") as f:
local_model = load(f)
if local_model is not None:
pred.append(local_model.predict(x_).flatten()[0])
else:
pred.append(np.nan)
pred = np.array(pred)
mask = np.isnan(pred)
if mask.all():
return np.nan
weighted = np.average(pred[~mask], weights=distances_[~mask])
return weighted
def _fit_local(
self,
model,
data: pd.DataFrame,
name: Hashable,
focal_x: np.ndarray,
model_kwargs: dict,
) -> list[Hashable]:
local_model = model(**model_kwargs)
X = data.drop(columns=["_y", "_weight"])
y = data["_y"]
local_model.fit(
X=X,
y=y,
sample_weight=data["_weight"],
)
focal_x_df = pd.DataFrame(focal_x.reshape(1, -1), columns=X.columns)
focal_pred = local_model.predict(focal_x_df).flatten()[0]
y_bar = self._y_bar(y, data["_weight"])
tss = self._tss(y, y_bar, data["_weight"])
# Compute hat value for this location
hat_value = self._compute_hat_value(X, data["_weight"], focal_x)
output = [
name,
focal_pred,
y_bar,
tss,
hat_value, # Add hat value to output
self._get_score_data(local_model, X, y),
]
if self.keep_models:
output.append(self._store_model(local_model, name))
else:
del local_model
return output
def _compute_global_log_likelihood(self) -> float:
"""
Compute the global log likelihood for the entire GWR model
Parameters:
-----------
y : pd.Series
Original target values
Returns:
--------
float : Global log likelihood value for the entire GWR model
"""
residuals = self.resid_
n = len(residuals)
# Estimate sigma from the residuals
sigma = np.sqrt(np.sum(residuals**2) / n)
if sigma <= 0:
return np.nan
# Global log likelihood assuming Gaussian errors
log_likelihood = (
-n / 2 * np.log(2 * np.pi)
- n * np.log(sigma)
- np.sum(residuals**2) / (2 * sigma**2)
)
return log_likelihood
def _y_bar(self, y, w_i):
"""weighted mean of y"""
sum_yw = np.sum(y * w_i)
return sum_yw / np.sum(w_i)
def _tss(self, y, y_bar, w_i):
"""geographically weighted total sum of squares"""
return np.sum(w_i * (y - y_bar) ** 2)