Pacific-Design.com

    
Home Index

1. Apache Spark

+ Cassandra RDD

+ Cassandra Table

+ Compute New Column

+ Configuration

+ DataFrame

+ Definition

+ Export Data

+ GroupBy

+ Install Spark

+ Introduction

+ JSON

+ Jupyter

+ Kafka

+ Parallelize

+ Perfomance

+ PySpark

+ Reduce

+ Resource Files

+ SBT Project

+ Scala

+ SparkSession

+ TempTable

+ Tutorial

+ UDF

+ Zeppelin

+ spark-submit

Apache Spark /

Apache Spark

Apache Spark is an open source big data processing framework built around speed, ease of use, and sophisticated analytics. It was originally developed in 2009 in UC Berkeley's AMPLab, and open sourced in 2010 as an Apache project

groupByKey

/*--- GROUP BY KEY --- 
res4: Array[(String, Iterable[String])] = Array( 
(BRADEN,CompactBuffer(SUFFOLK, SARATOGA, SUFFOLK, ERIE, SUFFOLK, SUFFOLK, ERIE)),  
(MATTEO,CompactBuffer(NASSAU, WESTCHESTER, QUEENS, SUFFOLK, NEW YORK, WESTCHESTER)),  */ 

val text = sc.textFile(flatfile.csv") 
val rows = text.map(line => line.split(",")) 
val df = rows.map(col => (col(1), col(2))) 
df.groupByKey.collect 

reducedByKey

/*--- REDUCE BY KEY --- 
res5: Array[(String, Int)] = Array((BRADEN,39), (DERECK,6), (LEIBISH,11), (MATTEO,439),
(ELVIN,34), (NOEMI,5), (AMARA,22), (BODHI,10), (BELLA,1102), (DANTE,337), (MEGAN,675),
*/ 

val text = sc.textFile(flatfile.csv") 
val rows = text.map(line => line.split(",")) 
val df = rows.map(col => (col(1), col(2).toInt )) 
df.reduceByKey((x,y) => x + y ).collect 

countByKey

//res16: Map[String,Long] = Map(red wings -> 1, jets -> 1, oilers -> 2) 

val text = sc.textFile(flatfile.csv") 
val rows = text.map(line => line.split(",")) 
val df = rows.map(col => (col(1), 1))
df.countByKey.collect

  1. textFile
  2. map
  3. split
  4. filter
  5. reduceByKey
  6. sortBy

val babyNames = sc.textFile("Baby_Name__Beginning_2007.csv") 
babyNames.count 
babyNames.first() 
 
println("-" * 80) 
println("David Rows") 
val rows = babyNames.map(line => line.split(",")) 
val davidRows = rows.filter(row => row(1).contains("DAVID")) 
davidRows.count 
davidRows.filter(row => row(4).toInt > 10).count 
 
println("-" * 80) 
println("Names Rows") 
val names = rows.map(name => (name(1),1)) 
//names.reduceByKey((a,b) => a + b ).sortBy(_._2, false).collect.foreach(println) 
names.reduceByKey((a,b) => a + b ).sortBy(_._2, false).take(10).foreach(println) 
 
println("-" * 80) 
 
val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(",")) 
//filteredRows.map( n => (n(1), n(4).toInt)).reduceByKey((a,b) => a + b).sortBy(_._2, false).collect.foreach(println) 
filteredRows.map( n => (n(1), n(4).toInt)).reduceByKey((a,b) => a + b).sortBy(_._2, false).take(10).foreach(println) 
 
 
println("-" * 80) 
sc.stop 
sys.exit() 

  1. groupByKey
  2. reduceByKey
  3. countByKey

val babyNames = sc.textFile("Baby_Name__Beginning_2007.csv") 
val rows = babyNames.map(line => line.split(",")) 
 
/*--- GROUP BY KEY --- 
res4: Array[(String, Iterable[String])] = Array( 
(BRADEN,CompactBuffer(SUFFOLK, SARATOGA, SUFFOLK, ERIE, SUFFOLK, SUFFOLK, ERIE)),  
(DERECK,CompactBuffer(Bronx, Kings)), 
(HAZEL,CompactBuffer(NEW YORK, MONROE, KINGS, KINGS, MONROE, ERIE, NEW YORK, KINGS)),  
(MATTEO,CompactBuffer(NASSAU, WESTCHESTER, QUEENS, SUFFOLK, NEW YORK, WESTCHESTER)),  
 
*/ 
println; println("-" * 80) 
println("valuesPerKey.groupByKey.collect") 
println("-" * 80) 
val valuesPerKey = rows.map(name => (name(1), name(2))) 
valuesPerKey.groupByKey.collect 
println 
 
/*--- REDUCE BY KEY --- 
res5: Array[(String, Int)] = Array((BRADEN,39), (DERECK,6), (LEIBISH,11), (MATTEO,439), (HAZEL,237),  
(RORY,46), (SKYE,109), (JOSUE,535), (NAHLA,26), (ASIA,6), (AMINAH,5), (HINDY,354), (MEGAN,675),  
(ELVIN,34), (NOEMI,5), (AMARA,22), (BODHI,10), (BELLA,1102), (DANTE,337), 
*/ 
 
//val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(",")) 
//filteredRows.map(n => (n(1), n(4).toInt )).reduceByKey((v1,v2) => v1 + v2 ).collect 
 
println; println("-" * 80) 
println("keysValueSum.reduceByKey((v1,v2) => v1 + v2 ).collect") 
println("-" * 80) 
val keysValueSum = rows.map(n => (n(1), n(4).toInt )) 
keysValueSum.reduceByKey((v1,v2) => v1 + v2 ).collect 
println 
 
 
//res16: Map[String,Long] = Map(red wings -> 1, jets -> 1, oilers -> 2) 
println; println("-" * 80) 
println("countKeys.map(k => (k,1)).countByKey") 
println("-" * 80) 
val countKeys = sc.parallelize(List("jets", "oilers", "red wings", "oilers"))  
countKeys.map(k => (k,1)).countByKey 
println 
 
println; println("-" * 80) 
sc.stop 
sys.exit() 

  1. union
  2. intersection

println("-" * 80) 
val parallel = sc.parallelize(1 to 9) 
val par2 = sc.parallelize(5 to 15) 
parallel.sample(true,.2).count 
 
println("-" * 80) 
parallel.union(par2).collect 
 
println("-" * 80) 
parallel.intersection(par2).collect 
 
println("-" * 80) 
parallel.union(par2).distinct.collect 
 
println("-" * 80) 
sc.stop 
sys.exit() 

  1. mapPartitions
  2. mapPartitionsWithIndex

println("-" * 80) 
val babyNames = sc.textFile("Baby_Name__Beginning_2007.csv") 
babyNames.count 
 
println("-" * 80) 
val res0 = sc.parallelize(List(1,2,3)).flatMap(x => List(x,x,x)).collect 
println(res0) 
//res0: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3) 
 
println("-" * 80) 
val res1 = sc.parallelize(List(1,2,3)).map(x => List(x,x,x)).collect 
println(res1) 
//res1: Array[List[Int]] = Array(List(1, 1, 1), List(2, 2, 2), List(3, 3, 3)) 
 
println("-" * 80) 
val parallel = sc.parallelize(1 to 9, 3) // 3 slices 
val res2 = parallel.mapPartitions(x => List(x.next).iterator).collect 
println(res2) 
//res2: Array[Int] = Array(1, 4, 7) 
 
println("-" * 80) 
val res6 = parallel
    .mapPartitionsWithIndex( ( index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", "+x).iterator)
    .collect 

println(res6) 
//res6: Array[String] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9) 
 
println("-" * 80) 
val parallel = sc.parallelize(1 to 9) 
parallel.sample(true,.2).count 
 
 
println("-" * 80) 
sc.stop 
sys.exit() 

  1. takeSample

println; println("-" * 80) 
val names1 = sc.parallelize(List("abe", "abby", "apple")) 
 
println; println("-" * 80) 
names1.reduce((t1, t2) => t1 + t2) 
//res3: String = abeappleabby                                                      
 
println; println("-" * 80) 
names1.flatMap(k => List(k.size) ).reduce((t1,t2) => t1 + t2) 
//res4: Int = 12 
 
println; println("-" * 80) 
val names2 = sc.parallelize(List("apple", "beatty", "beatrice")).map(a => (a, a.size)) 
names2.flatMap(t => Array(t._2)).reduce(_ + _) 
//res8: Int = 19 
 
println; println("-" * 80) 
names2.count 
//res9: Long = 3 
 
println; println("-" * 80) 
val teams = sc.parallelize(List("twins", "brewers", "cubs", "white sox", "indians", "bears")) 
teams.takeSample(true, 3) 
//res10: Array[String] = Array(indians, indians, brewers) 
 
println; println("-" * 80) 
sc.stop 
sys.exit()