Skip to content

Adding Markov Model #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Added Markov model to simulate timestamps of the household loads [#4](https://github.com/ie3-institute/simonaMarkovLoad/issues/4)
- `CODEOWNERS` file and dependabot automation [#7](https://github.com/ie3-institute/simonaMarkovLoad/issues/7)

### Changed
Expand Down
42 changes: 35 additions & 7 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np

from src.markov.plothelper import plot_transition_matrix
from src.markov.transmats import build_transition_matrices_parallel
from src.preprocessing.bucketing import add_bucket_columns
from src.preprocessing.loader import load_timeseries

PROCESSED_DIR = Path(__file__).resolve().parents[2] / "data" / "processed"
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)


def plot_state_distribution(df):

Expand All @@ -17,13 +26,32 @@ def plot_state_distribution(df):


def main():
df = load_timeseries()
print(df)
df_norm = load_timeseries(normalize=True)
print(df_norm)
df_disc = load_timeseries(normalize=True, discretize=True)
print(df_disc)
plot_state_distribution(df_disc)
df = load_timeseries(normalize=True, discretize=True)
df = add_bucket_columns(df)

n_states = int(df["state"].max()) + 1

counts, probs = build_transition_matrices_parallel(
df,
n_states=n_states,
alpha=0.5,
n_jobs=-1,
)

out_file = PROCESSED_DIR / "transition_matrices.npz"
np.savez_compressed(out_file, counts=counts, probs=probs)
print(
f"Gespeichert unter {out_file} "
f"(counts: {counts.shape}, probs: {probs.shape})"
)

data = np.load(PROCESSED_DIR / "transition_matrices.npz")
probs = data["probs"]

bucket_ids_to_plot = [0, 1767]

for bid in bucket_ids_to_plot:
plot_transition_matrix(probs[bid], bid)


if __name__ == "__main__":
Expand Down
31 changes: 31 additions & 0 deletions src/markov/plothelper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Tuple

import matplotlib.pyplot as plt
import numpy as np


def decode_bucket(bid: int) -> Tuple[int, int, int]:
month = bid // 192 + 1
weekend = (bid % 192) // 96
quarter = bid % 96
return month, weekend, quarter


def plot_transition_matrix(P: np.ndarray, bid: int) -> None:
month, weekend, quarter = decode_bucket(bid)

plt.figure(figsize=(5, 4))
plt.imshow(P, origin="lower")
plt.colorbar(label="P(i → j)")

plt.title(
f"Bucket {bid} (Monat {month}, "
f"{'WE' if weekend else 'WD'}, Q{quarter:02d})"
)

plt.xlabel("Folge‑State j")
plt.ylabel("Ausgangs‑State i")
plt.xticks(range(P.shape[0]))
plt.yticks(range(P.shape[0]))
plt.tight_layout()
plt.show()
57 changes: 57 additions & 0 deletions src/markov/transmats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from typing import Tuple

import numpy as np
import pandas as pd
from joblib import Parallel, delayed

from src.preprocessing.bucketing import TOTAL_BUCKETS


def _counts_for_bucket(
bid: int, bucket_ids: np.ndarray, idx_flat: np.ndarray, n_states: int
) -> Tuple[int, np.ndarray]:

rows = idx_flat[bucket_ids == bid]
C = (
np.bincount(rows, minlength=n_states * n_states)
.astype(np.uint32)
.reshape(n_states, n_states)
)
return bid, C


def build_transition_matrices_parallel(
df: pd.DataFrame,
*,
bucket_col: str = "bucket_id",
state_col: str = "state",
n_states: int,
alpha: float = 0.5,
n_jobs: int | None = -1,
) -> Tuple[np.ndarray, np.ndarray]:

df = df.sort_values("timestamp")
df["next_state"] = df[state_col].shift(-1)
df["next_bucket"] = df[bucket_col].shift(-1)

mask = df[bucket_col] == df["next_bucket"]
pairs = df.loc[mask, [bucket_col, state_col, "next_state"]].to_numpy(np.uint16)

bucket_ids = pairs[:, 0]
idx_flat = pairs[:, 1] * n_states + pairs[:, 2]

unique_buckets = np.arange(TOTAL_BUCKETS, dtype=np.uint16)
results = Parallel(n_jobs=n_jobs, backend="loky", verbose=5)(
delayed(_counts_for_bucket)(bid, bucket_ids, idx_flat, n_states)
for bid in unique_buckets
)

counts = np.zeros((TOTAL_BUCKETS, n_states, n_states), dtype=np.uint32)
for bid, C in results:
counts[bid] = C

counts_sm = counts.astype(np.float32) + alpha
row_sums = counts_sm.sum(axis=2, keepdims=True)
probs = (counts_sm / row_sums).astype(np.float32)

return counts, probs
80 changes: 80 additions & 0 deletions src/preprocessing/bucketing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Tuple

import pandas as pd

QUARTERS_PER_DAY: int = 96
WEEKEND_BUCKETS_PER_MONTH: int = 2 * QUARTERS_PER_DAY # 192
TOTAL_BUCKETS: int = 12 * WEEKEND_BUCKETS_PER_MONTH # 2_304


@dataclass(frozen=True, slots=True)
class Bucket:

month: int
weekend: int
quarter: int

def key(self) -> Tuple[int, int, int]:

return (self.month, self.weekend, self.quarter)

def index(self) -> int:

return (
(self.month - 1) * WEEKEND_BUCKETS_PER_MONTH
+ self.weekend * QUARTERS_PER_DAY
+ self.quarter
)

@classmethod
def from_timestamp(cls, ts: pd.Timestamp) -> "Bucket":

return cls(
month=ts.month,
weekend=int(ts.day_of_week >= 5),
quarter=ts.hour * 4 + ts.minute // 15,
)


def _timestamp_series(
df: pd.DataFrame, *, timestamp_col: str = "timestamp"
) -> pd.Series:
if not pd.api.types.is_datetime64_any_dtype(df[timestamp_col]):
return pd.to_datetime(df[timestamp_col], utc=False, infer_datetime_format=True)
return df[timestamp_col]


def add_bucket_columns(
df: pd.DataFrame,
*,
timestamp_col: str = "timestamp",
inplace: bool = False,
) -> pd.DataFrame:

base = df if inplace else df.copy()

ts = _timestamp_series(base, timestamp_col=timestamp_col)

base["month"] = ts.dt.month
base["weekend"] = (ts.dt.dayofweek >= 5).astype("uint8")
base["quarter"] = (ts.dt.hour * 4 + ts.dt.minute // 15).astype("uint16")

base["bucket_id"] = (
(base["month"] - 1) * WEEKEND_BUCKETS_PER_MONTH
+ base["weekend"] * QUARTERS_PER_DAY
+ base["quarter"]
).astype("uint16")

return base


__all__ = [
"Bucket",
"add_bucket_columns",
"QUARTERS_PER_DAY",
"WEEKEND_BUCKETS_PER_MONTH",
"TOTAL_BUCKETS",
]