Skip to main content

Temporal Fusion Transformers

pytorch-forecasting 설치

!pip install pytorch-forecasting

임포트

import lightning.pytorch as pl
from lightning.pytorch.callbacks import EarlyStopping, LearningRateMonitor
from lightning.pytorch.loggers import TensorBoardLogger
import numpy as np
import pandas as pd
import torch
from pytorch_forecasting import Baseline, TemporalFusionTransformer, TimeSeriesDataSet
from pytorch_forecasting.data import GroupNormalizer
from pytorch_forecasting.metrics import MAE, SMAPE, PoissonLoss, QuantileLoss

Stallion & Co.는 도매업체(agency)를 통해 소매업체에 유통되는 제품(sku)의 수요를 예측하기 위한 데이터

from pytorch_forecasting.data.examples import get_stallion_data
data = get_stallion_data()

인덱스 추가

data["time_idx"] = data["date"].dt.year * 12 + data["date"].dt.month
data["time_idx"] -= data["time_idx"].min()

추가 변수 설정

# 월
data["month"] = data.date.dt.month.astype(str).astype("category")
# 로그 볼륨
data["log_volume"] = np.log(data.volume + 1e-8)
# SKU별 평균 볼륨
data["avg_volume_by_sku"] = data.groupby(["time_idx", "sku"], observed=True).volume\
.transform("mean")
# 도매업체별 평균 볼륨
data["avg_volume_by_agency"] = data.groupby(["time_idx", "agency"], observed=True).volume\
.transform("mean")
# 공휴일 등 변환
special_days = [
"easter_day", "good_friday", "new_year", "christmas", "labor_day",
"independence_day", "revolution_day_memorial", "regional_games", "fifa_u_17_world_cup",
"football_gold_cup", "beer_capital", "music_fest"]
data[special_days] = data[special_days].apply(
lambda x: x.map({0: "-", 1: x.name})).astype("category")

데이터셋 만들기

max_prediction_length = 6  # 예측하려는 최대 시간 길이
max_encoder_length = 24 # 인코더에 사용할 최대 시간 길이
training_cutoff = data["time_idx"].max() - max_prediction_length
# 데이터를 훈련 및 검증 데이터로 나누기 위한 기준점

training = TimeSeriesDataSet(
data[lambda x: x.time_idx <= training_cutoff], # 기준점 이하만 훈련 데이터로 사용
time_idx="time_idx", # 시계열 인덱스
target="volume", # 예측하려는 값
group_ids=["agency", "sku"], # 데이터 그룹화에 사용할 열
min_encoder_length=max_encoder_length // 2, # 인코더에 사용할 최소 길이(24/2==12)
max_encoder_length=max_encoder_length, #
min_prediction_length=1, # 예측할 최소 길이
max_prediction_length=max_prediction_length, # 예측할 최대 길이
static_categoricals=["agency", "sku"], # 변하지 않는 범주형 변수
static_reals=[ # 변하지 않는 실수형 변수
"avg_population_2017","avg_yearly_household_income_2017"],
time_varying_known_categoricals=[ # 시간에 따라 변하는 미래에 알려진 범주형 변수
"special_days", "month"],
variable_groups={"special_days": special_days}, # 특정 범주형 변수 그룹을 하나의 변수로 취급
time_varying_known_reals=[ # 시간에 따라 변하는 미래에 알려진 실수형 변수
"time_idx", "price_regular", "discount_in_percent"],
time_varying_unknown_categoricals=[], # 시간에 따라 변하는 미래에 알 수 없는 범주형 변수
time_varying_unknown_reals=[ # 시간에 따라 변하는 미래에 알 수 없는 범주형 변수
"volume", "log_volume", "industry_volume", "soda_volume", "avg_max_temp",
"avg_volume_by_agency", "avg_volume_by_sku", ],
target_normalizer=GroupNormalizer( # 그룹별로 정규화하고, softplus 변환
groups=["agency", "sku"], transformation="softplus"),
add_relative_time_idx=True, # 상대적인 시간 인덱스를 추가
add_target_scales=True, # 대상 값의 스케일을 추가
add_encoder_length=True, # 인코더 길이를 추가
)

# 검증 데이터셋
validation = TimeSeriesDataSet.from_dataset(
training, data, predict=True, stop_randomization=True)

# 데이터 로더
batch_size = 128 # 배치(한 번에 모델에 입력하여 학습시킬 데이터셋) 크기
train_dataloader = training.to_dataloader(train=True, batch_size=batch_size, num_workers=0)
val_dataloader = validation.to_dataloader(train=False, batch_size=batch_size * 10, num_workers=0)

베이스라인 성능

baseline_predictions = Baseline().predict(val_dataloader, return_y=True)
MAE()(baseline_predictions.output, baseline_predictions.y)

모델 설정

tft = TemporalFusionTransformer.from_dataset(
training,
learning_rate=0.03, # 학습률
hidden_size=16, # 은닉층 크기
attention_head_size=2, # attention heads의 수. 클 수록 복잡한 패턴
dropout=0.1, # 과대적합 방지(0.1~0.3 사이)
hidden_continuous_size=8, # 은닉층 크기보다 작거나 같게
loss=QuantileLoss(), # TFT의 손실 함수
optimizer="Ranger" # 학습 알고리즘
)

학습 설정

early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, verbose=False, mode="min")
lr_logger = LearningRateMonitor() # 학습율 기록

trainer = pl.Trainer(
max_epochs=50, # 학습을 총 50회 반복
accelerator="gpu", # cpu로 바꾸면 CPU로 학습
enable_model_summary=True,
gradient_clip_val=0.1, # 경사 클리핑(폭발하는 경사 방지)
limit_train_batches=50, # 매 에포크 최대 50배치만 학습
callbacks=[lr_logger, early_stop_callback],
)

학습

trainer.fit(
tft,
train_dataloaders=train_dataloader,
val_dataloaders=val_dataloader,
)

성능평가

best_model_path = trainer.checkpoint_callback.best_model_path
best_tft = TemporalFusionTransformer.load_from_checkpoint(best_model_path)
predictions = best_tft.predict(val_dataloader, return_y=True,
trainer_kwargs=dict(accelerator="gpu"))
MAE()(predictions.output, predictions.y)

예측 및 시각화

raw_prediction = best_tft.predict(training.filter(lambda x: 
(x.agency == "Agency_01") & (x.sku == "SKU_01") & (x.time_idx_first_prediction == 15)),
mode="raw", return_x=True)

best_tft.plot_prediction(raw_prediction.x, raw_prediction.output, idx=0)