from collections.abc import Callable
from typing import Literal
import geopandas as gpd
import numpy as np
import pandas as pd
from scipy.spatial.distance import pdist
from sklearn import metrics
[docs]
class BandwidthSearch:
"""Optimal bandwidth search for geographically weighted estimators.
Reports information criteria and (optionally) other scores from multiple models with
varying bandwidth. When using golden section search, it minimizes (or maximizes)
the chosen ``criterion``.
When using classification models with a defined ``min_proportion``, keep in mind
that some locations may be excluded from the final model. In such a case, the
information criteria are typically not comparable across models with different
bandwidths and shall not be used to determine the optimal one.
Parameters
----------
model : type
A geographically weighted estimator class (e.g.
:class:`gwlearn.linear_model.GWLogisticRegression`) that can be instantiated as
``model(bandwidth=..., geometry=..., fixed=..., kernel=..., n_jobs=..., ...)``
and exposes information criteria attributes like ``aicc_``/``aic_``/``bic_``.
geometry : gpd.GeoSeries, optional
Geographic location of the observations in the sample. Used to determine the
spatial interaction weight based on specification by ``bandwidth``, ``fixed``,
``kernel``, and ``include_focal`` keywords.
fixed : bool, optional
True for distance based bandwidth and False for adaptive (nearest neighbor)
bandwidth, by default ``False``
kernel : str | Callable, optional
Type of kernel function used to weight observations, by default ``"bisquare"``
n_jobs : int, optional
The number of jobs to run in parallel. ``-1`` means using all processors,
by default ``-1``
search_method : {"golden_section", "interval"}, optional
Method used to search for optimal bandwidth. When using ``"golden_section"``,
the Golden section optimization is used to find the optimal bandwidth while
attempting to minimize or maximise ``criterion``. When using ``"interval"``,
fits all models within the specified bandwidths at a set interval without any
attempt to optimize the selection. By default ``"golden_section"``.
criterion : str, optional
Criterion used to select the optimal bandwidth.
Supported values include
``{"aicc", "aic", "bic", "prediction_rate", "log_loss"}``.
If you pass another string, it is interpreted as an attribute name ``m`` and
retrieved from the fitted model as ``getattr(model, m + "_")``.
By default ``"aicc"``.
metrics : list[str] | None, optional
Additional metrics to report for each bandwidth. Metrics follow the same
conventions as ``criterion`` (including special cases ``"log_loss"`` and
``"prediction_rate"``). By default ``None``.
minimize : bool, optional
Minimize or maximize the ``criterion``. When using information criterions,
like AICc, the optimal solution is the lowest value. When using other metrics,
the optimal may be the highest value. By default True, assuming lower is
better.
min_bandwidth : int | float | None, optional
Minimum bandwidth to consider, by default ``None``
max_bandwidth : int | float | None, optional
Maximum bandwidth to consider, by default ``None``
interval : int | float | None, optional
Interval for bandwidth search when using "interval" method, by default ``None``
max_iterations : int, optional
Maximum number of iterations for golden section search, by default ``100``
tolerance : float, optional
Tolerance for convergence in golden section search, by default ``1e-2``
verbose : bool | int, optional
Verbosity level, by default False
**kwargs
Additional keyword arguments passed to ``model`` initialization
Attributes
----------
scores_ : pd.Series
Series of criterion scores for each bandwidth tested (index is bandwidth).
metrics_ : pd.DataFrame
DataFrame of additional metrics for each bandwidth tested.
optimal_bandwidth_ : int | float
The optimal bandwidth found by the search method.
Examples
--------
Interval search over a small set of candidate bandwidths:
>>> import geopandas as gpd
>>> from geodatasets import get_path
>>> from gwlearn.linear_model import GWLogisticRegression
>>> from gwlearn.search import BandwidthSearch
>>> gdf = gpd.read_file(get_path('geoda.guerry'))
>>> X = gdf[['Crm_prp', 'Litercy', 'Donatns', 'Lottery']]
>>> y = gdf["Region"] == 'E'
>>> search = BandwidthSearch(
... GWLogisticRegression,
... geometry=gdf.representative_point(),
... fixed=False,
... search_method="interval",
... criterion="aicc",
... min_bandwidth=20,
... max_bandwidth=80,
... interval=10,
... max_iter=200,
... ).fit(X, y)
>>> search.optimal_bandwidth_
np.int64(40)
"""
def __init__(
self,
model,
*,
geometry: gpd.GeoSeries,
fixed: bool = False,
kernel: Literal[
"triangular",
"parabolic",
# "gaussian",
"bisquare",
"tricube",
"cosine",
"boxcar",
# "exponential",
]
| Callable = "bisquare",
n_jobs: int = -1,
search_method: Literal["golden_section", "interval"] = "golden_section",
criterion: str = "aicc",
metrics: list | None = None,
minimize: bool = True,
min_bandwidth: int | float | None = None,
max_bandwidth: int | float | None = None,
interval: int | float | None = None,
max_iterations: int = 100,
tolerance: float = 1e-2,
verbose: bool | int = False,
**kwargs,
) -> None:
self.model = model
self.kernel = kernel
self.fixed = fixed
self._model_kwargs = kwargs
self.geometry = geometry
self.n_jobs = n_jobs
self.search_method = search_method
self.criterion = criterion
self.minimize = minimize
self.min_bandwidth = min_bandwidth
self.max_bandwidth = max_bandwidth
self.interval = interval
self.max_iterations = max_iterations
self.tolerance = tolerance
self.metrics = metrics
self.verbose = verbose
[docs]
def fit(self, X: pd.DataFrame, y: pd.Series) -> "BandwidthSearch":
"""
Fit the searcher by evaluating candidate bandwidths on the provided data.
Parameters
----------
X : pd.DataFrame
Feature matrix used to evaluate candidate bandwidths (rows are samples).
y : pd.Series
Target values corresponding to X.
Returns
-------
self
The fitted instance.
Notes
-----
The optimal bandwidth is selected as the index of the minimum score if
``minimize=True``, otherwise as the index of the maximum score.
"""
if self.search_method == "interval":
self._interval(X=X, y=y)
elif self.search_method == "golden_section":
self._golden_section(X=X, y=y, tolerance=self.tolerance)
self.optimal_bandwidth_ = (
self.scores_.idxmin() if self.minimize else self.scores_.idxmax()
)
return self
def _score(
self, X: pd.DataFrame, y: pd.Series, bw: int | float
) -> tuple[float, list[float]]:
"""Fit the model and report criterion score.
In case of invariant y in a local model, returns np.inf
"""
if len(np.unique(y)) == 1:
return (np.inf, [])
gwm = self.model(
bandwidth=bw,
geometry=self.geometry,
fixed=self.fixed,
kernel=self.kernel,
n_jobs=self.n_jobs,
fit_global_model=False,
strict=False,
verbose=self.verbose == 2,
**self._model_kwargs,
).fit(X=X, y=y)
met = ["aicc", "aic", "bic"]
if self.metrics is not None:
met += self.metrics
if hasattr(gwm, "prediction_rate_") and gwm.prediction_rate_ == 0:
# prediction rate should report 0, everything else is undefined
score = (
gwm.prediction_rate_ if self.criterion == "prediction_rate" else np.nan
)
all_metrics = [
gwm.prediction_rate_ if m == "prediction_rate" else np.nan for m in met
]
return score, all_metrics
all_metrics = []
for m in met:
if m == "log_loss":
mask = gwm.proba_.isna().any(axis=1)
all_metrics.append(metrics.log_loss(y[~mask], gwm.proba_[~mask]))
else:
all_metrics.append(getattr(gwm, m + "_"))
return all_metrics[met.index(self.criterion)], all_metrics
def _interval(self, X: pd.DataFrame, y: pd.Series) -> None:
"""Fit models using the equal interval search.
Parameters
----------
X : pd.DataFrame
Independent variables
y : pd.Series
Dependent variable
"""
if not (
isinstance(self.min_bandwidth, float | int)
and isinstance(self.max_bandwidth, float | int)
and isinstance(self.interval, float | int)
):
raise ValueError(
"All 'min_bandwidth', 'max_bandwidth' and 'interval' need "
"to be set when using interval search method."
)
scores = {}
metrics = {}
bw = self.min_bandwidth
while bw <= self.max_bandwidth:
score, metric = self._score(X=X, y=y, bw=bw)
scores[bw] = score
metrics[bw] = metric
if self.verbose:
print(f"Bandwidth: {bw:.2f}, {self.criterion}: {scores[bw]:.3f}")
bw += self.interval
self.scores_ = pd.Series(scores, name=self.criterion)
self.metrics_ = pd.DataFrame(
metrics,
index=pd.Index(
["aicc", "aic", "bic"] + self.metrics
if self.metrics
else ["aicc", "aic", "bic"]
),
).T
def _golden_section(self, X: pd.DataFrame, y: pd.Series, tolerance: float) -> None:
delta = 0.38197
if self.fixed:
pairwise_distance = pdist(self.geometry.get_coordinates())
min_dist = np.min(pairwise_distance)
max_dist = np.max(pairwise_distance)
a = min_dist / 2.0
c = max_dist * 2.0
else:
a = 40 + 2 * X.shape[1]
c = len(self.geometry)
if self.min_bandwidth:
a = self.min_bandwidth
if self.max_bandwidth:
c = self.max_bandwidth
b = a + delta * np.abs(c - a)
d = c - delta * np.abs(c - a)
diff = 1.0e9
iters = 0
scores = {}
metrics = {}
while diff > tolerance and iters < self.max_iterations and a != np.inf:
if not self.fixed: # ensure we use int as possible bandwidth
b = int(b)
d = int(d)
if b in scores:
score_b = scores[b]
else:
score_b, metric_b = self._score(X=X, y=y, bw=b)
if self.verbose:
print(
f"Bandwidth: {f'{b:.2f}'.rstrip('0').rstrip('.')}, "
f"score: {score_b:.3f}"
)
scores[b] = score_b
metrics[b] = metric_b
if d in scores:
score_d = scores[d]
else:
score_d, metric_d = self._score(X=X, y=y, bw=d)
if self.verbose:
print(
f"Bandwidth: {f'{d:.2f}'.rstrip('0').rstrip('.')}, "
f"score: {score_d:.3f}"
)
scores[d] = score_d
metrics[d] = metric_d
if self.minimize:
if score_b <= score_d:
c = d
d = b
b = a + delta * np.abs(c - a)
else:
a = b
b = d
d = c - delta * np.abs(c - a)
diff = np.abs(score_b - score_d)
else:
if score_b >= score_d:
c = d
d = b
b = a + delta * np.abs(c - a)
else:
a = b
b = d
d = c - delta * np.abs(c - a)
diff = np.abs(score_b - score_d)
iters += 1
self.scores_ = pd.Series(scores)
self.metrics_ = pd.DataFrame(
metrics,
index=pd.Index(
["aicc", "aic", "bic"] + self.metrics
if self.metrics
else ["aicc", "aic", "bic"]
),
).T