import logging
import time
from pathlib import Path
import pandas as pd
import requests
import xarray as xr
import yaml
from joblib import Memory
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
KEYS_FILTER_SSPD = {
"typeOfLevel": "surface",
"cfVarName": "ssrd",
}
KEYS_FILTER_WIND = {
"typeOfLevel": "heightAboveGround",
"level": 10,
"cfVarName": "si10",
}
KEYS_FILTER_T2M = {
"typeOfLevel": "heightAboveGround",
"level": 2,
"cfVarName": "t2m",
}
PROJECT_ROOT = Path(__file__).parent.parent.parent
GEO_DIR = PROJECT_ROOT / "data" / "geo"
bounds_file = GEO_DIR / "france_bounds.yml"
region_name_file = GEO_DIR / "regions_name.yml"
region_mask_file = GEO_DIR / "mask_france_regions.nc"
[docs]
class ArpegeSimpleAPI():
"""Uses the `meteo.data.gouv.fr <https://meteo.data.gouv.fr>`_ API to fetch weather forecast data.
It downloads the data in the format GRIB2 and reads it using xarray.
If the file is already downloaded, it will not download it again.
Parameters
----------
date : str, optional
the date at which the weather forecast was computed.
Must be a valid date format, e.g. ``"YYYY-MM-DD"``
Default is the current date.
time : str, optional
the time at which the weather forecast was computed.
Can be ``"00:00:00"``, ``"06:00:00"``, ``"12:00:00"``, or ``"18:00:00"``
Default is ``"00:00:00"``
prefix : str, optional
the prefix where the files are downloaded.
Used to avoid downloading the same file multiple times.
Default is ``"/tmp/arpege"``
"""
base_url = "https://object.data.gouv.fr/meteofrance-pnt/pnt/{date}T{time}Z/arpege/01/SP1/arpege__{resolution}__SP1__{forecast}__{date}T{time}Z.grib2"
forecast_horizons = ["000H012H",
"013H024H",
"025H036H",
"037H048H",
"049H060H",
"061H072H",
"073H084H",
"085H096H",
"097H102H",
]
resolution = "01"
def __init__(self,
date=pd.Timestamp("today").strftime("%Y-%m-%d"),
time="00:00:00",
prefix="/tmp/arpege"):
self.date = date
self.time = time
self.prefix = prefix
with open(bounds_file, "r") as f:
bounds = yaml.safe_load(f)
self.min_lon = bounds["min_lon"]
self.max_lon = bounds["max_lon"]
self.min_lat = bounds["min_lat"]
self.max_lat = bounds["max_lat"]
with open(region_name_file, "r") as f:
self.regions_names = yaml.safe_load(f)
self.masks = xr.load_dataarray(region_mask_file)
[docs]
def get_url(self, forecast_horizon):
"""Format the URL to fetch the data."""
return self.base_url.format(date=self.date,
time=self.time,
resolution=self.resolution,
forecast=forecast_horizon)
[docs]
def get_filename(self, forecast_horizon):
"""Format the filename to save the data."""
return f"{self.prefix}/arpege_{self.resolution}_{self.date}_{self.time}_{forecast_horizon}.grib2"
[docs]
def fetch(self):
"""Download the data from the API and save it in the prefix folder.
All the forecast horizons are downloaded.
Returns
-------
list[Path]
The list of the files downloaded.
"""
list_files = []
for forecast_horizon in self.forecast_horizons:
logger.debug(f"Fetching {forecast_horizon}")
url = self.get_url(forecast_horizon=forecast_horizon)
filename = self.get_filename(forecast_horizon)
list_files.append(filename)
if Path(filename).exists():
continue
filename = Path(filename)
filename.parent.mkdir(parents=True, exist_ok=True)
response = requests.get(url)
response.raise_for_status()
with open(filename, "wb") as f:
f.write(response.content)
return list_files
[docs]
@staticmethod
def read_file_as_xarray(filename, keys_filter):
"""Open the file as an xarray dataset.
Parameters
----------
filename : str
the filename of the file to open.
keys_filter : dict
the keys to filter the data.
Returns
-------
xr.Dataset
the dataset containing the weather forecast data.
"""
return xr.open_dataset(filename,
engine="cfgrib",
backend_kwargs={"filter_by_keys": keys_filter},
)
[docs]
def read_files_as_xarray(self, list_files, keys_filter):
"""Read all the files as an xarray dataset and concatenate them along the step dimension.
Parameters
----------
list_files : list[str]
the list of the files to open.
keys_filter : dict
the keys to filter the data.
Returns
-------
xr.Dataset
the dataset containing the weather forecast data.
"""
list_xr = []
for filename in list_files:
list_xr.append(self.read_file_as_xarray(filename, keys_filter))
return xr.concat(list_xr, dim="step")
[docs]
def read_sspd(self):
"""Fetch the data and read the solar radiation.
Returns
-------
xr.Dataset
the dataset containing the solar radiation data.
"""
list_files = self.fetch()
return self.read_files_as_xarray(list_files, KEYS_FILTER_SSPD)
[docs]
def read_wind(self):
"""Fetch the data and read the wind speed.
Returns
-------
xr.Dataset
the dataset containing the wind speed data.
"""
list_files = self.fetch()
return self.read_files_as_xarray(list_files, KEYS_FILTER_WIND)
[docs]
def region_sun(self):
"""Compute the mean sun flux for each region of France.
Returns
-------
pd.DataFrame
The mean sun flux for each region of France.
"""
da_sun = self.read_sspd().ssrd
da_sun_france = da_sun.sel(
longitude=slice(self.min_lon, self.max_lon), latitude=slice(self.max_lat, self.min_lat)
)
try :
da_sun_region = da_sun_france.groupby(self.masks).mean(["latitude", "longitude"])
except ValueError:
da_sun_region = da_sun_france.groupby(self.masks).mean("stacked_longitude_latitude")
# relabel the regions groups
da_sun_region["group"] = self.regions_names
# change the name of the groups
da_sun_region = da_sun_region.rename(group="region")
df_sun_regions = da_sun_region.to_dataframe()
df_sun_regions = df_sun_regions.set_index("valid_time", append=True)
df_sun_regions = df_sun_regions.droplevel("step")
df_unstacked = df_sun_regions["ssrd"].unstack("region")
zero_pad = df_unstacked.iloc[0].copy().to_frame().T
zero_pad[:] = 0
zero_pad.index = zero_pad.index - pd.Timedelta("1h")
df_unstacked = pd.concat([zero_pad, df_unstacked], axis=0)
df_instant_flux = df_unstacked.diff().dropna()
df_instant_flux.index -= pd.Timedelta("1h")
return df_instant_flux
[docs]
def region_wind(self):
"""Compute the mean wind speed for each region of France.
Returns
-------
pd.DataFrame
The mean wind speed for each region of France.
"""
da_wind = self.read_wind().si10
da_wind_france = da_wind.sel(
longitude=slice(self.min_lon, self.max_lon), latitude=slice(self.max_lat, self.min_lat)
)
try :
da_wind_region = da_wind_france.groupby(self.masks).mean(["latitude", "longitude"])
except ValueError:
da_wind_region = da_wind_france.groupby(self.masks).mean("stacked_longitude_latitude")
# relabel the regions groups
da_wind_region["group"] = self.regions_names
# change the name of the groups
da_wind_region = da_wind_region.rename(group="region")
df_wind_regions = da_wind_region.to_dataframe()
df_wind_regions = df_wind_regions.set_index("valid_time", append=True)
df_wind_regions = df_wind_regions.droplevel("step")
df_unstacked = df_wind_regions["si10"].unstack("region")
return df_unstacked
memory = Memory("/tmp/cache/energy_forecast", verbose=0)
@memory.cache
def get_region_sun(date: str)->pd.DataFrame:
"""Retrun the mean sun flux for each hour of the day.
This is a simple wrapper around :py:meth:`ArpegeSimpleAPI.region_sun`.
Solar radiation is in W/m^2.
.. note::
The function is cached to avoid multiple computation for the same date.
The cache is persistent (saved on the disk at ``/tmp/cache/energy_forecast``)
Parameters
----------
date : str
the date at which the weather forecast is requested.
Must be a valid date format, e.g. ``"YYYY-MM-DD"``
or a ``datetime.date`` object.
Returns
-------
pd.DataFrame
The mean sun flux for each hour of the day.
The index is a DatetimeIndex and the columns are the regions of France:
- ``"Île-de-France"``
- ``"Centre-Val de Loire"``
- ``"Bourgogne-Franche-Comté"``
- ``"Normandie"``
- ``"Hauts-de-France"``
- ``"Grand Est"``
- ``"Pays de la Loire"``
- ``"Bretagne"``
- ``"Nouvelle-Aquitaine"``
- ``"Occitanie"``
- ``"Auvergne-Rhône-Alpes"``
- ``"Provence-Alpes-Côte d'Azur"``
- ``"Corse"``
.. seealso::
:func:`get_region_wind`
"""
sun_data = ArpegeSimpleAPI(date).region_sun()
return sun_data
@memory.cache
def get_region_wind(date: str)->pd.DataFrame:
"""Retrun the mean wind speed for each hour of the day.
This is a simple wrapper around :py:meth:`ArpegeSimpleAPI.region_wind`.
Wind speed is in m/s.
.. note::
The function is cached to avoid multiple computation for the same date.
The cache is persistent (saved on the disk at ``/tmp/cache/energy_forecast``)
Parameters
----------
date : str
the date at which the weather forecast is requested.
Must be a valid date format, e.g. ``"YYYY-MM-DD"``
or a ``datetime.date`` object.
Returns
-------
pd.DataFrame
The mean wind speed for each hour of the day.
The index is a DatetimeIndex and the columns are the regions of France:
- ``"Île-de-France"``
- ``"Centre-Val de Loire"``
- ``"Bourgogne-Franche-Comté"``
- ``"Normandie"``
- ``"Hauts-de-France"``
- ``"Grand Est"``
- ``"Pays de la Loire"``
- ``"Bretagne"``
- ``"Nouvelle-Aquitaine"``
- ``"Occitanie"``
- ``"Auvergne-Rhône-Alpes"``
- ``"Provence-Alpes-Côte d'Azur"``
- ``"Corse"``
.. seealso::
:func:`get_region_sun`
"""
sun_data = ArpegeSimpleAPI(date).region_wind()
return sun_data
[docs]
def warm_cache(logger, date=None, max_counter=30, sleep_duration=600):
"""Try to fetch the data from the API until it is successful.
Parameters
----------
logger : logging.Logger
the logger to use.
date : str, optional
the date at which the weather forecast was computed.
Must be a valid date format, e.g. ``"YYYY-MM-DD"``
Default is the current date.
max_counter : int, optional
the maximum number of attempts.
Default is 30.
sleep_duration : int, optional
the duration to sleep between each attempt in seconds.
Default is 600 (10 minutes).
Raises
------
TimeoutError
if the maximum number of attempts is reached.
"""
date = date or pd.Timestamp("today").strftime("%Y-%m-%d")
client = ArpegeSimpleAPI(date)
counter=0
while True:
logger.info(f"Attempt {counter}")
try :
client.fetch()
break
except requests.exceptions.HTTPError as e:
logger.warning(e)
logger.info(f"Sleeping for {sleep_duration} seconds")
time.sleep(sleep_duration)
counter += 1
if counter > max_counter:
raise TimeoutError("Max counter reached")
[docs]
def download_historical_forecasts(s3_key,
s3_secret,
s3_entrypoint,
s3_bucket,
prefix="./",
variables="all",
forecast_type="all",
dryrun=False
):
"""Download the historical forecasts from the S3 bucket.
Parameters
----------
s3_key : str
the key to access the S3 bucket.
s3_secret : str
the secret to access the S3 bucket.
s3_entrypoint : str
the entrypoint of the S3 bucket.
s3_bucket : str
the name of the S3 bucket.
prefix : str
The prefix where the files are downloaded.
Should be similar to ``"./data/silver"``.
variables : str or list[str], optional
the variables to download.
Can be ``"wind_speed_hourly"``, ``"sun_flux_downward_hourly"``, or ``"temperature_hourly"``
or a list of these values.
Default is ``"all"``, which downloads all the variables.
forecast_type : str or list[str], optional
the forecast type to download.
Can be ``"d0"``, ``"d1"``, ``"d2"``, or ``"d3"``,
or a list of these values.
Default is ``"all"``, which downloads all the forecast types.
dryrun : bool, optional
if True, do not download the files.
Default is False.
Returns
-------
list[Path]
the list of the files downloaded.
"""
import boto3
session = boto3.Session(
aws_access_key_id=s3_key,
aws_secret_access_key=s3_secret,
)
s3 = session.resource("s3", endpoint_url=s3_entrypoint)
bucket = s3.Bucket(s3_bucket)
list_files = []
key_prefix = "weather_forecasts"
if variables == "all":
variables = ["wind_speed_hourly",
"sun_flux_downward_hourly",
"temperature_hourly"]
if isinstance(variables, str):
variables = [variables]
for var in variables:
if var not in ["wind_speed_hourly",
"sun_flux_downward_hourly",
"temperature_hourly"]:
raise ValueError(f"Unknown variable {var} : must be in ['wind_speed_hourly', 'sun_flux_downward_hourly', 'temperature_hourly']")
if forecast_type == "all":
forecast_type = ["d0", "d1", "d2", "d3"]
if isinstance(forecast_type, str):
forecast_type = [forecast_type]
for forecast in forecast_type:
if forecast not in ["d0", "d1", "d2", "d3"]:
raise ValueError(f"Unknown forecast type {forecast} : must be in ['d0', 'd1', 'd2', 'd3']")
for var in variables:
for forecast in forecast_type:
key = f"{key_prefix}/{var}_{forecast}.nc"
# test if the key exists
filename = Path(prefix + "/" + key)
if filename.exists():
print(f"{filename} already downloaded, skipping")
continue
filename.parent.mkdir(parents=True, exist_ok=True)
if dryrun:
print(f"DRY RUN : would Download {key} to {filename}")
# test if the key exists without downloading it
try :
s3.Object(s3_bucket, key).load()
except Exception as e:
print(e)
else:
bucket.download_file(key, filename)
list_files.append(filename)
return list_files
if __name__ == "__main__":
logger.info("Fetching data for today")
warm_cache(logger)