How to Access Relational Data with Apache Spark

Apache Spark, a powerful Apache framework for large-scale data processing, seamlessly integrates with relational databases using JDBC (Java Database Connectivity). This enables you to leverage Spark’s distributed processing capabilities to analyze and manipulate data stored in traditional database systems alongside other data sources.

Prerequisites

  • A Spark environment set up (local or cluster mode).
  • The appropriate JDBC driver for your target database.
  • Connecting to a Database

Spark offers a convenient way to interact with relational databases through its DataFrameReader class. This class provides the jdbc() method to establish a connection.

JDBC (Java Database Connectivity) is a widely adopted standard that allows applications to connect to various database systems. As long as you have the appropriate JDBC driver library (JAR file) and configure the connection URL with the correct port and database schema, you can connect to your target database.

In my case, since i am working with MySQL, so will need the following information to establish the connection:

  • JDBC Driver JAR
  • Server IP or Host name and Port
  • Database name
  • Table name
  • User and Password.

By providing these details, Spark can effectively connect to your MySQL database and enable you to work with the data using DataFrames.

1. MySql Data

This article will focus on data stored in a MySQL database. I’ll be working with a schema named customer that contains a table called cust_info. The cust_info table has several columns:

id,first_name,last_name,email,gender
I’ve included a sample of the data below (or I’ll be referring to sample data throughout the article).

2. Dependencies

Below are the dependencies used in this project(pom.xml).
<properties>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <scala.version>2.12</scala.version>
    <spark.version>3.3.2</spark.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_${scala.version}</artifactId>
      <version>${spark.version}</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_${scala.version}</artifactId>
      <version>${spark.version}</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.21</version>
    </dependency>
</dependencies>

3. Read data using JDBC

I’m leveraging the jdbc() function within Spark to load data from a relational database and convert it into a Spark DataFrame
import org.apache.spark.sql.SparkSession
import java.util.Properties

object JdbcRead {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("JDBC Connection")
      .master("local[*]").getOrCreate()

    val databaseName="customer"
    val tableName="cust_info"
    val userName = "root"
    val pass ="root"
    val jdbcUrl = s"jdbc:mysql://localhost:3306/${databaseName}"

    val connectionProperties = new Properties()
    connectionProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")
    connectionProperties.put("user", userName)
    connectionProperties.put("password", pass)

    val mysqlDF = spark.read.jdbc(jdbcUrl, s"${databaseName}.${tableName}", connectionProperties)
    mysqlDF.show();
  }
}

Output

+---+----------+------------+--------------------+----------+
| id|first_name|   last_name|               email|    gender|
+---+----------+------------+--------------------+----------+
|  1|      Rhea|     Harling|rharling0@telegra...|    Female|
|  2|    Hewett|        Clee|  hclee1@blogger.com|      Male|
|  3|     Orrin|   Wickenden|owickenden2@uol.c...|      Male|
|  4| Winifield|     Le Surf|wlesurf3@pcworld.com|      Male|
|  5|    Gerald|       Buche|     gbuche4@pbs.org|      Male|
|  6|      Cale|    Patshull|  cpatshull5@mtv.com|      Male|
|  7|      Carr|   DAmbrosio|cdambrosio6@reddi...|      Male|
+---+----------+------------+--------------------+----------+

4. Read Data using format

We can use format function while using spark to read the data from RDBMS. Below is the example code.

val mysqlDF = spark.read.format("jdbc")
      .option("driver","com.mysql.cj.jdbc.Driver")
      .option("url", "jdbc:mysql://localhost:3306")
      .option("dbtable", "customer.cust_info")
      .option("user", "root")
      .option("password", "root").load()

5. Read JDBC Table in Parallel

In Spark, we can read the data in parallel using option numPartitions. This value also determines the maximum number of concurrent JDBC connections used when reading from a database. If you specify more partitions for writing than Spark can handle, it will automatically reduce the number of partitions using coalesce(numPartitions) before writing the data.
val mysqlDfNumPartition = spark.read.format("jdbc")
      .option("driver","com.mysql.cj.jdbc.Driver")
      .option("url", "jdbc:mysql://localhost:3306")
      .option("dbtable", "customer.cust_info")
      .option("numPartitions",5)
      .option("user", "root")
      .option("password", "root").load()

6. Control the number of lines read at once

JDBC fetchsize option is a key factor in optimizing database access. By setting the fetch size, you control how many rows are transferred between your application and the database in each round trip. This can significantly improve performance, especially when dealing with large datasets and drivers with a low default fetch size.
val mysqlDfFetchSize = spark.read.format("jdbc")
      .option("driver","com.mysql.cj.jdbc.Driver")
      .option("url", "jdbc:mysql://localhost:3306")
      .option("dbtable", "customer.cust_info")
      .option("fetchsize",100)
      .option("user", "root")
      .option("password", "root").load()

Leave a Reply

Your email address will not be published. Required fields are marked *