I am trying to do TF-IDF calculation using spark without the MLIB Library in Java.
I calculate the TF separately for the word and document and IDF separately and then trying to join these two RDDs. But I am getting GC overhead errors. Can someone suggest what I can do to avoid that?
/* links has (document,list(text)) */
JavaPairRDD<String,String> CommonKey = links
.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<String>>, String,String>() {
@Override
public Iterable<Tuple2<String,String>> call(Tuple2<String, Iterable<String>> s) {
//int urlCount = Iterables.size(s._1);
List<Tuple2<String, String>> results = new ArrayList<Tuple2<String, String>>();
for (String n : s._2) {
String[] itrStrings = n.split("\\s+");
for (String a : itrStrings) {
if(a != null && !a.isEmpty()) {
results.add(new Tuple2<String, String>(a, s._1));
}
}
}
return results;
}
});
JavaPairRDD<Tuple2<String,String>, Double> freq = CommonKey
.mapToPair(new PairFunction<Tuple2<String, String>, Tuple2<String,String>, Double>() {
@Override
public Tuple2<Tuple2<String,String>, Double> call(Tuple2<String, String> s) {
Tuple2<String, String> TupleKey = new Tuple2<String, String>( s._1,s._2);
return new Tuple2<Tuple2<String,String>, Double>(TupleKey,1.0);
}
});
JavaPairRDD<Tuple2<String,String>, Double> wordCount = freq.reduceByKey(new Function2<Double, Double, Double>() {
public Double call(Double a, Double b) { return (a + b); }
});
/* (word,title),idf */
JavaPairRDD<Tuple2<String,String>,Double> idffreqCount = CommonKey.distinct().groupByKey()
.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<String>>, Tuple2<String,String> ,Double>() {
@Override
public Iterable<Tuple2<Tuple2<String,String>,Double>> call(Tuple2<String, Iterable<String>> s) {
int dWord = Iterables.size(s._2);
List<Tuple2<Tuple2<String, String>,Double>> results = new ArrayList<Tuple2<Tuple2<String, String>,Double>>();
for (String n : s._2) {
Tuple2<String,String> TupleKey = new Tuple2<String,String>(s._1,n);
results.add(new Tuple2<Tuple2<String, String>,Double>(TupleKey, Math.log10(N/dWord)));
}
return results;
}
});
JavaPairRDD<Tuple2<String,String>,Tuple2<Double,Double>> finalIDF = wordCount.join(idffreqCount);
JavaPairRDD<Tuple2<String,String>,Double> finalIDFcount = finalIDF.mapValues(new Function<Tuple2<Double,Double>, Double>() {
public Double call(Tuple2<Double,Double> sum) {
return sum._1 * sum._2;
}
});
Thanks!!
Aucun commentaire:
Enregistrer un commentaire