# Lightning 模板

# Load Data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, random_split
from torch.optim.lr_scheduler import CosineAnnealingLR, ReduceLROnPlateau, LambdaLR
import pytorch_lightning as pl
from pytorch_lightning import Trainer, seed_everything
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.callbacks import ModelCheckpoint

import random
import logging
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import pickle
import math
from sklearn.metrics import f1_score
from sklearn.model_selection import KFold, train_test_split

from models.models import Enformer, EnformerConfig
from models.dataset import CREembeddingDataset, get_class

# setting
seed_everything(42, workers=True)
input_length = 40000
num_epochs = 50 # 训练轮数
batch_size = 36 # 批次大小

# logging
log_pfx='enformer_SL4long_20k_class_all'
logging.basicConfig(
filename=f'./logs/{log_pfx}.log', # 日志文件名
filemode='w', # 覆盖模式
format='%(asctime)s - %(levelname)s - %(message)s', # 日志格式
level=logging.INFO # 日志级别
)

# load model
device = "cuda" if torch.cuda.is_available() else "cpu"
def init_enformer_model():
model = Enformer.from_hparams(
dim = 1536//2,
depth = 11,
heads = 4,
num_downsamples = 7,
dim_divisible_by = 128
)
return model

# 随机抽取3000个样本的索引
# 使用 train_test_split 函数将 data_df 的索引随机划分为训练集和测试集,比例为 9:1
# load data
data_df = pd.read_pickle('./data/tpm_embedding_SL4long_20k.pkl')
data_df['class'] = get_class([np.log(x + 1) for x in data_df['max'].to_list()])
class_weights = data_df['class'].value_counts().reindex([0,1,2], fill_value=0)
class_weights = class_weights / class_weights.max()
test_pickle = "./data/tpm_embedding_SL4long_20k_test_cv5.pkl" # 用于后续测试
embeddings = np.load('./data/tpm_seq_embedding_SL4long_20k.npz')['embeddings']
train_indices, test_indices = train_test_split(range(len(data_df)), test_size=0.1, random_state=42)

# 五折交叉验证
kf = KFold(n_splits=5, shuffle=True, random_state=42)
fold_indices = []
for fold, (train_index, val_index) in enumerate(kf.split(train_indices)):
# 划分训练集和验证集的索引
train_fold_indices = [train_indices[i] for i in train_index]
val_fold_indices = [train_indices[i] for i in val_index]
fold_indices.append((train_fold_indices, val_fold_indices))

# 测试集的数据加载器
test_df = data_df.iloc[test_indices]
test_df.to_pickle(test_pickle)

# Lightning 模板

Lightning 需要

  • pl.LightningDataModule ,实现 dataloader 模块

  • pl.LightningModule ,加载模型

最少需要

  • forward 定义前向传播
  • configure_optimizers optimizer 与 scheduler
  • training_step
  • training_step
  • validation_step
  • test_step

额外的实现,每个 epoch 开始与结束的行为

  • on_train_epoch_start
  • on_train_epoch_end
  • on_validation_epoch_start
  • on_validation_epoch_end
  • on_test_epoch_start
  • on_test_epoch_end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class MyDataModule(pl.LightningDataModule):

def __init__(self, train_indices, val_indices, test_indices, batch_size):
super().__init__()
self.batch_size = batch_size
self.train_indices = train_indices
self.val_indices = val_indices
self.test_indices = test_indices

def train_dataloader(self):
self.train_dataset = CREembeddingDataset(data_df.iloc[self.train_indices], embeddings, target='class')
return DataLoader(self.train_dataset, batch_size=self.batch_size)

def val_dataloader(self):
self.val_dataset = CREembeddingDataset(data_df.iloc[self.val_indices], embeddings, target='class')
return DataLoader(self.val_dataset, batch_size=self.batch_size)

def test_dataloader(self):
self.test_dataset = CREembeddingDataset(data_df.iloc[self.test_indices], embeddings, target='class')
return DataLoader(self.test_dataset, batch_size=self.batch_size)

# pytorch_lightning
class MyModule(pl.LightningModule):
def __init__(self, foldnum, model, num_epochs, batch_size=batch_size, class_weights=class_weights):
"""
foldnum: 第几折
model: 模型
batch_size: 批次大小
class_weights: 类别权重
"""
super().__init__()
self.mymodel = model
self.batch_size = batch_size
self.class_weights = class_weights

# metrics function
self.loss_fn = nn.CrossEntropyLoss(weight=torch.tensor(class_weights, dtype=torch.bfloat16))
self.f1 = f1_score

# init metric target
self.init_metric_target(foldnum, num_epochs)

def init_metric_target(self, foldnum, num_epochs):

# metrics
self.val_outputs = []
self.val_targets = []
self.test_outputs = []
self.test_targets = []

# logging
self.running_loss = 0
self.val_loss = 0
self.test_loss = 0
self.epoch = 0
self.fold = foldnum
self.num_epochs = num_epochs

def forward(self, x):
return self.mymodel(x)

def configure_optimizers(self):
# 定义损失函数和优化器
optimizer = optim.AdamW(self.mymodel.parameters(), lr=0.0001, weight_decay=1e-3)

warm_up_iter = 5
T_max = 50 # 周期
lr_max = 1e-4 # 最大值
lr_min = 0 # 最小值
lambda0 = lambda cur_iter: (lr_max - lr_min)/warm_up_iter * cur_iter / 0.0001 if cur_iter <= warm_up_iter else \
(lr_min + 0.5*(lr_max-lr_min)*(1.0+math.cos((cur_iter-warm_up_iter)/(T_max-warm_up_iter)*math.pi))) / 0.0001
scheduler = LambdaLR(optimizer, lr_lambda=lambda0)

return {"optimizer": optimizer, "lr_scheduler": scheduler}

def training_step(self, batch, batch_idx):

embeddings, targets = batch
outputs = self(embeddings)
loss = self.loss_fn(outputs, targets)

self.running_loss += loss.item()
if batch_idx % 2 == 1: # 每2个batch记录一次训练损失和F1
avg_train_batch_loss = self.running_loss / (batch_idx + 1)
logging.info(f'Fold {self.fold+1}, Epoch {self.epoch + 1}/{self.num_epochs}, Batch {batch_idx + 1}, Train Loss(CrossEntropy): {avg_train_batch_loss:.4f}')

return loss

def validation_step(self, batch, batch_idx):

embeddings, targets = batch
outputs = self(embeddings)
loss = self.loss_fn(outputs, targets)
self.val_loss += loss.item()

_, preds = torch.max(outputs, 1)
_, decode_target = torch.max(targets, 1) # 真实结果
self.val_outputs.extend(preds.cpu().numpy())
self.val_targets.extend(decode_target.cpu().numpy())
return loss

def test_step(self, batch, batch_idx):

embeddings, targets = batch
outputs = self(embeddings)
loss = self.loss_fn(outputs, targets)
self.test_loss += loss.item()

_, preds = torch.max(outputs, 1)
_, decode_target = torch.max(targets, 1) # 真实结果
self.test_outputs.extend(preds.cpu().numpy())
self.test_targets.extend(decode_target.cpu().numpy())

return loss

def on_train_epoch_start(self):
self.running_loss = 0
# 获取优化器
current_lr = self.configure_optimizers()["optimizer"].param_groups[0]['lr']
logging.info(f'Epoch {self.epoch + 1}/{self.num_epochs}, Learning Rate: {current_lr:.5f}')

def on_train_epoch_end(self):
avg_train_loss = self.running_loss / len(self.train_dataloader())
logging.info(f'Fold {self.fold+1}, Epoch {self.epoch+1}/{self.num_epochs}, Train Loss(CrossEntropy): {avg_train_loss:.4f}')
self.log('train_loss', avg_train_loss, prog_bar=True)

def on_validation_epoch_start(self):
self.val_loss = 0
self.val_outputs = []
self.val_targets = []

def on_validation_epoch_end(self):
val_f1 = self.f1(self.val_targets, self.val_outputs, average='macro')
avg_val_loss = self.val_loss / len(self.val_targets)
logging.info(f'Fold {self.fold+1}, Epoch {self.epoch+1}/{self.num_epochs}, Val Loss(CrossEntropy): {avg_val_loss:.4f}, Val F1: {val_f1:.4f}')
self.log('val_f1', val_f1, prog_bar=True)
self.log('val_loss', avg_val_loss, prog_bar=True)

def on_test_epoch_start(self):
self.test_loss = 0
self.test_outputs = []
self.test_targets = []

def on_test_epoch_end(self):
test_f1 = self.f1(self.test_targets, self.test_outputs, average='macro')
avg_test_loss = self.test_loss / len(self.test_targets)
logging.info(f'Fold {self.fold+1}, Epoch {self.epoch+1}/{self.num_epochs}, Test Loss(CrossEntropy): {avg_test_loss:.4f}, Val F1: {test_f1:.4f}')
self.log('test_f1', test_f1, prog_bar=True)
self.log('test_loss', avg_test_loss, prog_bar=True)

self.epoch += 1

# Lightning training normal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 初始化模型
pl_model = MyModule(
model = init_enformer_model(),
num_epochs=num_epochs,
class_weights=class_weights,
batch_size=batch_size
)
# 创建数据模块
dm = MyDataModule(
train_indices=train_indices,
val_indices=val_indices,
test_indices=test_indices,
batch_size=batch_size
)
# callback
# early_stop_callback = EarlyStopping(monitor="val_f1", min_delta=0.00, patience=10, verbose=False, mode="max")
checkpoint_callback = ModelCheckpoint(
monitor="val_f1", # 监控指标
mode="max", # 寻找最大值
save_top_k=3, # 只保存最好的一个模型
filename=f"{log_pfx}_"+"-{epoch}-{val_f1:.2f}",
dirpath="./checkpoints" # 保存路径
)
# 构建Trainer
trainer = Trainer(
precision="bf16-mixed",
accelerator="gpu",
devices=1,
max_epochs=num_epochs,
logger=pl.loggers.TensorBoardLogger(save_dir='./logs', name=log_pfx),
num_sanity_val_steps=0,
callbacks=[checkpoint_callback]
)

# Training.....
trainer.fit(pl_model, datamodule=dm) # 训练模型
trainer.test(pl_model, datamodule=dm) # 测试模型

# 保存最终模型
torch.save(pl_model.mymodel.state_dict(), f"./weights/{log_pfx}_final.pth")

# Lightning training k-fold

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# training process
for fold, (train_idx, val_idx) in enumerate(fold_indices):
print(f"\n----------- Training Fold {fold + 1}/5 -----------")

model = init_enformer_model()

# 初始化模型
pl_model = MyModule(
foldnum = fold,
model = model,
num_epochs=num_epochs,
class_weights=class_weights,
batch_size=batch_size
)

# 创建数据模块
dm = MyDataModule(
train_indices=train_idx,
val_indices=val_idx,
test_indices=test_indices,
batch_size=batch_size
)

# callback
early_stop_callback = EarlyStopping(
monitor="val_f1", min_delta=0.00,
patience=10, verbose=False, mode="max"
)
checkpoint_callback = ModelCheckpoint(
monitor="val_f1", # 监控指标
mode="max", # 寻找最大值
save_top_k=1, # 只保存最好的一个模型
filename=f"{log_pfx}_fold{fold}"+"-{epoch}-{val_f1:.2f}",
dirpath="./checkpoints()" # 保存路径
)

# 构建Trainer
trainer = Trainer(
precision="bf16-mixed",
accelerator="gpu",
devices=1,
max_epochs=num_epochs,
logger=pl.loggers.TensorBoardLogger(
save_dir='./logs',
name=log_pfx
), # 根据 self.log记录tensorboard的结果
num_sanity_val_steps=0,
callbacks=[
early_stop_callback,
checkpoint_callback
] # 早停机制根据self.log记录tensorboard的结果
)

# Training.....
trainer.fit(pl_model, datamodule=dm) # 训练模型
trainer.test(pl_model, datamodule=dm) # 测试模型

if fold + 1 == 5:
pass
else:
del pl_model
# 保存最佳模型
torch.save(
pl_model.mymodel.state_dict(),
f"./weights/{log_pfx}_best_fold{fold}.pth"
)

# 保存最终模型
torch.save(pl_model.state_dict(), f"./weights/{log_pfx}_final.pth")
Edited on