Core Spark with Scala
4.4 (59 ratings)
Instead of using a simple lifetime average, Udemy calculates a course's star rating by considering a number of different factors such as the number of ratings, the age of ratings, and the likelihood of fraudulent ratings.
1,824 students enrolled
Wishlisted Wishlist

Please confirm that you want to add Core Spark with Scala to your Wishlist.

Add to Wishlist

Core Spark with Scala

Learn how to build Spark based applications using Scala as programming language
4.4 (59 ratings)
Instead of using a simple lifetime average, Udemy calculates a course's star rating by considering a number of different factors such as the number of ratings, the age of ratings, and the likelihood of fraudulent ratings.
1,824 students enrolled
Last updated 1/2017
English
Curiosity Sale
Current price: $10 Original price: $50 Discount: 80% off
30-Day Money-Back Guarantee
Includes:
  • 7.5 hours on-demand video
  • Full lifetime access
  • Access on mobile and TV
  • Certificate of Completion
What Will I Learn?
  • Understand scala and spark to develop big data applications at scale.
  • Read data from different file systems such as HDFS, local, s3 etc to Spark RDD
  • Read and write data using different file formats
  • Applying filter operations on RDD
  • Perform aggregations such as sum, avg, min, max etc using reduceByKey and aggregateByKey
  • Perform joins on disparate data sets using core spark
  • Sorting using sortByKey operator
View Curriculum
Requirements
  • Students need to have modern laptop with 64 bit OS and at least 8 GB RAM
  • (If there is no good laptop) Build single node lab using Cloudera distribution on cloud providers such as AWS
Description

Spark is in memory distributed computing framework in Big Data eco system and Scala is programming language. It is one of the hottest technologies in Big Data as of today. This course will cover how to use Spark core APIs to develop applications using Scala as programming language. The methodology of the course will be hands on where students are expected to watch the content and practice hands on. Most of the content will be in videos, where ever it is necessary code snippets are shared. Material will be around 3 to 4 hours but it might take longer for the students to finish the course as it includes downloading files, practicing etc.

Who is the target audience?
  • This course is for every one who want to build spark based applications using scala as programming language
  • Require basic programming skills
Students Who Viewed This Course Also Viewed
Curriculum For This Course
46 Lectures
07:16:25
+
Getting started
2 Lectures 25:26
+
Architecture and Concepts
6 Lectures 40:38
  • Architecture
  • Cluster Modes - Overview
  • Different modules in Spark
  • Accessing Documentation
  • Important concepts
Introduction
02:51

We will see the architecture of the Spark

  • Driver Program
  • Cluster Manager
  • Worker Node
  • Executor
Architecture Overview
05:58

Following are the different cluster modes

  • Local Mode
  • YARN
  • Mesos
  • Standalone
Cluster Modes
05:19

Following are the different modules in Spark

  • Core Spark
  • Data Frames, Data Sets, SQL
  • Spark Streaming
  • MLLib
  • GraphX
  • Bagel
  • R
Modules Overview
09:33

These are the typical steps you need to follow to access the documentation

  • Google for "Spark Documentation <version>"
  • Go to the documentation
  • Go to "Spark Programming Guide" - for core API
  • Choose programming language of your choice
  • In top "Programming Guides" drop down menu, you can see other modules
Documentation Overview
06:40

Following are the important concepts, you will learn in the cour

  • Resilient Distributed Datasets (RDD)
  • Transformations and Actions
    • Filtering Data
    • Row level transformations
    • Joining data sets
    • Aggregations
    • Ranking and Sorting
  • Shared Variables (Broadcast Variables and Accumulators)
Concepts Overview
10:17
+
Setup Scala IDE for Eclipse
13 Lectures 02:08:34

Following are the installation steps to setup eclipse with maven plugin

  • Make sure Java 1.7 or later installed (1.8 recommended)
  • Setup Eclipse
  • Setup Maven
  • STS (Spring Tool Suite) comes with eclipse with maven plugin
  • We recommend STS
  • If you already have eclipse, just add maven plugin from marketplace
Setup Eclipse with Maven
08:07

Following are the steps to setup simple java application using Maven project

  • Open eclipse with maven plugin (STS)
  • For first time, create new workspace simpleapps
  • File -> New -> Maven Project
  • Give artifact id, group id. Make sure you give correct package name.
  • It will create Maven project with App.java
  • Run application and validate
Develop simple java application
08:49

Even though we have virtual machine images from Cloudera and Hortonworks with all the necessary tools installed, it is good idea to set up development environment on our PC along with IDE. It require following to be installed to set up development environment for building scala based spark application

  • Java
  • Scala
  • Sbt
  • Eclipse with Scala IDE

Here is the development life cycle

  • Develop using IDE (eg: Eclipse with Scala IDE)
  • Use code versioning tools such as SVN, github for team development
  • Use sbt to build jar file
  • Ship the jar file (application) to the remote servers
  • Schedule it through some scheduler 

As part of this topic,

  • we will see how scala and sbt are installed on the PC
  • Treat the virtual machines as test or production servers.
  • We will not be covering code versioning tools.

Following topics will be covered to setup scala and sbt

  • Make sure java is installed
  • Download and install scala binaries
  • Launch scala CLI/interpreter and run simple scala program
  • Download and install sbt
  • Write simple scala application
  • Build using sbt
  • Run the application using sbt
Setup Scala and sbt - Introduction
04:28

Here are the instructions to setup scala

  • Download scala binaries
  • Install (untar) scala binaries
  • Update environment variable PATH
  • Launch scala interpreter/CLI and run simple scala program
  • Copy below code snippet and paste in scala interpreter/CLI
Setup and Validate scala
14:19

Following are the steps to create simple scala application

  • Make sure you are in right directory
  • Create src/main/scala mkdir -p src/main/scala
  • create file hw.scala under src/main/scala
  • Paste above code, save and exit
  • Run using scala src/main/scala/hw.scala
Run simple scala application
05:50

Here are the instructions to setup sbt

  • Download sbt
  • Install sbt
  • Go to the directory where you have scala source code
  • Create build.sbt
  • Package and run using sbt

Make sure you are in the right directory (application home directory) Copy below snippet to build.sbt

name := "hw"
version := "1.0"
scalaVersion := "2.11.8"
  • Build by running sbt package
  • A jar file will be created under target/scala-<major_version>/<application_name>_<scala_major_version>-<application_version>.jar
  • Example: target/scala-2.11/hw_2.11-1.0.jar
  • Run using sbt sbt run
  • Run using scala scala target/scala-2.11/hw_2.11-1.0.jar

Setup sbt and run scala application
10:54

We are in the process of setting up development environment on our PC/Mac so that we can develop the modules assigned. Following tasks are completed

  • Make sure Java is installed
  • Setup Eclipse (as part of Setup Eclipse with Maven)
  • Setup Scala
  • Setup sbt
  • Validate all the components

As part of this topic we will see

  • Installation of Scala IDE for Eclipse
  • Develop simple application using Scala IDE
  • Add eclipse plugin for sbt (sbteclipse)
  • Validate the integration of eclipse, scala and sbt 
Setup Scala IDE for Eclipse - Introduction
03:34

Setup Scala IDE for Eclipse

Before setting up Scala IDE let us understand the advantages of having IDE

  • The Scala IDE for Eclipse project lets you edit Scala code in Eclipse.
  • Syntax highlighting
  • Code completion
  • Debugging, and many other features
  • It makes Scala development in Eclipse a pleasure.

Steps to install Scala IDE for Eclipse

  • Launch Eclipse
  • Go to "Help" in top menu -> "Eclipse Marketplace"
  • Search for Scala IDE
  • Click on Install
  • Once installed restart Eclipse
  • Go to File -> New -> and see whether "Scala Application" is available or not.

Validate Scala IDE for Eclipse

As Scala IDE is installed, it is time to validate that we can develop scala based applications

  • Create new workspace "bigdata-spark-scala"
  • Launch "Scala Interpreter" and validate scala code snippets
  • Create simple "Scala Application"
  • Run simple "Scala Application" 

object hw {
  def main(args: Array[String]) {
    println("Hello World!")
  }
}

Install Scala IDE for Eclipse
11:00

Setup Scala IDE for Eclipse

Before setting up Scala IDE let us understand the advantages of having IDE

  • The Scala IDE for Eclipse project lets you edit Scala code in Eclipse.
  • Syntax highlighting
  • Code completion
  • Debugging, and many other features
  • It makes Scala development in Eclipse a pleasure.

Steps to install Scala IDE for Eclipse

  • Launch Eclipse
  • Go to "Help" in top menu -> "Eclipse Marketplace"
  • Search for Scala IDE
  • Click on Install
  • Once installed restart Eclipse
  • Go to File -> New -> and see whether "Scala Application" is available or not.

Validate Scala IDE for Eclipse

As Scala IDE is installed, it is time to validate that we can develop scala based applications

  • Create new workspace "bigdata-spark-scala"
  • Launch "Scala Interpreter" and validate scala code snippets
  • Create simple "Scala Application"
  • Run simple "Scala Application" 

object hw {
  def main(args: Array[String]) {
    println("Hello World!")
  }
}

Run simple scala application using Scala IDE
10:21

Now we will see how sbt can be integrated with Scala IDE for Eclipse

Steps to integrate sbt with Scala IDE for Eclipse

  • Check whether ~/.sbt/<version>/plugins/plugins.sbt exists, if not
  • Create plugins directory under ~/.sbt/<version> mkdir -p~/.sbt/0.13/plugins
  • cd ~/.sbt/0.13/plugins
  • Add addSbtPlugin("com.typesafe.sbteclipse" %"sbteclipse-plugin" % "4.0.0") to plugins.sbt

Advantages of integrating sbt with eclipse

  • Ability to generate eclipse projects
  • Build deployable jar file
  • Works well with scala

There are 2 ways to create jar file with sbt and Scala IDE for scala

  • First create project in Eclipse and then integrate with sbt
    • Go to project diretory
    • Create build.sbt and add name, version and scalaVersion
    • Run sbt package
    • Now jar file will be created
  • Use sbt to generate eclipse project
    • Go to working directory and create directory for project
    • Run mkdir -p /src/main/scala
    • Run sbt eclipse
    • Now project layout is ready
    • Use Scala for IDE to import the project
    • Create code using Scala for IDE
    • Test it and build it using sbt package
    • Now jar file will be created
Integrate sbt with Scala IDE for Eclipse
17:56

As part of this topic we will see 

  • Create sbt project for Spark using Scala
  • Integrate with Eclipse
  • Develop simple Spark application using Scala
  • Run it on the cluster

To perform this, we need

  • Hadoop cluster or Virtual Machine
  • Scala IDE for Eclipse
  • Integration of sbt with Scala IDE (sbteclipse plugin)
Develop Spark applications using Scala IDE - Introduction
01:37

Here we will see how to setup Spark Scala project using sbt

  • Create working directory for new project
  • Under working directory create src/main/scala directory structure
  • Create build.sbt with name, version, scalaVersion
  • Also update build.sbt with libraryDepencies for spark
    • libraryDependencies += "org.apache.spark"% "spark-core_2.11" % "1.6.2"
  • Complete build.sbt for spark 1.6.2 and scala 2.11 is provided below
  • Run sbt eclipse
  • Run sbt package

name := "simple-spark-scala"
version := "1.0"
scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.6.2"

Import into Scala IDE for Eclipse

Here we will see how to import Spark project using Scala to Eclipse

  • Import the project in Scala for IDE
  • Develop simple spark program
    • Go to "File -> New -> Scala Object"
    • Name it as "SimpleApp"
  • Now copy paste below scala code using Spark APIs

import org.apache.spark.SparkContext, org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("scala spark").setMaster(args(0))
    val sc = new SparkContext(conf)
    val i = List(1, 2, 3, 4, 5)
    val dataRDD = sc.parallelize(i)
    dataRDD.saveAsTextFile(args(1))
  }
}

Develop Spark applications using Scala IDE and sbt
15:16

Run the Spark Application using Scala IDE

Here we will see how to run Spark Application using Scala IDE

  • Right Click on the program, go to "Run As" -> "Run Configurations"
  • Choose Project and Main Class
  • Click on Arguments and update program arguments
  • SimpleApp main function expects 2 parameters
    • Cluster mode (pass local)
    • Output path (pass output directory)
  • Click on Run
  • Make sure there are no logs and validate the path
  • It should have data from the collection that is created as part of the output of the application

Run Spark Scala application on Cluster

As the program is successfully developed, we will see how we can run it on the cluster. If you do not have remote host or virtual machine, you can run spark-submit in local mode which is set up on your mac or PC

  • Build jar file using sbt sbt package
  • Make sure you have the environment ready with VM or Cluster
  • If not, follow this to setup environment on PC or AWS
  • scp jar file to the target environment (VM in this case)
  • Create directory in HDFS for OS user
  • Here is the sample for my VM (OS user: root)
  • Make sure to change root to the OS user you used to login

sudo -u hdfs hadoop fs -mkdir /user/root
sudo -u hdfs hadoop fs -chown root /user/root
hadoop fs -ls /user/root
  • Run with spark-submit command in YARN mode
  • Make sure /user/root/simpleappoutput does not exist
  • Run hadoop fs -rm -R /user/root/simpleappoutput to remove directory, if exists
spark-submit --class "SimpleApp" \
--master yarn \
--executor-memory 512m \
--total-executor-cores 1 \
simple-scala_2.11-1.0.jar yarn-client /user/root/simpleappoutput
  • Run with spark-submit command in YARN mode
  • Make sure /user/root/simpleappoutput does not exist
  • Run hadoop fs -rm -R /user/root/simpleappoutput to remove directory, if exists

spark-submit --class "SimpleApp" \ simple-scala_2.11-1.0.jar local /user/root/simpleappoutput

  • Validate results by querying hadoop fs -ls /user/root/simpleappoutput


Run Spark applications on cluster
16:23
+
Spark Context, Configuration and Externalizing Parameters
5 Lectures 54:21

As part of this topic we will understand what is SparkConf and SparkContext.

  • SparkConf - Configuration information of the application
  • SparkContext - Tells Spark how to access the cluster (YARN or Mesos or local)
  • Application parameters and configurations
Introduction
02:58

As part of this topic we will see details about SparkConf and how it is used while submitting spark applications.

  • As part of Cluster setup we will have a directory which in turn have files such as spark-env.sh and spark-defaults.conf.
  • spark-env.sh and spark-defaults.conf will be accessible using SparkConf
  • As part of application development first we need to create SparkConf object
  • There are several set and get methods to override parameter values or environment variables
  • setAppName can be used to give readable name to the application to get the logs
  • setMaster can be used to run the application in YARN mode or local mode or mesos mode
  • setExecutorEnv can be used to override environment variable of executor (such as memory)
  • set is generic method to override any configuration parameter

Here is the code snippet to create SparkConf

    val conf = new SparkConf().
      setAppName("Word Count).
      setMaster("local")

Spark Configuration
10:39

As part of this topic we will see details about SparkContext and how it is used while submitting spark applications.

  • SparkContext tells how application should be run (based on default or SparkConf setMaster)
  • Following are different contexts in which spark applications can run
    • local
    • yarn-client
    • mesos URL
    • spark URL
  • Once SparkContext object is created we can invoke functions such as
    • textFile - to read text files from local, s3, HDFS etc
    • sequenceFile - to read Hadoop's sequence file
    • parallelize - to parallelize collection
    • and many more
Spark Context
08:50

As part of this topic we will see how word count program can be developed using Scala IDE. Here are steps in developing the program

  • Program takes 2 parameters - input path and output path
  • Initialize application name and master (using SparkConf)
  • Create spark context object
  • Read data from input path using textFile
  • Apply flatMap and map function to tokenize the data as well as assign 1 to each of the token
  • Perform the word count using reduceByKey
  • Write output to outputPath
Develop word count program
13:30

In this topic we will see application properties and parameters in detail. Following are the ways we can pass parameters to application.

  • Arguments
  • Properties files (application.properties, application.conf etc)
  • We typically develop program with main function
  • It takes Array of strings as parameters
  • We can pass as many parameters as we want
  • It is not good practice to pass all the parameters as arguments (eg: username, password etc)
  • At the same time username, password etc should not be hard coded as part of the code
  • We need to externalize those using properties files
  • There is scala plugin to load these parameter files (typesafe config)
    • libraryDependencies += "com.typesafe" %"config" % "1.3.0"

Code snippet to load configuration properties

    val appConf = ConfigFactory.load()

Wordcount Improvised

In this topic we will see improvised word count program, which uses

  • Pass input path and output path as arguments
  • application.properties to externalize deployment mode (local or yarn-client etc)
    • Create application.properties and add parameter deploymentMode
    • Add dependency typesafe config dependency to build.sbt
    • Modify code to load application.properties
    • Get deploymentMode and pass it to setMaster
  • Accessing all run time parameters
  • Validate input path
    • Check whether input path exists
  • Override output path
    • Delete output path if already exists
  • Exercise - Compile it and run it on the cluster (for reference, click here and go to last section)

build.sbt

name := "demo-spark-scala"
version := "1.0"
scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.6.2"
libraryDependencies += "com.typesafe" % "config" % "1.3.0"

WordCount.scala

package wordcount

import com.typesafe.config._
import org.apache.spark.SparkContext, org.apache.spark.SparkConf
import org.apache.hadoop.fs._


object WordCount {
  def main(args: Array[String]) {
    val appConf = ConfigFactory.load()
    val conf = new SparkConf().
      setAppName("Word Count").
      setMaster(appConf.getString("deploymentMaster"))
      
    for(c < - conf.getAll)
      println(c._2)
    val sc = new SparkContext(conf)
    val inputPath = args(0)
    val outputPath = args(1)
    
    val fs = FileSystem.get(sc.hadoopConfiguration)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))
    
    if(!inputPathExists) {
      println("Invalid input path")
      return
    }
      
    if(outputPathExists)
      fs.delete(new Path(outputPath), true)
      
    val wc = sc.textFile(inputPath).
      flatMap(rec => rec.split(" ")).
      map(rec => (rec, 1)).
      reduceByKey((acc, value) => acc + value)
      
    wc.saveAsTextFile(outputPath)

  }
}

Externalizing Parameters
18:24
+
Develop Spark application using Scala
20 Lectures 03:07:26

Transformations and Actions form Spark Core API. Here are the detail

  • Reading and Writing from HDFS
  • Joining Disparate Data Sets
  • Aggregating data sets
  • Filtering data
  • Sorting and Ranking

As part of this, you will learn Spark APIs mentioned below

  • map, flatMap
  • filter
  • join
  • count, reduce
  • countByKey, groupByKey, reduceByKey, aggregateByKey
  • and few more

Lifecycle of an application

  • Read the data from file system (Create RDD)
  • Apply row level transformations
  • Join the data sets (if required)
  • Aggregate the data (if required)
  • Sort the data (if required)
  • Write the output back to file system
Transformations and Actions - Overview
04:53

As part of this topic we will try to understand the data set

  • Download dataset from github (retail_db)
  • retail_db is well formed database which have 6 folders (6 tables)
    • departments
    • categories
    • products
    • order_items
    • orders
    • customers
Understanding dataset and data model
08:50

As part of this topic, we will see how to develop end to end application using transformations and action

  • Define problem statement
  • Design application
  • Implementation
  • Test and validate
  • Build and run on the remote cluster
Define problem statement - introduction
03:50

We have 6 tables in our data set. We would like to see average revenue for each day for all completed orders

  • Consider only "completed" orders
  • orders have order_date - Extract order_id and order_date
  • order_items have order_item_subtotal - Extract order_item_order_id and order_item_subtotal
  • compute revenue for each order (use reduceByKey)
  • orders and order_items have 1 to many relationship 
    • one order to many order_items
  • Join orders and order_items
  • Get revenue and total number of orders for each day
  • Compute average revenue

  • orders is parent table for which order_id is primary key. Each record will store order level information such as order state, order date etc.
+-------------------+-------------+------+-----+---------+
| Field             | Type        | Null | Key | Default |
+-------------------+-------------+------+-----+---------+
| order_id          | int(11)     | NO   | PRI | NULL    |
| order_date        | datetime    | NO   |     | NULL    |
| order_customer_id | int(11)     | NO   |     | NULL    |
| order_status      | varchar(45) | NO   |     | NULL    |
+-------------------+-------------+------+-----+---------+
  • order_items is child table to orders. order_item_id is primary key and order_item_order_id is foreign key to orders.order_id. There will be multiple records in order_items for each order_id in orders table (as we can typically check out multiple order items per order)
+--------------------------+------------+------+-----+---------+
| Field                    | Type       | Null | Key | Default |
+--------------------------+------------+------+-----+---------+
| order_item_id            | int(11)    | NO   | PRI | NULL    |
| order_item_order_id      | int(11)    | NO   |     | NULL    |
| order_item_product_id    | int(11)    | NO   |     | NULL    |
| order_item_quantity      | tinyint(4) | NO   |     | NULL    |
| order_item_subtotal      | float      | NO   |     | NULL    |
| order_item_product_price | float      | NO   |     | NULL    |
+--------------------------+------------+------+-----+---------+
  • Read the data from orders and order_items
  • Extract the key from orders and order_items (using map)
  • Get revenue per order item per day
  • Join the orders and order_items
  • Get order count per date from order_items (aggregation). As there are orders which do not have corresponding records in order_items, we cannot get count using order table. We need to join order_items with orders to get total number of orders per day.
  • Get revenue per day from joined data

Define problem statement and design
09:27

As part of this topic we will see how to read data from file system 

  • There are several file systems and file formats Spark supports
  • File systems include local, HDFS, s3 and many more
  • File formats include all standard file formats
  • We will also see how we can use Hadoop APIs to use Hadoop supported file formats
Reading data from files - Introduction
02:22

As part of this topic we will see how we can read data from orders and order_items

  • SparkContext provide several options to read data
    • textFile
    • sequenceFile
    • objectFile
    • hadoopFile - old API hadoop input formats
    • newAPIHadoopFile - new API hadoop input formats
  • textFile - to read data from text files
    • Default is based on the configuration
    • It could be HDFS or local file system
    • It also works on top of s3
  • In our case it is either HDFS or local file system
    • textFile is most appropriate one
  • Input path is passed as argument to the program
Reading data for orders and order_items
20:51

As part of this topic we will see how to perform necessary transformations before joining the data.

  • Filter for completed orders
  • Extract the key from orders and order_items (using map)
  • Get order_date along with order_id from orders
  • Get order_item_subtotal along with order_id from order_items
  • Get revenue per order from order_items
Applying simple transformations - Introduction
02:32

As part of this topic we will see how we can apply filter transformation for completed orders

  • orders have 4 columns
  • 4th column is order_status
  • Data is comma separated
  • rec.split(",") will return an array
  • rec.split(",")(3) will get order_status
  • rec.split(",")(3).equals("COMPLETE") will check for completed orders
  • If it returns true record will be returned for downstream processing, else record will be discarded
  • In this case rec as well as rec.split(",")(3) is of type string
  • Hence we can use any string operations, casting functions, conditional operators etc

Code snippet to filter completed orders

    val ordersCompleted = ordersRDD.
      filter(rec => (rec.split(",")(3) == "COMPLETE"))

Few more examples on filter

val ordersRDD = sc.textFile("")
ordersRDD.filter(line => line.split(",")(3).equals("COMPLETE")).take(5).foreach(println)
ordersRDD.filter(line => line.split(",")(3).contains("PENDING")).take(5).foreach(println)
ordersRDD.filter(line => line.split(",")(0).toInt > 100).take(5).foreach(println)
ordersRDD.filter(line => line.split(",")(0).toInt > 100 || line.split(",")(3).contains("PENDING")).take(5).foreach(println)
ordersRDD.filter(line => line.split(",")(0).toInt > 1000 && 
    (line.split(",")(3).contains("PENDING") || line.split(",")(3).equals("CANCELLED"))).
    take(5).
    foreach(println)
ordersRDD.filter(line => line.split(",")(0).toInt > 1000 && 
    !line.split(",")(3).equals("COMPLETE")).
    take(5).
    foreach(println)

Filter for completed orders - filter transformation
09:15

As part of this topic we will see how to use map function and create paired RDD with order_id as key

  • orders have 4 columns and order_items have 6 columns
  • First column in orders is order_id
  • From orders we need order_date (as we need daily average revenue)
  • Use map to generate paired RDD of order_id and order_date
    • ordersCompleted.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
    • As orderId is of type int we are using toInt function to cast it to integer
  • Second column in order_items is order_id
  • From order_items we need order_item_subtotal (to compute revenue)
  • Use map to generate paired RDD of order_id and order_item_subtotal
    • orderItems.map(rec => (rec.split(",")(1).toInt, rec.split(",")(4).toFloat))
    • As orderId is of type int we are using toInt function to cast it to integer
    • As order_item_subtotal is of type float we are using toFloat to cast it to float

Code snippet to extract required fields from orders and order_items

    val orders = ordersCompleted.
      map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
    val orderItemsMap = orderItemsRDD.
      map(rec => (rec.split(",")(1).toInt, rec.split(",")(4).toFloat))

Extract required fields - map transformation
12:39

As part of this topic we will see how to use aggregate functions such as reduceByKey to get revenue for each order

  • There are several by key aggregate functions
    • groupByKey
    • reduceByKey
    • aggregateByKey
  • groupByKey is more generic function and it does not use combiner
  • It is more appropriate to use for complex group transformations such as sorting and ranking
  • aggregateByKey is used in the scenarios where we have different logic for combiner and reducer
  • reduceByKey is more appropriate function to aggregate the data for current scenario
  • Using new paired RDD of order_items, we can compute total revenue for each order
    • Remember each order will have multiple records in order_items
    • reduceByKey((acc, value) => acc + value) - function to generate revenue for each order_id
  • Now we are ready to join orders and order_items after extracting required fields and performing aggregations

Code snippet to extract required fields from orders and order_items

    val orderItems = orderItemsMap.
      reduceByKey((acc, value) => acc + value)

Generate revenue per order - reduceByKey transformation
14:37

Joining data sets - Introduction
03:24

As per this topic we will see how join can be used to get order_date and order_revenue as output

  • We already filtered orders with completed state
  • We already got order_id as key and order_date as value from orders
  • Also we got order_id as key and order_item_subtotal as value from order_items
  • We also got order revenue for each order from order_items
  • Now we can apply join function to join both the data sets

    val ordersJoin = orders.join(orderItems)

Join orders and order_items
10:05

As part of this topic we will see different aggregate functions

  • Click here to understand functionality of combiner
  • Click here to understand difference between reduceByKey and aggregateByKey
  • groupByKey - not intended for aggressive aggregations
  • reduceByKey - when both combiner logic and reducer logic are same
  • aggregateByKey - when combiner logic and reducer logic are different
  • Implement aggregateByKey to get both revenue as well as number of orders
  • Compute average revenue for each day
  • Sort the data by date
  • Save the output to a file/directory
Compute average revenue - Introduction
10:29

As part of this topic we will compute total revenue and total number of orders for each day

  • Apply map function to discard order_id and create paired RDD of order_date and per order revenue
  • per order revenue is computed before joining the data sets by using reduceByKey
  • Implement aggregateByKey to get both revenue as well as number of orders
  • Input for aggregateByKey is revenue for each order for each day
  • Output for aggregateByKey is one record
    • with order_date as key per date
    • total revenue and total number of orders per date

    val ordersJoinMap = ordersJoin.map(rec => (rec._2._1, rec._2._2))

    val revenuePerDay = ordersJoinMap.aggregateByKey((0.0, 0))(
      (acc, value) => (acc._1 + value, acc._2 + 1),
      (total1, total2) => (total1._1 + total2._1, total1._2 + total2._2)
    )

Compute total revenue and total orders for each day - aggregateByKey
15:45

Let us compute average revenue

  • We have order_date as key and revenue, total number of orders as value
  • Now we need to apply map function
  • Divide revenue with total number of orders to get average revenue

    val averageRevenuePerDay = revenuePerDay.
      map(rec => (rec._1, BigDecimal(rec._2._1 / rec._2._2).
          setScale(2, BigDecimal.RoundingMode.HALF_UP).toFloat))

Compute average revenue
09:20

Sorting data by order_date - sortByKey
07:57

Now let us see how we can store output to file syst

  • RDD can be written to any of the file systems
    • local
    • HDFS
    • s3
  • It can be written in any file format
    • saveAsTextFile
    • saveAsSequenceFile
    • saveAsHadoopFile
    • saveAsNewAPIHadoopFile
  • In this case we will try to save as text file

    averageRevenuePerDaySorted.
      saveAsTextFile(outputPath)

Save output to file system
09:40

Let us see how we can run the application on the cluster

  • Make sure sbt and eclipse are integrated
  • Review the program logic
  • Test the program locally by passing
    • input path
    • output path
    • environment (dev or local)
  • Build the jar file
  • Ship the jar file to the remote cluster (virtual machine in this case)
  • Run the program on remote cluster in yarn mode
Build and run the application - Introduction
02:00

Let us see how we can test the program locally using Scala IDE for eclips

  • Test the program locally by passing
    • input path
    • output path
    • environment (dev or local)
  • Review the output location
  • Make sure directories and files are created
  • Validate results
Run application using Scala IDE
08:49

Build and run application on remote cluster
20:41
About the Instructor
Durga Viswanatha Raju Gadiraju
4.5 Average rating
149 Reviews
8,125 Students
4 Courses
Technology Adviser and Evangelist

13+ years of experience in executing complex projects using vast array of technologies including Big Data and Cloud.

I found itversity, llc - a US based startup to provide quality training for IT professionals and staffing as well as consulting solutions for enterprise clients. I have trained thousands of IT professionals in vast array of technologies including Big Data and Cloud.

Building IT career for people and provide quality services to the clients will be paramount to our organization.

As an entry strategy itversity will be providing quality training in the areas of ABCD

* Application Development
* Big Data and Business Intelligence
* Cloud
* Datawarehousing, Databases