从0开始搭建Auto-Encoder

Huahuatii大约 9 分钟深度学习搭建模型Deep LearningModel PracticeMLP

从0开始搭建Auto-Encoder

搭建Auto-Encoder模型的时候,选择了更加结构化的搭建模式:

  1. 搭建base_ae:这一步用来确定一个ae模型的基本结构会有哪些(其中有部分未必会用到)

1 搭建base_AE模型

  • 初始化
  • 编码函数
  • 解码函数
  • 采样函数
  • 生成函数
  • 前向传播(抽象方法)
  • 损失函数(抽象方法)
'base_ae.py'
from torch import nn
from abc import abstractmethod  # 定义抽象方法
from typing import TypeVar  # 定义数据类新的接口
from typing import List, Any
Tensor = TypeVar('torch.tensor')  # 定义泛型变量,表示一个 PyTorch 的 tensor


class BaseAE(nn.Module):
    def __init__(self) -> None:  # 初始化神经网络模型的各个组件
        super(BaseAE, self).__init__()

    def encode(self, input: Tensor) -> List[Tensor]:  # 定义一个编码过程
        raise NotImplementedError

    def decode(self, input: Tensor) -> List[Tensor]:  # 定义一个解码过程
        raise NotImplementedError

    def sample(self, batch_size: int, current_device: int, **kwargs) -> Tensor:  # 定义一个采样方法
        raise RuntimeWarning()

    def generate(self, x: Tensor, **kwargs) -> Tensor:  # 定义一个生成方法
        raise NotImplementedError
        
    def initialize_weights(self) -> None:  # 定义初始化权重方法
        raise NotImplementedError

    @abstractmethod  # 装饰器,声明下面的方法为抽象方法
    def forward(self, *inputs: Tensor) -> Tensor:  # 定义前向传播过程
        pass

    @abstractmethod
    def loss_function(self, *inputs: Any, **kwargs) -> Tensor:  # 定义损失函数
        pass

细节补充

sample:该方法通常需要从一个潜在空间中随机采样一个向量,然后使用解码器将该向量转换为一个输出。

generate :该方法是用来生成一个输出的,它接收一个潜在空间中的向量作为输入,然后使用解码器将这个向量转换为一个输出。

abstractmethod:具有抽象方法的类叫做抽象类(不可被实例化),可以通过在方法定义中添加装饰器@abstractmethod来将一个方法标记为抽象方法,通常在父类中直接写pass(否则直接子类没有写抽象方法也会直接继承该方法),所以子类必须必须重新覆盖该抽象方法,才能被实例化。

2 搭建具体AE模型

  • 继承
  • 添加层
  • 初始化权重
详情
'ae.py'
import torch
from torch import nn
from torch.nn import functional as F
from typing import List, TypeVar
from model.base_ae import BaseAE

Tensor = TypeVar('torch.tensor')


class AE(BaseAE):
    def __init__(self,  # AE的构造函数
                 input_dim: int,
                 latent_dim: int,
                 hidden_dims: List = None,
                 dop: float = 0.1,
                 noise_flag: bool = False,
                 **kwargs) -> None:
        super(AE, self).__init__()
        self.latent_dim = latent_dim
        self.noise_flag = noise_flag
        self.dop = dop

        if hidden_dims is None:
            hidden_dims = [512, 256]

        # build encoder
        modules = []

        # 添加第一个隐藏层
        modules.append(
            nn.Sequential(nn.Linear(input_dim, hidden_dims[0], bias=True),
                          nn.BatchNorm1d(hidden_dims[0]), nn.LeakyReLU(),
                          nn.Dropout(self.dop)))

        # 循环添加中间隐藏层
        for i in range(len(hidden_dims) - 1):
            modules.append(
                nn.Sequential(
                    nn.Linear(hidden_dims[i], hidden_dims[i + 1], bias=True),
                    nn.BatchNorm1d(hidden_dims[i + 1]), nn.LeakyReLU(),
                    nn.Dropout(self.dop)))
        # 添加最后一个隐藏层,输出维度为latent_dim
        modules.append(nn.Dropout(self.dop))
        modules.append(nn.Linear(hidden_dims[-1], latent_dim, bias=True))

        # 将以上层组成encoder
        self.encoder = nn.Sequential(*modules)

        # build decoder(同encoder)
        modules = []

        modules.append(
            nn.Sequential(nn.Linear(latent_dim, hidden_dims[-1], bias=True),
                          nn.BatchNorm1d(hidden_dims[-1]), nn.LeakyReLU(),
                          nn.Dropout(self.dop)))

        hidden_dims2 = hidden_dims[::-1]  # 确保encoder和decoder层对称

        for i in range(len(hidden_dims2) - 1):
            modules.append(
                nn.Sequential(
                    nn.Linear(hidden_dims2[i], hidden_dims2[i + 1], bias=True),
                    nn.BatchNorm1d(hidden_dims2[i + 1]), nn.LeakyReLU(),
                    nn.Dropout(self.dop)))
        self.decoder = nn.Sequential(*modules)

        self.final_layer = nn.Sequential(
            nn.Linear(hidden_dims2[-1], hidden_dims2[-1], bias=True),
            nn.BatchNorm1d(hidden_dims2[-1]), nn.LeakyReLU(), nn.Dropout(self.dop),
            nn.Linear(hidden_dims2[-1], input_dim))

        # 调用初始化权重方法
        initialize_weights()

    # 编码输出隐藏层结果
    def encode(self, input: Tensor) -> Tensor:
        if self.noise_flag and self.training: # 判断噪音tag/训练模式->是否添加噪音
            latent_code = self.encoder(
                input + torch.randn_like(input) * 0.05) # 隐藏层的位置添加0.05的噪音
        else:
            latent_code = self.encoder(input)

        return latent_code

    # 解码输出最终结果
    def decode(self, z: Tensor) -> Tensor:
        embed = self.decoder(z)
        outputs = self.final_layer(embed)

        return outputs

    # 前向传播,返回三个参数[输入值,隐藏值,重构值]
    def forward(self, input: Tensor, **kwargs) -> List[Tensor]:
        z = self.encode(input)
        return [input, self.decode(z), z]

    # 定义损失函数(MSE(均方误差))
    def loss_function(self, *args, **kwargs) -> dict:
        input = args[0]
        recons = args[1]

        recons_loss = F.mse_loss(input, recons)
        loss = recons_loss

        return {'loss': loss, 'recons_loss': recons_loss}

    # 定义采样的函数(在隐藏层随机采样)
    def sample(self, num_samples: int, current_device: int,
               **kwargs) -> Tensor:
        z = torch.randn(num_samples, self.latent_dim)

        z = z.to(current_device)
        samples = self.decode(z)

        return samples

    
    def generate(self, x: Tensor, **kwargs) -> Tensor:
        return self.forward(x)[1]
    
    # Initialize weights using Xavier initialization
    def initialize_weights(self):
        for layer in net.modules():
            if isinstance(layer, nn.Linear):
                nn.init.xavier_uniform_(layer.weight)
                nn.init.zeros_(layer.bias)

细节补充

3 调用模型进行训练(这里以网格搜索参数法为例)

  • 传入模型结构及参数
  • 读取预测数据集
  • 实例化模型
  • TensorBoard可视化训练过程(可选)
  • 输出模型信息(可选)
  • 训练过程
详情
'train_ae.py'
import torch
import csv
from torch.utils.data import DataLoader, random_split
from torch.optim.lr_scheduler import StepLR

from math import ceil
from tqdm import tqdm
from sklearn.model_selection import ParameterGrid

from model.ae import AE
from route.route import set_seed, read_uq_arr, myDataset



# 设定随机种子
set_seed(2029)

# gpu训练
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


# 网格参数
param_grid  = {
    'input_dim': [1126],
    'num_epochs': [502],
    'batch_size': [1024, 2048],
    'learning_rate': [0.001, 0.002, 0.0005],
    'latent_dim': [128],
    'hidden_dims': [[2048, 512, 256], [1024, 512, 256], [2048, 1024, 512]],
    'dop': [0.05, 0.1],
    'gamma': [0.4, 0.7, 1],
    'noise_flag': [True, False]
}


# 加载数据
BC32_uq1126 = read_uq_arr('BC32', gene_num=1126)  # 读取数据
data = torch.tensor(BC32_uq1126, dtype=torch.float32)  # 将arr数据转为tensor
data = data.to(device)  # 将数据挪到GPU上
dataset = myDataset(data)  # 构建数据集
train_size = ceil(len(dataset) * 0.8)  # 划分训练集大小
test_size = len(dataset) - train_size  # 划分测试集大小
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])  # 根据上述打乱数据集

# 定义最优损失和参数
best_loss = float('inf')
best_params = None

# 网格化搜索
for params in tqdm(ParameterGrid(param_grid), ncols=40):
    train_dataloader = DataLoader(train_dataset,
                              batch_size=params['batch_size'],
                              shuffle=True)
    test_dataloader = DataLoader(test_dataset,
                             batch_size=params['batch_size'],
                             shuffle=False)
    # 使用当前参数训练模型
    model = AE(input_dim=params['input_dim'], latent_dim=params['latent_dim'], hidden_dims=params['hidden_dims'], dop=params['dop'], noise_flag=params['noise_flag'])
    model = model.to(device)
    
    # 打印训练信息
    print("\t----------------------------- INFO -------------------------------")
    total_params = sum(p.numel() for p in model.parameters())
    print(f"Number of parameters: {total_params}")
    print(f"Parameters: {params}")
    print("\t----------------------------- END --------------------------------")
    
    # 使用当前参数训练模型
    optimizer = torch.optim.Adam(model.parameters(), lr=params['learning_rate'])
    criterion = model.loss_function()
    scheduler = StepLR(optimizer, step_size=50, gamma=params['gamma'])
    for epoch in range(params['num_epochs']):
        # 训练模型
        model.train()
        train_loss = 0.0
        for batch_idx, inputs in enumerate(train_dataloader):
            inputs = inputs.to(device)
            optimizer.zero_grad()
            inputs, outputs, _ = model(inputs)
            loss = model.loss_function(inputs, outputs)['loss']
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * inputs.size(0)
        train_loss /= len(train_dataloader.dataset)

        # 测试模型
        model.eval()
        test_loss = 0.0
        with torch.no_grad():
            for batch_idx, inputs in enumerate(test_dataloader):
                inputs, outputs, _ = model(inputs)
                loss = model.loss_function(inputs, outputs)['loss']
                test_loss += loss.item() * inputs.size(0)
            test_loss /= len(test_dataloader.dataset)

        # 更新学习率
        scheduler.step()
        # 打印训练日志
        print(
            f'Epoch {epoch+1}/{500}, Train Loss: {train_loss:.6f}, Test Loss: {test_loss:.6f}'
        )
        # 将训练结果保存为一个tsv文件,用于查看参数以及训练结果
        if epoch+1 % 100 == 0:
            with open('result.tsv','a',newline="") as f:
                fields = {'gamma', 'dop', 'noise_flag', 'input_dim', 'hidden_dims', 'learning_rate', 'num_epochs', 'test_loss', 'latent_dim', 'batch_size', 'train_loss'}
                writer = csv.DictWriter(f, delimiter='\t', fieldnames=fields)
                # writer.writeheader()
                result = params.copy()
                result['num_epochs'] = epoch
                result['train_loss'] = round(train_loss, 4)
                result['test_loss'] = round(test_loss, 4)
                writer.writerow(result)
                
    # 保存最好的参数组合
    if test_loss < best_loss:
        best_loss = test_loss
        best_params = params
        torch.save(model.state_dict(), './saved/ae/ae_model.pt')

    print("最佳参数组合:", best_params)
    print("最佳测试损失:", best_loss)
附`route.py`
'route.py'
import scanpy as sc
import pandas as pd
import numpy as np
import torch
import os
import random
import pickle, json
from typing import TypeVar, List, Any
from torch.utils.data import Dataset

Array = TypeVar("numpy.adarray")
base_dir = "/home/hht/Myapps/Transfer_Project/data/"


# 设定随机种子
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.device_count() > 0:
        torch.cuda.manual_seed_all(seed)


# 对array数据进行标准化
def get_normed_array(arr: Array) -> Array:
    np.seterr(divide='ignore', invalid='ignore')
    means = np.mean(arr, axis=0)
    stds = np.std(arr, axis=0)
    stds = np.where(stds == 0, 1, stds)
    standardized_arr = (arr - means) / stds
    return standardized_arr


# 计算混淆矩阵
def confusion_matrix(predicted_labels, true_labels, num_classes):
    cm = torch.zeros((num_classes, num_classes), dtype=torch.float32)
    num_samples = torch.zeros(num_classes, dtype=torch.float32)
    for i in range(len(predicted_labels)):
        cm[predicted_labels[i], true_labels[i]] += 1
        num_samples[true_labels[i]] += 1
    return cm, num_samples


# 计算准确率
def accuracy(cm):
    return torch.diag(cm).sum() / cm.sum()


# 计算召回率
def recall(cm):
    return torch.diag(cm) / cm.sum(dim=1)


def weighted_recall(cm, num_samples):
    r = recall(cm)
    w = num_samples / num_samples.sum()
    avg_recall = torch.dot(r, w)
    return avg_recall


# 计算精确率
def precision(cm):
    return torch.diag(cm) / cm.sum(dim=0)


def weighted_precision(cm, num_samples):
    num_classes = cm.shape[0]
    precisions = torch.zeros(num_classes, dtype=torch.float32)
    for i in range(num_classes):
        if num_samples[i] > 0:
            precisions[i] = cm[i, i] / cm[:, i].sum()
    avg_precision = torch.sum(
        precisions * num_samples) / torch.sum(num_samples)
    return avg_precision


# 计算F1值
def f1_score(precision, recall):
    return 2 * precision * recall / (precision + recall)


def weighted_f1_score(avg_precision, avg_recall):
    return 2 * avg_precision * avg_recall / (avg_precision + avg_recall)


# 设定所需的common_gene
def choose_common_gene(gene_num: int) -> list:
    if gene_num == 1729:
        with open(base_dir + "Mart/common_uq_geneID_1729.pickle", "rb") as f:
            cols_to_keep = list(pickle.load(f))
    elif gene_num == 1126:
        with open(base_dir + "Mart/common_uq_geneID_1126.pickle", "rb") as f:
            cols_to_keep = list(pickle.load(f))
    elif gene_num == 1416:
        with open(base_dir + "Mart/common_uq_geneID_1416.pickle", "rb") as f:
            cols_to_keep = list(pickle.load(f))
    else:
        raise Exception("输入正确gene_num")
    return cols_to_keep


# 根据所需的特定基因读取数据的标准化arr
def read_uq_arr(dataset_name: str, gene_num: int) -> Array:
    cols_to_keep = choose_common_gene(gene_num)
    BC32_gex = sc.read_h5ad(base_dir + "BC32/BC32_gex.h5ad")
    BC32_uq1729_df = BC32_gex[:, BC32_gex.var.index.isin(cols_to_keep)].to_df()
    BC32_uq1729_arr = np.array(BC32_uq1729_df)
    BC32_uq1729 = get_normed_array(BC32_uq1729_arr)
    if dataset_name == 'BC32':
        return BC32_uq1729
    if dataset_name == 'BC15':
        BC11_gex = sc.read_h5ad(base_dir + "BC15/BC15_gex.h5ad")
        BC11_uq_x_df = BC11_gex[:,
                                BC11_gex.var.index.isin(cols_to_keep)].to_df()
        BC11_uq1729_df = BC11_uq_x_df.reindex(columns=BC32_uq1729_df.columns,
                                              fill_value=0)
        BC11_uq1729_arr = np.array(BC11_uq1729_df)
        BC11_uq1729 = get_normed_array(BC11_uq1729_arr)
        return BC11_uq1729

    if dataset_name == 'BC11':
        BC11_gex = sc.read_h5ad(base_dir + "BC15/BC11/BC11_gex.h5ad")
        BC11_uq_x_df = BC11_gex[:,
                                BC11_gex.var.index.isin(cols_to_keep)].to_df()
        BC11_uq1729_df = BC11_uq_x_df.reindex(columns=BC32_uq1729_df.columns,
                                              fill_value=0)
        BC11_uq1729_arr = np.array(BC11_uq1729_df)
        BC11_uq1729 = get_normed_array(BC11_uq1729_arr)
        return BC11_uq1729


# 读取数据标签
def read_label(dataset_name: str) -> list:
    if dataset_name == 'BC32':
        with open(base_dir + "BC32/lables.pickle", 'rb') as f:
            labels = pickle.load(f)
    if dataset_name == 'BC15':
        with open(base_dir + "BC15/lables.pickle", 'rb') as f:
            labels = pickle.load(f)
    if dataset_name == 'BC11':
        with open(base_dir + "BC15/BC11/lables.pickle", 'rb') as f:
            labels = pickle.load(f)
    labels = pd.Series(labels)
    labels, classes = pd.factorize(labels)
    return labels, classes


with open(base_dir + "BC32/lables.pickle", "rb") as f:

    BC32_labels = pd.Series(pickle.load(f))

    BC32_labels, BC32_classes = pd.factorize(BC32_labels)



class myDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return len(self.data)

4 根据编码器进行预测

  • 继承
  • 添加层
  • 初始化权重
详情
'ae.py'
import torch
from torch import nn
from torch.nn import functional as F
from typing import List, TypeVar
from model.base_ae import BaseAE

Tensor = TypeVar('torch.tensor')


class AE(BaseAE):
    def __init__(self,  # AE的构造函数
                 input_dim: int,
                 latent_dim: int,
                 hidden_dims: List = None,
                 dop: float = 0.1,
                 noise_flag: bool = False,
                 **kwargs) -> None:
        super(AE, self).__init__()
        self.latent_dim = latent_dim
        self.noise_flag = noise_flag
        self.dop = dop

        if hidden_dims is None:
            hidden_dims = [512, 256]

        # build encoder
        modules = []

        # 添加第一个隐藏层
        modules.append(
            nn.Sequential(nn.Linear(input_dim, hidden_dims[0], bias=True),
                          nn.BatchNorm1d(hidden_dims[0]), nn.LeakyReLU(),
                          nn.Dropout(self.dop)))

        # 循环添加中间隐藏层
        for i in range(len(hidden_dims) - 1):
            modules.append(
                nn.Sequential(
                    nn.Linear(hidden_dims[i], hidden_dims[i + 1], bias=True),
                    nn.BatchNorm1d(hidden_dims[i + 1]), nn.LeakyReLU(),
                    nn.Dropout(self.dop)))
        # 添加最后一个隐藏层,输出维度为latent_dim
        modules.append(nn.Dropout(self.dop))
        modules.append(nn.Linear(hidden_dims[-1], latent_dim, bias=True))

        # 将以上层组成encoder
        self.encoder = nn.Sequential(*modules)

        # build decoder(同encoder)
        modules = []

        modules.append(
            nn.Sequential(nn.Linear(latent_dim, hidden_dims[-1], bias=True),
                          nn.BatchNorm1d(hidden_dims[-1]), nn.LeakyReLU(),
                          nn.Dropout(self.dop)))

        hidden_dims2 = hidden_dims[::-1]  # 确保encoder和decoder层对称

        for i in range(len(hidden_dims2) - 1):
            modules.append(
                nn.Sequential(
                    nn.Linear(hidden_dims2[i], hidden_dims2[i + 1], bias=True),
                    nn.BatchNorm1d(hidden_dims2[i + 1]), nn.LeakyReLU(),
                    nn.Dropout(self.dop)))
        self.decoder = nn.Sequential(*modules)

        self.final_layer = nn.Sequential(
            nn.Linear(hidden_dims2[-1], hidden_dims2[-1], bias=True),
            nn.BatchNorm1d(hidden_dims2[-1]), nn.LeakyReLU(), nn.Dropout(self.dop),
            nn.Linear(hidden_dims2[-1], input_dim))

        # 调用初始化权重方法
        initialize_weights()

    # 编码输出隐藏层结果
    def encode(self, input: Tensor) -> Tensor:
        if self.noise_flag and self.training: # 判断噪音tag/训练模式->是否添加噪音
            latent_code = self.encoder(
                input + torch.randn_like(input) * 0.05) # 隐藏层的位置添加0.05的噪音
        else:
            latent_code = self.encoder(input)

        return latent_code

    # 解码输出最终结果
    def decode(self, z: Tensor) -> Tensor:
        embed = self.decoder(z)
        outputs = self.final_layer(embed)

        return outputs

    # 前向传播,返回三个参数[输入值,隐藏值,重构值]
    def forward(self, input: Tensor, **kwargs) -> List[Tensor]:
        z = self.encode(input)
        return [input, self.decode(z), z]

    # 定义损失函数(MSE(均方误差))
    def loss_function(self, *args, **kwargs) -> dict:
        input = args[0]
        recons = args[1]

        recons_loss = F.mse_loss(input, recons)
        loss = recons_loss

        return {'loss': loss, 'recons_loss': recons_loss}

    # 定义采样的函数(在隐藏层随机采样)
    def sample(self, num_samples: int, current_device: int,
               **kwargs) -> Tensor:
        z = torch.randn(num_samples, self.latent_dim)

        z = z.to(current_device)
        samples = self.decode(z)

        return samples

    
    def generate(self, x: Tensor, **kwargs) -> Tensor:
        return self.forward(x)[1]
    
    # Initialize weights using Xavier initialization
    def initialize_weights(self):
        for layer in net.modules():
            if isinstance(layer, nn.Linear):
                nn.init.xavier_uniform_(layer.weight)
                nn.init.zeros_(layer.bias)

细节补充