How to ingest from RDBMS to Hadoop using Spark?

TjMan 27/Jan/2019 Spark
How to ingest from RDBMS to Hadoop using Spark?

Generally people use Sqoop to ingest data from any RDBMS system to Hadoop, but Sqoop comes with very small set of features and for most of the big organizations, it is not a good choice. As Sqoop is CLI based, not secure and do not have much feature to track which data is going where. Most of the organizations build there own ingestion framework to ingest data. Lets discuss how we can build our own ingestion program without Sqoop, write code from very scratch to ingest some data from MYSQL to HIVE using Spark.

Basic requirement:
  • MYSQL JDBC drive
  • MYSQL DB -one table to ingest
  • IDE -Need to write some Spark code and compile
  • Cluster to run and check output on HIVE
Source data creation:
CREATE TABLE datatbl (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(20),PRIMARY KEY (`id`));

DELIMITER $$
CREATE PROCEDURE InsertRand(IN NumRows INT, IN MinVal INT, IN MaxVal INT)
    BEGIN
        DECLARE i INT;
        SET i = 1;
        START TRANSACTION;
        WHILE i <= NumRows DO
            INSERT INTO datatbl (`name`) VALUES (MinVal + CEIL(RAND() * (MaxVal - MinVal)));
            SET i = i + 1;
        END WHILE;
        COMMIT;
    END$$
DELIMITER ;

CALL InsertRand(1111, 2222, 5555);

Here we simply created a table on MYSQL and inserted some random value using stored procedure to play with.

Source data:
mysql> SELECT * FROM datatbl limit 10;
Current database: rnd

+----+------+
| id | name |
+----+------+
|  1 | 4324 |
|  2 | 2414 |
|  3 | 3542 |
|  4 | 4912 |
|  5 | 5045 |
|  6 | 4932 |
|  7 | 3969 |
|  8 | 2830 |
|  9 | 3351 |
| 10 | 2708 |
+----+------+
10 rows in set (1.06 sec)

mysql> select max(id),min(id),count(*) from datatbl;
+---------+---------+----------+
| max(id) | min(id) | count(*) |
+---------+---------+----------+
|    1111 |       1 |     1111 |
+---------+---------+----------+
1 row in set (0.27 sec)
Spark Code:
package com.tjman.exp1

import org.apache.spark.sql.SparkSession

object IngestionFromMysql {
  def main(args:Array[String]){
    
  val spark = SparkSession.builder().appName("MysqlIngester").enableHiveSupport().getOrCreate()
    
	val data = spark.read.format("jdbc")
	.option("url", "jdbc:mysql://ip:port/rnd?zeroDateTimeBehavior=convertToNull")
	.option("driver", "com.mysql.jdbc.Driver")
	.option("dbtable", "datatbl")
	.option("user", "rnd")
	.option("password", "password")
	.option("numPartitions", 10)
	.option("partitionColumn", "id")
	.option("lowerBound", 1)
	.option("upperBound", 1111)
	.load()
	
	data.write.saveAsTable("tjman.datatbl")
	
  spark.stop()

  }
}

As you can see the program is simple. Firstly, Spark session is created with hive support to be able to write data to Hive during saving the data. During read, we used MYSQL driver to connect to database and also provided DB details. Most importent part is "numPartitions" option which is used to partition the total data on several batches. Here we took 10 batches, so basically when Spark reads the table it makes ten partition of the total data. Partition is based on "partitionColumn" which is primary key and max value and min value is mentioned as 1 and 1111. So each part will have (1111-1)/10 records means 111 records each.So this is how each executor will take vales during execution: select * from datatbl where id>=1 and id<=111; Next: select * from datatbl where id>=112 and id<=222; .. and so on. At the end, the dataframe is saved to HIVE table and we can visualize the data from Hive as below. Also note, there are ten files created in HDFS, one for each batch.

Check the result:

Spark Submit:

spark-submit --master yarn --num-executors 4 --jars /usr/share/java/mysql-connector.jar --class com.tjman.exp1.IngestionFromMysql IngestionWithSpark.jar 10

Hive:

hive (tjman)> select max(id),min(id),count(*) from datatbl;
Total MapReduce CPU Time Spent: 20 seconds 480 msec
OK
1111    1       1111
Time taken: 22.224 seconds, Fetched: 1 row(s)

hive (tjman)> select * from datatbl limit 10;
OK
1       4324
2       2414
3       3542
4       4912
5       5045
6       4932
7       3969
8       2830
9       3351
10      2708
Time taken: 0.409 seconds, Fetched: 10 row(s)

HDFS:

[tjman@n1 ~]$ hdfs dfs -ls /apps/hive/warehouse/tjman.db/datatbl                                
Found 11 items
-rw-r--r--   2 tjman hdfs          0 2019-01-27 14:29 /apps/hive/warehouse/tjman.db/datatbl/_SUCCESS
-rw-r--r--   2 tjman hdfs       1591 2019-01-27 14:29 /apps/hive/warehouse/tjman.db/datatbl/part-00000-b6bc5bb3-fa01-4f44-8fd1-315ce9c84ed5-c000.snappy.parquet
-rw-r--r--   2 tjman hdfs       1595 2019-01-27 14:29 /apps/hive/warehouse/tjman.db/datatbl/part-00001-b6bc5bb3-fa01-4f44-8fd1-315ce9c84ed5-c000.snappy.parquet
-rw-r--r--   2 tjman hdfs       1591 2019-01-27 14:29 /apps/hive/warehouse/tjman.db/datatbl/part-00002-b6bc5bb3-fa01-4f44-8fd1-315ce9c84ed5-c000.snappy.parquet
-rw-r--r--   2 tjman hdfs       1605 2019-01-27 14:29 /apps/hive/warehouse/tjman.db/datatbl/part-00003-b6bc5bb3-fa01-4f44-8fd1-315ce9c84ed5-c000.snappy.parquet
-rw-r--r--   2 tjman hdfs       1601 2019-01-27 14:29 /apps/hive/warehouse/tjman.db/datatbl/part-00004-b6bc5bb3-fa01-4f44-8fd1-315ce9c84ed5-c000.snappy.parquet
-rw-r--r--   2 tjman hdfs       1586 2019-01-27 14:29 /apps/hive/warehouse/tjman.db/datatbl/part-00005-b6bc5bb3-fa01-4f44-8fd1-315ce9c84ed5-c000.snappy.parquet
-rw-r--r--   2 tjman hdfs       1605 2019-01-27 14:29 /apps/hive/warehouse/tjman.db/datatbl/part-00006-b6bc5bb3-fa01-4f44-8fd1-315ce9c84ed5-c000.snappy.parquet
-rw-r--r--   2 tjman hdfs       1581 2019-01-27 14:29 /apps/hive/warehouse/tjman.db/datatbl/part-00007-b6bc5bb3-fa01-4f44-8fd1-315ce9c84ed5-c000.snappy.parquet
-rw-r--r--   2 tjman hdfs       1602 2019-01-27 14:29 /apps/hive/warehouse/tjman.db/datatbl/part-00008-b6bc5bb3-fa01-4f44-8fd1-315ce9c84ed5-c000.snappy.parquet
-rw-r--r--   2 tjman hdfs       1621 2019-01-27 14:29 /apps/hive/warehouse/tjman.db/datatbl/part-00009-b6bc5bb3-fa01-4f44-8fd1-315ce9c84ed5-c000.snappy.parquet
[tjman@n1 ~]$
Conclusion:

So, We finally build our own ingestion program using Spark. Next, I will come back with more advanced program to do incremental backup of MYSQL tables along with features like refreshing updated source rows. Stay tuned to TuneToTech.

Recent Post