Source code for cagey._internal.queries

import pkgutil
from collections.abc import Iterable, Iterator, Sequence
from dataclasses import asdict, astuple
from datetime import datetime
from sqlite3 import Connection

import polars as pl

from cagey._internal.types import (
    MassSpectrumId,
    MassSpectrumPeak,
    MassSpectrumTopologyAssignment,
    NmrSpectrum,
    NmrSpectrumId,
    Precursor,
    Precursors,
    Reaction,
    ReactionKey,
    Row,
    TurbidState,
)


[docs] class CreateTablesError(Exception): """Raised when the tables cannot be created."""
[docs] class InsertMassSpectrumError(Exception): """Raised when a mass spectrum cannot be inserted."""
[docs] class InsertNmrSpectrumError(Exception): """Raised when an NMR spectrum cannot be inserted."""
[docs] def create_tables(connection: Connection) -> None: """Create the tables in the database. Parameters: connection: A SQLite connection. """ script = pkgutil.get_data("cagey", "_internal/sql/create_tables.sql") if script is not None: connection.executescript(script.decode()) else: msg = "failed to load create_tables.sql" raise CreateTablesError(msg)
[docs] def precursors_df(connection: Connection) -> pl.DataFrame: """Return a DataFrame of precursors. Parameters: connection: A SQLite connection. Returns: A DataFrame of precursors. """ return pl.read_database( """ SELECT precursors.name, precursors.smiles FROM precursors ORDER BY precursors.name """, connection, )
[docs] def reactions_df(connection: Connection) -> pl.DataFrame: """Return a DataFrame of reactions. Parameters: connection: A SQLite connection. Returns: A DataFrame of reactions. """ return pl.read_database( """ SELECT reactions.experiment, reactions.plate, reactions.formulation_number, di.name AS di_name, tri.name AS tri_name FROM reactions LEFT JOIN precursors AS di ON reactions.di_name = di.name LEFT JOIN precursors AS tri ON reactions.tri_name = tri.name ORDER BY reactions.experiment, reactions.plate, reactions.formulation_number, di.name, tri.name """, connection, )
[docs] def aldehyde_peaks_df(connection: Connection) -> pl.DataFrame: """Return a DataFrame of aldehyde peaks. Parameters: connection: A SQLite connection. Returns: A DataFrame of aldehyde peaks. """ return pl.read_database( """ SELECT reactions.experiment, reactions.plate, reactions.formulation_number, di.name AS di_name, tri.name AS tri_name, nmr_aldehyde_peaks.ppm, nmr_aldehyde_peaks.amplitude FROM nmr_aldehyde_peaks LEFT JOIN nmr_spectra ON nmr_aldehyde_peaks.nmr_spectrum_id = nmr_spectra.id LEFT JOIN reactions ON nmr_spectra.reaction_id = reactions.id LEFT JOIN precursors AS di ON reactions.di_name = di.name LEFT JOIN precursors AS tri ON reactions.tri_name = tri.name ORDER BY reactions.experiment, reactions.plate, reactions.formulation_number, nmr_aldehyde_peaks.ppm """, connection, )
[docs] def imine_peaks_df(connection: Connection) -> pl.DataFrame: """Return a DataFrame of imine peaks. Parameters: connection: A SQLite connection. Returns: A DataFrame of imine peaks. """ return pl.read_database( """ SELECT reactions.experiment, reactions.plate, reactions.formulation_number, di.name AS di_name, tri.name AS tri_name, nmr_imine_peaks.ppm, nmr_imine_peaks.amplitude FROM nmr_imine_peaks LEFT JOIN nmr_spectra ON nmr_imine_peaks.nmr_spectrum_id = nmr_spectra.id LEFT JOIN reactions ON nmr_spectra.reaction_id = reactions.id LEFT JOIN precursors AS di ON reactions.di_name = di.name LEFT JOIN precursors AS tri ON reactions.tri_name = tri.name ORDER BY reactions.experiment, reactions.plate, reactions.formulation_number, nmr_imine_peaks.ppm """, connection, )
[docs] def mass_spectrum_peaks_df(connection: Connection) -> pl.DataFrame: """Return a DataFrame of mass spectrum peaks. Parameters: connection: A SQLite connection. Returns: A DataFrame of mass spectrum peaks. """ return pl.read_database( """ SELECT reactions.experiment, reactions.plate, reactions.formulation_number, di.name AS di_name, tri.name AS tri_name, mass_spectrum_peaks.tri_count, mass_spectrum_peaks.di_count, mass_spectrum_peaks.adduct, mass_spectrum_peaks.charge, mass_spectrum_peaks.calculated_mz, mass_spectrum_peaks.spectrum_mz, mass_spectrum_peaks.separation_mz, mass_spectrum_peaks.intensity FROM mass_spectrum_peaks LEFT JOIN mass_spectra ON mass_spectrum_peaks.mass_spectrum_id = mass_spectra.id LEFT JOIN reactions ON mass_spectra.reaction_id = reactions.id LEFT JOIN precursors AS di ON reactions.di_name = di.name LEFT JOIN precursors AS tri ON reactions.tri_name = tri.name ORDER BY reactions.experiment, reactions.plate, reactions.formulation_number, mass_spectrum_peaks.spectrum_mz """, connection, )
[docs] def mass_spectrum_topology_assignments_df( connection: Connection, ) -> pl.DataFrame: """Return a DataFrame of mass spectrum topology assignments. Parameters: connection: A SQLite connection. Returns: A DataFrame of mass spectrum topology assignments. """ return pl.read_database( """ SELECT reactions.experiment, reactions.plate, reactions.formulation_number, di.name AS di_name, tri.name AS tri_name, mass_spectrum_peaks.tri_count, mass_spectrum_peaks.di_count, mass_spectrum_peaks.adduct, mass_spectrum_peaks.charge, mass_spectrum_peaks.calculated_mz, mass_spectrum_peaks.spectrum_mz, mass_spectrum_peaks.separation_mz, mass_spectrum_peaks.intensity, mass_spectrum_topology_assignments.topology FROM mass_spectrum_topology_assignments LEFT JOIN mass_spectrum_peaks ON mass_spectrum_peaks.id = mass_spectrum_topology_assignments.mass_spectrum_peak_id LEFT JOIN mass_spectra ON mass_spectrum_peaks.mass_spectrum_id = mass_spectra.id LEFT JOIN reactions ON mass_spectra.reaction_id = reactions.id LEFT JOIN precursors AS di ON reactions.di_name = di.name LEFT JOIN precursors AS tri ON reactions.tri_name = tri.name ORDER BY reactions.experiment, reactions.plate, reactions.formulation_number, mass_spectrum_peaks.spectrum_mz """, connection, )
[docs] def turbidity_dissolved_references_df(connection: Connection) -> pl.DataFrame: """Return a DataFrame of turbidity dissolved references. Parameters: connection: A SQLite connection. Returns: A DataFrame of turbidity dissolved references. """ return pl.read_database( """ SELECT reactions.experiment, reactions.plate, reactions.formulation_number, di.name AS di_name, tri.name AS tri_name, turbidity_dissolved_references.dissolved_reference FROM turbidity_dissolved_references LEFT JOIN reactions ON turbidity_dissolved_references.reaction_id = reactions.id LEFT JOIN precursors AS di ON reactions.di_name = di.name LEFT JOIN precursors AS tri ON reactions.tri_name = tri.name ORDER BY reactions.experiment, reactions.plate, reactions.formulation_number """, connection, )
[docs] def turbidity_measurements_df(connection: Connection) -> pl.DataFrame: """Return a DataFrame of turbidity measurements. Parameters: connection: A SQLite connection. Returns: A DataFrame of turbidity measurements. """ return ( pl.read_database( """ SELECT reactions.experiment, reactions.plate, reactions.formulation_number, di.name AS di_name, tri.name AS tri_name, turbidity_measurements.time, turbidity_measurements.turbidity FROM turbidity_measurements LEFT JOIN reactions ON turbidity_measurements.reaction_id = reactions.id LEFT JOIN precursors AS di ON reactions.di_name = di.name LEFT JOIN precursors AS tri ON reactions.tri_name = tri.name ORDER BY reactions.experiment, reactions.plate, reactions.formulation_number, turbidity_measurements.time """, connection, ) .with_columns( pl.col("time").str.to_datetime(), ) .sort(["experiment", "plate", "formulation_number", "time"]) )
[docs] def turbidity_states_df(connection: Connection) -> pl.DataFrame: """Return a DataFrame of turbidity states. Parameters: connection: A SQLite connection. Returns: A DataFrame of turbidity states. """ return pl.read_database( """ SELECT reactions.experiment, reactions.plate, reactions.formulation_number, di.name AS di_name, tri.name AS tri_name, turbidities.state FROM turbidities LEFT JOIN reactions ON turbidities.reaction_id = reactions.id LEFT JOIN precursors AS di ON reactions.di_name = di.name LEFT JOIN precursors AS tri ON reactions.tri_name = tri.name ORDER BY reactions.experiment, reactions.plate, reactions.formulation_number """, connection, )
[docs] def insert_precursors( connection: Connection, precursors: Iterable[Precursor], *, commit: bool = True, ) -> None: """Insert precursors into the database. Parameters: connection: A SQLite connection. precursors: The precursors to insert. commit: Whether to commit the transaction. """ connection.executemany( "INSERT INTO precursors (name, smiles) VALUES (:name, :smiles)", map(asdict, precursors), ) if commit: connection.commit()
[docs] def insert_reactions( connection: Connection, reactions: Iterable[Reaction], *, commit: bool = True, ) -> None: """Insert reactions into the database. Parameters: connection: A SQLite connection. reactions: The reactions to insert. commit: Whether to commit the transaction. """ connection.executemany( """ INSERT INTO reactions ( experiment, plate, formulation_number, di_name, tri_name ) VALUES ( :experiment, :plate, :formulation_number, :di_name, :tri_name ) """, map(asdict, reactions), ) if commit: connection.commit()
[docs] def insert_mass_spectrum( connection: Connection, reaction_key: ReactionKey, peaks: Sequence[MassSpectrumPeak], *, commit: bool = True, ) -> None: """Insert a mass spectrum into the database. Parameters: connection: A SQLite connection. reaction_key: The reaction key. peaks: The mass spectrum peaks. commit: Whether to commit the transaction. """ cursor = connection.execute( """ INSERT INTO mass_spectra (reaction_id) SELECT id FROM reactions WHERE experiment = :experiment AND plate = :plate AND formulation_number = :formulation_number """, asdict(reaction_key), ) _mass_spectrum_id = cursor.lastrowid if isinstance(_mass_spectrum_id, int): mass_spectrum_id = MassSpectrumId(_mass_spectrum_id) else: msg = "failed to insert mass spectrum" raise InsertMassSpectrumError(msg) connection.executemany( f""" INSERT INTO mass_spectrum_peaks ( mass_spectrum_id, di_count, tri_count, adduct, charge, calculated_mz, spectrum_mz, separation_mz, intensity ) VALUES ( {mass_spectrum_id}, :di_count, :tri_count, :adduct, :charge, :calculated_mz, :spectrum_mz, :separation_mz, :intensity ) """, # noqa: S608 map(asdict, peaks), ) if commit: connection.commit()
[docs] def insert_mass_spectrum_topology_assignments( connection: Connection, assignments: Iterable[MassSpectrumTopologyAssignment], *, commit: bool = True, ) -> None: """Insert mass spectrum topology assignments into the database. Parameters: connection: A SQLite connection. assignments: The mass spectrum topology assignments. commit: Whether to commit the transaction. """ connection.executemany( """ INSERT INTO mass_spectrum_topology_assignments ( mass_spectrum_peak_id, topology ) VALUES ( :mass_spectrum_peak_id, :topology ) """, map(asdict, assignments), ) if commit: connection.commit()
[docs] def mass_spectrum_peaks( connection: Connection, reaction_key: ReactionKey, ) -> Iterator[Row[MassSpectrumPeak]]: """Get the mass spectrum peaks for a reaction. Parameters: connection: A SQLite connection. reaction_key: The reaction key. """ for ( id_, di_count, tri_count, adduct, charge, calculated_mz, spectrum_mz, separation_mz, intensity, ) in connection.execute( """ SELECT mass_spectrum_peaks.id, mass_spectrum_peaks.di_count, mass_spectrum_peaks.tri_count, mass_spectrum_peaks.adduct, mass_spectrum_peaks.charge, mass_spectrum_peaks.calculated_mz, mass_spectrum_peaks.spectrum_mz, mass_spectrum_peaks.separation_mz, mass_spectrum_peaks.intensity FROM mass_spectrum_peaks JOIN mass_spectra ON mass_spectra.id = mass_spectrum_peaks.mass_spectrum_id JOIN reactions ON mass_spectra.reaction_id = reactions.id WHERE reactions.experiment = :experiment AND reactions.plate = :plate AND reactions.formulation_number = :formulation_number """, asdict(reaction_key), ): yield Row( id_, MassSpectrumPeak( di_count=di_count, tri_count=tri_count, adduct=adduct, charge=charge, calculated_mz=calculated_mz, spectrum_mz=spectrum_mz, separation_mz=separation_mz, intensity=intensity, ), )
[docs] def reaction_precursors( connection: Connection, reactions: Sequence[ReactionKey], ) -> Iterator[tuple[ReactionKey, Precursors]]: """Get the precursors for a set of reactions. Parameters: connection: A SQLite connection. reactions: The reactions. """ q = ",".join("(?,?,?)" for _ in range(len(reactions))) for ( experiment, plate, formulation_number, di_smiles, tri_smiles, ) in connection.execute( f""" SELECT reactions.experiment, reactions.plate, reactions.formulation_number, di.smiles AS di_smiles, tri.smiles AS tri_smiles FROM reactions LEFT JOIN precursors AS di ON reactions.di_name = di.name LEFT JOIN precursors AS tri ON reactions.tri_name = tri.name WHERE ( reactions.experiment, reactions.plate, reactions.formulation_number ) IN ({q}) """, tuple(value for reaction in reactions for value in astuple(reaction)), ): yield ( ReactionKey(experiment, plate, formulation_number), Precursors(di_smiles, tri_smiles), )
[docs] def insert_nmr_spectrum( connection: Connection, reaction_key: ReactionKey, spectrum: NmrSpectrum, *, commit: bool = True, ) -> None: """Insert an NMR spectrum into the database. Parameters: connection: A SQLite connection. reaction_key: The reaction key. spectrum: The NMR spectrum. commit: Whether to commit the transaction. """ cursor = connection.execute( """ INSERT INTO nmr_spectra (reaction_id) SELECT id FROM reactions WHERE experiment = :experiment AND plate = :plate AND formulation_number = :formulation_number """, asdict(reaction_key), ) _nmr_spectrum_id = cursor.lastrowid if isinstance(_nmr_spectrum_id, int): nmr_spectrum_id = NmrSpectrumId(_nmr_spectrum_id) else: msg = "failed to insert nmr spectrum" raise InsertNmrSpectrumError(msg) connection.executemany( f""" INSERT INTO nmr_aldehyde_peaks (nmr_spectrum_id, ppm, amplitude) VALUES ({nmr_spectrum_id}, :ppm, :amplitude) """, # noqa: S608 map(asdict, spectrum.aldehyde_peaks), ) connection.executemany( f""" INSERT INTO nmr_imine_peaks (nmr_spectrum_id, ppm, amplitude) VALUES ({nmr_spectrum_id}, :ppm, :amplitude) """, # noqa: S608 map(asdict, spectrum.imine_peaks), ) if commit: connection.commit()
[docs] def insert_turbidity( # noqa: PLR0913 connection: Connection, reaction_key: ReactionKey, dissolved_reference: float, data: dict[str, float], turbidity_state: TurbidState, *, commit: bool = True, ) -> None: """Insert turbidity data into the database. Parameters: connection: A SQLite connection. reaction_key: The reaction key. dissolved_reference: The dissolved reference. data: A map of times to turbidity values. turbidity_state: The turbidity state. commit: Whether to commit the transaction. """ reaction = asdict(reaction_key) connection.execute( """ INSERT INTO turbidity_dissolved_references (reaction_id, dissolved_reference) SELECT id, :dissolved_reference FROM reactions WHERE experiment = :experiment AND plate = :plate AND formulation_number = :formulation_number """, reaction | {"dissolved_reference": dissolved_reference}, ) connection.executemany( """ INSERT INTO turbidity_measurements (reaction_id, time, turbidity) SELECT id, :time, :turbidity FROM reactions WHERE experiment = :experiment AND plate = :plate AND formulation_number = :formulation_number """, ( reaction | { "time": datetime.strptime( time, "%Y_%m_%d_%H_%M_%S_%f" ).astimezone(), "turbidity": turbidity, } for time, turbidity in data.items() ), ) connection.execute( """ INSERT INTO turbidities (reaction_id, state) SELECT id, :state FROM reactions WHERE experiment = :experiment AND plate = :plate AND formulation_number = :formulation_number """, reaction | {"state": turbidity_state.value}, ) if commit: connection.commit()