Pacific-Design.com

    
Home Index

1. Apache Spark

+ Cassandra CQL

+ Cassandra RDD

+ Compute New Column

+ Configuration

+ DataFrame

+ Export Data

+ GroupBy

+ Install Spark

+ Introduction

+ JSON

+ Parallelize

+ PySpark

+ RDD to DF

+ Reduce

+ SBT Project

+ Scala

+ TempTable

+ Tutorial

+ UDF

+ Video

+ Zeppelin

+ spark-submit

Apache Spark /

Apache Spark

Apache Spark is an open source big data processing framework built around speed, ease of use, and sophisticated analytics. It was originally developed in 2009 in UC Berkeley's AMPLab, and open sourced in 2010 as an Apache project

groupByKey

/*--- GROUP BY KEY --- 
res4: Array[(String, Iterable[String])] = Array( 
(BRADEN,CompactBuffer(SUFFOLK, SARATOGA, SUFFOLK, ERIE, SUFFOLK, SUFFOLK, ERIE)),  
(MATTEO,CompactBuffer(NASSAU, WESTCHESTER, QUEENS, SUFFOLK, NEW YORK, WESTCHESTER)),  */ 

val text = sc.textFile(flatfile.csv") 
val rows = text.map(line => line.split(",")) 
val df = rows.map(col => (col(1), col(2))) 
df.groupByKey.collect 

reducedByKey

/*--- REDUCE BY KEY --- 
res5: Array[(String, Int)] = Array((BRADEN,39), (DERECK,6), (LEIBISH,11), (MATTEO,439),
(ELVIN,34), (NOEMI,5), (AMARA,22), (BODHI,10), (BELLA,1102), (DANTE,337), (MEGAN,675),
*/ 

val text = sc.textFile(flatfile.csv") 
val rows = text.map(line => line.split(",")) 
val df = rows.map(col => (col(1), col(2).toInt )) 
df.reduceByKey( (v1,v2) => v1 + v2 ).collect 

countByKey

//res16: Map[String,Long] = Map(red wings -> 1, jets -> 1, oilers -> 2) 

val text = sc.textFile(flatfile.csv") 
val rows = text.map(line => line.split(",")) 
val df = rows.map(col => (col(1), 1))
df.countByKey.collect

RDD - Resilient Distributes Dataset

  • Abstraction for data interaction ( lazy in memory)
  • RDDs are an immutable, distributed collection of elements into partitions
  • RDDs - multiple types

Spark Transformation

Produce a RDD, which is a collection of elements partitioned across the nodes of the cluster. that can be operated on in parallel.
  • map
  • flatMap
  • filter

Spark Actions

Executes various parallel operations on a cluster and produce result.
  • reduce
  • collect
  • count

Spark Driver

  • declares transformation and actions


Apache Spark popular libraries:

  • Spark SQL:
    • Spark SQL provides the capability to expose the Spark datasets over JDBC API and allow running the SQL like queries on Spark data using traditional BI and visualization tools. Spark SQL allows the users to ETL their data from different formats it’s currently in (like JSON, Parquet, a Database), transform it, and expose it for ad-hoc querying.
  • Spark MLlib:

    • MLlib is Spark’s scalable machine learning library consisting of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, as well as underlying optimization primitives.
  • Spark GraphX:

    • GraphX is the new (alpha) Spark API for graphs and graph-parallel computation. At a high level, GraphX extends the Spark RDD by introducing the Resilient Distributed Property Graph: a directed multi-graph with properties attached to each vertex and edge. To support graph computation, GraphX exposes a set of fundamental operators (e.g., subgraph, joinVertices, and aggregateMessages) as well as an optimized variant of the Pregel API. In addition, GraphX includes a growing collection of graph algorithms and builders to simplify graph analytics tasks.
  • Spark Streaming:

    • Spark Streaming can be used for processing the real-time streaming data. This is based on micro batch style of computing and processing. It uses the DStream which is basically a series of RDDs, to process the real-time data.
  • Spark Cassandra Connector:

    • There are also integration adapters with other products like Cassandra (Spark Cassandra Connector) and R (SparkR). With Cassandra Connector, you can use Spark to access data stored in a Cassandra database and perform data analytics on that data.i

  1. textFile
  2. map
  3. split
  4. filter
  5. reduceByKey
  6. sortBy

val babyNames = sc.textFile("Baby_Name__Beginning_2007.csv") 
babyNames.count 
babyNames.first() 
 
println("-" * 80) 
println("David Rows") 
val rows = babyNames.map(line => line.split(",")) 
val davidRows = rows.filter(row => row(1).contains("DAVID")) 
davidRows.count 
davidRows.filter(row => row(4).toInt > 10).count 
 
println("-" * 80) 
println("Names Rows") 
val names = rows.map(name => (name(1),1)) 
//names.reduceByKey((a,b) => a + b ).sortBy(_._2, false).collect.foreach(println) 
names.reduceByKey((a,b) => a + b ).sortBy(_._2, false).take(10).foreach(println) 
 
println("-" * 80) 
 
val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(",")) 
//filteredRows.map( n => (n(1), n(4).toInt)).reduceByKey((a,b) => a + b).sortBy(_._2, false).collect.foreach(println) 
filteredRows.map( n => (n(1), n(4).toInt)).reduceByKey((a,b) => a + b).sortBy(_._2, false).take(10).foreach(println) 
 
 
println("-" * 80) 
sc.stop 
sys.exit() 

  1. groupByKey
  2. reduceByKey
  3. countByKey

val babyNames = sc.textFile("Baby_Name__Beginning_2007.csv") 
val rows = babyNames.map(line => line.split(",")) 
 
/*--- GROUP BY KEY --- 
res4: Array[(String, Iterable[String])] = Array( 
(BRADEN,CompactBuffer(SUFFOLK, SARATOGA, SUFFOLK, ERIE, SUFFOLK, SUFFOLK, ERIE)),  
(DERECK,CompactBuffer(Bronx, Kings)), 
(HAZEL,CompactBuffer(NEW YORK, MONROE, KINGS, KINGS, MONROE, ERIE, NEW YORK, KINGS)),  
(MATTEO,CompactBuffer(NASSAU, WESTCHESTER, QUEENS, SUFFOLK, NEW YORK, WESTCHESTER)),  
 
*/ 
println; println("-" * 80) 
println("valuesPerKey.groupByKey.collect") 
println("-" * 80) 
val valuesPerKey = rows.map(name => (name(1), name(2))) 
valuesPerKey.groupByKey.collect 
println 
 
/*--- REDUCE BY KEY --- 
res5: Array[(String, Int)] = Array((BRADEN,39), (DERECK,6), (LEIBISH,11), (MATTEO,439), (HAZEL,237),  
(RORY,46), (SKYE,109), (JOSUE,535), (NAHLA,26), (ASIA,6), (AMINAH,5), (HINDY,354), (MEGAN,675),  
(ELVIN,34), (NOEMI,5), (AMARA,22), (BODHI,10), (BELLA,1102), (DANTE,337), 
*/ 
 
//val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(",")) 
//filteredRows.map(n => (n(1), n(4).toInt )).reduceByKey((v1,v2) => v1 + v2 ).collect 
 
println; println("-" * 80) 
println("keysValueSum.reduceByKey((v1,v2) => v1 + v2 ).collect") 
println("-" * 80) 
val keysValueSum = rows.map(n => (n(1), n(4).toInt )) 
keysValueSum.reduceByKey((v1,v2) => v1 + v2 ).collect 
println 
 
 
//res16: Map[String,Long] = Map(red wings -> 1, jets -> 1, oilers -> 2) 
println; println("-" * 80) 
println("countKeys.map(k => (k,1)).countByKey") 
println("-" * 80) 
val countKeys = sc.parallelize(List("jets", "oilers", "red wings", "oilers"))  
countKeys.map(k => (k,1)).countByKey 
println 
 
println; println("-" * 80) 
sc.stop 
sys.exit() 

  1. union
  2. intersection

println("-" * 80) 
val parallel = sc.parallelize(1 to 9) 
val par2 = sc.parallelize(5 to 15) 
parallel.sample(true,.2).count 
 
println("-" * 80) 
parallel.union(par2).collect 
 
println("-" * 80) 
parallel.intersection(par2).collect 
 
println("-" * 80) 
parallel.union(par2).distinct.collect 
 
println("-" * 80) 
sc.stop 
sys.exit() 

  1. mapPartitions
  2. mapPartitionsWithIndex

println("-" * 80) 
val babyNames = sc.textFile("Baby_Name__Beginning_2007.csv") 
babyNames.count 
 
println("-" * 80) 
val res0 = sc.parallelize(List(1,2,3)).flatMap(x => List(x,x,x)).collect 
println(res0) 
//res0: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3) 
 
println("-" * 80) 
val res1 = sc.parallelize(List(1,2,3)).map(x => List(x,x,x)).collect 
println(res1) 
//res1: Array[List[Int]] = Array(List(1, 1, 1), List(2, 2, 2), List(3, 3, 3)) 
 
println("-" * 80) 
val parallel = sc.parallelize(1 to 9, 3) // 3 slices 
val res2 = parallel.mapPartitions(x => List(x.next).iterator).collect 
println(res2) 
//res2: Array[Int] = Array(1, 4, 7) 
 
println("-" * 80) 
val res6 = parallel
    .mapPartitionsWithIndex( ( index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", "+x).iterator)
    .collect 

println(res6) 
//res6: Array[String] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9) 
 
println("-" * 80) 
val parallel = sc.parallelize(1 to 9) 
parallel.sample(true,.2).count 
 
 
println("-" * 80) 
sc.stop 
sys.exit() 

  1. takeSample

println; println("-" * 80) 
val names1 = sc.parallelize(List("abe", "abby", "apple")) 
 
println; println("-" * 80) 
names1.reduce((t1, t2) => t1 + t2) 
//res3: String = abeappleabby                                                      
 
println; println("-" * 80) 
names1.flatMap(k => List(k.size) ).reduce((t1,t2) => t1 + t2) 
//res4: Int = 12 
 
println; println("-" * 80) 
val names2 = sc.parallelize(List("apple", "beatty", "beatrice")).map(a => (a, a.size)) 
names2.flatMap(t => Array(t._2)).reduce(_ + _) 
//res8: Int = 19 
 
println; println("-" * 80) 
names2.count 
//res9: Long = 3 
 
println; println("-" * 80) 
val teams = sc.parallelize(List("twins", "brewers", "cubs", "white sox", "indians", "bears")) 
teams.takeSample(true, 3) 
//res10: Array[String] = Array(indians, indians, brewers) 
 
println; println("-" * 80) 
sc.stop 
sys.exit() 

References: