Open
Description
Hi,
Interestingly, related to this post, I noticed using different setups to compare the quality of forecasts takes advantage of various following backtesting methods you provide; we get the same Prediction Interval (PI) results over PI-related metrics!
- Backtesting without refit (proposed here)
- Backtesting with refit and fixed training size (rolling origin)
- Backtesting with refit and increasing training size (fixed origin)
- Backtesting with intermittent refit
Running Without Refit...
******************************
100%
2/2 [00:00<00:00, 2.42it/s]
Without Refit - MAE: mean_absolute_error
0 6.794017
Empirical coverage_sk: 0.92
picp_sk: 91.77%
******************************
Running Fixed Backtesting...
******************************
100%
2/2 [00:01<00:00, 1.21it/s]
Fixed Backtesting - MAE: mean_absolute_error
0 7.342417
Empirical coverage_sk: 0.92
picp_sk: 91.77%
******************************
Running Intermittent Refitting...
******************************
100%
2/2 [00:00<00:00, 2.27it/s]
Intermittent Refitting - MAE: mean_absolute_error
0 6.794017
Empirical coverage_sk: 0.92
picp_sk: 91.77%
******************************
Running Refit Increasing Training Size...
******************************
100%
2/2 [00:01<00:00, 1.87it/s]
Refit Increasing Training Size - MAE: mean_absolute_error
0 6.876937
Empirical coverage_sk: 0.92
picp_sk: 91.77%
******************************
I have experimented over the following time data after data partition:
Dates train : 0 --- 4966 (n=4967)
Dates validacion : 4967 --- 6622 (n=1656)
Dates test : 6623 --- 8639 (n=2017)
Following is a Minimal Reproducible Example (MRE):
def empirical_coverage(y, lower_bound, upper_bound):
"""
Calculate coverage of a given interval
"""
return np.mean(np.logical_and(y >= lower_bound, y <= upper_bound))
def calculate_picp_sk(y, lower_bound, upper_bound):
"""
Calculate the Prediction Interval Coverage Probability (PICP).
Parameters:
- predictions: A dictionary or object containing 'y_true', 'lower_bound', and 'upper_bound' arrays.
Returns:
- PICP: Prediction Interval Coverage Probability in percentage.
"""
c_values = np.where((y>= lower_bound) & (y<= upper_bound), 1, 0)
PICP = np.mean(c_values) * 100
return PICP
#-----------------------------------------------------------
# import libs
#-----------------------------------------------------------
# !pip install skforecast==0.14.0
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from skforecast.model_selection import backtesting_forecaster, TimeSeriesFold
from skforecast.recursive import ForecasterRecursive
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import MinMaxScaler
import warnings
warnings.filterwarnings("ignore")
#-----------------------------------------------------------
# LOAD THE DATASET
#-----------------------------------------------------------
df = pd.read_csv('https://raw.githubusercontent.com/amcs1729/Predicting-cloud-CPU-usage-on-Azure-data/master/azure.csv')
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.rename(columns={'min cpu': 'mincpu', 'max cpu': 'maxcpu', 'avg cpu': 'avgcpu'})
# Data preparation
# ==============================================================================
sliced_df = df[['timestamp', 'avgcpu']].sort_values("timestamp")
# Normalize 'avgcpu' column between 0 and 100
scaler = MinMaxScaler(feature_range=(0, 100))
sliced_df['avgcpu'] = scaler.fit_transform(sliced_df[['avgcpu']])
# Ensure the 'avgcpu' column is numeric
sliced_df['avgcpu'] = pd.to_numeric(sliced_df['avgcpu'], errors='coerce')
# Remove NaN and invalid values
sliced_df = sliced_df.dropna(subset=['avgcpu'])
# Sort the data by the 'timestamp' column
sliced_df = sliced_df.sort_values(by='timestamp')
# Reset the index
sliced_df.reset_index(drop=True, inplace=True)
# Configuration
# ==============================================================================
steps = 2017
lags = 288
name_columns = "avgcpu"
end_train = 4966
end_validation = 6622
# Splitting dataset
data = sliced_df.copy()
data_train = data[data['timestamp'] <= data['timestamp'][end_train]]
data_val = data[(data['timestamp'] > data['timestamp'][end_train]) & (data['timestamp'] <= data['timestamp'][end_validation])]
data_test = data[data['timestamp'] > data['timestamp'][end_validation]]
# Print dataset partitions
print("Train size:", data_train.shape, "\nVal size: ", data_val.shape, "\nTest size: ", data_test.shape)
# Plot partitions
# ==============================================================================
fig, ax = plt.subplots(figsize=(8, 3))
ax.plot(data_train['avgcpu'], label='Train', color='blue')
ax.plot(data_val['avgcpu'], label='Validation', color='orange')
ax.plot(data_test['avgcpu'], label='Test', color='tomato')
ax.set_title('avgcpu')
ax.legend()
plt.show()
# Define forecasting model
forecaster = ForecasterRecursive(
regressor=LinearRegression(n_jobs=-1),
lags=lags
)
# Train the model
forecaster.fit(y=data_train['avgcpu'])
# Different configurations for backtesting
configs = {
"Without Refit": TimeSeriesFold(
initial_train_size=len(data_train), steps=steps,
refit=False, fixed_train_size=False
),
"Fixed Backtesting": TimeSeriesFold(
initial_train_size=len(data_train), steps=steps,
refit=True, fixed_train_size=True
),
"Intermittent Refitting": TimeSeriesFold(
initial_train_size=len(data_train), steps=steps,
refit=3, fixed_train_size=False
),
"Refit Increasing Training Size": TimeSeriesFold(
initial_train_size=len(data_train), steps=steps,
refit=True, fixed_train_size=False
),
}
# Run backtesting for each configuration
for config_name, cv in configs.items():
print(f"\nRunning {config_name}...\n")
print("*" * 30)
metric_val, predictions_val = backtesting_forecaster(
forecaster = forecaster,
y = data["avgcpu"],
cv = cv,
metric = 'mean_absolute_error',
n_jobs = 'auto',
verbose = False,
show_progress = True
)
last_window = data["avgcpu"].tail(steps)
y_test_pred = forecaster.predict_interval(
steps=steps,
interval=[1, 99],
n_boot=100,
last_window=last_window)
# Reset index of y_test_lower and y_test_upper
y_test_lower = y_test_pred['lower_bound'].reset_index(drop=True)
y_test_upper = y_test_pred['upper_bound'].reset_index(drop=True)
# Reset index of data_test[name_columns]
y_true_test = data_test[name_columns].reset_index(drop=True)
print(f"{config_name} - MAE:", metric_val)
coverage_sk = empirical_coverage(
y=y_true_test, # Use the reset index Series here
lower_bound=y_test_lower,
upper_bound=y_test_upper
)
print(f"Empirical coverage_sk: {round(coverage_sk, 2)}")
picp_sk = calculate_picp_sk(
y=y_true_test, # Use the reset index Series here
lower_bound=y_test_lower,
upper_bound=y_test_upper
)
print(f"picp_sk: {round(picp_sk, 2)}%")
print("*" * 30)
It would be great if you could assist in finding the problem in the above MRE implementation.
Happy New Year, though 🍀