Migrate from Apache Kafka in a single command
Move your workloads from any Kafka system to Redpanda with just one command
Author: Mihai Todor
Moving existing workloads to a new system is just as important as its deployment, operation, and management. At Redpanda, we believe there’s power in simplicity, so we’re excited to share how we’re making end-to-end migration drastically easier.
Introducing Redpanda Migrator, a tool designed to simplify migrations from any Apache Kafka® system to Redpanda. With fewer components to manage, this new tool has major benefits:
- Single Go binary with no complex deployment or JVM tuning.
- Single process to migrate topics, schemas, consumer groups, ACLs. No herding connectors or manual coordination.
- Single metrics endpoint with all you need to know about progress and health. No second guessing.
Redpanda Migrator is currently available in Redpanda Connect. Existing customers can try it with any Redpanda distribution: Cloud or Self-Managed. New users can try Redpanda Migrator for free as part of Redpanda Serverless.
In this post, we’ll briefly cover what Redpanda Migrator brings to the table and demonstrate a simple example of transferring data to Redpanda.
Why use Redpanda Migrator?
With Redpanda Migrator, you can move your workloads from any Kafka system to Redpanda using just one command.
While you can continue using MirrorMaker2 (MM2) and the Kafka Connect ecosystem with Redpanda clusters, we designed our new Redpanda Migrator for Redpanda Connect to address common concerns voiced by customers and internal Redpanda operators alike.
- Requires complex deployment and setup: Running MM2 in distributed mode involves the complexities and infrastructure overhead of running a second distributed system alongside Kafka.
- Prone to misconfiguration: Config is spread across multiple connectors, worker nodes, and the Kafka Connect runtime itself, making it hard to use for developers without deep Kafka expertise.
- Incomplete metrics experience: The default log level in Kafka Connect makes it difficult to debug issues with MM2 and other connectors. Still, increasing log verbosity can flood users with noise, making it difficult to diagnose problems.
- Difficult to tune performance: Between JVM garbage collection and years of bolt-on fixes, it’s hard to tune MM2 to keep up with high-volume data flows while maintaining minimal replication lag.
Redpanda Migrator in action
Here’s a quick demo of transferring data from a Kafka cluster to a Redpanda cluster using Redpanda Migrator. You can also check the Redpanda Connect Docs on GitHub for help getting started.
Note that the Redpanda Connect components that enable Redpanda Migrator’s functionality are:
kafka_migrator
inputkafka_migrator
outputkafka_franz
inputkafka_migrator_offsets
outputschema_registry
inputschema_registry
output
For convenience, these are bundled together in the kafka_migrator_bundle
input and kafka_migrator_bundle
output templates. For those who prefer to read, here's a step-by-step breakdown of the demo.
1. Create the Docker containers
First, you’ll need two clusters. To keep it simple, you can run the Bitnami Kafka and Schema Registry Docker containers for the source
cluster and a Redpanda Docker container for the destination
cluster via Docker Compose.
services:
source:
image: bitnami/kafka
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_DOCKER:PLAINTEXT
KAFKA_CFG_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_DOCKER://0.0.0.0:19092,CONTROLLER://:9093
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_DOCKER://source:19092
KAFKA_CFG_AUTHORIZER_CLASS_NAME: "org.apache.kafka.metadata.authorizer.StandardAuthorizer"
KAFKA_CFG_SUPER_USERS: User:redpanda;User:ANONYMOUS
ports:
- 9092:9092
- 19092:19092
healthcheck:
test: [ "CMD", "kafka-topics.sh", "--bootstrap-server=localhost:9092", "--list" ]
start_period: 5s
interval: 3s
init_source:
image: bitnami/kafka
working_dir: /opt/bitnami/kafka/bin
entrypoint: /bin/bash
depends_on:
source:
condition: service_healthy
command: >
-c "kafka-topics.sh --create --if-not-exists --topic foo --replication-factor=1 --partitions=2 --bootstrap-server source:19092 &&
kafka-topics.sh --create --if-not-exists --topic bar --replication-factor=1 --partitions=2 --bootstrap-server source:19092 &&
echo 'Created topics:' &&
kafka-topics.sh --list --exclude-internal --bootstrap-server source:19092 &&
kafka-acls.sh --bootstrap-server source:19092 --add --allow-principal User:redpanda --operation Read --topic foo &&
kafka-acls.sh --bootstrap-server source:19092 --add --deny-principal User:redpanda --operation Read --topic bar
echo 'Created ACLs:' &&
kafka-acls.sh --bootstrap-server source:19092 --list"
source_schema_registry:
image: bitnami/schema-registry
environment:
SCHEMA_REGISTRY_KAFKA_BROKERS: PLAINTEXT://source:19092
ports:
- 8081:8081
depends_on:
source:
condition: service_healthy
destination:
image: redpandadata/redpanda
command:
- redpanda
- start
- --node-id 0
- --mode dev-container
- --set rpk.additional_start_flags=[--reactor-backend=epoll]
- --set redpanda.auto_create_topics_enabled=false
- --kafka-addr 0.0.0.0:9093
- --advertise-kafka-addr localhost:9093
- --schema-registry-addr 0.0.0.0:8081
ports:
- 8082:8081
- 9093:9093
- 9645:9644
$ docker-compose -f docker-compose.yml up --force-recreate -V
Note: We used an
init
container above to create two topics,foo
andbar
, each with two partitions and an associated ACL.
2. Create schemas
Once the demo clusters are up and running, use curl to create a schema for each topic in the source
cluster.
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"name\": \"Foo\", \"type\": \"record\", \"fields\": [{\"name\": \"data\", \"type\": \"int\"}]}"}' http://localhost:8081/subjects/foo/versions
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"name\": \"Bar\", \"type\": \"record\", \"fields\": [{\"name\": \"data\", \"type\": \"int\"}]}"}' http://localhost:8081/subjects/bar/versions
3. Generate messages
Let’s simulate an application with a producer and consumer actively publishing and reading messages on the source
cluster. Use Redpanda Connect to generate some Avro-encoded messages and push them to the two topics you just created.
# generate_data.yaml
http:
enabled: false
input:
sequence:
inputs:
- generate:
mapping: |
let msg = counter()
root.data = $msg
meta kafka_topic = match $msg % 2 {
0 => "foo"
1 => "bar"
}
interval: 1s
count: 0
batch_size: 1
processors:
- schema_registry_encode:
url: "http://localhost:8081"
subject: ${! metadata("kafka_topic") }
avro_raw_json: true
output:
kafka_franz:
seed_brokers: [ "localhost:9092" ]
topic: ${! @kafka_topic }
partitioner: manual
partition: ${! random_int(min:0, max:1) }
You can start this pipeline and leave it running:
$ redpanda-connect run generate_data.yaml
You can also start a Redpanda Connect consumer, which reads messages from the source
cluster topics and also leaves it running. (This consumer uses the foobar
consumer group, which will be important later.)
# kafka_migrator_bundle.yaml
input:
kafka_migrator_bundle:
kafka_migrator:
seed_brokers: [ "localhost:9092" ]
topics:
- '^[^_]' # Skip internal topics which start with `_`
regexp_topics: true
consumer_group: migrator_bundle
start_from_oldest: true
schema_registry:
url: http://localhost:8081
include_deleted: true
subject_filter: ""
output:
kafka_migrator_bundle:
kafka_migrator:
seed_brokers: [ "localhost:9093" ]
max_in_flight: 1
schema_registry:
url: http://localhost:8082
metrics:
prometheus: {}
mapping: |
meta label = if this == "input_kafka_migrator_lag" { "source" }
Note: The max_in_flight setting is important to preserve message ordering at the partition level. Please refer to the documentation for details.
Next, launch the migrator bundle with the example configuration:
$ redpanda-connect run kafka_migrator_bundle.yaml
5. Check the status of migrated topics
You’re ready to check which topics and ACLs have been migrated to the destination
cluster.
Note: Roles are specific to Redpanda. For now, they have to be migrated manually.
$ rpk -X brokers=localhost:9093 -X admin.hosts=localhost:9645 topic list
NAME PARTITIONS REPLICAS
_schemas 1 1
bar 2 1
foo 2 1
$ rpk -X brokers=localhost:9093 -X admin.hosts=localhost:9645 security acl list
PRINCIPAL HOST RESOURCE-TYPE RESOURCE-NAME RESOURCE-PATTERN-TYPE OPERATION PERMISSION ERROR
User:redpanda * TOPIC bar LITERAL READ DENY
User:redpanda * TOPIC foo LITERAL READ ALLOW
6. Check metrics to monitor progress
Redpanda Connect emits Prometheus metrics for monitoring and trending with your observability stack. Besides the standard Redpanda Connect metrics, the kafka_migrator
input also emits an input_kafka_migrator_lag
metric for each topic and partition, which can be used to monitor its progress.
$ curl http://localhost:4195/metrics
...
# HELP input_kafka_migrator_lag Benthos Gauge metric
# TYPE input_kafka_migrator_lag gauge
input_kafka_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="__consumer_offsets"} 0
input_kafka_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="bar"} 0
input_kafka_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="foo"} 0
input_kafka_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="__consumer_offsets"} 0
input_kafka_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="bar"} 1
input_kafka_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="foo"} 0
...
7. Read from the migrated topics
Finally, we can stop the read_data_source.yaml
consumer we started previously and then start a similar consumer running against the destination
cluster. Before starting the consumer up on the destination
cluster, make sure to give the migrator bundle time to finish replicating the translated offset.
# read_data_destination.yaml
http:
enabled: false
input:
kafka_franz:
seed_brokers: [ "localhost:9093" ]
topics:
- '^[^_]' # Skip topics which start with `_`
regexp_topics: true
start_from_oldest: true
consumer_group: foobar
processors:
- schema_registry_decode:
url: "http://localhost:8082"
avro_raw_json: true
output:
stdout: {}
processors:
- mapping: |
root = this.merge({"count": counter(), "topic": @kafka_topic, "partition": @kafka_partition})
$ redpanda-connect run read_data_destination.yaml
And you’re all done!
It’s worth clarifying that the source
cluster consumer uses the foobar
consumer group. We started the destination
cluster consumer using the same consumer group; as you can see, it resumes reading messages from where the source
consumer left off.
Due to the mechanics of the Kafka protocol, we need to perform offset remapping when migrating consumer group offsets to the destination
cluster. While more sophisticated approaches are possible, we used a simple timestamp-based approach.
So, for each migrated offset, we first query the destination
cluster to find the latest offset before the received offset timestamp. We then use that as the destination
consumer group offset for the corresponding topic and partition pair.
Although the timestamp-based approach doesn’t guarantee exactly-once delivery, it minimizes the likelihood of message duplication and avoids the need for complex and error-prone offset remapping logic.
Get started with Redpanda Migrator
Redpanda Migrator is a convenient new tool that simplifies migrations from any Kafka system to Redpanda. It allows developers to easily move Kafka messages, schemas, and ACLs without digging into Kafka or Redpanda internals. We’ve designed Redpanda Migrator as part of Redpanda Connect to make it simple to use and powerful to run at scale.
Ready to migrate without the migraine? Try Redpanda Migrator yourself in Redpanda Cloud!