Streaming Data From Mysql into Apache Kafka with Debezium Connector

 


Prequisites

1.     Ubuntu 20.04 ( recommendation, 4-8 VCPU, 8gb RAM, 50 GB Disk)

2.     Apache Kafka 2.11-1.0.0

3.     MySQL Server 5.7 or higher

4.     Debezium connector mysql 1.1.1 Final

5.     Java 1.8 or newer version


1.     Update System


$ sudo apt-get update

2.     Install requirements tools


// Install Java 1.8
$ sudo apt install openjdk-8-jdk
// Install MySQL
$ sudo apt-get install mysql-server # for Ubuntu, Debian
$ sudo yum install mysql-server && sudo systemctl start mysqld # for CentOS, RHEL
$ brew install mysql && mysql.server restart # for macOS
// Setup Environment JAVA
$ sudo nano ~/.bashrc
// ADD in bottom
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/jre/
$ source ~/.bashrc

3.     Setup MySQL


// Create testdb.sql
$ sudo nano testdb.sql
// Copy the following to the file
-- Create the test database
CREATE DATABASE testDB;
GO
USE testDB;

-- Create some fruit
CREATE TABLE fruit (
id INTEGER ZEROFILL NOT NULL AUTO_INCREMENT,
fruit_name VARCHAR(255) NOT NULL,
num_sold INTEGER NOT NULL,
PRIMARY KEY(id)
);

-- Insert test values
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Apple', 5);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Pear', 10);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Peach', 20);

$ sudo chmod 777 testdb.sql
$ sudo mysql -u root -p ❮ testdb.sql
$ sudo nano /etc/mysql/mysql.conf.d/mysqld.cnf
// Copy the following to the file
[mysql]
………………
log_bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL
expire_logs_days=10
gtid_mode=ON
enforce_gtid_consistency=ON

interactive_timeout=600
wait_timeout=600
binlog_rows_query_log_events=ON

// Login into mysql
$ sudo mysql -u root
// Create User
mysql> CREATE USER 'dbuser'@'%' IDENTIFIED BY 'Password1@';
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbuser'@'%';
mysql> ALTER USER 'dbuser'@'%' IDENTIFIED WITH mysql_native_password BY 'Password1@';
mysql> FLUSH PRIVILEGES;

// Set the GTID Consistency and the GTID Mode
mysql> SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = OFF;
mysql> SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = ON;
mysql> SET @@GLOBAL.GTID_MODE = OFF;
mysql> SET @@GLOBAL.GTID_MODE = OFF_PERMISSIVE;
mysql> SET @@GLOBAL.GTID_MODE = ON_PERMISSIVE;

// make sure the following table shows a value of “0”
mysql> SHOW STATUS LIKE 'ONGOING_ANONYMOUS_TRANSACTION_COUNT';

mysql> SET @@GLOBAL.GTID_MODE = ON; mysql> exit // Restart MySQL server $ sudo systemctl restart mysql // Note Server ID $ sudo mysql -u root mysql> SELECT @@server_id as SERVER_ID;

 4. Download and Install Kafka & Debezium


// Setup Kafka and Zookeper
$ wget https://archive.apache.org/dist/kafka/0.10.1.0/kafka_2.11-0.10.1.0.tgz
$ tar xvzf  kafka_2.11-1.0.0.tgz
$ sudo mv kafka_2.11-1.0.0/ /opt/kafka

// Setup Debezium Files
$ wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.1.1.Final/debezium-connector-mysql-1.1.1.Final-plugin.tar.gz
$ tar -xvzf debezium-connector-mysql-1.1.1.Final-plugin.tar.gz
$ sudo mkdir /opt/kafka/conns
$ sudo cp -r debezium-connector-mysql /opt/kafka/conns/
$ sudo nano /opt/kafka/config/server.properties
// Uncomment this 
listeners=PLAINTEXT://:9092
// Adding path to Debezium files to Kafka configs
$ sudo nano /opt/kafka/config/connect-distributed.properties 
// Add in bottom
………
plugin.path=/opt/kafka/conns
// Or following the commands
$ echo "plugin.path=/opt/kafka/conns" >> /opt/kafka/config/connect-distributed.properties

 5. If using OVA/OVF Files directly run kafka in this step with tmux session (split 3  shelll command). ova file in here


// Create tmux session
$ tmux new -s kafka

// Split 3 shell command
ctrl + b and click shift + ‘ (quote mark)
 
// In the first shell command, Start Kafka Zookepeer $ /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties // In the second shell command, Start Kafka Broker $ /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties // In the third shell command, Start Kafka Connect $ /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties

6. Stream Data and Create create the Debezium connector with a POST request


// Create create the Debezium connector with a POST request
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "127.0.0.1", "database.port": "3306", "database.user": "dbuser", "database.password": "Password1@", "database.server.id": "1", "database.allowPublicKeyRetrieval":"true", "database.server.name": "fruitserver", "database.whitelist": "testDB", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.fulfillment", "include.schema.changes": "true" } }';
// Ensure fruitserver and dbhistory topics is available
$ /opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181


 

 



Post a Comment

0 Comments