Source code for seastersdb.connect
import logging
from typing import Any, Dict
import duckdb
from duckdb.typing import ( # from duckdb.sqltypes import TIMESTAMP, BOOLEAN
BOOLEAN,
TIMESTAMP,
)
from ._get_pathdb import get_pathdb
__all__ = ["connect"]
log = logging.getLogger(__name__)
[docs]
def connect(**read_parquet_kws: Dict[str, Any]) -> duckdb.DuckDBPyConnection:
"""
Establish a DuckDB connection and register macros and utility functions for
accessing the SEASTERS database.
The connection is configured so that all ``read_parquet`` calls use
``union_by_name=True`` by default, ensuring consistent schema handling across
heterogeneous files. The function creates a set of SQL macros that provide
convenient access to data and metadata for each supported network, as well as
a dedicated macro for BSRN datasets. A custom ``overlaps`` function is also
registered for time-range filtering within SQL queries.
The created macros include:
- ``ghcnd()``, ``ghcnh()``, ``gsdr()``, ``bsrn()`` for bulk data access.
- ``<network>_stations()`` and ``<network>_inv()`` for metadata.
- ``<network>_var()`` for variable definitions (when available).
A dedicated ``bsrn(dataset)`` macro provides dataset-wise access to BSRN files.
A custom ``overlaps`` function is added for evaluating temporal coverage in
SQL queries.
Parameters
----------
**read_parquet_kws
Additional keyword arguments forwarded to DuckDB's ``read_parquet``.
These are applied to all automatically created macros. The option
``union_by_name=True`` is always enforced and cannot be overridden.
Returns
-------
con : duckdb.DuckDBPyConnection
A live DuckDB connection with all SEASTERS macros and custom functions
registered.
"""
# Process kwargs
forced_kws: Dict[str, Any] = dict(union_by_name=True)
read_parquet_kws.update(forced_kws)
# Original DuckDB connection
con = duckdb.connect()
log.info("Completed connection to DuckDB's API.")
# Create path macros
def create_path_macro(name: str, path: str, **kws) -> None:
str_kws = ", ".join(f"{k} = {v!r}" for k, v in kws.items())
con.execute(f"""
CREATE MACRO {name}()
AS TABLE
FROM read_parquet(
'{path}'{(',' + str_kws) if str_kws else ''}
)
""") # noqa: E202
pathdb = get_pathdb()
for network in ["BSRN", "GHCNd", "GHCNh", "GSDR"]:
networkl = network.lower()
if network != "BSRN":
create_path_macro(
networkl, f"{pathdb}/{network}/*.parquet", **read_parquet_kws
)
create_path_macro(
f"{networkl}_stations",
f"{pathdb}/metadata/stations-{networkl}.parquet",
)
create_path_macro(
f"{networkl}_inv",
f"{pathdb}/metadata/inventory-{networkl}.parquet",
)
if network != "GSDR":
create_path_macro(
f"{networkl}_var",
f"{pathdb}/metadata/variables-{networkl}.parquet",
)
str_kws = ", ".join(f"{k} = {v!r}" for k, v in read_parquet_kws.items())
con.execute(f"""
CREATE MACRO bsrn(dataset)
AS TABLE
FROM read_parquet(
'{pathdb}/BSRN/station_id=*/*_' ||
dataset ||
'.parquet'
{(',' + str_kws) if str_kws else ''}
)
""") # noqa: E202, E222
log.info("Created macros to access the SEASTERS database.")
# Record functions
def where_overlaps(ta1, ta2, tb1, tb2):
return tb1 <= ta2 and tb2 >= ta1
con.create_function("overlaps", where_overlaps, 4 * [TIMESTAMP], BOOLEAN)
log.info("Created custom functions ('overlaps').")
return con