"""
All scales have a means by which the values that are mapped
onto the scale are interpreted. Numeric digital scales put
out numbers for direct interpretation, but most scales
cannot do this. What they offer is named markers/ticks that
aid in assessing the values e.g. the common odometer will
have ticks and values to help gauge the speed of the vehicle.
The named markers are what we call breaks. Properly calculated
breaks make interpretation straight forward. These functions
provide ways to calculate good(hopefully) breaks.
"""
from __future__ import annotations
import sys
from dataclasses import KW_ONLY, dataclass, field
from datetime import date, datetime, timedelta
from itertools import product
from typing import TYPE_CHECKING
from warnings import warn
import numpy as np
import pandas as pd
from mizani._core.date_utils import as_datetime
from mizani._core.dates import (
calculate_date_breaks_auto,
calculate_date_breaks_byunits,
)
from .utils import NANOSECONDS, SECONDS, log, min_max
if TYPE_CHECKING:
from typing import Literal, Sequence
from mizani.typing import (
DatetimeBreaksUnits,
DurationUnit,
FloatArrayLike,
NDArrayFloat,
Timedelta,
TimedeltaArrayLike,
Trans,
)
__all__ = [
"breaks_log",
"breaks_symlog",
"minor_breaks",
"minor_breaks_trans",
"breaks_date",
"breaks_timedelta",
"breaks_extended",
]
[docs]
@dataclass
class breaks_log:
"""
Integer breaks on log transformed scales
Parameters
----------
n : int
Desired number of breaks
base : int
Base of logarithm
Examples
--------
>>> x = np.logspace(3, 6)
>>> limits = min(x), max(x)
>>> breaks_log()(limits)
array([ 1000, 10000, 100000, 1000000])
>>> breaks_log(2)(limits)
array([ 1000, 100000])
>>> breaks_log()([0.1, 1])
array([0.1, 0.3, 1. , 3. ])
"""
n: int = 5
base: float = 10
[docs]
def __call__(self, limits: tuple[float, float]) -> NDArrayFloat:
"""
Compute breaks
Parameters
----------
limits : tuple
Minimum and maximum values
Returns
-------
out : array_like
Sequence of breaks points
"""
if any(np.isinf(limits)):
return np.array([])
n = self.n
base = self.base
rng = log(limits, base)
_min = int(np.floor(rng[0]))
_max = int(np.ceil(rng[1]))
# Prevent overflow
if float(base) ** _max > sys.maxsize:
base = float(base)
if _max == _min:
return np.array([base**_min])
# Try getting breaks at the integer powers of the base
# e.g [1, 100, 10000, 1000000]
# If there are too few breaks, try other points using the
# _log_sub_breaks
by = int(np.floor((_max - _min) / n)) + 1
for step in range(by, 0, -1):
breaks = np.array([base**i for i in range(_min, _max + 1, step)])
relevant_breaks = (limits[0] <= breaks) & (breaks <= limits[1])
if np.sum(relevant_breaks) >= n - 2:
return breaks
return _breaks_log_sub(n=n, base=base)(limits)
@dataclass
class _breaks_log_sub:
"""
Breaks for log transformed scales
Calculate breaks that do not fall on integer powers of
the base.
Parameters
----------
n : int
Desired number of breaks
base : int | float
Base of logarithm
Notes
-----
Credit: Thierry Onkelinx (thierry.onkelinx@inbo.be) for the original
algorithm in the r-scales package.
"""
n: int = 5
base: float = 10
def __call__(self, limits: tuple[float, float]) -> NDArrayFloat:
base = self.base
n = self.n
rng = log(limits, base)
_min = int(np.floor(rng[0]))
_max = int(np.ceil(rng[1]))
steps = [1]
# Prevent overflow
if float(base) ** _max > sys.maxsize:
base = float(base)
def delta(x):
"""
Calculates the smallest distance in the log scale between the
currently selected breaks and a new candidate 'x'
"""
arr = np.sort(np.hstack([x, steps, base]))
log_arr = log(arr, base)
return np.min(np.diff(log_arr))
if self.base == 2:
return np.array([base**i for i in range(_min, _max + 1)])
candidate = np.arange(base + 1)
candidate = np.compress(
(candidate > 1) & (candidate < base), candidate
)
while len(candidate):
best = np.argmax([delta(x) for x in candidate])
steps.append(candidate[best])
candidate = np.delete(candidate, best)
_factors = [base**i for i in range(_min, _max + 1)]
breaks = np.array([f * s for f, s in product(_factors, steps)])
relevant_breaks = (limits[0] <= breaks) & (breaks <= limits[1])
if np.sum(relevant_breaks) >= n - 2:
breaks = np.sort(breaks)
lower_end = np.max(
[
np.min(np.where(limits[0] <= breaks)) - 1,
0, # type: ignore
]
)
upper_end = np.min(
[
np.max(np.where(breaks <= limits[1])) + 1,
len(breaks), # type: ignore
]
)
return breaks[lower_end : upper_end + 1]
else:
return breaks_extended(n=n)(limits)
[docs]
@dataclass
class minor_breaks:
"""
Compute minor breaks
This is the naive method. It does not take into account
the transformation.
Parameters
----------
n : int
Number of minor breaks between the major
breaks.
Examples
--------
>>> major = [1, 2, 3, 4]
>>> limits = [0, 5]
>>> minor_breaks()(major, limits)
array([0.5, 1.5, 2.5, 3.5, 4.5])
>>> minor_breaks()([1, 2], (1, 2))
array([1.5])
More than 1 minor break.
>>> minor_breaks(3)([1, 2], (1, 2))
array([1.25, 1.5 , 1.75])
>>> minor_breaks()([1, 2], (1, 2), 3)
array([1.25, 1.5 , 1.75])
"""
n: int = 1
[docs]
def __call__(
self,
major: FloatArrayLike,
limits: tuple[float, float] | None = None,
n: int | None = None,
) -> NDArrayFloat:
"""
Minor breaks
Parameters
----------
major : array_like
Major breaks
limits : array_like | None
Limits of the scale. If *array_like*, must be
of size 2. If **None**, then the minimum and
maximum of the major breaks are used.
n : int
Number of minor breaks between the major
breaks. If **None**, then *self.n* is used.
Returns
-------
out : array_like
Minor beraks
"""
if len(major) < 2:
return np.array([])
if limits is None:
low, high = min_max(major)
else:
low, high = min_max(limits)
if n is None:
n = self.n
# Try to infer additional major breaks so that
# minor breaks can be generated beyond the first
# and last major breaks
diff = np.diff(major)
step = diff[0]
if len(diff) > 1 and all(diff == step):
major = np.hstack([major[0] - step, major, major[-1] + step])
mbreaks = []
factors = np.arange(1, n + 1)
for lhs, rhs in zip(major[:-1], major[1:]):
sep = (rhs - lhs) / (n + 1)
mbreaks.append(lhs + factors * sep)
minor = np.hstack(mbreaks)
minor = minor.compress((low <= minor) & (minor <= high))
return minor
[docs]
@dataclass
class minor_breaks_trans:
"""
Compute minor breaks for transformed scales
The minor breaks are computed in data space.
This together with major breaks computed in
transform space reveals the non linearity of
of a scale. See the log transforms created
with :func:`log_trans` like :class:`log10_trans`.
Parameters
----------
trans : trans or type
Trans object or trans class.
n : int
Number of minor breaks between the major
breaks.
Examples
--------
>>> from mizani.transforms import sqrt_trans
>>> major = [1, 2, 3, 4]
>>> limits = [0, 5]
>>> t1 = sqrt_trans()
>>> t1.minor_breaks(major, limits)
array([1.58113883, 2.54950976, 3.53553391])
# Changing the regular `minor_breaks` method
>>> t2 = sqrt_trans()
>>> t2.minor_breaks = minor_breaks()
>>> t2.minor_breaks(major, limits)
array([0.5, 1.5, 2.5, 3.5, 4.5])
More than 1 minor break
>>> major = [1, 10]
>>> limits = [1, 10]
>>> t2.minor_breaks(major, limits, 4)
array([2.8, 4.6, 6.4, 8.2])
"""
trans: Trans
n: int = 1
[docs]
def __call__(
self,
major: FloatArrayLike,
limits: tuple[float, float] | None = None,
n: int | None = None,
) -> NDArrayFloat:
"""
Minor breaks for transformed scales
Parameters
----------
major : array_like
Major breaks
limits : array_like | None
Limits of the scale. If *array_like*, must be
of size 2. If **None**, then the minimum and
maximum of the major breaks are used.
n : int
Number of minor breaks between the major
breaks. If **None**, then *self.n* is used.
Returns
-------
out : array_like
Minor breaks
"""
if limits is None:
limits = min_max(major)
if n is None:
n = self.n
major = self._extend_breaks(major)
major = self.trans.inverse(major)
limits = self.trans.inverse(limits)
minor = minor_breaks(n)(major, limits)
return self.trans.transform(minor)
def _extend_breaks(self, major: FloatArrayLike) -> FloatArrayLike:
"""
Append 2 extra breaks at either end of major
If breaks of transform space are non-equidistant,
:func:`minor_breaks` add minor breaks beyond the first
and last major breaks. The solutions is to extend those
breaks (in transformed space) before the minor break call
is made. How the breaks depends on the type of transform.
"""
trans = self.trans
trans = trans if isinstance(trans, type) else trans.__class__
# so far we are only certain about this extending stuff
# making sense for log transform
is_log = trans.__name__.startswith("log")
diff = np.diff(major)
step = diff[0]
if is_log and all(diff == step):
major = np.hstack([major[0] - step, major, major[-1] + step])
return major
[docs]
@dataclass
class breaks_date:
"""
Regularly spaced dates
Parameters
----------
n :
Desired number of breaks.
width : str | None
An interval specification. Must be one of
[second, minute, hour, day, week, month, year]
If ``None``, the interval automatic.
Examples
--------
>>> from datetime import datetime
>>> limits = (datetime(2010, 1, 1), datetime(2026, 1, 1))
Default breaks will be regularly spaced but the spacing
is automatically determined
>>> breaks = breaks_date(9)
>>> [d.year for d in breaks(limits)]
[2010, 2012, 2014, 2016, 2018, 2020, 2022, 2024, 2026]
Breaks at 4 year intervals
>>> breaks = breaks_date(width='4 year')
>>> [d.year for d in breaks(limits)]
[2010, 2014, 2018, 2022, 2026]
"""
n: int = 5
_: KW_ONLY
width: str | None = None
_width: int | None = field(init=False, default=None)
_units: DatetimeBreaksUnits | None = field(init=False, default=None)
def __post_init__(self):
# For backwards compatibility
if isinstance(self.n, str) and self.width is None:
warn(
"Passing the width as the parameter has been deprecated "
"and will not work in a future version. "
'Use breaks_date(width="4 years")',
FutureWarning,
)
self.width = self.n
if self.width:
# Parse the width specification
# e.g. '10 months' => (10, month)
_w, units = self.width.strip().lower().split()
self._width = int(_w)
self._units = units.rstrip("s") # type: ignore
[docs]
def __call__(
self, limits: tuple[datetime, datetime] | tuple[date, date]
) -> Sequence[datetime]:
"""
Compute breaks
Parameters
----------
limits : tuple
Minimum and maximum :class:`datetime.datetime` values.
Returns
-------
out : array_like
Sequence of break points.
"""
if any(pd.isna(x) for x in limits):
return []
if isinstance(limits[0], np.datetime64) and isinstance(
limits[1], np.datetime64
):
limits = limits[0].astype(object), limits[1].astype(object)
limits = as_datetime(limits)
if self._units and self._width:
return calculate_date_breaks_byunits(
limits, self._units, self._width
)
else:
return calculate_date_breaks_auto(limits, self.n)
[docs]
@dataclass
class breaks_timedelta:
"""
Timedelta breaks
Returns
-------
out : callable ``f(limits)``
A function that takes a sequence of two
:class:`datetime.timedelta` values and returns
a sequence of break points.
Examples
--------
>>> from datetime import timedelta
>>> breaks = breaks_timedelta()
>>> x = [timedelta(days=i*365) for i in range(25)]
>>> limits = min(x), max(x)
>>> major = breaks(limits)
>>> [val.total_seconds()/(365*24*60*60)for val in major]
[0.0, 5.0, 10.0, 15.0, 20.0, 25.0]
"""
n: int = 5
Q: Sequence[float] = (1, 2, 5, 10)
def __post_init__(self):
self._calculate_breaks = breaks_extended(n=self.n, Q=self.Q)
[docs]
def __call__(
self, limits: tuple[Timedelta, Timedelta]
) -> TimedeltaArrayLike:
"""
Compute breaks
Parameters
----------
limits : tuple
Minimum and maximum :class:`datetime.timedelta` values.
Returns
-------
out : array_like
Sequence of break points.
"""
if any(pd.isna(x) for x in limits):
return []
helper = timedelta_helper(limits)
scaled_limits = helper.scaled_limits()
scaled_breaks = self._calculate_breaks(scaled_limits)
breaks = helper.numeric_to_timedelta(scaled_breaks)
return breaks
# This could be cleaned up, state overload?
@dataclass
class timedelta_helper:
"""
Helper for computing timedelta breaks
and labels.
How to use - breaks?
1. Initialise with a timedelta sequence/limits.
2. Get the scaled limits and use those to calculate
breaks using a general purpose breaks calculating
routine. The scaled limits are in numerical format.
3. Convert the computed breaks from numeric into timedelta.
See, :class:`breaks_timedelta`
How to use - formating?
1. Call :meth:`format_info` with the timedelta values to be
formatted and get back a tuple of numeric values and
the units for those values.
2. Format the values with a general purpose formatting
routing.
See, :class:`~mizani.labels.label_timedelta`
"""
x: TimedeltaArrayLike
units: DurationUnit | None = None
def __post_init__(self):
l, h = min(self.x), max(self.x)
self.package = self.determine_package(self.x[0])
self.limits = self.value(l), self.value(h)
self._units: DurationUnit = self.units or self.best_units((l, h))
self.factor = self.get_scaling_factor(self._units)
@classmethod
def determine_package(cls, td: Timedelta) -> Literal["pandas", "cpython"]:
if hasattr(td, "components"):
package = "pandas"
elif hasattr(td, "total_seconds"):
package = "cpython"
else:
msg = f"{td.__class__} format not yet supported."
raise ValueError(msg)
return package
@classmethod
def format_info(
cls, x: TimedeltaArrayLike, units: DurationUnit | None = None
) -> tuple[NDArrayFloat, DurationUnit]:
helper = cls(x, units)
return helper.timedelta_to_numeric(x), helper._units
def best_units(self, x: TimedeltaArrayLike) -> DurationUnit:
"""
Determine good units for representing a sequence of timedeltas
"""
# Read
# [(0.9, 's'),
# (9, 'm)]
# as, break ranges between 0.9 seconds (inclusive)
# and 9 minutes are represented in seconds. And so on.
ts_range = self.value(max(x)) - self.value(min(x))
package = self.determine_package(x[0])
if package == "pandas":
cuts: list[tuple[float, DurationUnit]] = [
(0.9, "us"),
(0.9, "ms"),
(0.9, "s"),
(9, "min"),
(6, "h"),
(4, "day"),
(4, "week"),
(4, "month"),
(3, "year"),
]
denomination = NANOSECONDS
base_units = "ns"
else:
cuts = [
(0.9, "s"),
(9, "min"),
(6, "h"),
(4, "day"),
(4, "week"),
(4, "month"),
(3, "year"),
]
denomination = SECONDS
base_units = "ms"
for size, units in reversed(cuts):
if ts_range >= size * denomination[units]:
return units
return base_units
@staticmethod
def value(td: Timedelta) -> float:
"""
Return the numeric value representation on a timedelta
"""
if isinstance(td, pd.Timedelta):
return td.value
else:
return td.total_seconds()
def scaled_limits(self) -> tuple[float, float]:
"""
Minimum and Maximum to use for computing breaks
"""
_min = self.limits[0] / self.factor
_max = self.limits[1] / self.factor
return _min, _max
def timedelta_to_numeric(
self, timedeltas: TimedeltaArrayLike
) -> NDArrayFloat:
"""
Convert sequence of timedelta to numerics
"""
return np.array([self.to_numeric(td) for td in timedeltas])
def numeric_to_timedelta(self, values: NDArrayFloat) -> TimedeltaArrayLike:
"""
Convert sequence of numerical values to timedelta
"""
if self.package == "pandas":
return [
pd.Timedelta(int(x * self.factor), unit="ns") for x in values
]
else:
return [timedelta(seconds=x * self.factor) for x in values]
def get_scaling_factor(self, units):
if self.package == "pandas":
return NANOSECONDS[units]
else:
return SECONDS[units]
def to_numeric(self, td: Timedelta) -> float:
"""
Convert timedelta to a number corresponding to the
appropriate units. The appropriate units are those
determined with the object is initialised.
"""
if isinstance(td, pd.Timedelta):
return td.value / NANOSECONDS[self._units]
else:
return td.total_seconds() / SECONDS[self._units]
[docs]
@dataclass
class breaks_extended:
"""
An extension of Wilkinson's tick position algorithm
Parameters
----------
n : int
Desired number of breaks
Q : list
List of nice numbers
only_inside : bool
If ``True``, then all the breaks will be within the given
range.
w : list
Weights applied to the four optimization components
(simplicity, coverage, density, and legibility). They
should add up to 1.
Examples
--------
>>> limits = (0, 9)
>>> breaks_extended()(limits)
array([ 0. , 2.5, 5. , 7.5, 10. ])
>>> breaks_extended(n=6)(limits)
array([ 0., 2., 4., 6., 8., 10.])
References
----------
- Talbot, J., Lin, S., Hanrahan, P. (2010) An Extension of
Wilkinson's Algorithm for Positioning Tick Labels on Axes,
InfoVis 2010.
Additional Credit to Justin Talbot on whose code this
implementation is almost entirely based.
"""
n: int = 5
Q: Sequence[float] = (1, 5, 2, 2.5, 4, 3)
only_inside: bool = False
w: Sequence[float] = (0.25, 0.2, 0.5, 0.05)
def __post_init__(self):
# Used for lookups during the computations
self.Q_index = {q: i for i, q in enumerate(self.Q)}
def coverage(
self, dmin: float, dmax: float, lmin: float, lmax: float
) -> float:
p1 = (dmax - lmax) ** 2
p2 = (dmin - lmin) ** 2
p3 = (0.1 * (dmax - dmin)) ** 2
return 1 - 0.5 * (p1 + p2) / p3
def coverage_max(self, dmin: float, dmax: float, span: float) -> float:
range = dmax - dmin
if span > range:
half = (span - range) / 2.0
return 1 - (half**2) / (0.1 * range) ** 2
else:
return 1
def density(
self, k: float, dmin: float, dmax: float, lmin: float, lmax: float
) -> float:
r = (k - 1.0) / (lmax - lmin)
rt = (self.n - 1) / (max(lmax, dmax) - min(lmin, dmin))
return 2 - max(r / rt, rt / r)
def density_max(self, k: float) -> float:
if k >= self.n:
return 2 - (k - 1.0) / (self.n - 1.0)
else:
return 1
def simplicity(
self, q: float, j: float, lmin: float, lmax: float, lstep: float
) -> float:
eps = 1e-10
n = len(self.Q)
i = self.Q_index[q] + 1
if (
(lmin % lstep < eps or (lstep - lmin % lstep) < eps)
and lmin <= 0
and lmax >= 0
):
v = 1
else:
v = 0
return (n - i) / (n - 1.0) + v - j
def simplicity_max(self, q: float, j: float) -> float:
n = len(self.Q)
i = self.Q_index[q] + 1
v = 1
return (n - i) / (n - 1.0) + v - j
def legibility(self, lmin: float, lmax: float, lstep: float) -> float:
# Legibility depends on fontsize, rotation, overlap ... i.e.
# it requires drawing or simulating drawn breaks then calculating
# a score. Return 1 ignores all that.
return 1
[docs]
def __call__(self, limits: tuple[float, float]) -> NDArrayFloat:
"""
Calculate the breaks
Parameters
----------
limits : array
Minimum and maximum values.
Returns
-------
out : array_like
Sequence of break points.
"""
Q = self.Q
w = self.w
only_inside = self.only_inside
simplicity_max = self.simplicity_max
density_max = self.density_max
coverage_max = self.coverage_max
simplicity = self.simplicity
coverage = self.coverage
density = self.density
legibility = self.legibility
log10 = np.log10
ceil = np.ceil
floor = np.floor
# casting prevents the typechecker from mixing
# float & np.float32
dmin, dmax = float(limits[0]), float(limits[1])
if dmin > dmax:
dmin, dmax = dmax, dmin
elif dmin == dmax:
return np.array([dmin])
best_score = -2.0
best = (0, 0, 0, 0, 0) # Gives Empty breaks
j = 1.0
while j < float("inf"):
for q in Q:
sm = simplicity_max(q, j)
if w[0] * sm + w[1] + w[2] + w[3] < best_score:
j = float("inf")
break
k = 2.0
while k < float("inf"):
dm = density_max(k)
if w[0] * sm + w[1] + w[2] * dm + w[3] < best_score:
break
delta = (dmax - dmin) / (k + 1) / j / q
z: float = ceil(log10(delta))
while z < float("inf"):
step = j * q * (10**z)
cm = coverage_max(dmin, dmax, step * (k - 1))
if (
w[0] * sm + w[1] * cm + w[2] * dm + w[3]
< best_score
):
break
min_start = int(floor(dmax / step) * j - (k - 1) * j)
max_start = int(ceil(dmin / step) * j)
if min_start > max_start:
z = z + 1
break
for start in range(min_start, max_start + 1):
lmin = start * (step / j)
lmax = lmin + step * (k - 1)
lstep = step
s = simplicity(q, j, lmin, lmax, lstep)
c = coverage(dmin, dmax, lmin, lmax)
d = density(k, dmin, dmax, lmin, lmax)
l = legibility(lmin, lmax, lstep)
score = w[0] * s + w[1] * c + w[2] * d + w[3] * l
if score > best_score and (
not only_inside
or (lmin >= dmin and lmax <= dmax)
):
best_score = score
best = (lmin, lmax, lstep, q, k)
z = z + 1
k = k + 1
j = j + 1
locs = best[0] + np.arange(best[4]) * best[2]
return locs
[docs]
class breaks_symlog:
"""
Breaks for the Symmetric Logarithm Transform
Examples
--------
>>> limits = (-100, 100)
>>> breaks_symlog()(limits)
array([-100, -10, 0, 10, 100])
"""
[docs]
def __call__(self, limits: tuple[float, float]) -> NDArrayFloat:
def _signed_log10(x):
return np.round(np.sign(x) * np.log10(np.sign(x) * x)).astype(int)
l, h = _signed_log10(limits)
exps = np.arange(l, h + 1, 1)
return np.sign(exps) * (10 ** np.abs(exps))
# Deprecated
log_breaks = breaks_log
trans_minor_breaks = minor_breaks_trans
date_breaks = breaks_date
timedelta_breaks = breaks_timedelta
extended_breaks = breaks_extended