Pacific-Design.com

    
Home Index

1. Machine Learning

2. 5 Spark Naive Bayes

Machine Learning / 5 Spark Naive Bayes /

Spark - Naive Bayes

// Build a model using Naiive Bayes
// spark-shell -i naive-bayes.scala

import org.apache.spark.mllib.classification.NaiveBayes

/*---------------------------------------------------------------------
                    Load Data 
-----------------------------------------------------------------------*/
val raw_trips = sc.textFile("2014-Q1-cabi-trip-history-data.csv")
raw_trips.take(4)

def convertDur(dur: String): Long = {
  val dur_regex = """(\d+)h\s(\d+)m\s(\d+)s""".r
  val dur_regex(hour, minute, second) = dur
  (hour.toLong * 3600L) + (minute.toLong * 60L) + second.toLong

}

case class Trip(id: String, dur: Long, s0: String, s1: String, reg: String)
val bike_trips = raw_trips.map(_.split(",")).filter(_(0) != "Duration").map( r => Trip(r(5), convertDur(r(0)), r(2), r(4), r(6) ) ) 
bike_trips.cache()

/*---------------------------------------------------------------------
                   DataFrame Model 
-----------------------------------------------------------------------*/
val bike_df = bike_trips.toDF()
bike_df.registerTempTable("bikeshare")
sql("SELECT * FROM bikeshare LIMIT 10").show()

val query = """
  SELECT COUNT(*) AS num, s0, s1
  FROM bikeshare
  GROUP BY s0, s1
  ORDER BY num DESC LIMIT 10
"""

sql(query).show()

/*---------------------------------------------------------------------
                  Compute NaiveBayes

val model = NaiveBayes.train(train_set, lambda = 1.0)
val pred  = test_set.map(t => ( model.predict(t.features), t.label))
val cm    = sc.parallelize(pred.countByValue().toSeq)
val cm_nb = cm.map(x => ( x._1, (1.0 * x._2 / n))).sortBy(_._1, true)
cm_nb.foreach(println)

-----------------------------------------------------------------------*/

exit()

References: