samedi 25 avril 2015

Apache Spark TF-IDF Calculation


I am trying to do TF-IDF calculation using spark without the MLIB Library in Java.

I calculate the TF separately for the word and document and IDF separately and then trying to join these two RDDs. But I am getting GC overhead errors. Can someone suggest what I can do to avoid that?

/* links has (document,list(text))  */

    JavaPairRDD<String,String> CommonKey = links
            .flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<String>>, String,String>() {
                @Override
                public Iterable<Tuple2<String,String>> call(Tuple2<String, Iterable<String>> s) {
                    //int urlCount = Iterables.size(s._1);

                    List<Tuple2<String, String>> results = new ArrayList<Tuple2<String, String>>();
                    for (String n : s._2) {

                        String[] itrStrings = n.split("\\s+");
                        for (String a : itrStrings) {
                            if(a != null && !a.isEmpty()) {

                                results.add(new Tuple2<String, String>(a, s._1));
                            }
                        }
                    }
                    return results;
                }
            });


    JavaPairRDD<Tuple2<String,String>, Double> freq = CommonKey
            .mapToPair(new PairFunction<Tuple2<String, String>, Tuple2<String,String>, Double>() {
                @Override
                public Tuple2<Tuple2<String,String>, Double> call(Tuple2<String, String> s) {
                    Tuple2<String, String> TupleKey = new Tuple2<String, String>( s._1,s._2);
                    return new Tuple2<Tuple2<String,String>, Double>(TupleKey,1.0);
                }
            });
    JavaPairRDD<Tuple2<String,String>, Double> wordCount = freq.reduceByKey(new Function2<Double, Double, Double>() {
        public Double call(Double a, Double b) { return  (a + b); }
    });

    /* (word,title),idf */





    JavaPairRDD<Tuple2<String,String>,Double> idffreqCount = CommonKey.distinct().groupByKey()
            .flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<String>>, Tuple2<String,String> ,Double>() {
                @Override
                public Iterable<Tuple2<Tuple2<String,String>,Double>> call(Tuple2<String, Iterable<String>> s) {
                    int dWord = Iterables.size(s._2);

                    List<Tuple2<Tuple2<String, String>,Double>> results = new ArrayList<Tuple2<Tuple2<String, String>,Double>>();

                    for (String n : s._2) {

                        Tuple2<String,String> TupleKey = new Tuple2<String,String>(s._1,n);

                        results.add(new Tuple2<Tuple2<String, String>,Double>(TupleKey, Math.log10(N/dWord)));

                    }

                    return results;
                }
            });



    JavaPairRDD<Tuple2<String,String>,Tuple2<Double,Double>> finalIDF = wordCount.join(idffreqCount);
    JavaPairRDD<Tuple2<String,String>,Double> finalIDFcount = finalIDF.mapValues(new Function<Tuple2<Double,Double>, Double>() {

        public Double call(Tuple2<Double,Double> sum) {
            return sum._1 * sum._2;
        }
    });

Thanks!!


Aucun commentaire:

Enregistrer un commentaire