top of page

MIDSHIPS

  • Writer's pictureAjit Gupta

USING KAFKA TO FACILITATE MONGODB DATA REPLICATION TO ACHIEVE AN RPO OF ZERO


In this blog I will explain how to resolve a data loss challenge where Cloud Service Providers (CSPs) like Ali-Cloud only provide a highly available replicated dataset for MongoDB in a single Availability Zone (AZ) as opposed to across multiple AZs. As a result, in the event a disaster where the primary AZ (hosting the MongoDB cluster) is unavailable all of the data on the MongoDB will also be unavailable creating a Single Point of Failure (SPOF).  Below is a summary of the example architecture:


As you can see from the architecture on the left, failure of AZ1 will result in down-time as there is no secondary MongoDB cluster to continue with business operations.


 

About Taweh Ruhle


Experienced techie that still believes having my own network and rack at home is a necessity. I am a full stack developer with extensive experience of #DevOps, #IAM, #Kubernetes, and #Cloud.


For any queries, feedback you may have please contact me on taweh@midships.io

 

Solution Options

  1. Take regular backup of the primary MongoDB Cluster. Restore in the second AZ from the backup. Whilst the Recovery Time Objective (RTO) could be low (not zero), the Recovery Point Objective (RPO) will depend on when the last backup was taken and is unlikely to be zero (and typically greater than 60 minutes).

  2. Leverage Kafka to provide a near-real-time replication of the MongoDB Cluster across AZ. Kafka is a messaging system for storing, reading, and analysing data. It will be used to facilitate event driven data replication. This solution could deliver an RTO and RPO close to zero.

Solution 2, leveraging Kafka will be discussed in the remainder of this blog post.


Solution Architecture Overview


The components in green are ones to be added to facilitate the near-real-time MongoDB data replication using Kafka.

An additional active MongoDB cluster is created in the alternative AZ (AZ2) to hold the replicated data.

A distributed cluster of Kafka Brokers and Connect instance will be required to handle the capture and replay of the change events from across MongoDB clusters.


Components required to facilitate the proposed solution:

  • MongoDB Clusters in the both AZ1 and AZ2

  • Kafka Broker cluster This cluster will hold the Topics that will be used to store and stream the MongoDB cluster changes from the source MongoDB instances in AZ1 to the destination instance in AZ2. Topic here refers to common name used to store and publish a particular stream of data. You can get this as a managed service from a CSP, like AWS or if you want to run and mange it in-house, you can use the guide here. A official docker version also exists here. This blog will not cover setting up Kafka Brokers.

  • Distributed Kafka Connect instances This component will be responsible to managing the retrieval of change events from the source MongoDB cluster and replay them on the destination MongoDB instance via the Kafka Broker(s). This will be done using the below 2 connectors:

o Debezium MongoDB Source Connector - Retrieves the database changes from the active MongoDB cluster oplog and send to the configured Topics on the Kafka Brokers.


o MongoDB Kafka Sink Connector - Retrieves the database changes from the configured Topics on the Kafka Brokers and replays in the passive MongoDB cluster.


Data Replication Flow


A. Real-time data changes on active MongoDB instance are collected by Kafka Connect Debezium Source Connector from the oplog.


B. Kafka Connect can be configured to transform and/or convert these changes structure before sending to the configured Kafka Broker Service.


C. The Kafka Broker service stores the MongoDB changes under pre-defined Topics for later consumption.


D. The Kafka Connect MongoDB Sink connector will pull the available changes under the pre-configured Topics in the Kafka Broker(s). The MongoDB Sink Connector can be configured to transform and/or convert the changes structure before sending to the destination/secondary MongoDB instance.


E. The passive MongoDB instance receives the changes playback instruction from the Kafka Connect MongoDB Sink connector, replicating the state of the active instance.


Data Replication Scenario

Below I am going to take you through the steps required to use Kafka to replicate data from a MongoDB cluster in one AZ (AZ1) into separate MongoDB cluster in another AZ(AZ2).

Note:

  • All of the components mentioned below must be accessible to one another over the network.

  • This scenario tutorial is a proof-of-concept and will require addition security and hardening amendments to be production ready

1. Create a MongoDB cluster that will serve as your active MongoDB instance. Also create a database called src_db and a Collection called names with a _id and name columns.


Take note of the below:

  • AZ under which it is installed

  • If cluster type is a Sharded, note the Configuration Server hostnames and port. Note that for Ali-cloud I found that the Configuration Server did not work as expected and had to use the Shards hostnames and port.

  • If a Replica-set cluster, note the Mongos hostnames and ports

  • User account username and password. User should have the following permissions: Read access to the admin database; Read access to the config database; Ability to list databases; read access on the all the databases managed by lfs

2. Create another MongoDB cluster that will serve as your passive MongoDB instance. Ensure the AZ under which it is installed is different to the AZ noted in #1.

Take note of the below:

  • AZ under which it is installed

  • Mongos connection URL including Hostname and port

  • User account username and password. User should have the following permissions: Read and Write access to all databases; Ability to create databases, collections, and documents; Ability to create indexes and keys; Ability to list databases.

3. Setup Kafka Broker service and ensure it is running. You can use managed service Kafka Brokers like those provided by AWS (here). I suggest you enable automatic creation of Topics by setting the auto.create.topics.enable to true. This is helpful when there are numerous databases and/or collections that exists per database. In the case this is set to false, topics will need to be created manually. See section 2 here for details.


4. Download and install the Debezium MongoDB source connector from here and follow the instruction here to install. Basically you need to download the binary and extract the content to the plugin.path directory. I am using /usr/local/share/kafka/plugins as the plugin-path. If using a Docker Kafka Connect, you can install the connector using the command confluent-hub install --no-prompt --component-dir /usr/local/share/kafka/plugins debezium/debezium-connector-mongodb:1.2.1


5. Download MongoDB Kafka Sink Connector from here and copy the .jar file to the plugin.path directory. I am using /usr/local/share/kafka/plugins as the plugin-path. If using a Docker Kafka Connect, you can install the connector using the command confluent-hub install --no-prompt --component-dir ${CONNECT_PLUGIN_PATH} debezium/debezium-connector-mongodb:1.2.1


6. Setup a distributed worker using the below properties file (worker.properties) and start it up. If you are running Kafka Connect from the binary, you can execute the command bin/connect-distributed.sh worker.properties to start Kafka Connect in distributed mode. In this scenario: - bootstrap.servers is the hostnames and ports for the Kafka brokers from #3 above. I am using kafkahost-1:9092,kafkahost-2:9092,kafkahost-3:9092 in this scenario - security.protocol is set to PLAINTEXT for simplicity. This allows connectivity to the Kafka Brokers without TLS. This is usually the default settings. worker.properties


 group.id=mongo-kc-grpID
 config.storage.topic=__mongo_kc_configStorage
 offset.storage.topic=__mongo_kc_offsetStorage
 status.storage.topic=__mongo_kc_statusStorage
 bootstrap.servers=kafkahost-1:9092,kafkahost-2:9092,kafkahost-3:9092
 security.protocol=PLAINTEXT
 key.converter=org.apache.kafka.connect.json.JsonConverter
 value.converter=org.apache.kafka.connect.json.JsonConverter
 value.converter.schemas.enable=false
 key.converter.schemas.enable=false
 internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 plugin.path=/usr/local/share/kafka/plugins
 rest.port=28083

In the case you are running the Kafka Connect Docker solution from here, you can set the below Environment Variables and start the docker container to get the same effect:

· CONNECT_BOOTSTRAP_SERVERS=“ kafkahost-1:9092,kafkahost-2:9092,kafkahost-3:9092”
· CONNECT_GROUP_ID=“mongo-kc-grpID”
· CONNECT_CONFIG_STORAGE_TOPIC=“__mongo_kc_configStorage”
· CONNECT_OFFSET_STORAGE_TOPIC=“__ mongo_kc _connect_offsetStorage”
· CONNECT_STATUS_STORAGE_TOPIC=“__ mongo_kc _connect_statusStorage”
· CONNECT_KEY_CONVERTER=“org.apache.kafka.connect.json.JsonConverter”
· CONNECT_VALUE_CONVERTER=“org.apache.kafka.connect.json.JsonConverter”
· CONNECT_INTERNAL_KEY_CONVERTER=“org.apache.kafka.connect.json.JsonConverter”
· CONNECT_INTERNAL_VALUE_CONVERTER=“org.apache.kafka.connect.json.JsonConverter”
· CONNECT_SECURITY_PROTOCOL=“PLAINTEXT”
· CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
· CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
· CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false
· CONNECT_INTERNAL_KEY_CONVERTER_SCHEMAS_ENABLE=false
· CONNECT_PLUGIN_PATH= “/usr/local/share/kafka/plugins”
· CONNECT_REST_PORT=”28083”

7. With the Kafka Connect Worker successfully running, use the following steps to add the Debezium MongoDB source connector. It will pull all database change events into the specified topics on the Kafka Brokers.


  • Create the below json file (debe_srcDB.json) In this scenario: - the active MongoDB hostname are az1-mongo-host-1:3717, az1-mongo-host-2:3717 - the username used is root and password is Password2020 - mongodb.name is the prefix of the topic to be created on the Kafka Broker(s) - 28083 is the Kafka Connect Rest Port

 {
  "name" : "debe_src_db",
  "config" : {
   "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
   "mongodb.hosts" : "az1-mongo-host-1:3717,az1-mongo-host-2:3717",
   "mongodb.user" : "root",
   "mongodb.password" : "Password2020",
   "mongodb.ssl.enabled" : false,
   "mongodb.name" : "replication_test",
   "database.blacklist" : "admin,config",
   "collection.blacklist" : ".*[.]system.profile"
  }
 }
 
  • Execute the below curl command on the Kafka Connect host to create the Debezium MongoDB Source connector. You can execute the command from any VM/server with access to the Kafka Connect server but replacing localhost with the Kafka Connect hostname in the below command. curl -s -X POST -H "Cont ent-Type: application/json" -d @debe_srcDB.json “http://localhost:28083/ connectors”

8. With the Kafka Connect Worker successfully running, use the following steps to add the MongoDB Sink connector. This connector will pull all database change events from the Kafka Brokers topics into the passive MongoDB cluster.

  • Create the below json file (mongo_sinkDB.json) In this scenario: - topics.regex is in the format <topic-prefix>.<database-name>.* - the topic.override sets up which collection to create from which Kafka topic. The topic.override.replication_test.src_db.names.collection maps the topic replication_test.src_db.names to the names collection - the username used is root and password is Password2020 - 3717 is the MongoDB Port

 {
  "name" : "mongo_sinkDB",
  "config" : {
  "connector.class" : "com.mongodb.kafka.connect.MongoSinkConnector",
  "topics.regex" : "replication_test.src_db.*",
  "database" : "src_db",
  "connection.uri" : "mongodb://root:Password2020@az2-mongo-host-1:3717/admin",
  "topic.override.replication_test.src_db.names.collection" : "names",
  "change.data.capture.handler" : "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler",
  "document.id.strategy" : "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
  "writemodel.strategy" : "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy",
  "document.id.strategy.overwrite.existing" : true
  }
 }
 

  • Execute the below curl command on the Kafka Connect host to create the Debezium MongoDB Source connector. You can execute the command from any VM/server with access to the Kafka Connect server but replacing localhost with the Kafka Connect hostname in the below command. curl -s -X POST -H "Cont ent-Type: application/json" -d @mongo_sinkDB.json http://localhost:28083/ connectors

9. Connect to the active MongoDB cluster in AZ1 using Mongo Shell or a MongoDB GUI like MongoDB Compass. The username to use is root and password Password2020. Connect to the src_db database and add a document to the names collection. I suggest adding a document with name TestUser.

10. Connect to the passive MongoDB cluster in AZ2 using Mongo Shell or a MongoDB GUI like MongoDB Compass. The username to use is root and password Password2020. If everything has been setup properly and is working, you should see the new document in the names collection under the src_db database that you created in step #9.

Additional Recommendations

  • Given the criticality of the Kafka components I suggest it is integrated into any existing logging and monitoring solution of one be setup if none exists. This will ensure any issue with the Kafka Connect connectors are logged and identified. Following which, it can be resolved, and replication can continue.

  • At the very least, the Kafka Connect connectors status should be checked daily to confirm all connectors are operational and identify any connectors requiring action preventing replication from continuing. The below command can be used to check the connectors status on the Kafka Connect host. curl -s http://localhost:28083/connectors?expand=info&expand=status

 

I hope this blog post has been useful. If you have any questions, feedback or to learn more about how Midships can help support your cloud delivery, contact me at Taweh@midships.io

735 views0 comments
bottom of page