Pacific-Design.com

    
Home Index

1. Apache Spark

2. Java Spark

+ RowFactory

Apache Spark / Java Spark /

Spark Java Basic Functions

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.*;
import java.util.stream.Collectors;

public class SparkRun {

    public void execute() {

        SparkConf conf = new SparkConf().setAppName("SQL Component");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String[]> data = sc.textFile("/tmp/datafile.csv")
                .map(s -> s.split(","));

        long lines = data.count();
        long unique = data.map(strings -> strings[0]).distinct().count();

        List<Tuple2<String, Integer>> pairs = data.mapToPair(strings -> new Tuple2<String, Integer>(strings[1], 1))
                .reduceByKey((Integer i1, Integer i2) -> i1 + i2).collect();

        Map<String, Integer> sortedData = new HashMap<>();
        Iterator it = pairs.iterator();
        while (it.hasNext()) {
            Tuple2<String, Integer> o = (Tuple2<String, Integer>) it.next();
            sortedData.put(o._1, o._2);
        }

        List<String> sorted = sortedData.entrySet().stream()
                .sorted(Comparator.comparing((Map.Entry<String, Integer> entry) -> entry.getValue()).reversed())
                .map(Map.Entry::getKey).collect(Collectors.toList());
        
        String mostPopular = sorted.get(0);
        int purchases = sortedData.get(mostPopular);

        System.out.println("+----------------------------------------------------------");
        System.out.println("| Total purchases : " + lines);
        System.out.println("| Unique users    : " + unique);
        System.out.println(String.format(
                           "| Frequent word   : %s = %d ",
                mostPopular, purchases));
        System.out.println("+----------------------------------------------------------");

    }


}