Pacific-Design.com

    
Home Index

1. Spark

2. Tutorial

Spark / Tutorial /

1-textFile-map-split-filter-sortBy.scala

1-textFile-map-split-filter-sortBy.scala
/* 
    Most popular baby names 
    Kevin Duraj 
*/ 
 
//val babyNames = sc.textFile("baby_names.csv") 
val babyNames = sc.textFile("Baby_Name__Beginning_2007.csv") 
babyNames.count 
babyNames.first() 
 
println("-" * 80) 
println("David Rows") 
val rows = babyNames.map(line => line.split(",")) 
val davidRows = rows.filter(row => row(1).contains("DAVID")) 
davidRows.count 
davidRows.filter(row => row(4).toInt > 10).count 
 
println("-" * 80) 
println("Names Rows") 
val names = rows.map(name => (name(1),1)) 
//names.reduceByKey((a,b) => a + b ).sortBy(_._2, false).collect.foreach(println) 
names.reduceByKey((a,b) => a + b ).sortBy(_._2, false).take(10).foreach(println) 
 
println("-" * 80) 
 
val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(",")) 
//filteredRows.map( n => (n(1), n(4).toInt)).reduceByKey((a,b) => a + b).sortBy(_._2, false).collect.foreach(println) 
filteredRows.map( n => (n(1), n(4).toInt)).reduceByKey((a,b) => a + b).sortBy(_._2, false).take(10).foreach(println) 
 
 
println("-" * 80) 
sc.stop 
sys.exit() 
 
2-baby-names.scala
2-baby-names.scala
/* 
    Most popular baby names 
    Kevin Duraj 
*/ 
 
println("-" * 80) 
//val babyNames = sc.textFile("baby_names.csv") 
val babyNames = sc.textFile("Baby_Name__Beginning_2007.csv") 
babyNames.count 
 
println("-" * 80) 
val res0 = sc.parallelize(List(1,2,3)).flatMap(x => List(x,x,x)).collect 
println(res0) 
//res0: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3) 
 
println("-" * 80) 
val res1 = sc.parallelize(List(1,2,3)).map(x => List(x,x,x)).collect 
println(res1) 
//res1: Array[List[Int]] = Array(List(1, 1, 1), List(2, 2, 2), List(3, 3, 3)) 
 
println("-" * 80) 
val parallel = sc.parallelize(1 to 9, 3) // 3 slices 
val res2 = parallel.mapPartitions(x => List(x.next).iterator).collect 
println(res2) 
//res2: Array[Int] = Array(1, 4, 7) 
 
println("-" * 80) 
val res6 = parallel.mapPartitionsWithIndex( ( index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", "+x).iterator).collect 
println(res6) 
//res6: Array[String] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9) 
 
println("-" * 80) 
val parallel = sc.parallelize(1 to 9) 
parallel.sample(true,.2).count 
 
 
println("-" * 80) 
sc.stop 
sys.exit() 
 
3-union-intersection-distinct.scala
3-union-intersection-distinct.scala
/* 
    Most popular baby names 
    Kevin Duraj 
*/ 
 
println("-" * 80) 
val parallel = sc.parallelize(1 to 9) 
val par2 = sc.parallelize(5 to 15) 
parallel.sample(true,.2).count 
 
println("-" * 80) 
parallel.union(par2).collect 
 
println("-" * 80) 
parallel.intersection(par2).collect 
 
println("-" * 80) 
parallel.union(par2).distinct.collect 
 
println("-" * 80) 
sc.stop 
sys.exit() 
 
4-reduce-takeSample.scala
4-reduce-takeSample.scala
/* 
 
*/ 
 
println; println("-" * 80) 
val names1 = sc.parallelize(List("abe", "abby", "apple")) 
 
println; println("-" * 80) 
names1.reduce((t1, t2) => t1 + t2) 
//res3: String = abeappleabby                                                      
 
println; println("-" * 80) 
names1.flatMap(k => List(k.size) ).reduce((t1,t2) => t1 + t2) 
//res4: Int = 12 
 
println; println("-" * 80) 
val names2 = sc.parallelize(List("apple", "beatty", "beatrice")).map(a => (a, a.size)) 
names2.flatMap(t => Array(t._2)).reduce(_ + _) 
//res8: Int = 19 
 
println; println("-" * 80) 
names2.count 
//res9: Long = 3 
 
println; println("-" * 80) 
val teams = sc.parallelize(List("twins", "brewers", "cubs", "white sox", "indians", "bears")) 
teams.takeSample(true, 3) 
//res10: Array[String] = Array(indians, indians, brewers) 
 
println; println("-" * 80) 
sc.stop 
sys.exit() 
 
5-groupByKey-reduceByKey-countByKey.scala
5-groupByKey-reduceByKey-countByKey.scala
/* 
    Most popular baby names 
    Year,First Name,County,Sex,Count 
    Kevin Duraj 
*/ 
 
//val babyNames = sc.textFile("baby_names.csv") 
val babyNames = sc.textFile("Baby_Name__Beginning_2007.csv") 
val rows = babyNames.map(line => line.split(",")) 
 
/*--- GROUP BY KEY --- 
res4: Array[(String, Iterable[String])] = Array( 
(BRADEN,CompactBuffer(SUFFOLK, SARATOGA, SUFFOLK, ERIE, SUFFOLK, SUFFOLK, ERIE)),  
(DERECK,CompactBuffer(Bronx, Kings)), 
(HAZEL,CompactBuffer(NEW YORK, MONROE, KINGS, KINGS, MONROE, ERIE, NEW YORK, KINGS)),  
(MATTEO,CompactBuffer(NASSAU, WESTCHESTER, QUEENS, SUFFOLK, NEW YORK, WESTCHESTER)),  
 
*/ 
println; println("-" * 80) 
println("valuesPerKey.groupByKey.collect") 
println("-" * 80) 
val valuesPerKey = rows.map(name => (name(1), name(2))) 
valuesPerKey.groupByKey.collect 
println 
 
/*--- REDUCE BY KEY --- 
res5: Array[(String, Int)] = Array((BRADEN,39), (DERECK,6), (LEIBISH,11), (MATTEO,439), (HAZEL,237),  
(RORY,46), (SKYE,109), (JOSUE,535), (NAHLA,26), (ASIA,6), (AMINAH,5), (HINDY,354), (MEGAN,675),  
(ELVIN,34), (NOEMI,5), (AMARA,22), (BODHI,10), (BELLA,1102), (DANTE,337), 
*/ 
 
//val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(",")) 
//filteredRows.map(n => (n(1), n(4).toInt )).reduceByKey((v1,v2) => v1 + v2 ).collect 
 
println; println("-" * 80) 
println("keysValueSum.reduceByKey((v1,v2) => v1 + v2 ).collect") 
println("-" * 80) 
val keysValueSum = rows.map(n => (n(1), n(4).toInt )) 
keysValueSum.reduceByKey((v1,v2) => v1 + v2 ).collect 
println 
 
 
//res16: Map[String,Long] = Map(red wings -> 1, jets -> 1, oilers -> 2) 
println; println("-" * 80) 
println("countKeys.map(k => (k,1)).countByKey") 
println("-" * 80) 
val countKeys = sc.parallelize(List("jets", "oilers", "red wings", "oilers"))  
countKeys.map(k => (k,1)).countByKey 
println 
 
println; println("-" * 80) 
sc.stop 
sys.exit()