# -*- coding: utf-8 -*-
from abc import ABC, abstractmethod
from typing import Any, Callable, Iterable, Optional, Union
import numpy as np
import pandas as pd
from pandas.api.types import is_bool_dtype, is_list_like
from pandas_select import iterutils
from pandas_select.base import LogicalOp, PrettyPrinter, Selector
Cond = Callable[[pd.Series], Iterable[bool]]
Columns = Union[str, Iterable[str], Callable]
class _BoolIndexerMixin(PrettyPrinter, ABC):
"""Base class for selectors that filters rows by value."""
def __init__(self, cond: Cond, columns: Optional[Columns] = None):
self.cond = cond
if callable(columns):
self.columns = columns
else:
self.columns = columns and iterutils.to_set(columns) # type:ignore
def __call__(self, df: pd.DataFrame) -> Iterable[bool]:
"""Apply the condition to each column.
Return a boolean array with size `df.shape[0]`.
"""
if self.columns is not None:
df = df[self.columns]
masks = df.apply(self.cond)
return self._join(masks)
@abstractmethod
def _join(self, df: pd.DataFrame) -> Iterable[bool]: # pragma: no cover
raise NotImplementedError()
class _BoolOpsMixin:
"""Common logical operators mixin."""
def intersection(self, other: _BoolIndexerMixin) -> "BoolOp":
"""Select elements in both selectors."""
return BoolOp(np.logical_and, "&", self, other) # type: ignore
def union(self, other: _BoolIndexerMixin) -> "BoolOp":
"""Select elements in the left side but not in right side."""
return BoolOp(np.logical_or, "|", self, other) # type: ignore
def symmetric_difference(self, other: Any) -> "BoolOp":
"""Select elements either in the left side or the right side but not in both."""
return BoolOp(np.logical_xor, "^", self, other) # type: ignore
def __and__(self, other: _BoolIndexerMixin) -> "BoolOp":
return self.intersection(other)
def __rand__(self, other: _BoolIndexerMixin) -> "BoolOp":
return BoolOp(np.logical_and, "&", other, self) # type: ignore
def __or__(self, other: _BoolIndexerMixin) -> "BoolOp":
return self.union(other)
def __ror__(self, other: _BoolIndexerMixin) -> "BoolOp":
return BoolOp(np.logical_or, "|", other, self) # type: ignore
def __xor__(self, other: _BoolIndexerMixin) -> "BoolOp":
return self.symmetric_difference(other)
def __rxor__(self, other: _BoolIndexerMixin) -> "BoolOp":
return BoolOp(np.logical_xor, "^", other, self) # type: ignore
def __invert__(self) -> "BoolOp":
return BoolOp(np.invert, "~", self) # type: ignore
class _BoolMask(PrettyPrinter, _BoolOpsMixin):
def __init__(self, mask: Iterable[bool]):
self.mask = np.asarray(mask)
def __call__(self, df: pd.DataFrame) -> Iterable[bool]:
return self.mask
BoolOperation = Callable[[np.ndarray, Optional[np.ndarray]], np.ndarray]
BoolOperand = Union[Selector, Iterable[bool]]
class BoolOp(LogicalOp, _BoolOpsMixin):
"""A logical operation between two `Where` selectors."""
def __init__(
self,
op: BoolOperation,
op_name: str,
left: BoolOperand,
right: Optional[BoolOperand] = None,
):
bool_selectors = [self._validate_operand(operand) for operand in (left, right)]
super().__init__(op, op_name, *bool_selectors) # type:ignore
def _validate_operand(self, sel: Any) -> Union[Selector, Iterable[bool]]:
if sel is None or callable(sel):
return sel
if not is_list_like(sel):
raise TypeError("Operand does not support logical operations.")
sel = np.asarray(sel)
if not is_bool_dtype(sel):
raise TypeError("Operand is not boolean dtype.")
return _BoolMask(sel)
class BoolIndexer(_BoolIndexerMixin, _BoolOpsMixin, ABC):
"""Base class for boolean indexing and support logical operations."""
[docs]class Anywhere(BoolIndexer):
"""Filter rows where *any* column matches a condition.
Parameters
----------
cond: callable or boolean array-like
Select labels where `cond` is True. If `cond` is a callable, it is computed on
each column and should return a boolean array.
columns: optional
Subset of columns on which to apply `cond`.
Examples
--------
>>> df = pd.DataFrame({'A': [1, 1, 4], 'B': [2, -3, 1]}, index=["a", "b", "c"])
>>> df
A B
a 1 2
b 1 -3
c 4 1
>>> df.loc[Anywhere(lambda x : x % 2 == 0)]
A B
a 1 2
c 4 1
>>> df.loc[Anywhere(lambda x : x % 2 == 0, columns="A")]
A B
c 4 1
"""
def _join(self, df: pd.DataFrame) -> np.ndarray:
return df.any(axis="columns").to_numpy()
[docs]class Everywhere(BoolIndexer):
"""Filter rows where *all* columns match a condition.
Parameters
----------
cond: callable or boolean array-like
Select labels where `cond` is True. If `cond` is a callable, it is computed on
each column and should return a boolean array.
columns: optional
Subset of columns on which to apply `cond`.
Examples
--------
>>> df = pd.DataFrame({'A': [1, 1, 4], 'B': [2, -3, 1]}, index=["a", "b", "c"])
>>> df
A B
a 1 2
b 1 -3
c 4 1
>>> df.loc[Everywhere(lambda x : x > 0)]
A B
a 1 2
c 4 1
>>> df.loc[Everywhere(lambda x : x == 1, columns="A")]
A B
a 1 2
b 1 -3
"""
def _join(self, df: pd.DataFrame) -> np.ndarray:
return df.all(axis="columns").to_numpy()