基于自编码器的心电信号异常检测(Pytorch)

代码较为简单,很容易读懂。

# Importing necessary libraries for TensorFlow, pandas, numpy, and matplotlib
import tensorflow as tf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import copy


# Importing the PyTorch library
import torch


# Importing additional libraries for data manipulation, visualization, and machine learning
import copy
import seaborn as sns
from pylab import rcParams
from matplotlib import rc
from sklearn.model_selection import train_test_split


# Importing PyTorch modules for neural network implementation
from torch import nn, optim
import torch.nn.functional as F
import torch.nn as nn


# Ignoring warnings to enhance code cleanliness
import warnings
warnings.filterwarnings('ignore')
df = pd.read_csv('http://storage.googleapis.com/download.tensorflow.org/data/ecg.csv',header=None)
df.head().T

df.describe()

df.isna().sum()
0      0
1      0
2      0
3      0
4      0
      ..
136    0
137    0
138    0
139    0
140    0
Length: 141, dtype: int64
df.dtypes
0      float64
1      float64
2      float64
3      float64
4      float64
        ...   
136    float64
137    float64
138    float64
139    float64
140    float64
Length: 141, dtype: object
new_columns = list(df.columns)
new_columns[-1] = 'target'
df.columns = new_columns
df.target.value_counts()
1.0    2919
0.0    2079
Name: target, dtype: int64
value_counts = df['target'].value_counts()


# Plotting
plt.figure(figsize=(8, 6))
value_counts.plot(kind='bar', color='skyblue')
plt.title('Value Counts of Target Column')
plt.xlabel('Target Values')
plt.ylabel('Count')


# Display the count values on top of the bars
for i, count in enumerate(value_counts):
    plt.text(i, count + 0.1, str(count), ha='center', va='bottom')


plt.show()

classes = df.target.unique()


def plot_ecg(data, class_name, ax, n_steps=10):
    # Convert data to a DataFrame
    time_series_df = pd.DataFrame(data)


    # Apply a moving average for smoothing
    smooth_data = time_series_df.rolling(window=n_steps, min_periods=1).mean()


    # Calculate upper and lower bounds for confidence interval
    deviation = time_series_df.rolling(window=n_steps, min_periods=1).std()
    upper_bound = smooth_data + deviation
    lower_bound = smooth_data - deviation


    # Plot the smoothed data
    ax.plot(smooth_data, color='black', linewidth=2)


    # Plot the confidence interval
    ax.fill_between(time_series_df.index, lower_bound[0], upper_bound[0], color='black', alpha=0.2)


    # Set the title
    ax.set_title(class_name)
# Plotting setup
fig, axs = plt.subplots(
    nrows=len(classes) // 3 + 1,
    ncols=3,
    sharey=True,
    figsize=(14, 8)
)


# Plot for each class
for i, cls in enumerate(classes):
    ax = axs.flat[i]
    data = df[df.target == cls].drop(labels='target', axis=1).mean(axis=0).to_numpy()
    plot_ecg(data, cls, ax)  # Using 'cls' directly as class name


# Adjust layout and remove extra axes
fig.delaxes(axs.flat[-1])
fig.tight_layout()


plt.show()

normal_df = df[df.target == 1].drop(labels='target', axis=1)
normal_df.shape
(2919, 140)
anomaly_df = df[df.target != 1].drop(labels='target', axis=1)
anomaly_df.shape
(2079, 140)
# Splitting the Dataset


# Initial Train-Validation Split:
# The dataset 'normal_df' is divided into training and validation sets.
# 15% of the data is allocated to the validation set.
# The use of 'random_state=42' ensures reproducibility.


train_df, val_df = train_test_split(
  normal_df,
  test_size=0.15,
  random_state=42
)


# Further Splitting for Validation and Test:
# The validation set obtained in the previous step is further split into validation and test sets.
# 33% of the validation set is allocated to the test set.
# The same 'random_state=42' is used for consistency in randomization.


val_df, test_df = train_test_split(
  val_df,
  test_size=0.30,
  random_state=42
)
# Function to Create a Dataset
def create_dataset(df):
    # Convert DataFrame to a list of sequences, each represented as a list of floats
    sequences = df.astype(np.float32).to_numpy().tolist()


    # Convert sequences to PyTorch tensors, each with shape (sequence_length, 1, num_features)
    dataset = [torch.tensor(s).unsqueeze(1).float() for s in sequences]


    # Extract dimensions of the dataset
    n_seq, seq_len, n_features = torch.stack(dataset).shape


    # Return the dataset, sequence length, and number of features
    return dataset, seq_len, n_features
# Create the training dataset from train_df
train_dataset, seq_len, n_features = create_dataset(train_df)


# Create the validation dataset from val_df
val_dataset, _, _ = create_dataset(val_df)


# Create the test dataset for normal cases from test_df
test_normal_dataset, _, _ = create_dataset(test_df)


# Create the test dataset for anomalous cases from anomaly_df
test_anomaly_dataset, _, _ = create_dataset(anomaly_df)

Implementation of LSTM-Based Autoencoder for ECG Anomaly Detection

class Encoder(nn.Module):


  def __init__(self, seq_len, n_features, embedding_dim=64):
    super(Encoder, self).__init__()


    self.seq_len, self.n_features = seq_len, n_features
    self.embedding_dim, self.hidden_dim = embedding_dim, 2 * embedding_dim


    self.rnn1 = nn.LSTM(
      input_size=n_features,
      hidden_size=self.hidden_dim,
      num_layers=1,
      batch_first=True
    )


    self.rnn2 = nn.LSTM(
      input_size=self.hidden_dim,
      hidden_size=embedding_dim,
      num_layers=1,
      batch_first=True
    )


  def forward(self, x):
    x = x.reshape((1, self.seq_len, self.n_features))


    x, (_, _) = self.rnn1(x)
    x, (hidden_n, _) = self.rnn2(x)


    return hidden_n.reshape((self.n_features, self.embedding_dim))
class Decoder(nn.Module):


  def __init__(self, seq_len, input_dim=64, n_features=1):
    super(Decoder, self).__init__()


    self.seq_len, self.input_dim = seq_len, input_dim
    self.hidden_dim, self.n_features = 2 * input_dim, n_features


    self.rnn1 = nn.LSTM(
      input_size=input_dim,
      hidden_size=input_dim,
      num_layers=1,
      batch_first=True
    )


    self.rnn2 = nn.LSTM(
      input_size=input_dim,
      hidden_size=self.hidden_dim,
      num_layers=1,
      batch_first=True
    )


    self.output_layer = nn.Linear(self.hidden_dim, n_features)


  def forward(self, x):
    x = x.repeat(self.seq_len, self.n_features)
    x = x.reshape((self.n_features, self.seq_len, self.input_dim))


    x, (hidden_n, cell_n) = self.rnn1(x)
    x, (hidden_n, cell_n) = self.rnn2(x)
    x = x.reshape((self.seq_len, self.hidden_dim))


    return self.output_layer(x)
class Autoencoder(nn.Module):


  def __init__(self, seq_len, n_features, embedding_dim=64):
    super(Autoencoder, self).__init__()


    self.encoder = Encoder(seq_len, n_features, embedding_dim).to(device)
    self.decoder = Decoder(seq_len, embedding_dim, n_features).to(device)


  def forward(self, x):
    x = self.encoder(x)
    x = self.decoder(x)


    return x
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Autoencoder(seq_len, n_features, 128)
model = model.to(device)

Training and Visualization of ECG Autoencoder Model

def plot_input_reconstruction(model, dataset, epoch):
    model = model.eval()


    plt.figure(figsize=(10, 5))


    # Take the first sequence from the dataset
    seq_true = dataset[0].to(device)
    seq_pred = model(seq_true)


    with torch.no_grad():
        # Squeeze the sequences to ensure they are 1-dimensional
        input_sequence = seq_true.squeeze().cpu().numpy()
        reconstruction_sequence = seq_pred.squeeze().cpu().numpy()


        # Check the shape after squeezing
        if input_sequence.ndim != 1 or reconstruction_sequence.ndim != 1:
            raise ValueError("Input and reconstruction sequences must be 1-dimensional after squeezing.")


        # Plotting the sequences
        plt.plot(input_sequence, label='Input Sequence', color='black')
        plt.plot(reconstruction_sequence, label='Reconstruction Sequence', color='red')
        plt.fill_between(range(len(input_sequence)), input_sequence, reconstruction_sequence, color='gray', alpha=0.5)


        plt.title(f'Input vs Reconstruction - Epoch {epoch}')
        plt.legend()
        plt.show()






import torch
import numpy as np
import copy


def train_model(model, train_dataset, val_dataset, n_epochs, save_path):
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)
    criterion = torch.nn.L1Loss(reduction='sum').to(device)
    history = {'train': [], 'val': []}


    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = float('inf')


    for epoch in range(1, n_epochs + 1):
        model.train()


        train_losses = []
        for seq_true in train_dataset:
            optimizer.zero_grad()


            seq_true = seq_true.to(device)
            seq_pred = model(seq_true)


            loss = criterion(seq_pred, seq_true)


            loss.backward()
            optimizer.step()


            train_losses.append(loss.item())


        val_losses = []
        model.eval()
        with torch.no_grad():
            for seq_true in val_dataset:
                seq_true = seq_true.to(device)
                seq_pred = model(seq_true)


                loss = criterion(seq_pred, seq_true)
                val_losses.append(loss.item())


        train_loss = np.mean(train_losses)
        val_loss = np.mean(val_losses)


        history['train'].append(train_loss)
        history['val'].append(val_loss)


        if val_loss < best_loss:
            best_loss = val_loss
            best_model_wts = copy.deepcopy(model.state_dict())
            # Save the best model weights
            print("Saving best model")
            torch.save(model.state_dict(), save_path)


        print(f'Epoch {epoch}: train loss {train_loss} val loss {val_loss}')


        if epoch == 1 or epoch % 5 == 0:
            plot_input_reconstruction(model, val_dataset, epoch)


    # Load the best model weights before returning
    model.load_state_dict(best_model_wts)
    return model.eval(), history
save_path = 'best_model.pth'  # Replace with your actual path
model, history = train_model(model, train_dataset, val_dataset, 100, save_path)

ax = plt.figure().gca()


ax.plot(history['train'],label='Train Loss', color='black')
ax.plot(history['val'],label='Val Loss', color='red')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['train', 'test'])
plt.title('Loss over training epochs')
plt.show();

ECG Anomaly Detection Model Evaluation and Visualization

model = Autoencoder(seq_len, n_features, 128)


model.load_state_dict(torch.load('best_model.pth'))


model = model.to(device)
model.eval()
Autoencoder(
  (encoder): Encoder(
    (rnn1): LSTM(1, 256, batch_first=True)
    (rnn2): LSTM(256, 128, batch_first=True)
  )
  (decoder): Decoder(
    (rnn1): LSTM(128, 128, batch_first=True)
    (rnn2): LSTM(128, 256, batch_first=True)
    (output_layer): Linear(in_features=256, out_features=1, bias=True)
  )
)
def predict(model, dataset):
  predictions, losses = [], []
  criterion = nn.L1Loss(reduction='sum').to(device)
  with torch.no_grad():
    model = model.eval()
    for seq_true in dataset:
      seq_true = seq_true.to(device)
      seq_pred = model(seq_true)


      loss = criterion(seq_pred, seq_true)


      predictions.append(seq_pred.cpu().numpy().flatten())
      losses.append(loss.item())
  return predictions, losses
_, losses = predict(model, train_dataset)


sns.distplot(losses, bins=50, kde=True, label='Train',color='black');


#Visualising train loss

Threshold = 25
predictions, pred_losses = predict(model, test_normal_dataset)
sns.distplot(pred_losses, bins=50, kde=True,color='black')

correct = sum(l <= 25 for l in pred_losses)
print(f'Correct normal predictions: {correct}/{len(test_normal_dataset)}')
Correct normal predictions: 141/145
anomaly_dataset = test_anomaly_dataset[:len(test_normal_dataset)]
predictions, pred_losses = predict(model, anomaly_dataset)
sns.distplot(pred_losses, bins=50, kde=True,color='red');

correct = sum(l > 25 for l in pred_losses)
print(f'Correct anomaly predictions: {correct}/{len(anomaly_dataset)}')

Correct anomaly predictions: 145/145

def plot_prediction(data, model, title, ax):
  predictions, pred_losses = predict(model, [data])


  ax.plot(data, label='true',color='black')
  ax.plot(predictions[0], label='reconstructed',color='red')
  ax.set_title(f'{title} (loss: {np.around(pred_losses[0], 2)})')
  ax.legend()
fig, axs = plt.subplots(
  nrows=2,
  ncols=4,
  sharey=True,
  sharex=True,
  figsize=(22, 8)
)


for i, data in enumerate(test_normal_dataset[:4]):
  plot_prediction(data, model, title='Normal', ax=axs[0, i])


for i, data in enumerate(test_anomaly_dataset[:4]):
  plot_prediction(data, model, title='Anomaly', ax=axs[1, i])


fig.tight_layout();

工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

最近更新

  1. TCP协议是安全的吗?

    2024-06-18 09:00:02       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-18 09:00:02       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-18 09:00:02       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-18 09:00:02       18 阅读

热门阅读

  1. 洛谷 AT_abc358_c [ABC358C] Popcorn 题解

    2024-06-18 09:00:02       12 阅读
  2. LINUX 精通 3.2

    2024-06-18 09:00:02       8 阅读
  3. 测试用例设计:提升测试覆盖率的策略与方法

    2024-06-18 09:00:02       10 阅读
  4. HTML页面定时刷新指南

    2024-06-18 09:00:02       7 阅读
  5. Docker的常见问题

    2024-06-18 09:00:02       8 阅读
  6. 1985H1 Maximize the Largest Component (Easy Version)

    2024-06-18 09:00:02       7 阅读
  7. sping怎么解决循环依赖

    2024-06-18 09:00:02       6 阅读
  8. Redis命令

    2024-06-18 09:00:02       7 阅读
  9. spi service实现类加载代码

    2024-06-18 09:00:02       7 阅读
  10. 浅谈配置元件之TCP取样器配置/TCP取样器

    2024-06-18 09:00:02       7 阅读
  11. 算法 Hw9

    2024-06-18 09:00:02       7 阅读
  12. 6月17号

    2024-06-18 09:00:02       10 阅读