In this post, I am sharing a code snippet to demonstrate multiplication table in spark.
In addition, it also demonstrates the following:
Assumption :
In addition, it also demonstrates the following:
- How to create a custom Spark RDD
- How to create an Iterator/Generator
- How to work with matrices in spark
Assumption :
- You are using JDK 8.
- You added spark and spark-mllib libraries to the classpath. I assume you use something like Maven and add these dependencies.
import org.apache.spark.Partition; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.TaskContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix; import org.apache.spark.mllib.linalg.distributed.MatrixEntry; import org.apache.spark.rdd.RDD; import scala.collection.AbstractIterator; import scala.collection.Iterator; import scala.collection.mutable.ArrayBuffer; import scala.reflect.ClassManifestFactory$; /** * This class illustrates multiplication table on spark! * @author Thamme Gowda N * */ public class Main { /** * This RDD has numbers in it. * This RDD can be used for illustrating math operations on Spark */ public static class SequenceRDD extends RDD<Long>{ private long start; private long end; private Partition[] partitions; public SequenceRDD(SparkContext _sc, long start, long end) { super(_sc, new ArrayBuffer<> (), ClassManifestFactory$.MODULE$.fromClass(Long.class)); this.start = start; this.end = end; this.partitions = new Partition[]{ () -> 0 //just one part at index 0 }; } @Override public Iterator<Long> compute(Partition split, TaskContext ctx) { //we have only one part, so this is it return new SequenceIterator(start, end); } @Override public Partition[] getPartitions() { return partitions; } } /** * This iterator yields numbers so we can build an RDD on it */ private static class SequenceIterator extends AbstractIterator<Long> implements java.util.Iterator<Long> { private long nextStart; private long end; /** * Number generator for [start, end] * @param start the start * @param end the end, inclusive */ public SequenceIterator(long start, long end) { this.nextStart = start; this.end = end; assert end >= start : "Invalid Args"; } @Override public boolean hasNext(){ return nextStart <= end; } @Override public Long next() { return nextStart++; } } public static void main(String[] args) { long n = 1_000; String outpath = "multiplication-matrix"; long st = System.currentTimeMillis(); SparkConf conf = new SparkConf() .setAppName("Large Matrix") .setMaster("local[2]"); JavaSparkContext ctx = new JavaSparkContext(conf); JavaRDD<Long> rdd = new SequenceRDD(ctx.sc(), 0, n) .toJavaRDD() .cache(); JavaPairRDD<Long, Long> pairs = rdd.cartesian(rdd); JavaRDD<MatrixEntry> entries = pairs.map(tup -> new MatrixEntry(tup._1(), tup._2(), tup._1() * tup._2())); CoordinateMatrix matrix = new CoordinateMatrix(entries.rdd()); matrix.toIndexedRowMatrix() .rows() .saveAsTextFile(outpath); System.out.printf("n=%d\t outpath=%s\nTime taken : %dms\n", n, outpath, System.currentTimeMillis() - st); ctx.stop(); } }
No comments:
Post a Comment