Seamless Topic Migration: A Step-by-Step Guide from Confluent Kafka to MSK

As we explored options for migrating our Kafka clusters from Confluent to MSK, we discovered two potential solutions: MirrorMaker and Kafka Connect Replicator.

Considering our clusters’ substantial number of topics, we opted to start with Kafka Replicator due to its advanced features, including data filtering, transformation, and compatibility options. The choice between these solutions depends on the specific requirements, control, and flexibility needed for data replication tasks.

In this blog, we will provide a comprehensive guide on how we built a straightforward replicator service that efficiently transfers topics from Confluent to MSK. We will outline the necessary steps to ensure a clear understanding of the process and delve into each step, highlighting the key actions required at each stage.

In our setup, we will cover the following steps:

  1. Cloning our pre-built replicator repository
  2. Building a Docker image with Amazon MSK Library
  3. Customizing the connector’s properties
  4. Kubernetes deployment
  5. Helm deployment
  6. Observing the topics in MSK

Cloning the replicator repository

The replicator, which is a part of Confluent Platform built on Kafka Connect, enables seamless data replication between two distinct Kafka clusters. Consequently, we will clone our project, along with its deployment configuration.

git clone git@github.com:naturalett/replicator-connector-confluent-to-msk.git
cd replicator-connector-confluent-to-msk

Building a Docker Image with Amazon MSK Library

In our approach, we build a Docker image by starting with the confluentinc/cp-enterprise-replicator-executable base image and incorporating the Amazon MSK Library. By utilizing the MSK Library, JVM-based Apache Kafka clients gain the ability to leverage AWS IAM for authentication and authorization when interacting with Amazon MSK clusters that have AWS IAM enabled as their authentication mechanism.

You have the option to either build the image yourself or utilize the pre-built image provided below.

cat <<EOF | docker build -t naturalett/cp-enterprise-replicator-executable:7.4.0 .  -f -
FROM confluentinc/cp-enterprise-replicator-executable:7.4.0
USER root
RUN curl -sSL https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.6/aws-msk-iam-auth-1.1.6-all.jar -o /usr/share/java/kafka/aws-msk-iam-auth-1.1.6-all.jar
USER appuser
EXPOSE 8083
CMD ["/etc/confluent/docker/run"]
EOF

Our pre-built image is: naturalett/cp-enterprise-replicator-executable:7.4.0

Customizing the connector’s properties

In the replicator setup, there are three config files:

  • Consumer
  • Producer
  • Replication

This content provides the minimal configuration changes for each file. To configure the properties, you will need to set up specific environment variables, which will then be used to create the corresponding properties.

The necessary environment variables for MSK configuration include your:

  • Topic name
  • MSK broker endpoint
  • MSK cluster name

Additionally, for Confluent Kafka configuration, you will need to set the environment variables for:

  • Bootstrap servers
  • Bootstrap username
  • Bootstrap password

Let’s export the following environment variables:

# Specify the desired replication topic
export TOPIC=<FILL IT UP>

# Specify the desired MSK broker endpoints
export MSK_BROKER_ENDPOINT_1=<FILL IT UP>
export MSK_BROKER_ENDPOINT_2=<FILL IT UP>
export MSK_BROKER_ENDPOINT_3=<FILL IT UP>

# Specify the desired MSK cluster name
export MSK_CLUSTER_NAME=<FILL IT UP>

# Specify the current bootstrap server
export BOOTSTRAP_SERVERS=<FILL IT UP>

# Specify the username and password (API Key) of the bootstrap server
export BOOTSTRAP_USERNAME=<FILL IT UP>
export BOOTSTRAP_PASSWORD=<FILL IT UP>

Generate the replication.properties:

cat replicator/secrets-tpl/replication.properties.tpl | sed -e "s/TOPIC/${TOPIC}/" > replicator/secrets/replication.properties
cat replicator/secrets-tpl/replication.properties.tpl | sed -e "s/TOPIC/${TOPIC}/" > helm/replicator-conf/replication.properties

Generate the producer.properties:

cat replicator/secrets-tpl/producer.properties.tpl | sed -e "s/MSK_BROKER_ENDPOINT_1/${MSK_BROKER_ENDPOINT_1}/" -e "s/MSK_BROKER_ENDPOINT_2/${MSK_BROKER_ENDPOINT_2}/" -e "s/MSK_BROKER_ENDPOINT_3/${MSK_BROKER_ENDPOINT_3}/" -e "s/MSK_CLUSTER_NAME/${MSK_CLUSTER_NAME}/" > replicator/secrets/producer.properties
cat replicator/secrets-tpl/producer.properties.tpl | sed -e "s/MSK_BROKER_ENDPOINT_1/${MSK_BROKER_ENDPOINT_1}/" -e "s/MSK_BROKER_ENDPOINT_2/${MSK_BROKER_ENDPOINT_2}/" -e "s/MSK_BROKER_ENDPOINT_3/${MSK_BROKER_ENDPOINT_3}/" -e "s/MSK_CLUSTER_NAME/${MSK_CLUSTER_NAME}/" > helm/replicator-conf/producer.properties

Generate the consumer.properties:

cat replicator/secrets-tpl/consumer.properties.tpl | sed -e "s/BOOTSTRAP_SERVERS/${BOOTSTRAP_SERVERS}/" -e "s/BOOTSTRAP_USERNAME/${BOOTSTRAP_USERNAME}/" -e "s/BOOTSTRAP_PASSWORD/${BOOTSTRAP_PASSWORD}/" > replicator/secrets/consumer.properties
cat replicator/secrets-tpl/consumer.properties.tpl | sed -e "s/BOOTSTRAP_SERVERS/${BOOTSTRAP_SERVERS}/" -e "s/BOOTSTRAP_USERNAME/${BOOTSTRAP_USERNAME}/" -e "s/BOOTSTRAP_PASSWORD/${BOOTSTRAP_PASSWORD}/" > helm/replicator-conf/consumer.properties

Kubernetes deployment

One option is to generate the properties as mentioned earlier and utilize them to create a Kubernetes deployment.

Create the configuration secrets:

kubectl create secret generic replicator-secret-props-${TOPIC} --from-file=replicator/secrets/ --namespace default

Create the kubernetes deployment:

kubectl apply -f container/replicator-deployment.yaml

cat <<EOF | kubectl create -f -
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: repl-exec-connect-cluster-${TOPIC}
namespace: default
spec:
replicas: 1
template:
metadata:
labels:
app: replicator-app
spec:
containers:
- name: confluent-replicator
imagePullPolicy: IfNotPresent
image: naturalett/cp-enterprise-replicator-executable:7.4.0
resources:
limits:
cpu: "2"
memory: 5Gi
requests:
cpu: "2"
memory: 5Gi
env:
- name: CLUSTER_ID
value: "replicator-k8s"
- name: CLUSTER_THREADS
value: "1"
- name: CONNECT_GROUP_ID
value: "containerized-repl"
- name: KAFKA_HEAP_OPTS
value: "-Xms2048M -Xmx4096M"

- name: KAFKA_OPTS
value: "-Djava.security.auth.login.config=/etc/replicator-config/jaas.conf"

# Note: This is to avoid _overlay errors_ . You could use /etc/replicator/ here instead.
- name: REPLICATION_CONFIG
value: "/etc/replicator-config/replication.properties"
- name: PRODUCER_CONFIG
value: "/etc/replicator-config/producer.properties"
- name: CONSUMER_CONFIG
value: "/etc/replicator-config/consumer.properties"
volumeMounts:
- name: replicator-properties
mountPath: /etc/replicator-config/
volumes:
- name: replicator-properties
secret:
secretName: "replicator-secret-props-${TOPIC}"
defaultMode: 0666

EOF

Verify the replicated topic in your MSK cluster.

Helm deployment

The second option is to generate the properties as mentioned earlier and utilize them to create a Helm deployment.

Install the Helm chart:

helm upgrade -i cp-enterprise-replicator-executable --namespace=default -f helm/values.yaml ./helm

We can use port forward in order to check our connector:

kubectl port-forward -n default svc/cp-enterprise-replicator-executable 8083:8083

Once the port forwarding is executed, you will be able to see:

Verify the replicated topic in your MSK cluster.

Monitor Apache Kafka Cluster — Using Kadeck

Once the replicator has been set up in our infrastructure, we can ensure that topics between Confluent and MSK are synchronized. To validate this synchronization, we will leverage Kadeck, which allows us to conveniently manage and monitor our Kafka resources. With Kadeck, we can observe the events in real-time.

Install the Helm chart:

helm upgrade -i my-kadeck-web kadeck/kadeck-web --set image.repository=xeotek/kadeckweb --set image.tag=4.2.9 --namespace=default --version 0.5.0

Add a connection:

Fill the following:

  • Connection name: msk-demo
  • Bootstrap Servers: <MSK ENDPOINT>:9098
  • Security Protocol: SASL_SSL
  • Sasl Jaas Config (optional): software.amazon.msk.auth.iam.IAMLoginModule required;
  • Sasl Mechanism (optional): aws_msk_iam

After adding the connection, you will be able to observe the newly created topic generated by the replicator, which is based on the earlier added replication configuration.

To validate our setup, we connected to our Confluent Kafka and generated a new message in our test-topic:

And we can now see it in MSK as well:

Summarize

This article provides a comprehensive guide on migrating topics from Confluent Kafka to MSK, utilizing the powerful features of Kafka Connect Replicator. Users benefit from advanced capabilities like data filtering, transformation, and compatibility options. Additionally, the article suggests using Kadeck for monitoring Kafka cluster events. To validate the setup, the article demonstrates the process of connecting to Confluent Kafka, generating a new message in a test-topic, and confirming its presence in MSK.


Subscribe to DDIntel Here.

Visit our website here: https://www.datadriveninvestor.com

Join our network here: https://datadriveninvestor.com/collaborate

Facebook
Twitter
LinkedIn