Detect anomalies in streaming data using Python and machine learning

Learn the simplest way to detect anomalies in streaming data

Redpanda Data
10 min readMay 8, 2024

Author: Artem Oppermann

Anomaly detection is a data analysis method that identifies patterns in data that do not conform to expected behavior. These inconsistencies in the data are known as anomalies or outliers. Using statistical, machine learning, or data mining techniques, anomaly detection uncovers hidden insights displaying deviations from most data. These deviations could indicate exceptional events, such as errors or fraud.

Its broad practical applications span business and technology, including fraud detection to preempt unauthorized transactions and insider trading. Furthermore, anomaly detection plays a crucial role in network intrusion detection by identifying unauthorized access and abnormal traffic patterns, enhancing network security against cyber threats. In system monitoring, it can locate unexpected behavior, faults, or failures, helping maintain system integrity and reliability.

This post introduces anomaly detection and its significance across industries. You’ll also learn how to set up a real-time detection system using Redpanda and Python’s machine learning tools.

What is anomaly detection?

Anomaly detection is a sophisticated process that identifies unusual patterns or behaviors within a data set. These irregularities in the data are considered anomalies or outliers. They are typically distinguished from normal behavior using various techniques.

For instance, the z-score statistical method can be used, where the z-score calculates how many standard deviations a data point is from the mean; if the z-score is above a certain threshold, the data point is considered an anomaly.

Anomalies in the data could represent errors, inefficiencies, or even significant opportunities, such as unique insights that may lead to innovation or growth. For example, a sudden spike in product sales in an unexpected demographic could signify a new market opportunity.

Anomaly detection can be applied to uncover hidden information, detect problems early, or highlight areas requiring further investigation. For instance, in the realm of fraud detection within the finance industry, algorithms like k-nearest neighbors (k-NN) are commonly used.

They can sift through vast amounts of transaction data and identify patterns that stand out as unusual. This might include strange purchasing behavior or irregular activities within an account. By spotting these inconsistencies, the algorithms can help to uncover potential fraudulent activities and, in this way, prevent financial crimes.

For network intrusion detection, anomaly detection helps identify unauthorized access or strange traffic patterns. These abnormal patterns may signal an attempted breach or unauthorized usage that requires immediate attention.

Network administrators can quickly detect these anomalies and initiate countermeasures to halt potential intrusions. This contributes to the overall security and integrity of computer systems so sensitive information remains protected and system functionality is maintained at optimal levels.

Anomaly detection in system monitoring enables quick identification of unexpected system behaviors like CPU spikes or log errors. By detecting these deviations, organizations can quickly respond and mitigate issues before they escalate. This is not just about troubleshooting; it’s also an opportunity for proactive maintenance.

For example, noticing a trend in declining hard drive performance could prompt preemptive actions, averting a potential system failure. At the same time, spotting abnormal network latency could reveal a misconfiguration. This would allow for timely adjustments that improve overall system efficiency.

Tutorial: anomaly detection with Redpanda, Python, and machine learning

It has always been challenging to harness the power of machine learning to analyze your streaming data. But Python has made machine learning accessible through various libraries and frameworks, such as TensorFlow and scikit-learn, which simplify the process of building, training, and deploying models.

This tutorial uses scikit-learn, a popular choice for classical machine learning algorithms, offering simple and efficient data analysis and modeling tools. The library provides various algorithms for tasks like regression, classification, and clustering, making it suitable for beginners and professionals.

In the following tutorial, you’ll use Redpanda (an Apache Kafka®–compatible event streaming platform), along with scikit-learn, in Python to create a real-time anomaly detection system that identifies unusual patterns or outliers within a stream of data:

Schematic depiction of the anomaly detection use case

Synthetic data is generated to simulate a sine wave with occasional random spikes to represent anomalies. This data and its corresponding unique time-steps are produced into a Redpanda topic called raw-data. An Apache Kafka consumer then consumes the data from the raw-data topic, processing it, and converting it into a format suitable for training a machine learning model.

The scikit-learn library’s Isolation Forest model is then trained on the collected data to predict anomalies, identifying data points that deviate from the expected pattern. An Apache Kafka producer then sends the detected anomalies and their corresponding time-steps together as JSON strings to another Redpanda topic called anomalies.

Prerequisites

To complete the tutorial, you need the following:

  • Python 3.6 or higher: The code is written in Python, so you’ll need to install Python on your system. You can download it from Python’s official website.
  • Redpanda: Redpanda is a streamlined Apache Kafka–compatible event streaming platform. You’ll need it installed and running. Follow the instructions on the Redpanda website to install and start Redpanda.
  • Apache Kafka–Python Library: This library provides the KafkaConsumer and KafkaProducer classes used in the code. Install it using pip:
pip install kafka-python
  • scikit-learn: scikit-learn is a machine-learning library used for anomaly detection in the code. You can also install it using pip:
pip install scikit-learn
  • NumPy: NumPy is used for numerical operations in the code. Here’s how to install it using pip:
pip install numpy
  • Redpanda topics: Create the Redpanda topics raw-data and anomalies before running the code. Use the rpk command-line tool, which provides an easy way to interact with your Redpanda cluster. To create the required topics, execute the following rpk commands:
rpk topic create raw-data  
rpk topic create anomalies

These commands will establish the necessary infrastructure for your data streams, allowing you to focus on monitoring and detecting anomalies.

  • Redpanda configuration: Ensure that Redpanda is configured to listen on the correct address and port (e.g., localhost:9092). If you're running Redpanda inside a Docker container or on a remote machine, adjust the address and port in both the Redpanda configuration and the Python code.

1. Generate synthetic data

Use the following code to create a script to generate synthetic data with occasional anomalies and produce it into the Redpanda topic raw-data:

#Import all required libraries
from kafka.producer import KafkaProducer
import numpy as np
import json

raw_data_producer = KafkaProducer(bootstrap_servers='localhost:9092')

#Produce synthetic data and the occasional anomalies
for i in range(1000):
value = np.sin(i * 0.1)

# Introduce a random spike as an anomaly with 1% probability
if np.random.rand() < 0.01:
value += np.random.rand() * 5

# Create a dictionary with both value and time-step
data = {
"value": value,
"time_step": i
}

# Convert the dictionary to a JSON string and encode it
raw_data_producer.send('raw-data', value=json.dumps(data).encode("utf-8"))

raw_data_producer.flush()
raw_data_producer.close()

The Kafka producer is initialized to connect to a Redpanda broker (or Kafka broker) running on localhost:9092. Then a loop runs 1,000 times, simulating 1,000 time-steps. For each time-step, a sine wave value is generated. A random spike (anomaly) with a 1 percent probability is added to the sine wave value.

A dictionary is created containing both the generated value and its corresponding time-step. This dictionary is converted to a JSON string and encoded to bytes. The encoded JSON string is sent to the raw-data topic using the Kafka producer.

After all messages are generated and sent, the producer's flush() method ensures all buffered messages are transmitted to the broker. The producer is then closed to release its resources.

2. Consume model data from a Redpanda topic

The following code provides a foundation for a real-time anomaly detection system, where data is consumed from a Kafka topic and processed for further analysis:

#Import all required libraries
from kafka.consumer import KafkaConsumer

# Create a consumer to read raw data
raw_data_consumer = KafkaConsumer('raw-data', bootstrap_servers='localhost:9092')

# Collect data for training and prediction, along with time-steps
data = []
time_steps = []
for i, msg in enumerate(raw_data_consumer):
# Decode the JSON string to a Python dictionary
decoded_msg = json.loads(msg.value.decode('utf-8'))

value = decoded_msg["value"]
data.append([value])
time_steps.append(i) # Assuming each data point corresponds to a time-step

# Stop collecting after 1000 data points
if len(data) >= 1000:
break

The code begins by importing all the required libraries, including the Kafka consumer and the Isolation Forest model from scikit-learn.

Next, a Kafka consumer is created to read raw data from the raw-data topic, which is configured to connect to a Redpanda broker running on localhost:9092.

The code then enters a loop where it consumes messages from the raw-data topic. Each message is expected to be a JSON string containing two fields: value, representing the data value, and time_step, representing the unique time-step when the data was created.

Inside the loop, the code decodes each message and parses the JSON string to extract both the value and the time-step. These are appended to the data and time_steps lists, respectively. The loop continues until 1,000 data points have been collected.

3. Train an anomaly detection model

This part of the code focuses on the anomaly detection process using the Isolation Forest algorithm:

from sklearn.ensemble import IsolationForest

# Convert to a NumPy array for Scikit-learn
X = np.array(data)

# Train an Isolation Forest model
model = IsolationForest(contamination=0.01)
model.fit(X)

# Predict anomalies
pred = model.predict(X)

# Anomalies are represented by -1
anomalies = X[pred == -1]
anomaly_time_steps = [time_steps[i] for i in range(len(pred)) if pred[i] == -1]

The collected data, which was previously stored in a list, is converted into a NumPy array. Next, an Isolation Forest model is instantiated with a contamination parameter set to 0.01. This parameter is used to control the proportion of outliers in the data, and in this case, it's set to 1 percent, meaning that the algorithm expects 1 percent of the data to be anomalous.

The model is then trained on the collected data using the fit method. This step involves the model learning the underlying structure of the data and determining what constitutes normal behavior. Once the model is trained, it's used to predict anomalies in the same data it was trained on. The predict method returns an array of predictions, where a prediction of −1 represents an anomaly and a prediction of 1 represents normal data.

The code then extracts the anomalies by selecting the data points where the prediction is −1. It also extracts the corresponding time-steps for these anomalies by selecting the time-steps at the same indices where the prediction is −1.

At the end of this segment, two variables, anomalies and anomaly_time_steps, contain the detected anomalous data values and their corresponding time-steps, respectively.

4. Produce the detected anomaly data into another Redpanda topic

This part of the code is responsible for sending the detected anomalies, along with their corresponding time-steps, to a Kafka topic named anomalies:

# Create a producer to send anomalies
anomalies_producer = KafkaProducer(bootstrap_servers='localhost:9092')

# Produce the detected anomalies and time-steps to the Redpanda topic
for anomaly, time_step in zip(anomalies, anomaly_time_steps):
anomaly_data = {
"value": anomaly[0],
"time_step": time_step
}
anomalies_producer.send('anomalies', value=json.dumps(anomaly_data))

anomalies_producer.flush()

# Print the results along with corresponding time-steps
print("Detected anomalies:")
for anomaly, time_step in zip(anomalies, anomaly_time_steps):
print(f"Value: {anomaly[0]}, Time-step: {time_step}")

anomalies_producer.close()

A Kafka producer is created to send messages to the anomalies topic. The producer is configured to connect to a Kafka or Redpanda broker running on localhost:9092. Next, the code enters a loop that iterates through the detected anomalies and their corresponding time-steps.

Inside the loop, a dictionary is created for each anomaly, containing two fields: value, representing the anomalous value, and time_step, representing the unique time-step when the anomaly was detected. The dictionary is then converted to a JSON string, and the JSON string is sent as a message to the anomalies topic using the Kafka producer.

After all the anomalies have been sent, the producer's flush method ensures all the buffered messages are transmitted to the broker so no messages are left unsent before the program continues.

5. Observe the results

With the previous code executed, you can check to see what anomalies were observed and sent to the Redpanda topic anomalies. To do this, execute the following command:

rpk topic consume anomalies

This command reads the data from a given topic—in this case, anomalies. The output should look like this:

{
"topic": "anomalies",
"value": "{\"anomalous_value\": -0.9999232575641008, \"time_step\": 47}",
"timestamp": 1658154017025,
"partition": 0,
"offset": 257
}
{
"topic": "anomalies",
"value": "{\"anomalous_value\": 5.6794386977017925, \"time_step\": 395}",
"timestamp": 1658154017133,
"partition": 0,
"offset": 258
}

Here, you see two irregularities or anomalies sent to the topic anomalies in Redpanda. The first anomaly has a value of −0.9999232575641008 and occurred at time-step 47. The second anomaly has a value of 5.6794386977017925 and occurred at time-step 395.

As a next step, you can take various actions based on these detected anomalies. You could set up automated alerts to notify system administrators of potential issues or integrate this data into a monitoring dashboard for real-time tracking and analysis. The goal is to use these anomalies as a basis for proactive maintenance and system optimization.

Conclusion

By generating synthetic data with occasional anomalies, consuming it through Kafka consumers, and applying the Isolation Forest algorithm, you can detect unusual patterns within a stream of data. Integrating Redpanda, scikit-learn and Python creates a robust and scalable solution that can be applied to various domains, from monitoring system health to detecting fraudulent activities.

You can find the complete code and further details in this GitHub repo. Feel free to clone, fork, or contribute to the project!

To keep exploring Redpanda, check the documentation and browse the blog for tutorials. If you have questions, join the Redpanda community on Slack.

--

--

Redpanda Data

The streaming data platform for developers—fully Kafka compatible. Helping developers build the best data streaming experiences.