Pacific-Design.com

    
Home Index

1. Apache Spark

2. Compute New Column

Apache Spark / Compute New Column /

Create a new column with function in Spark Dataframe


  /*---------------------------------------------------------------------------------------
                                DataFrame New Column Function
    ---------------------------------------------------------------------------------------*/

  val duraj: (String => String) = (arg: String) => {   

    var result = "no3years"

    try {
      val beginDate   = arg.toString.take(10)
      val dateFormat  = new SimpleDateFormat("yyyy-MM-dd");
      val date        = new Date();
      val currentDate = dateFormat.format(date)
      //DEBUG println("currentDate=" + currentDate)

      val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
      val oldDate   = LocalDate.parse(beginDate, formatter)
      val newDate   = LocalDate.parse(currentDate, formatter)
      val period    = newDate.toEpochDay() - oldDate.toEpochDay()
      //DEBUG println("period=" + period)
    
      if(period > (365*3)) { result = "up3year"    } 
      else                 { result = "over3years" }

    } catch { 
        case unknown => { result = arg } 
    }   

    result
  }

  /*---------------------------------------------------------------------------------------
                                  Tranformation
    ---------------------------------------------------------------------------------------*/

  def index_tranformation(sourceAvroFile: String, start: String) {

    val sc             = new SparkContext(new SparkConf().setAppName("ExportData"))
    val sqlContext     = new SQLContext(sc)
    var bufferedWriter = new BufferedWriter(new FileWriter(new File("/data/raw/data-index.dat")))

    for (line <- Source.fromFile(sourceAvroFile).getLines) {  

      val line2    = line.toString.replaceAll("s3://", "@")
      val avroFile = "s3n://" + aws.access_key_id + ":" + aws.secret_access_key + line2
      println(avroFile)
     
      val video   = sqlContext.read.format("com.databricks.spark.avro").load(avroFile).toDF()
      val sqlfunc = udf(duraj)
      val newDF   = video.withColumn("period", sqlfunc(col("published_at"))) 


     newDF.map(t => 
     "duration="             + t.getAs[String]("duration")                               + "\n" + 
     "record_creation_time=" + t.getAs[String]("record_creation_time").toString.take(10) + "\n" + 
     "relevancy_score="      + t.getAs[String]("relevancy_score")                        + "\n" + 
     "period="               + t.getAs[String]("period")                                 + "\n" 
     ).collect().map(_.trim).foreach( row => bufferedWriter.write(row + "\n\n") )
    
    }

    scala.util.Try(bufferedWriter.close())

  }