Today, I will show you a very simple way to join two csv files in Spark.

In one of our Big Data / Hadoop projects, we needed to find an easy way to join two csv file in spark. We explored a lot of techniques and finally came upon this one which we found was the easiest.

This post will be helpful to folks who want to explore Spark Streaming and real time data.

First, load the data with the following steps:-

Step 1:

Export the two csv files which we have to join into HDFS

hadoop fs -put /root/workshop/hdp/emp_acc.csv /user/hdp/spark/
hadoop fs -put /root/workshop/hdp/emp_info.csv /user/hdp/spark/

load the emp-csv into hdfs
load the emp-csv into hdfs

Step 2:

For processing csv files in spark, few jars are required. These jars files are required before entering the spark-shell. Add the following jars:

  • spark-core_2.11-1.5.1.jar

  • spark-csv_2.11-1.4.0.jar

  • spark-sql_2.11-1.5.1.jar

  • commons-csv-1.2.jar

    ADD_JARS="/root/workshop/hdp/spark-core_2.11-1.5.1.jar,/root/workshop/hdp/spark-csv_2.11-1.4.0.jar,/root/workshop/hdp/spark-sql_2.11-1.5.1.jar,/root/workshop/hdp/commons-csv-1.2.jar" spark-shell

run the spark-shell command

Step 3:

Add required packages:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import sqlContext.implicits._

add packages

Step 4:

Create the sqlContext objects

val sqlc = new org.apache.spark.sql.SQLContext(sc)

spark-context

Step 5:

Load the csv files into different scala variables

var emp_acc_CsvFile = "/user/hdp/spark/emp_acc.csv"
var emp_info_CsvFile = "/user/hdp/spark/emp_info.csv"

load into scala-variable

Step 6 -

Create a dataframe and load the csv files using com.databricks.sparks.csv API and read the header and set the inferSchema property to true with loaded data.

val emp_acc_LoadCsvDF = sqlc.read.format("com.databricks.spark.csv").    option("header", "true").option("inferSchema", "true").load(emp_acc_CsvFile)

file1-acc load into as DF

val emp_info_LoadCsvDF = sqlc.read.format("com.databricks.spark.csv").
 option("header", "true").
 option("inferSchema", "true").
 load(emp_info_CsvFile)

file2-info load into as DF

Step 7 -

Use the printSchema method to verify if the schema is loaded correctly

emp_acc_LoadCsvDF.printSchema();

show schema file-1
emp_info_LoadCsvDF.printSchema();
show schema file-2
Step 8 -

Use the Show method to check the data in console. If you want to see some records or limited rows then pass the number of rows as an argument.

println(emp_acc_LoadCsvDF.show(10))

show data file-1

println(emp_info_LoadCsvDF.show(10))

show data file-2

Now, join the dataframes :-

Step 1:

Once the data has been loaded into the dataframes correctly, join them both. Here I am using inner join and joining the acc_id from first dataframe and info_id from second dataframe.

val files_joinDF = emp_acc_LoadCsvDF.join(emp_info_LoadCsvDF, emp_acc_LoadCsvDF("acc_id").equalTo(emp_info_LoadCsvDF("info_id")), "inner").selectExpr("acc_id", "name", "salary", "dept_id", "phone", "address", "email")


Step 2:

Once the join is complete, verify it using the printSchema method

files_joinDF.printSchema


Step 3:

Check data for the combined dataframes

println(files_joinDF.show(10))


This is an example of a simple use case; you can explore many other possibilities and join multiple columns with different joining modes using this technique.

Thank You for Reading.. !!!