Hydrology Basics

Python ソースファイル

サイト本文とは別ファイルとして、実行できる水文解析サンプルを public/code/ に同梱しています。

使い方

Python 文法の学習ページではなく、水文解析の考え方を確認するための最小サンプルです。各ファイルは public/code/ に配置しているため、ロリポップへ dist/ をアップロードした後も /code/ファイル名.py として直接ダウンロードできます。

cd hydro-basics-astro/public/code
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
python 00_run_all_examples.py

同梱ソース

00_run_all_examples.py

同梱サンプルをまとめて実行する補助スクリプト。

ソースをダウンロード
#!/usr/bin/env python3
"""同梱している水文解析サンプルをまとめて実行する補助スクリプト。

実行例:
    cd public/code
    python 00_run_all_examples.py

各スクリプトの出力は public/code/outputs/ に作成されます。
サブプロセスを使わず同一プロセスで順番に実行するため、WSL や共有サーバーの
軽量環境でも確認しやすい構成です。
"""
from __future__ import annotations

import runpy
import sys
from pathlib import Path


SCRIPTS = [
    "rainfall_summary.py",
    "horton_infiltration.py",
    "runoff_scs_cn.py",
    "muskingum_example.py",
    "water_balance_bucket.py",
    "flow_duration_curve.py",
]


def main() -> None:
    here = Path(__file__).resolve().parent
    output_dir = here / "outputs"
    output_dir.mkdir(parents=True, exist_ok=True)

    original_argv = sys.argv[:]
    try:
        for script in SCRIPTS:
            print(f"\n===== {script} =====")
            script_path = here / script
            sys.argv = [str(script_path), "--output-dir", str(output_dir)]
            runpy.run_path(str(script_path), run_name="__main__")
    finally:
        sys.argv = original_argv


if __name__ == "__main__":
    main()

rainfall_summary.py

時間雨量CSVから総雨量、最大雨量、累加雨量、図を作成します。

ソースをダウンロード
#!/usr/bin/env python3
"""時間雨量データの基本集計サンプル。

入力CSVの形式:
    datetime,rain_mm
    2026-07-01 00:00,0.0

出力:
    - 総雨量
    - 最大1時間雨量
    - 最大3時間雨量
    - 累加雨量CSV
    - ハイエトグラフPNG

実行例:
    python rainfall_summary.py
    python rainfall_summary.py --csv ../data/sample_hourly_rainfall.csv --output-dir outputs
"""
from __future__ import annotations

import argparse
from pathlib import Path

import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import pandas as pd


def default_csv_path() -> Path:
    """このスクリプトから見たサンプルCSVの既定パスを返す。"""
    return Path(__file__).resolve().parents[1] / "data" / "sample_hourly_rainfall.csv"


def read_rainfall(csv_path: Path) -> pd.DataFrame:
    """時間雨量CSVを読み込み、時刻順に整列する。"""
    rain = pd.read_csv(csv_path, parse_dates=["datetime"])
    required = {"datetime", "rain_mm"}
    missing = required - set(rain.columns)
    if missing:
        raise ValueError(f"CSVに必要な列がありません: {sorted(missing)}")

    rain = rain.sort_values("datetime").reset_index(drop=True)
    rain["rain_mm"] = pd.to_numeric(rain["rain_mm"], errors="coerce").fillna(0.0)
    rain["cumulative_mm"] = rain["rain_mm"].cumsum()
    rain["rolling_3h_mm"] = rain["rain_mm"].rolling(window=3, min_periods=1).sum()
    return rain


def summarize(rain: pd.DataFrame) -> dict[str, float | str]:
    """水文解析で最初に確認する基本統計量を作る。"""
    peak_idx = rain["rain_mm"].idxmax()
    max_3h_idx = rain["rolling_3h_mm"].idxmax()
    return {
        "total_rainfall_mm": round(float(rain["rain_mm"].sum()), 3),
        "max_1h_rainfall_mm": round(float(rain.loc[peak_idx, "rain_mm"]), 3),
        "max_1h_time": str(rain.loc[peak_idx, "datetime"]),
        "max_3h_rainfall_mm": round(float(rain.loc[max_3h_idx, "rolling_3h_mm"]), 3),
        "max_3h_end_time": str(rain.loc[max_3h_idx, "datetime"]),
    }


def plot_hyetograph(rain: pd.DataFrame, output_png: Path) -> None:
    """時間雨量と累加雨量の図を作成する。"""
    fig, ax1 = plt.subplots(figsize=(9, 4.8))
    ax1.bar(rain["datetime"], rain["rain_mm"], width=0.03, label="Hourly rainfall")
    ax1.set_ylabel("Hourly rainfall [mm/h]")
    ax1.set_xlabel("Datetime")
    ax1.tick_params(axis="x", rotation=35)

    ax2 = ax1.twinx()
    ax2.plot(rain["datetime"], rain["cumulative_mm"], marker="o", label="Cumulative rainfall")
    ax2.set_ylabel("Cumulative rainfall [mm]")

    fig.tight_layout()
    fig.savefig(output_png, dpi=160)
    plt.close(fig)


def main() -> None:
    parser = argparse.ArgumentParser(description="時間雨量CSVの基本集計")
    parser.add_argument("--csv", type=Path, default=default_csv_path(), help="入力CSV")
    parser.add_argument("--output-dir", type=Path, default=Path("outputs"), help="出力フォルダ")
    args = parser.parse_args()

    args.output_dir.mkdir(parents=True, exist_ok=True)
    rain = read_rainfall(args.csv)
    summary = summarize(rain)

    output_csv = args.output_dir / "rainfall_summary_with_cumulative.csv"
    output_png = args.output_dir / "rainfall_hyetograph.png"
    rain.to_csv(output_csv, index=False, encoding="utf-8-sig")
    plot_hyetograph(rain, output_png)

    print("=== Rainfall summary ===")
    for key, value in summary.items():
        print(f"{key}: {value}")
    print(f"Saved: {output_csv}")
    print(f"Saved: {output_png}")


if __name__ == "__main__":
    main()

horton_infiltration.py

Horton式で浸透能、実浸透量、超過雨量を計算します。

ソースをダウンロード
#!/usr/bin/env python3
"""Horton 式による浸透能と有効雨量の簡易計算。

Horton 式:
    f(t) = fc + (f0 - fc) * exp(-k t)

ここでは 1 時間雨量データを使い、各時刻で
    実浸透量 = min(雨量, 浸透能)
    地表流出に回る雨量 = max(0, 雨量 - 浸透能)
を計算します。
"""
from __future__ import annotations

import argparse
from pathlib import Path

import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd


def default_csv_path() -> Path:
    return Path(__file__).resolve().parents[1] / "data" / "sample_hourly_rainfall.csv"


def horton_capacity(time_hour: np.ndarray, f0: float, fc: float, k: float) -> np.ndarray:
    """Horton 式で浸透能 [mm/h] を計算する。"""
    if f0 < fc:
        raise ValueError("f0 は fc 以上にしてください。")
    if k < 0:
        raise ValueError("k は 0 以上にしてください。")
    return fc + (f0 - fc) * np.exp(-k * time_hour)


def calculate_infiltration(rain: pd.DataFrame, f0: float, fc: float, k: float, dt_hour: float) -> pd.DataFrame:
    """雨量・浸透能・実浸透量・超過雨量を計算する。"""
    result = rain.copy()
    result["t_hour"] = np.arange(len(result)) * dt_hour
    result["capacity_mm_h"] = horton_capacity(result["t_hour"].to_numpy(), f0, fc, k)
    result["actual_infiltration_mm"] = np.minimum(result["rain_mm"], result["capacity_mm_h"] * dt_hour)
    result["excess_rainfall_mm"] = np.maximum(0.0, result["rain_mm"] - result["actual_infiltration_mm"])
    result["cum_excess_mm"] = result["excess_rainfall_mm"].cumsum()
    return result


def plot_result(result: pd.DataFrame, output_png: Path) -> None:
    fig, ax = plt.subplots(figsize=(9, 4.8))
    ax.bar(result["datetime"], result["rain_mm"], width=0.03, label="Rainfall")
    ax.plot(result["datetime"], result["capacity_mm_h"], marker="o", label="Infiltration capacity")
    ax.plot(result["datetime"], result["excess_rainfall_mm"], marker="s", label="Excess rainfall")
    ax.set_xlabel("Datetime")
    ax.set_ylabel("mm or mm/h")
    ax.tick_params(axis="x", rotation=35)
    ax.legend()
    fig.tight_layout()
    fig.savefig(output_png, dpi=160)
    plt.close(fig)


def main() -> None:
    parser = argparse.ArgumentParser(description="Horton 式による浸透計算")
    parser.add_argument("--csv", type=Path, default=default_csv_path(), help="時間雨量CSV")
    parser.add_argument("--f0", type=float, default=35.0, help="初期浸透能 [mm/h]")
    parser.add_argument("--fc", type=float, default=6.0, help="最終浸透能 [mm/h]")
    parser.add_argument("--k", type=float, default=0.45, help="低減係数 [1/h]")
    parser.add_argument("--dt-hour", type=float, default=1.0, help="時間刻み [hour]")
    parser.add_argument("--output-dir", type=Path, default=Path("outputs"), help="出力フォルダ")
    args = parser.parse_args()

    args.output_dir.mkdir(parents=True, exist_ok=True)
    rain = pd.read_csv(args.csv, parse_dates=["datetime"]).sort_values("datetime")
    result = calculate_infiltration(rain, args.f0, args.fc, args.k, args.dt_hour)

    output_csv = args.output_dir / "horton_infiltration_result.csv"
    output_png = args.output_dir / "horton_infiltration_result.png"
    result.to_csv(output_csv, index=False, encoding="utf-8-sig")
    plot_result(result, output_png)

    print("=== Horton infiltration ===")
    print(f"Total rainfall       : {result['rain_mm'].sum():.2f} mm")
    print(f"Total infiltration   : {result['actual_infiltration_mm'].sum():.2f} mm")
    print(f"Total excess rainfall: {result['excess_rainfall_mm'].sum():.2f} mm")
    print(f"Saved: {output_csv}")
    print(f"Saved: {output_png}")


if __name__ == "__main__":
    main()

runoff_scs_cn.py

SCS-CN法で有効雨量を求め、簡易単位図で流出量へ変換します。

ソースをダウンロード
#!/usr/bin/env python3
"""SCS Curve Number 法による有効雨量と簡易流出ハイドログラフ。

このサンプルは、降雨から有効雨量を求め、非常に単純な三角形単位図で
流出ハイドログラフへ変換する学習用コードです。実務では、流域特性、
時間分布、土地利用、土壌、初期損失、基底流などの設定が重要です。
"""
from __future__ import annotations

import argparse
from pathlib import Path

import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd


def default_csv_path() -> Path:
    return Path(__file__).resolve().parents[1] / "data" / "sample_hourly_rainfall.csv"


def scs_effective_rainfall(rain_mm: np.ndarray, curve_number: float) -> np.ndarray:
    """累加雨量から SCS-CN 法で時刻別有効雨量を求める。"""
    if not (1.0 < curve_number <= 100.0):
        raise ValueError("curve_number は 1 より大きく 100 以下にしてください。")

    storage_s = 25400.0 / curve_number - 254.0  # [mm]
    initial_abstraction = 0.2 * storage_s
    cumulative_p = np.cumsum(rain_mm)

    cumulative_pe = np.zeros_like(cumulative_p, dtype=float)
    mask = cumulative_p > initial_abstraction
    cumulative_pe[mask] = (cumulative_p[mask] - initial_abstraction) ** 2 / (
        cumulative_p[mask] + 0.8 * storage_s
    )
    incremental_pe = np.diff(np.r_[0.0, cumulative_pe])
    return np.maximum(incremental_pe, 0.0)


def triangular_unit_hydrograph(length: int = 8) -> np.ndarray:
    """面積1になる三角形単位図を返す。"""
    rising = np.linspace(0.0, 1.0, length // 2 + 1)
    falling = np.linspace(1.0, 0.0, length - len(rising) + 2)[1:]
    uh = np.r_[rising, falling]
    uh = np.maximum(uh, 0.0)
    return uh / uh.sum()


def rainfall_to_discharge(effective_rain_mm: np.ndarray, area_km2: float, dt_hour: float) -> np.ndarray:
    """有効雨量[mm]を流量[m3/s]へ概略変換する。"""
    uh = triangular_unit_hydrograph(length=9)
    runoff_depth = np.convolve(effective_rain_mm, uh, mode="full")[: len(effective_rain_mm)]
    volume_m3 = runoff_depth / 1000.0 * area_km2 * 1_000_000.0
    discharge_m3s = volume_m3 / (dt_hour * 3600.0)
    return discharge_m3s


def plot_result(result: pd.DataFrame, output_png: Path) -> None:
    fig, ax1 = plt.subplots(figsize=(9, 4.8))
    ax1.bar(result["datetime"], result["rain_mm"], width=0.03, label="Rainfall")
    ax1.bar(result["datetime"], result["effective_rain_mm"], width=0.025, label="Effective rainfall")
    ax1.set_ylabel("Rainfall [mm]")
    ax1.set_xlabel("Datetime")
    ax1.tick_params(axis="x", rotation=35)

    ax2 = ax1.twinx()
    ax2.plot(result["datetime"], result["runoff_m3s"], marker="o", label="Runoff")
    ax2.set_ylabel("Discharge [m3/s]")
    fig.tight_layout()
    fig.savefig(output_png, dpi=160)
    plt.close(fig)


def main() -> None:
    parser = argparse.ArgumentParser(description="SCS-CN 法による簡易流出計算")
    parser.add_argument("--csv", type=Path, default=default_csv_path(), help="時間雨量CSV")
    parser.add_argument("--curve-number", type=float, default=78.0, help="Curve Number")
    parser.add_argument("--area-km2", type=float, default=12.5, help="流域面積 [km2]")
    parser.add_argument("--dt-hour", type=float, default=1.0, help="時間刻み [hour]")
    parser.add_argument("--output-dir", type=Path, default=Path("outputs"), help="出力フォルダ")
    args = parser.parse_args()

    args.output_dir.mkdir(parents=True, exist_ok=True)
    rain = pd.read_csv(args.csv, parse_dates=["datetime"]).sort_values("datetime").reset_index(drop=True)
    rain["effective_rain_mm"] = scs_effective_rainfall(rain["rain_mm"].to_numpy(), args.curve_number)
    rain["runoff_m3s"] = rainfall_to_discharge(rain["effective_rain_mm"].to_numpy(), args.area_km2, args.dt_hour)

    output_csv = args.output_dir / "scs_cn_runoff_result.csv"
    output_png = args.output_dir / "scs_cn_runoff_result.png"
    rain.to_csv(output_csv, index=False, encoding="utf-8-sig")
    plot_result(rain, output_png)

    print("=== SCS-CN runoff ===")
    print(f"Total rainfall        : {rain['rain_mm'].sum():.2f} mm")
    print(f"Effective rainfall    : {rain['effective_rain_mm'].sum():.2f} mm")
    print(f"Peak discharge        : {rain['runoff_m3s'].max():.2f} m3/s")
    print(f"Saved: {output_csv}")
    print(f"Saved: {output_png}")


if __name__ == "__main__":
    main()

muskingum_example.py

Muskingum法で流入ハイドログラフを河道追跡します。

ソースをダウンロード
#!/usr/bin/env python3
"""Muskingum 法による河道追跡サンプル。

水文的河道追跡では、流入ハイドログラフが河道貯留により
下流側で遅れ・減衰する様子を、比較的少ないパラメータで表します。

実行例:
    python muskingum_example.py
    python muskingum_example.py --k 2.0 --x 0.2 --dt 1.0 --output-dir outputs
"""
from __future__ import annotations

import argparse
from pathlib import Path

import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd


def muskingum_coefficients(k: float, x: float, dt: float) -> tuple[float, float, float]:
    """Muskingum 法の係数 C0, C1, C2 を計算する。

    Parameters
    ----------
    k:
        貯留時間に相当する係数。時間単位は dt とそろえる。
    x:
        重み係数。一般に 0.0 <= x <= 0.5 の範囲で設定する。
    dt:
        計算時間刻み。
    """
    if not (0.0 <= x <= 0.5):
        raise ValueError("x は 0.0 以上 0.5 以下にしてください。")
    if k <= 0 or dt <= 0:
        raise ValueError("k と dt は正の値にしてください。")

    denominator = k - k * x + 0.5 * dt
    c0 = (-k * x + 0.5 * dt) / denominator
    c1 = (k * x + 0.5 * dt) / denominator
    c2 = (k - k * x - 0.5 * dt) / denominator
    return c0, c1, c2


def route_muskingum(inflow: np.ndarray, q0: float, k: float, x: float, dt: float) -> np.ndarray:
    """流入量配列から流出量配列を計算する。"""
    c0, c1, c2 = muskingum_coefficients(k, x, dt)
    outflow = np.empty_like(inflow, dtype=float)
    outflow[0] = q0
    for i in range(1, len(inflow)):
        outflow[i] = c0 * inflow[i] + c1 * inflow[i - 1] + c2 * outflow[i - 1]
    return outflow


def sample_inflow() -> np.ndarray:
    """洪水時の流入ハイドログラフ例を返す。"""
    return np.array(
        [80, 95, 130, 210, 330, 470, 610, 720, 770, 735,
         650, 535, 420, 315, 240, 180, 135, 105, 90, 82],
        dtype=float,
    )


def plot_hydrograph(time: np.ndarray, inflow: np.ndarray, outflow: np.ndarray, output_png: Path) -> None:
    """流入・流出ハイドログラフを描画する。"""
    fig, ax = plt.subplots(figsize=(8.5, 4.8))
    ax.plot(time, inflow, marker="o", label="Inflow")
    ax.plot(time, outflow, marker="s", linestyle="--", label="Outflow")
    ax.set_xlabel("Time step")
    ax.set_ylabel("Discharge [m3/s]")
    ax.set_title("Muskingum channel routing")
    ax.grid(True, alpha=0.3)
    ax.legend()
    fig.tight_layout()
    fig.savefig(output_png, dpi=160)
    plt.close(fig)


def main() -> None:
    parser = argparse.ArgumentParser(description="Muskingum 法による河道追跡")
    parser.add_argument("--k", type=float, default=2.0, help="貯留係数 K")
    parser.add_argument("--x", type=float, default=0.2, help="重み係数 x")
    parser.add_argument("--dt", type=float, default=1.0, help="時間刻み")
    parser.add_argument("--q0", type=float, default=80.0, help="初期流出量")
    parser.add_argument("--output-dir", type=Path, default=Path("outputs"), help="出力フォルダ")
    args = parser.parse_args()

    args.output_dir.mkdir(parents=True, exist_ok=True)
    inflow = sample_inflow()
    outflow = route_muskingum(inflow, args.q0, args.k, args.x, args.dt)
    time = np.arange(len(inflow)) * args.dt
    c0, c1, c2 = muskingum_coefficients(args.k, args.x, args.dt)

    result = pd.DataFrame({"time": time, "inflow": inflow, "outflow": outflow})
    output_csv = args.output_dir / "muskingum_result.csv"
    output_png = args.output_dir / "muskingum_result.png"
    result.to_csv(output_csv, index=False, encoding="utf-8-sig")
    plot_hydrograph(time, inflow, outflow, output_png)

    print("=== Muskingum coefficients ===")
    print(f"C0={c0:.4f}, C1={c1:.4f}, C2={c2:.4f}, sum={c0 + c1 + c2:.4f}")
    print(f"Peak inflow : {inflow.max():.2f}")
    print(f"Peak outflow: {outflow.max():.2f}")
    print(f"Saved: {output_csv}")
    print(f"Saved: {output_png}")


if __name__ == "__main__":
    main()

water_balance_bucket.py

月単位の降水量・蒸発散量から簡易水収支を計算します。

ソースをダウンロード
#!/usr/bin/env python3
"""月単位の簡易水収支・バケットモデル。

降水量 P、可能蒸発散量 PET、土壌貯留量を用いて、実蒸発散量、
流出量、土壌水分の変化を概略計算します。
"""
from __future__ import annotations

import argparse
from pathlib import Path

import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import pandas as pd


def default_csv_path() -> Path:
    return Path(__file__).resolve().parents[1] / "data" / "sample_monthly_climate.csv"


def run_bucket_model(climate: pd.DataFrame, storage_capacity_mm: float, initial_storage_mm: float) -> pd.DataFrame:
    """単層バケットモデルで月ごとの水収支を計算する。"""
    storage = initial_storage_mm
    rows = []

    for _, row in climate.iterrows():
        p = float(row["precip_mm"])
        pet = float(row["pet_mm"])

        available = storage + p
        aet = min(pet, available)
        storage_after_et = available - aet
        runoff = max(0.0, storage_after_et - storage_capacity_mm)
        storage = min(storage_after_et, storage_capacity_mm)

        rows.append({
            "month": row["month"],
            "precip_mm": p,
            "pet_mm": pet,
            "aet_mm": aet,
            "runoff_mm": runoff,
            "soil_storage_mm": storage,
        })

    return pd.DataFrame(rows)


def plot_result(result: pd.DataFrame, output_png: Path) -> None:
    fig, ax = plt.subplots(figsize=(9, 4.8))
    ax.bar(result["month"], result["precip_mm"], label="Precipitation")
    ax.plot(result["month"], result["pet_mm"], marker="o", label="PET")
    ax.plot(result["month"], result["aet_mm"], marker="s", label="AET")
    ax.plot(result["month"], result["runoff_mm"], marker="^", label="Runoff")
    ax.set_xlabel("Month")
    ax.set_ylabel("Water depth [mm/month]")
    ax.legend()
    fig.tight_layout()
    fig.savefig(output_png, dpi=160)
    plt.close(fig)


def main() -> None:
    parser = argparse.ArgumentParser(description="簡易水収支・バケットモデル")
    parser.add_argument("--csv", type=Path, default=default_csv_path(), help="月別気象CSV")
    parser.add_argument("--storage-capacity-mm", type=float, default=160.0, help="土壌貯留容量 [mm]")
    parser.add_argument("--initial-storage-mm", type=float, default=80.0, help="初期土壌貯留量 [mm]")
    parser.add_argument("--output-dir", type=Path, default=Path("outputs"), help="出力フォルダ")
    args = parser.parse_args()

    args.output_dir.mkdir(parents=True, exist_ok=True)
    climate = pd.read_csv(args.csv)
    result = run_bucket_model(climate, args.storage_capacity_mm, args.initial_storage_mm)

    output_csv = args.output_dir / "water_balance_bucket_result.csv"
    output_png = args.output_dir / "water_balance_bucket_result.png"
    result.to_csv(output_csv, index=False, encoding="utf-8-sig")
    plot_result(result, output_png)

    print("=== Water balance ===")
    print(f"Annual precipitation: {result['precip_mm'].sum():.2f} mm")
    print(f"Annual AET          : {result['aet_mm'].sum():.2f} mm")
    print(f"Annual runoff       : {result['runoff_mm'].sum():.2f} mm")
    print(f"Saved: {output_csv}")
    print(f"Saved: {output_png}")


if __name__ == "__main__":
    main()

flow_duration_curve.py

日流量から流況曲線と代表流量を作成します。

ソースをダウンロード
#!/usr/bin/env python3
"""流況曲線 Flow Duration Curve の作成サンプル。

日流量データを大きい順に並べ、超過確率を計算します。
河川の豊水・平水・低水・渇水の確認、維持流量の検討、
モデル再現性の概略確認に使えます。
"""
from __future__ import annotations

import argparse
from pathlib import Path

import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd


def default_csv_path() -> Path:
    return Path(__file__).resolve().parents[1] / "data" / "sample_daily_flow.csv"


def flow_duration_curve(flow: pd.Series) -> pd.DataFrame:
    """流量系列から超過確率を求める。"""
    q = pd.to_numeric(flow, errors="coerce").dropna().sort_values(ascending=False).to_numpy()
    rank = np.arange(1, len(q) + 1)
    exceedance_probability = rank / (len(q) + 1) * 100.0
    return pd.DataFrame({"exceedance_probability_percent": exceedance_probability, "flow_m3s": q})


def representative_flows(fdc: pd.DataFrame) -> dict[str, float]:
    """代表的な超過確率流量を線形補間で求める。"""
    p = fdc["exceedance_probability_percent"].to_numpy()
    q = fdc["flow_m3s"].to_numpy()
    return {
        "Q10_high_flow_m3s": float(np.interp(10, p, q)),
        "Q50_median_flow_m3s": float(np.interp(50, p, q)),
        "Q90_low_flow_m3s": float(np.interp(90, p, q)),
    }


def plot_fdc(fdc: pd.DataFrame, output_png: Path) -> None:
    fig, ax = plt.subplots(figsize=(8, 4.8))
    ax.plot(fdc["exceedance_probability_percent"], fdc["flow_m3s"], marker="o")
    ax.set_xlabel("Exceedance probability [%]")
    ax.set_ylabel("Discharge [m3/s]")
    ax.set_title("Flow duration curve")
    ax.grid(True, alpha=0.3)
    fig.tight_layout()
    fig.savefig(output_png, dpi=160)
    plt.close(fig)


def main() -> None:
    parser = argparse.ArgumentParser(description="流況曲線を作成する")
    parser.add_argument("--csv", type=Path, default=default_csv_path(), help="日流量CSV")
    parser.add_argument("--output-dir", type=Path, default=Path("outputs"), help="出力フォルダ")
    args = parser.parse_args()

    args.output_dir.mkdir(parents=True, exist_ok=True)
    daily = pd.read_csv(args.csv, parse_dates=["date"])
    fdc = flow_duration_curve(daily["flow_m3s"])
    reps = representative_flows(fdc)

    output_csv = args.output_dir / "flow_duration_curve.csv"
    output_png = args.output_dir / "flow_duration_curve.png"
    fdc.to_csv(output_csv, index=False, encoding="utf-8-sig")
    plot_fdc(fdc, output_png)

    print("=== Flow duration curve ===")
    for key, value in reps.items():
        print(f"{key}: {value:.2f}")
    print(f"Saved: {output_csv}")
    print(f"Saved: {output_png}")


if __name__ == "__main__":
    main()