From 5b1229b54a80f9ce7fc14e38f998733cf7241fe9 Mon Sep 17 00:00:00 2001 From: Philipp Schmelter Date: Tue, 20 May 2025 14:46:04 +0200 Subject: [PATCH] markov --- CHANGELOG.md | 3 ++ src/main.py | 42 +++++++++++++++--- src/markov/plothelper.py | 31 +++++++++++++ src/markov/transmats.py | 57 ++++++++++++++++++++++++ src/preprocessing/bucketing.py | 80 ++++++++++++++++++++++++++++++++++ 5 files changed, 206 insertions(+), 7 deletions(-) create mode 100644 src/markov/plothelper.py create mode 100644 src/markov/transmats.py create mode 100644 src/preprocessing/bucketing.py diff --git a/CHANGELOG.md b/CHANGELOG.md index a49e8ea..b9a7d22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,5 +6,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- Added Markov model to simulate timestamps of the household loads [#4](https://github.com/ie3-institute/simonaMarkovLoad/issues/4) + ### Changed - Compute instantaneous kW from cumulative kWh via 15-minute differencing [#1](https://github.com/ie3-institute/simonaMarkovLoad/issues/1) \ No newline at end of file diff --git a/src/main.py b/src/main.py index 8e867e3..5c74780 100644 --- a/src/main.py +++ b/src/main.py @@ -1,7 +1,16 @@ +from pathlib import Path + import matplotlib.pyplot as plt +import numpy as np +from src.markov.plothelper import plot_transition_matrix +from src.markov.transmats import build_transition_matrices_parallel +from src.preprocessing.bucketing import add_bucket_columns from src.preprocessing.loader import load_timeseries +PROCESSED_DIR = Path(__file__).resolve().parents[2] / "data" / "processed" +PROCESSED_DIR.mkdir(parents=True, exist_ok=True) + def plot_state_distribution(df): @@ -17,13 +26,32 @@ def plot_state_distribution(df): def main(): - df = load_timeseries() - print(df) - df_norm = load_timeseries(normalize=True) - print(df_norm) - df_disc = load_timeseries(normalize=True, discretize=True) - print(df_disc) - plot_state_distribution(df_disc) + df = load_timeseries(normalize=True, discretize=True) + df = add_bucket_columns(df) + + n_states = int(df["state"].max()) + 1 + + counts, probs = build_transition_matrices_parallel( + df, + n_states=n_states, + alpha=0.5, + n_jobs=-1, + ) + + out_file = PROCESSED_DIR / "transition_matrices.npz" + np.savez_compressed(out_file, counts=counts, probs=probs) + print( + f"Gespeichert unter {out_file} " + f"(counts: {counts.shape}, probs: {probs.shape})" + ) + + data = np.load(PROCESSED_DIR / "transition_matrices.npz") + probs = data["probs"] + + bucket_ids_to_plot = [0, 1767] + + for bid in bucket_ids_to_plot: + plot_transition_matrix(probs[bid], bid) if __name__ == "__main__": diff --git a/src/markov/plothelper.py b/src/markov/plothelper.py new file mode 100644 index 0000000..4065912 --- /dev/null +++ b/src/markov/plothelper.py @@ -0,0 +1,31 @@ +from typing import Tuple + +import matplotlib.pyplot as plt +import numpy as np + + +def decode_bucket(bid: int) -> Tuple[int, int, int]: + month = bid // 192 + 1 + weekend = (bid % 192) // 96 + quarter = bid % 96 + return month, weekend, quarter + + +def plot_transition_matrix(P: np.ndarray, bid: int) -> None: + month, weekend, quarter = decode_bucket(bid) + + plt.figure(figsize=(5, 4)) + plt.imshow(P, origin="lower") + plt.colorbar(label="P(i → j)") + + plt.title( + f"Bucket {bid} (Monat {month}, " + f"{'WE' if weekend else 'WD'}, Q{quarter:02d})" + ) + + plt.xlabel("Folge‑State j") + plt.ylabel("Ausgangs‑State i") + plt.xticks(range(P.shape[0])) + plt.yticks(range(P.shape[0])) + plt.tight_layout() + plt.show() diff --git a/src/markov/transmats.py b/src/markov/transmats.py new file mode 100644 index 0000000..acabae4 --- /dev/null +++ b/src/markov/transmats.py @@ -0,0 +1,57 @@ +from typing import Tuple + +import numpy as np +import pandas as pd +from joblib import Parallel, delayed + +from src.preprocessing.bucketing import TOTAL_BUCKETS + + +def _counts_for_bucket( + bid: int, bucket_ids: np.ndarray, idx_flat: np.ndarray, n_states: int +) -> Tuple[int, np.ndarray]: + + rows = idx_flat[bucket_ids == bid] + C = ( + np.bincount(rows, minlength=n_states * n_states) + .astype(np.uint32) + .reshape(n_states, n_states) + ) + return bid, C + + +def build_transition_matrices_parallel( + df: pd.DataFrame, + *, + bucket_col: str = "bucket_id", + state_col: str = "state", + n_states: int, + alpha: float = 0.5, + n_jobs: int | None = -1, +) -> Tuple[np.ndarray, np.ndarray]: + + df = df.sort_values("timestamp") + df["next_state"] = df[state_col].shift(-1) + df["next_bucket"] = df[bucket_col].shift(-1) + + mask = df[bucket_col] == df["next_bucket"] + pairs = df.loc[mask, [bucket_col, state_col, "next_state"]].to_numpy(np.uint16) + + bucket_ids = pairs[:, 0] + idx_flat = pairs[:, 1] * n_states + pairs[:, 2] + + unique_buckets = np.arange(TOTAL_BUCKETS, dtype=np.uint16) + results = Parallel(n_jobs=n_jobs, backend="loky", verbose=5)( + delayed(_counts_for_bucket)(bid, bucket_ids, idx_flat, n_states) + for bid in unique_buckets + ) + + counts = np.zeros((TOTAL_BUCKETS, n_states, n_states), dtype=np.uint32) + for bid, C in results: + counts[bid] = C + + counts_sm = counts.astype(np.float32) + alpha + row_sums = counts_sm.sum(axis=2, keepdims=True) + probs = (counts_sm / row_sums).astype(np.float32) + + return counts, probs diff --git a/src/preprocessing/bucketing.py b/src/preprocessing/bucketing.py new file mode 100644 index 0000000..f28f571 --- /dev/null +++ b/src/preprocessing/bucketing.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Tuple + +import pandas as pd + +QUARTERS_PER_DAY: int = 96 +WEEKEND_BUCKETS_PER_MONTH: int = 2 * QUARTERS_PER_DAY # 192 +TOTAL_BUCKETS: int = 12 * WEEKEND_BUCKETS_PER_MONTH # 2_304 + + +@dataclass(frozen=True, slots=True) +class Bucket: + + month: int + weekend: int + quarter: int + + def key(self) -> Tuple[int, int, int]: + + return (self.month, self.weekend, self.quarter) + + def index(self) -> int: + + return ( + (self.month - 1) * WEEKEND_BUCKETS_PER_MONTH + + self.weekend * QUARTERS_PER_DAY + + self.quarter + ) + + @classmethod + def from_timestamp(cls, ts: pd.Timestamp) -> "Bucket": + + return cls( + month=ts.month, + weekend=int(ts.day_of_week >= 5), + quarter=ts.hour * 4 + ts.minute // 15, + ) + + +def _timestamp_series( + df: pd.DataFrame, *, timestamp_col: str = "timestamp" +) -> pd.Series: + if not pd.api.types.is_datetime64_any_dtype(df[timestamp_col]): + return pd.to_datetime(df[timestamp_col], utc=False, infer_datetime_format=True) + return df[timestamp_col] + + +def add_bucket_columns( + df: pd.DataFrame, + *, + timestamp_col: str = "timestamp", + inplace: bool = False, +) -> pd.DataFrame: + + base = df if inplace else df.copy() + + ts = _timestamp_series(base, timestamp_col=timestamp_col) + + base["month"] = ts.dt.month + base["weekend"] = (ts.dt.dayofweek >= 5).astype("uint8") + base["quarter"] = (ts.dt.hour * 4 + ts.dt.minute // 15).astype("uint16") + + base["bucket_id"] = ( + (base["month"] - 1) * WEEKEND_BUCKETS_PER_MONTH + + base["weekend"] * QUARTERS_PER_DAY + + base["quarter"] + ).astype("uint16") + + return base + + +__all__ = [ + "Bucket", + "add_bucket_columns", + "QUARTERS_PER_DAY", + "WEEKEND_BUCKETS_PER_MONTH", + "TOTAL_BUCKETS", +]