Skip to content

historical data script #101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 54 additions & 5 deletions pvliveconsumer/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import logging
import os
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import List, Optional

import click
import numpy as np
import pandas as pd
import pytz
import sentry_sdk
from nowcasting_datamodel.connection import DatabaseConnection
from nowcasting_datamodel.models.base import Base_Forecast
Expand Down Expand Up @@ -41,11 +43,16 @@
sentry_sdk.set_tag("version", pvliveconsumer.__version__)

pvlive_domain_url = os.getenv("PVLIVE_DOMAIN_URL", "api.pvlive.uk")
# ignore these gsp ids from PVLive as they are no longer used
ignore_gsp_ids = [5, 17, 53, 75, 139, 140, 143, 157, 163, 225, 310]


@click.command()
@click.group()
def cli():
"""PVLive Consumer CLI"""
pass


@cli.command()
@click.option(
"--db-url",
default=None,
Expand Down Expand Up @@ -78,7 +85,7 @@
"--uk-london-time-hour",
default=None,
envvar="UK_LONDON_HOUR",
help="Optionl to only run code if UK time hour matches code this value. "
help="Optional to only run code if UK time hour matches code this value. "
"This is to solve clock change issues when running with cron in UTC.",
type=click.INT,
)
Expand All @@ -96,7 +103,7 @@ def app(
:param regime: if its "in-day" or "day-after"
:param n_gsps: How many gsps of data to pull
:param include_national: optional if to get national data or not
:param uk_london_time_hour: Optionl to only run code if UK time hour matches code this value.
:param uk_london_time_hour: Optional to only run code if UK time hour matches code this value.
This is to solve clock change issues when running with cron in UTC.
"""

Expand Down Expand Up @@ -137,6 +144,48 @@ def app(
pull_data_and_save(gsps=gsps, session=session, regime=regime)


# ADD NEW COMMAND HERE
@cli.command("extract-historical")
@click.option("--start", type=click.DateTime(), required=True, help="Start date (YYYY-MM-DD)")
@click.option("--end", type=click.DateTime(), required=True, help="End date (YYYY-MM-DD)")
@click.option("--output", type=click.Path(), required=True, help="Output zarr file path")
@click.option("--gsp-ids", default=None, help="Comma-separated GSP IDs (default: all)")
@click.option(
"--db-url",
default=None,
envvar="DB_URL",
help="Database URL to get GSP list from",
type=click.STRING,
)
def extract_historical(start, end, output, gsp_ids, db_url):
"""Extract historical GSP data to Zarr format"""
from pvliveconsumer.scripts.extract_historical_gsp import HistoricalGSPExtractor

# Convert dates to UTC
start_utc = start.replace(tzinfo=pytz.UTC)
end_utc = end.replace(tzinfo=pytz.UTC)

# Parse GSP IDs if provided
gsp_id_list = None
if gsp_ids:
gsp_id_list = [int(x.strip()) for x in gsp_ids.split(",")]
elif db_url:
# Get GSP IDs from database
connection = DatabaseConnection(url=db_url, base=Base_Forecast, echo=False)
with connection.get_session() as session:
gsps = get_gsps(session=session, n_gsps=342, include_national=True)
gsp_id_list = [gsp.gsp_id for gsp in gsps]

# Extract data
extractor = HistoricalGSPExtractor()
extractor.extract_gsp_data(
start=start_utc, end=end_utc, output_path=Path(output), gsp_ids=gsp_id_list
)

click.echo(f"✅ Historical data extracted to {output}")


# Keep all existing functions unchanged
def pull_data_and_save(
gsps: List[LocationSQL],
session: Session,
Expand Down Expand Up @@ -297,4 +346,4 @@ def save_to_database(session: Session, gsp_yields: List[GSPYieldSQL]):


if __name__ == "__main__":
app()
cli() # Changed from app() to cli()
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ sentry-sdk==2.13.0
click
pvlib
pvlive-api==1.4.0
xarray>=2023.1.0
zarr>=2.13.0
numcodecs>=0.11.0
pytz>=2023.1
204 changes: 204 additions & 0 deletions scripts/extract_historical_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
"""
Extract Historical GSP Data to Zarr Format

Migrated from archived nowcasting_dataset repository
Author: Peter Dudfield (Original), Your Name (Migration)
Date: 2025-01-22

Pulls raw PV GSP data from Sheffield Solar API and saves to compressed Zarr format.
Data volume: ~1MB per month of data
"""
import logging
from datetime import datetime
from pathlib import Path
from typing import List, Optional

import numcodecs
import pandas as pd
import pytz
import xarray as xr
import yaml
from pvlive_api import PVLive

# Use existing logging setup from pvlive-consumer
logger = logging.getLogger(__name__)


class HistoricalGSPExtractor:
"""Extract and process historical GSP data from PVLive API"""

def __init__(self, pvlive_domain: str = "api.pvlive.uk"):
"""
Initialize the extractor

Args:
pvlive_domain: PVLive API domain URL
"""
self.pvlive = PVLive(domain_url=pvlive_domain)
# Use the same ignored GSP IDs as the main app
self.ignore_gsp_ids = [5, 17, 53, 75, 139, 140, 143, 157, 163, 225, 310]

def extract_gsp_data(
self,
start: datetime,
end: datetime,
output_path: Path,
gsp_ids: Optional[List[int]] = None,
normalize_data: bool = False,
) -> None:
"""
Extract GSP data and save to Zarr format

Args:
start: Start datetime (UTC)
end: End datetime (UTC)
output_path: Path to save the zarr file
gsp_ids: List of GSP IDs to extract (if None, extracts all)
normalize_data: Whether to normalize the data
"""
logger.info(f"Extracting GSP data from {start} to {end}")

# Get GSP data
data_df = self._load_pv_gsp_raw_data(start, end, gsp_ids, normalize_data)

if data_df.empty:
logger.warning("No data retrieved from PVLive API")
return

# Process and convert to xarray
xarray_dataset = self._process_dataframe_to_xarray(data_df)

# Save to zarr with compression
self._save_to_zarr(xarray_dataset, output_path, start, end)

logger.info(f"Successfully saved {len(data_df)} records to {output_path}")

def _load_pv_gsp_raw_data(
self, start: datetime, end: datetime, gsp_ids: Optional[List[int]], normalize_data: bool
) -> pd.DataFrame:
"""Load raw GSP data from PVLive API"""

if gsp_ids is None:
# Use all GSP IDs except ignored ones
gsp_ids = [i for i in range(0, 343) if i not in self.ignore_gsp_ids]

all_data = []

for gsp_id in gsp_ids:
try:
logger.debug(f"Fetching data for GSP {gsp_id}")

gsp_data = self.pvlive.between(
start=start,
end=end,
entity_type="gsp",
entity_id=gsp_id,
dataframe=True,
extra_fields="installedcapacity_mwp,capacity_mwp,updated_gmt",
)

if not gsp_data.empty:
gsp_data["gsp_id"] = gsp_id
all_data.append(gsp_data)

except Exception as e:
logger.warning(f"Failed to fetch data for GSP {gsp_id}: {e}")
continue

if not all_data:
return pd.DataFrame()

combined_df = pd.concat(all_data, ignore_index=True)
logger.info(f"Retrieved data for {len(combined_df)} records across {len(all_data)} GSPs")

return combined_df

def _process_dataframe_to_xarray(self, data_df: pd.DataFrame) -> xr.Dataset:
"""Convert dataframe to xarray dataset with proper structure"""

logger.debug("Converting DataFrame to xarray Dataset")

# Pivot data to get datetime x gsp_id matrices
data_generation_df = data_df.pivot(
index="datetime_gmt", columns="gsp_id", values="generation_mw"
)
data_installedcapacity_df = data_df.pivot(
index="datetime_gmt", columns="gsp_id", values="installedcapacity_mwp"
)
data_capacity_df = data_df.pivot(
index="datetime_gmt", columns="gsp_id", values="capacity_mwp"
)
data_updated_gmt_df = data_df.pivot(
index="datetime_gmt", columns="gsp_id", values="updated_gmt"
)

# Create xarray dataset
dataset = xr.Dataset(
data_vars={
"generation_mw": (("datetime_gmt", "gsp_id"), data_generation_df),
"installedcapacity_mwp": (("datetime_gmt", "gsp_id"), data_installedcapacity_df),
"capacity_mwp": (("datetime_gmt", "gsp_id"), data_capacity_df),
"updated_gmt": (("datetime_gmt", "gsp_id"), data_updated_gmt_df),
},
coords={"datetime_gmt": data_generation_df.index, "gsp_id": data_generation_df.columns},
attrs={
"title": "Historical GSP PV Generation Data",
"source": "Sheffield Solar PVLive API",
"created_by": "pvlive-consumer historical extractor",
},
)

return dataset

def _save_to_zarr(
self, dataset: xr.Dataset, output_path: Path, start: datetime, end: datetime
) -> None:
"""Save xarray dataset to zarr with compression and metadata"""

# Create output directory
output_path.parent.mkdir(parents=True, exist_ok=True)

# Configure compression
encoding = {
var: {"compressor": numcodecs.Blosc(cname="zstd", clevel=5)}
for var in dataset.data_vars
}

# Save to zarr
dataset.to_zarr(output_path, mode="w", encoding=encoding)

# Save configuration metadata
config = {
"extraction_config": {
"start_date": start.isoformat(),
"end_date": end.isoformat(),
"created_at": datetime.now(pytz.UTC).isoformat(),
"n_gsps": len(dataset.gsp_id),
"n_timestamps": len(dataset.datetime_gmt),
"data_source": "Sheffield Solar PVLive API",
"extractor_version": "2.0",
}
}

config_path = output_path.parent / "extraction_metadata.yaml"
with open(config_path, "w") as f:
yaml.dump(config, f, default_flow_style=False)

logger.info(f"Saved metadata to {config_path}")


def main():
"""Main execution function for standalone use"""

# Configuration - adjust these as needed
start = datetime(2016, 1, 1, tzinfo=pytz.UTC)
end = datetime(2021, 10, 1, tzinfo=pytz.UTC)
output_path = Path("./data/historical_gsp_data.zarr")

# Initialize and run extractor
extractor = HistoricalGSPExtractor()
extractor.extract_gsp_data(start=start, end=end, output_path=output_path, normalize_data=False)


if __name__ == "__main__":
main()