Just Code‎ > ‎

Spark - Reading from Excel files, converting to parquet

posted Dec 19, 2019, 5:37 AM by Peter Henell   [ updated Dec 19, 2019, 5:38 AM ]
I am running my examples using jupyter docker container
docker run -d -p 8001:8888 --name notebook -e SPARK_OPTS="--driver-memory 4g --executor-memory=4g" jupyter/all-spark-notebook
Using Apache Toree kernel, my first cell is to download my dependencies

%ShowTypes on
%AddDeps com.crealytics spark-excel_2.11 0.12.2 --transitive
And my second cell is to configure our SparkSession
val spark = SparkSession
     .config("spark.executor.memory", "4g")
     .config("spark.driver.memory", "8g")
     .appName("Peter Henell Spark Snowflake Example")
We also create functions to load data from excel files.
(Documentation of excel reader can be found here https://github.com/crealytics/spark-excel)
import org.apache.spark.sql.DataFrame
def readExcel(file: String, sheet: String): DataFrame = 
        .option("dataAddress", sheet) // Optional, default: "A1"
        .option("useHeader", "true") // Required
        //.option("treatEmptyValuesAsNulls", "false") // Optional, default: true
        .option("inferSchema", "true") // Optional, default: false
        //.option("addColorColumns", "true") // Optional, default: false
        //.option("timestampFormat", "MM-dd-yyyy HH:mm:ss") // Optional, default: yyyy-mm-dd hh:mm:ss[.fffffffff]
        //.option("maxRowsInMemory", 20) // Optional, default None. If set, uses a streaming reader which can help with big files
        .option("excerptSize", 10) // Optional, default: 10. If set and if schema inferred, number of rows to infer schema from
        //.option("workbookPassword", "pass") // Optional, default None. Requires unlimited strength JCE for older JVMs
        //.schema(myCustomSchema) // Optional, default: Either inferred schema, or all columns are Strings
def excelAsTable(file: String, sheet: String, tableName: String): Unit =
    readExcel(file, sheet).createOrReplaceTempView(tableName)
def parquetAsTable(file: String, tableName: String): Unit = 
To be able to store our results from any analysis done on the excel we need to rename columns with invalid names.

From our excel file called resultat.xlsx, take the data from sheet "TestResults" starting from the cell A1. The result will be stored in testresult.parquet
import org.apache.spark.sql.SaveMode
readExcel("resultat.xlsx", "'TestResults'!A1")
    .withColumnRenamed("Duration Seconds","DurationSeconds")
    .withColumnRenamed("Cost In EURO","CostInEUR")
We can now consume the parquest file and store it as a temporary view for future queries
parquetAsTable("testresult.parquet", "testresults")
Perform our first simple test of the data
  SELECT avg(DurationSeconds), count(1) as testsConducted
  FROM testresults
  order by 1 asc