The following example programs showcase different applications of Stratosphere from graph processing to data mining problems and relational queries.
The examples consist of a task description and an in-depth discussion of the solving program. Sample data for the jobs is either provided by data set generators or links to external data sets or data set generators. The source code of all example jobs can be found in the pact-example module (https://github.com/stratosphere/stratosphere/blob/master/pact/pact-examples/src/main/java/eu/stratosphere/pact/example).
Please note: The goal of the programs is to assist you in understanding the programming model. Hence, the code often written in a more understandable way rather than in the most efficient way. If you want to learn how to write efficient and fast PACT programs, have a look at advanced PACT programming.
Overview:
Word Count
Counting words is the classic example to introduce parallelized data processing with MapReduce. It demonstrates how the frequencies of words in a document collection are computed.
The source code of this example can be found here.
The WordCount algorithm works as follows:
A detailed description of the implementation of the WordCount algorithm for Hadoop MapReduce can be found at Hadoop's Map/Reduce Tutorial.
The example PACT program implements the above described algorithm.
NULL
.'
', '\t
', …) into
single words. Each word is emitted in an independent key-value pair,
where the key is the word itself and the value is an Integer “1”.@Combinable
to use the combine
method. The combine method computes partial sums over the “1”-values
emitted by the Map contract, the reduce method sums these partial
sums and obtains the final count for each word.Three arguments must be provided to the getPlan()
method of the
example job:
int noSubStasks
: Degree of parallelism of all tasks.String inputPath
: Path to the input data.String outputPath
: Destination path for the result.Any plain text file can be used as input for the the WordCount example. If you do not have a text file at hand, you can download some literature from the Gutenberg Project. For example
wget -O ~/hamlet.txt http://www.gutenberg.org/cache/epub/1787/pg1787.txt
will download Hamlet and put the text into a file called hamlet.txt
in your home directory.
TPCH - Query 3
The source code of this example can be found here.
The TPC-H benchmark is a decision support benchmark on relational data (see http://www.tpc.org/tpch/). The example below shows a PACT program realizing a modified version of Query 3 from TPC-H including one join, some filtering and an aggregation. The original query contains another join with the Customer relation, which we omitted to reduce the size of the example. The picture below shows the schema of the data to be analyzed and the corresponding SQL query.
The example PACT program implements the SQL Query given above in the Java class eu.stratosphere.pact.example.relational.TPCHQuery3 in the pact-examples module.
For this example job, four arguments must be provided to the getPlan()
method. Namely:
int noSubStasks
: Degree of parallelism for all tasks.String orders
: Path to the Orders relation.String lineitem
: Path to the Lineitem relation.String outputPath
: Destination path for the result output.See !ExecutePactProgram or details on how to specify paths for Nephele.
The TPC-H benchmark suite provides a data generator tool (DBGEN) for generating sample input data (see http://www.tpc.org/tpch/). To use it together with PACT, take the following steps:
# PACT program was tested with DB2 data format
DATABASE = DB2
MACHINE = LINUX
WORKLOAD = TPCH
# according to your compiler, mostly gcc
CC = gcc
./dbgen -T o -s 1
The Weblog Analysis example shows how analyzing relational data can be done with the help of the PACT Programming Model.
In this case a scenario is considered where the log files for different webpages are analyzed. These log files contain information about the page ranks and the page visits of the different webpages. The picture below gives the schema of the the data to be analyzed and the corresponding SQL query. The data is split into three relations:
The analysis task computes the ranking information of all documents that contain a certain set of keywords, have at least a certain rank, and have not been visited in a certain year.
The weblog analysis example PACT program is implemented in the following Java class: eu.stratosphere.pact.example.relational.WebLogAnalysis.java in the pact-examples module.
Five arguments must be provided to the getPlan()
method of the example job:
int noSubStasks
: Degree of parallelism for all tasks.String docs
: Path to the docs relation.String ranks
: Path to the ranks relation.String visits
: Path to the visits relation.String outputPath
: Destination path for the result.There are two generators which can provide data for the web-log analysis PACT example.
Both generators produce identically structured test data.
We provide a data set generator to generate the docs, ranks, and visits relations. The generator is implemented as Java class eu.stratosphere.pact.example.relational.generator.WebLogGenerator.java in the pact-examples module.
The parameters of the main method of the generator are:
The data generated by our stand-alone generator follows only the schema of the distributed generator. Attribute values, distributions, and correlations are not the same.
Please consult the in-line JavaDocs for further information on the generator.
For generating larger data sets in a distributed environment, you can use a generator provided by Brown University (see Section Analysis Benchmarks Data Sets).
K-Means Iteration
The source code of this example can be found here.
The k-means cluster algorithm is a well-known algorithm to group data points in to clusters of similar points. In contrast to hierarchical cluster algorithms, the number of cluster centers has to be provided to the algorithm before it is started. K-means is an iterative algorithm, which means that certain steps are repeatably performed until a termination criterion is fulfilled.
The algorithm operates in five steps (see figure below):
A detailed discussion of the K-Means clustering algorithm can be found here: http://en.wikipedia.org/wiki/K-means_clustering
The example PACT program implements one iteration step (steps 2,3, and 4) of the k-means algorithm. The implementation resides in the following Java class: eu.stratosphere.pact.example.kmeans.KMeansSingleStep.java in the pact-examples module. All required classes (data types, data formats, PACT stubs, etc.) are contained in this class as static inline classes.
The PACT program that implements the k-means iteration is shown in the figure below. The implementation follows the same steps as presented above. We discuss the program top-down in “flow direction” of the data.
Since it is assumed, that IDs are unique in both files, both data
sources are annotated with UniqueKey
OutputContracts.
A Cross PACT is used to compute the distances of all data points to
all cluster centers. The user code (highlighted as leaf green box)
implements the distance computation of a single data point to a
single cluster center. The Cross PACT takes care of enumerating all
combinations of data points and cluster centers and calls the user
code with each combination. The user code emits key-value pairs
where the ID of the data point is the key (hence the SameKeyLeft
OutputContract is attached). The value consists of a record that
contains the coordinates of the data point, the ID of the cluster
center, and the computed distance.
We identify the cluster center that is closest to each data point with a Reduce PACT. The output key-value pairs of the Cross PACT have the ID of the data points as key. Hence, all records (with distance and cluster center ID) that belong to a data point are grouped together. These records are handed to the user code where the record with the minimal distance is identified. This is a minimum aggregation and offers the potential use of a Combiner which performs partial aggregations. The key-value pairs that are generated by the user code have the ID of the closest cluster center as key and the coordinates of the data point as value.
The computation of the new cluster center positions is done with another Reduce PACT. Since its input has cluster center IDs as keys, all pairs that belong to the same center are grouped and handed to the user code. Here the average of all data point coordinates is computed. Again this average computation can be improved by providing a partially aggregating Combiner that computes a count and a coordinate sum. The user code emits for each cluster center a single record that contains its ID and its new position.
Finally, the DataSink (new ClusterCenters) writes the results back into the HDFS.
As already mentioned, the PACT program only covers steps 2, 3, and 4. The evaluation of the termination criterion and the iterative call of the PACT program can be done by an external control program.
Four arguments must be provided to the getPlan()
method of the example
job:
int noSubStasks
: Degree of parallelism of all tasks.String dataPoints
: Path to the input data points.String clusterCenters
: Path to the input cluster centers.String outputPath
: Destination path for the result.We provide a data set generator to generate data points and cluster centers input files. The generator is implemented as Java class: eu.stratosphere.pact.example.kmeans.KMeansSampleDataGenerator.java in the pact-examples module.
The parameters of the main method of the generator are:
Please consult the in-line JavaDocs for further information on the generator.
Pairwise Shortest Paths
Known as Floyd-Warshall Algorithm, the following graph analysis algorithm finds the shortest path between all pairs of vertices in a weighted graph.
A detailed description of the Floyd-Warshall algorithms can be found on wikipedia.
Consider a directed graph G(V,E), where V is a set of vertices and E is a set of edges. The algorithm takes the adjacency matrix D of G and compares all possible path combinations between every two vertices in the graph G.
(Pseudocode)
for m := 1 to n
for i := 1 to n
for j := 1 to n
D[[i]][[j]] = min ( D[[i]][[j]], D[[i]][[m]]+D[[m]][[j]] );
In order to parallelize the shortest path algorithm, the following steps will be iteratively performed:
Assuming that I(k) is the set of the shortest paths in the k-th iteration.
Generate two key/value sets from the I(k) set:
Perform an equi-join on the two sets of pairs:
Union this J joined set with the intermediate result of the previous iteration I(k):
For all pairwise distances from U set, only the shortest will remain in the I(k+1) set:
The example PACT program implements one iteration step of the all pairs
shortest path algorithm.
The implementation resides in the following Java class:
eu.stratosphere.pact.example.graph.PairwiseSP.java
in the pact-examples module.
Four arguments must be provided to the getPlan()
method of the example
job:
int noSubStasks
: Degree of parallelism of all tasks.String inputPaths
: Path to the input paths.String outputPaths
: Destination path for the result paths.boolean RDFInputFlag
: Input format flag. If set to true, RDF input
must be provided. Otherwise, the custom path format is required.We provide a small RDF test data set which is a subset of a Billion-Triple-Challenge data set RDFDataSet.tar.gz (77KB, 1.8MB uncompressed). If you want to run the example with (a lot) more test data, you can download a larger subset (or the whole) Billion-Triple-Challenge data set from http://km.aifb.kit.edu/projects/btc-2009/.
Triangle Enumeration
The triangle enumeration PACT example program works on undirected graphs. It identifies all triples of nodes, which are pair-wise connected with each other, i.e., their edges form a triangle. This is a common preprocessing step for methods that identify highly connected subgraphs or cliques within larger graphs. Such methods are often used for analysis of social networks.
A MapReduce variant of this task was published by J. Cohen in “Graph Twiddling in a MapReduce World”, Computing in Science and Engineering, 2009.
The goal of the triangle enumeration algorithm is to identify all triples of nodes, which are pair-wise connected with each other.
The figure above shows how the algorithm works to achieve that:
The triangle enumeration example PACT program is implemented in the following Java class: eu.stratosphere.pact.example.triangles.EnumTrianglesRdfFoaf.java in the pact-examples module.
<http://xmlns.com/foaf/0.1/knows>
predicates as data input
format. RDF triples with other predicates are simply filtered out.
The triples must be separated by the line-break character ('\n
').
A RDF predicate is interpreted as edge between its subject and
object. The edge is forwarded as key with the lexicographically
smaller node being the first part of the edge and the greater node
the second. The value is set to NULL
because all relevant
information is contained in the key.Three arguments must be provided to the getPlan()
method of the
example job:
int noSubStasks
: Degree of parallelism of all tasks.String inputRDFTriples
: Path to the input RDF triples.String outputTriangles
: Destination path for the output of result
triangles.We provide a small RDF test data set which is a subset of a Billion-Triple-Challenge data set RDFDataSet.tar.gz (77KB, 1.8MB uncompressed). If you want to run the example with (a lot) more test data, you can download a larger subset (or the whole) Billion-Triple-Challenge data set from http://km.aifb.kit.edu/projects/btc-2009/.