Source code for cagey._internal.turbidity

import polars as pl

from cagey._internal.queries import TurbidState


[docs] def get_turbid_state( turbidities: dict[str, float], dissolved_reference: float ) -> TurbidState: """Get the turbidity state of an experiment. Parameters: turbidities: Maps a timestamp to a turbidity measurement. dissolved_reference: The turbidity at which the solution is considered dissolved. Returns: The turbidity state of the experiment. """ turbidity = _turbidity_from_json(turbidities) turbidity = get_stability_windows(turbidity) turbidity = get_aggregated_stability_windows(turbidity) result = turbidity.collect() if result.is_empty(): return TurbidState.UNSTABLE if result.row(0, named=True)["mean_turbidity"] < dissolved_reference + 1: return TurbidState.DISSOLVED return TurbidState.TURBID
[docs] def get_stability_windows( turbidity: pl.LazyFrame, ) -> pl.LazyFrame: """Get the stability windows for a turbidity measurement. Parameters: turbidity: A DataFrame with columns time and turbidity. Returns: A new DataFrame which groups the turbidity measurements into groups of 1 minute. Each group is then assigned a stability based on the mean turbidity and the standard deviation. """ return ( _average_turbidity(turbidity) .with_columns( stable=pl.col("turbidity") .is_between(pl.col("lower_bound"), pl.col("upper_bound")) .or_(pl.col("turbidities").list.len() == 1), ) .with_columns( group=( (pl.col("stable") != pl.col("stable").shift(1)) .fill_null(value=True) .cum_sum() ), ) )
[docs] def get_aggregated_stability_windows( turbidity: pl.LazyFrame, ) -> pl.LazyFrame: """Join adjacent windows with the same stability label. Parameters: turbidity: A DataFrame with rows representing 1 minute long windows each labeled according to stability. Returns: A new DataFrame which joins adjacent stability windows if they have the same stability. """ return ( turbidity.group_by("group") .agg( stable=pl.col("stable").first(), time_delta=pl.max("time") - pl.min("time"), mean_turbidity=pl.mean("turbidity"), ) .filter( pl.col("stable").eq(other=True), pl.col("time_delta") >= pl.duration(minutes=1), ) )
def _turbidity_from_json(turbidity_json: dict[str, float]) -> pl.LazyFrame: return ( pl.DataFrame( { "time": list(turbidity_json.keys()), "turbidity": list(turbidity_json.values()), } ) .with_columns( pl.col("time").str.strptime( pl.Datetime, format="%Y_%m_%d_%H_%M_%S_%f" ), ) .sort("time") .lazy() ) def _average_turbidity(turbidity: pl.LazyFrame) -> pl.LazyFrame: return ( turbidity.rolling("time", period="1m", offset="0", closed="both") .agg( turbidities=pl.col("turbidity"), mean=pl.mean("turbidity"), std=pl.std("turbidity"), lower_bound=pl.mean("turbidity") - 3 * pl.std("turbidity"), upper_bound=pl.mean("turbidity") + 3 * pl.std("turbidity"), ) .join(turbidity, on="time") )