Autoencoder For Anomaly Detection Using Tensorflow Keras

Autoencoder For Anomaly Detection Using Tensorflow Keras

Autoencoder is an unsupervised neural network model that uses reconstruction error to detect anomalies or outliers. The reconstruction error is the difference between the reconstructed data and the input data.

Autoencoder uses only normal data to train the model and all data to make predictions. Therefore, we expect outliers to have higher reconstruction errors because they are different from the regular data.

In this article, we will use the Python Tensorflow Keras library to illustrate the process of identifying outliers using an autoencoder. To be specific, we will cover:

  • What is the algorithm behind autoencoder for anomaly detection?
  • How to train an autoencoder model?
  • How to set a threshold for autoencoder anomaly detection?
  • How to evaluate autoencoder anomaly detection performance?

Resources for this post:

Step 1: Import Libraries

# Synthetic dataset
from sklearn.datasets import make_classification

# Data processing
import pandas as pd
import numpy as np
from collections import Counter

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Model and performance
import tensorflow as tf
from tensorflow.keras import layers, losses
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

Step 2: Create Dataset With Anomalies

Using make_classification from the sklearn library, We created two classes with the ratio between the majority class and the minority class being 0.995:0.005. 32 informative features were made as predictors. We did not include any redundant or repeated features in this dataset.

# Create an imbalanced dataset
X, y = make_classification(n_samples=100000, n_features=32, n_informative=32,
                           n_redundant=0, n_repeated=0, n_classes=2,
                           n_clusters_per_class=1,
                           weights=[0.995, 0.005],
                           class_sep=0.5, random_state=0)

Step 3: Train Test Split

In this step, we split the dataset into 80% training data and 20% validation data. random_state ensures that we have the same train test split every time. The seed number for random_state does not have to be 42, and it can be any number.

# Train test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Check the number of records
print('The number of records in the training dataset is', X_train.shape[0])
print('The number of records in the test dataset is', X_test.shape[0])
print(f"The training dataset has {sorted(Counter(y_train).items())[0][1]} records for the majority class and {sorted(Counter(y_train).items())[1][1]} records for the minority class.")

The train test split gives us 80,000 records for the training dataset and 20,000 for the validation dataset. And we have 79,200 data points from the majority class and 800 from the minority class in the training dataset. Minority class data points are outliers or anomalies.

Step 4: Autoencoder Algorithm For Anomaly Detection

The Autoencoder model for anomaly detection has six steps. The first three steps are for model training, and the last three steps are for model prediction.

  • Step 1 is the encoder step. The essential information is extracted by a neural network model in this step.
  • Step 2 is the decoder step. In this step, the model reconstructs the data using the extracted information.
  • Step 3: Iterate step 1 and step 2 to adjust the model to minimize the difference between input and reconstructed output, until we get good reconstruction results for the training dataset.
  • Step 4: Make predictions on a dataset that includes outliers.
  • Step 5: Set up a threshold for outliers/anomalies by comparing the differences between the autoencoder model reconstruction value and the actual value.
  • Step 6: Identify the data points with the difference higher than the threshold to be outliers or anomalies.

Step 5: Autoencoder Model Training

The autoencoder model trains on the normal dataset, so we must first separate the expected data from the anomaly data.

Then we created the input layer, encoder layers, and decoder layers.

In the input layer, we specified the shape of the dataset. Because the modeling dataset has 32 features, the shape is 32 here.

The encoder consists of 3 layers with 16, 8, and 4 neurons, respectively. Note that the encoder requires the number of neurons to decrease with the layers. The last layer in the encoder is the size of the encoded representation, and it is also called the bottleneck.

The decoder consists of 3 layers with 8, 16, and 32 neurons, respectively. Opposite from the encoder, the decoder requires the number of neurons to increase with the layers. The output layer in the decoder has the same size as the input layer.

The relu activation function is used for each layer except for the decoder output layer. relu is a popular activation function, but you can try other activation functions and compare the model performance.

After defining the input, encoder, and decoder layers, we create the autoencoder model to combine the layers.

# Keep only the normal data for the training dataset
X_train_normal = X_train[np.where(y_train == 0)]

# Input layer
input = tf.keras.layers.Input(shape=(32,))

# Encoder layers
encoder = tf.keras.Sequential([
  layers.Dense(16, activation='relu'),
  layers.Dense(8, activation='relu'),
  layers.Dense(4, activation='relu')])(input)

# Decoder layers
decoder = tf.keras.Sequential([
      layers.Dense(8, activation="relu"),
      layers.Dense(16, activation="relu"),
      layers.Dense(32, activation="sigmoid")])(encoder)

# Create the autoencoder
autoencoder = tf.keras.Model(inputs=input, outputs=decoder)

After defining the input, encoder, and decoder layers, we create the autoencoder model to combine the layers.

After creating the autoencoder model, we compile the model with the optimizer of adam and the loss of mae (Mean Absolute Error).

When fitting the autoencoder model, we can see that the input and output datasets are the same, which is the dataset that contains only the normal data points.

The validation data is the testing dataset that contains both normal and anomaly data points.

The epochs of 20 and batch_size of 64 mean the model uses 64 datapoints to update the weights in each iteration, and the model will go through the whole training dataset 20 times.

shuffle=True will shuffle the dataset before each epoch.

# Compile the autoencoder
autoencoder.compile(optimizer='adam', loss='mae')

# Fit the autoencoder
history = autoencoder.fit(X_train_normal, X_train_normal, 
          epochs=20, 
          batch_size=64,
          validation_data=(X_test, X_test),
          shuffle=True)
Epoch 1/20
1238/1238 [==============================] - 2s 1ms/step - loss: 2.5375 - val_loss: 2.5047
Epoch 2/20
1238/1238 [==============================] - 2s 1ms/step - loss: 2.4882 - val_loss: 2.4829
Epoch 3/20
.........
.........
Epoch 19/20
1238/1238 [==============================] - 2s 1ms/step - loss: 2.4568 - val_loss: 2.4602
Epoch 20/20
1238/1238 [==============================] - 2s 1ms/step - loss: 2.4560 - val_loss: 2.4593

This chart visualizes the training and validation loss changes during the model fitting. The x-axis is the number of epochs, and the y axis is the loss. We can see that both training and validation losses decrease with the increase of epochs.

Step 6: Autoencoder Anomaly Detection Threshold

Now that we have an autoencoder model, let’s use it to predict the outliers.

Firstly, we use .predict to get the reconstruction value for the testing data set containing the usual data points and the outliers.

Then we calculate the loss value between actual and reconstruction using mean absolute error.

After that, a threshold is set to identify the outliers. This threshold can be based on percentile, standard deviation, or other methods. We use 98% loss as the threshold to identify 2% of the data as outliers in this example.

# Predict anomalies/outliers in the training dataset
prediction = autoencoder.predict(X_test)

# Get the mean absolute error between actual and reconstruction/prediction
prediction_loss = tf.keras.losses.mae(prediction, X_test)

# Check the prediction loss threshold for 2% of outliers
loss_threshold = np.percentile(prediction_loss, 98)
print(f'The prediction loss threshold for 2% of outliers is {loss_threshold:.2f}')

# Visualize the threshold
sns.histplot(prediction_loss, bins=30, alpha=0.8)
plt.axvline(x=loss_threshold, color='orange')

The visualization chart shows that the prediction loss is close to a normal distribution with a mean of around 2.5. The prediction loss threshold for 2% of outliers is about 3.5.

Step 7: Autoencoder Anomaly Detection Performance

Sometimes the dataset has the ground truth label for the anomalies, and the dataset often does not. When there is a label for anomalies, we can evaluate the model performance.

Based on the threshold we identified in the previous step, we predicted normal data points if the prediction loss is less than the threshold. Otherwise, we predict the data point to be an outlier or anomaly. We label the normal prediction 0 and outlier prediction 1 to be consistent with the ground truth label.

# Check the model performance at 2% threshold
threshold_prediction = [0 if i < loss_threshold else 1 for i in prediction_loss]

# # Check the prediction performance
print(classification_report(y_test, threshold_prediction))

The recall value of 0.01 shows that around 1% of the outliers were captured by the autoencoder.

              precision    recall  f1-score   support

           0       0.99      0.98      0.98     19803
           1       0.01      0.01      0.01       197

    accuracy                           0.97     20000
   macro avg       0.50      0.50      0.50     20000
weighted avg       0.98      0.97      0.98     20000

Step 8: Put All Code Together

###### Step 1: Import Libraries

# Synthetic dataset
from sklearn.datasets import make_classification

# Data processing
import pandas as pd
import numpy as np
from collections import Counter

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Model and performance
import tensorflow as tf
from tensorflow.keras import layers, losses
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report


###### Step 2: Create Dataset With Anomalies

# Create an imbalanced dataset
X, y = make_classification(n_samples=100000, n_features=32, n_informative=32,
                           n_redundant=0, n_repeated=0, n_classes=2,
                           n_clusters_per_class=1,
                           weights=[0.995, 0.005],
                           class_sep=0.5, random_state=0)


###### Step 3: Train Test Split

# Train test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Check the number of records
print('The number of records in the training dataset is', X_train.shape[0])
print('The number of records in the test dataset is', X_test.shape[0])
print(f"The training dataset has {sorted(Counter(y_train).items())[0][1]} records for the majority class and {sorted(Counter(y_train).items())[1][1]} records for the minority class.")


###### Step 4: Autoencoder Algorithm For Anomaly Detection

# No code for this step


###### Step 5: Autoencoder Model Training

# Keep only the normal data for the training dataset
X_train_normal = X_train[np.where(y_train == 0)]

# Input layer
input = tf.keras.layers.Input(shape=(32,))

# Encoder layers
encoder = tf.keras.Sequential([
  layers.Dense(16, activation='relu'),
  layers.Dense(8, activation='relu'),
  layers.Dense(4, activation='relu')])(input)

# Decoder layers
decoder = tf.keras.Sequential([
      layers.Dense(8, activation="relu"),
      layers.Dense(16, activation="relu"),
      layers.Dense(32, activation="sigmoid")])(encoder)

# Create the autoencoder
autoencoder = tf.keras.Model(inputs=input, outputs=decoder)


###### Step 6: Autoencoder Anomaly Detection Threshold

# Predict anomalies/outliers in the training dataset
prediction = autoencoder.predict(X_test)

# Get the mean absolute error between actual and reconstruction/prediction
prediction_loss = tf.keras.losses.mae(prediction, X_test)

# Check the prediction loss threshold for 2% of outliers
loss_threshold = np.percentile(prediction_loss, 98)
print(f'The prediction loss threshold for 2% of outliers is {loss_threshold:.2f}')

# Visualize the threshold
sns.histplot(prediction_loss, bins=30, alpha=0.8)
plt.axvline(x=loss_threshold, color='orange')


###### Step 7: Autoencoder Anomaly Dectection Performance

# Check the model performance at 2% threshold
threshold_prediction = [0 if i < loss_threshold else 1 for i in prediction_loss]

# # Check the prediction performance
print(classification_report(y_test, threshold_prediction))

Summary

In this article, we went through the autoencoder neural network model for anomaly detection.

Using the Tensorflow Keras API in Python, we covered

  • What is autoencoder?
  • What is the algorithm behind autoencoder for anomaly detection?
  • How to train an autoencoder model?
  • How to set a threshold for autoencoder anomaly detection?
  • How to evaluate autoencoder anomaly detection performance?

For more information about data science and machine learning, please check out my YouTube channel and Medium Page or follow me on LinkedIn.

Recommended Tutorials


References

Leave a Comment

Your email address will not be published. Required fields are marked *