The latest source code is available in our Git repository. Unfortunately, our SSL certificate does not have a valid signature chain at the moment. You have to disable GIT's check for that. To check out the code execute:
git clone git://github.com/stratosphere/stratosphere.git
cd stratosphere
-DskipTests
due to a dependency
to test code.mvn clean package
The build resides in
./stratosphere-dist/target/stratosphere-dist-0.2-stratosphere-bin/stratosphere-0.2
./stratosphere-dist/target/stratosphere-dist-0.2-stratosphere-bin/stratosphere-0.2.zip
./stratosphere-dist/target/stratosphere-dist-0.2-stratosphere-bin/stratosphere-0.2.tar.gz
The source code of the Stratosphere project is structured in Maven modules. The project consists of four main modules:
In the following we describe the Maven modules of Nephele and PACT in detail. Both consist again of multiple Maven sub-modules.
Nephele is divided into several Maven modules:
nephele-common contains all classes to develop and submit a Nephele program. That includes
nephele-server contains basic components of the execution engine. This includes
nephele-hdfs is the Nephele binding to the HDFS file system.
nephele-clustermanager is the default instance manager Nephele uses in cluster mode. It is responsible for
nephele-ec2cloudmanager is the default instance manager Nephele uses in cloud mode. It is responsible for
nephele-queuescheduler contains a simple FIFO job scheduler. It's the default job scheduler for Nephele in cluster and cloud mode.
nephele-compression-* provides compression capabilities for Nephele's network and file channels. This includes
nephele-examples contains example Nephele programs. These examples illustrate
nephele-management contains management API that goes beyond the capabilities of nephele-common. This includes an
nephele-profiling is an optional extension for profiling the execution of a Nephele job. This includes
Conversion of the individual profiling data to summarized profiling data
nephele-visualization is a graphical user interface. It allows you to
Similar to Nephele, the PACT project also consists of multiple Maven modules:
pact-common contains all classes required to write a PACT program. This includes:
pact-runtime contains the source code that is executed by the Nephele engine. This includes:
pact-compiler contains the sources of the PACT optimizer, Nephele job graph generator, and frontend. This includes:
pact-clients contains clients and interfaces to execute PACT jobs, including:
pact-examples contains example PACT programs.
pact-tests contains End-to-end tests for the Nephele/PACT system. This includes:
eu.stratosphere.pact.test.contracts
).eu.stratosphere.pact.test.pactPrograms
).
Developing Stratosphere with the Eclipse IDE becomes more comfortable if a set of plugins is installed. The following plugins are helpful:
Please configure your Eclipse to ensure your code is compliant to the
projects code style.
The Eclipse code style settings of the Stratosphere project are
collected in the file
stratosphere_codestyle.pref
that can be imported with File→Import→General/Preferences.
The Eclipse source code import is done in two stages:
Clone the latest revision from our public github repository:
git clone https://github.com/stratosphere/stratosphere.git
The source code will be placed in a directory called stratosphere
. See
here
for information on the structure of the source code.
Import the Stratosphere source code using Maven's Import tool:
Next, you need to connect imported sources with the EGit plugin:
Now, your finished with the source code import.
You can use JGIT to pull from and push to your repository.
Alternatively, you can issue pull and push commands using Git's default command line client. Sometimes, that irritates Eclipse so that you must refresh and manually clean your Eclipse projects. This is done as follow:
Running and debugging the Stratosphere system in local mode is quite easy. Follow these steps to run / debug Stratosphere directly from Eclipse:
nephele-server
as the project.eu.stratoshere.nephele.jobmanager
.pact-tests
. That step is important, because the
selected project determines how the class path will be constructed
for the launch configuration, and we need to make sure that the pact
classes are in the class path as well. Directly choosing
pact-tests
as the project in the earlier step prevents the main
class search from finding the JobManager class (seems to be an
Eclipse Bug, may possibly be fixed in current/later Eclipse
versions).-configDir ”<path/to/config/dir/>” -executionMode “local”
. The
configuration directory must include a valid configuration. The
default configuration is good for local mode operation, so you can
use the configuration in the
'stratosphere-dist/src/main/stratosphere-bin/conf', which resides in
the stratosphere-dist project.-Djava.net.preferIPv4Stack=true
. You
may also configure your heap size here, by adding for example
-Xmx768m -Xms768m
to dedicate 768 Megabytes of memory to the
system.When the JobManager is up and running, you can issue any Nephele job against the system, either as original Nephele job or resulting from compilation of a PACT program. The run / debug configuration needs only to be configured once.
Modern software engineering methodologies are based on iterative and incremental development. In test-driven development (TDD) the developer first writes an automated test case (unit-test) that defines the desired contract or specification of a function, then encodes the actual function code to pass that test. Inspired by the context of TDD, we designed and implemented the means for effective unit-testing with respect to:
A user-defined Test-Case extends the provided TestBase class which in turn is based on JUnit. The TestBase class acts as a “facade” and mainly reads the configuration, initializes a ClusterProdiver instance and runs the test case.
The central component ClusterProvider provides encapsulated access to two crucial components of an execution environment: the filesystem (local or HDFS) and the job execution engine (Nephele). Each component can be configured to run in different execution modes. The following execution modes are supported:
Filesystem
Nephele
To ease the Configuration, we separated the settings into
Configurations are stored as *.properties files and thus can be easily overwritten (e.g. by maven depending on the current build profile) without modifying the underlying source-code. Note that execution modes cannot be mixed arbitrarily, e.g., running the filesystem in local mode and Nephele in a distributed setup will fail.
The property files of the test configuration are located at:
Exemplary, the following configuration specifies the setting for the set of unit-tests defined in package 'eu.stratosphere.pact.test.contracts'.
cluster config - “ClusterConfigs/localConfig1TM.prop”
ClusterProvider#clusterId=local1TM
LocalClusterProvider#numTaskTrackers=1
ClusterProvider#clusterProviderType=LocalClusterProvider
FilesystemProvider#filesystemType=local_fs
test config - “TestConfigs/eu.stratosphere.pact.test.contracts.prop”
CoGroupTest=local1TM,local4TM
CrossTest=local1TM
MapTest=local1TM,local4TM
MatchTest=local1TM
ReduceTest=local1TM
The abstract base class “TestBase” consolidates the previously described functionality and provides means to externalize test-case specific configurations.
public abstract class TestBase extends TestCase
{
protected abstract void preSubmit();
protected abstract JobGraph getJobGraph();
protected abstract void postSubmit();
}
Essentially, the “TestBase” class performs the following four steps:
Hence, the user-defined test case has to override the stub methods “preSubmit”, “getJobGraph”, and “postSubmit”.
If the same test should be executed with different parameters, additional steps are required:
In detail, the test mechanism performs the following steps:
The function 'TestBase.toParameterList(…)' performs:
JUnit instantiates the test-class for each config combination
The constructor of the test-class passed the current configuration to TestBase
The TestBase class initializes the configured ClusterProvider (FileSystem / Nephele)
The TestBase performs the following user functions defined in the test-class
When writing your first test case you will pursue one of two paths based on your requirements. If it is sufficient to run the test-case once, follow the steps described in “single configuration”. If you need to run the test-case multiple times with varying parameters, follow the multiple configurations description.
Single configuration:
Multiple configurations:
The following WordCount example reads a string and counts how often words occur. The map task takes a line as input, breaks it into words and emits each word as a key-value pair (word, 1). Consecutively, a reduce task aggregates all pairs with the same key (word), sums up the values for each word and finally yields the “word counts”.
Note, that the WordCountTest class is annotated with @RunWith(Parameterized.class) which tells JUnit to instantiate the test-case for each user-provided configuration. Therefore, the user has to define a method which returns a set of configurations - in our case this method is called “getConfigurations()” and uniquely identified by the @Parameters annotation. The method returns 5 configurations where each of them overrides the “WordCountTest#!NoSubtasks” parameter with a specific value.
Further, the test configuration “WordCountTest=local1TM” assigns the “local1TM” cluster profile to our WordCountTest class. Thus, JUnit will instantiate the WordCountTest five times. Remember that the Cartesian product of associated cluster profiles and the configurations returned by “getConfigurations()” is formed; the test configuration “WordCountTest=local1TM,local2TM,local3TM” would result in 15 WordCountTest instances.
Java source code:
package eu.stratosphere.pact.test;
...
@RunWith(Parameterized.class)
public class WordCountTest extends TestBase
{
String TEXT = "one\n" + "two two\n" + "three three three\n";
String COUNTS = "one 1\n" + "two 2\n" + "three 3\n";
String textPath, resultPath;
public WordCountTest(Configuration config)
{
super(config);
}
@Override
protected void preSubmit() throws Exception
{
textPath = getFilesystemProvider().getTempDirPath() + "/text";
resultPath = getFilesystemProvider().getTempDirPath() + "/result";
getFilesystemProvider().createDir(textPath);
String[] splits = splitInputString(TEXT, '\n', 4);
for (int i = 0; i < splits.length; i++)
{
getFilesystemProvider().createFile(textPath + "/part_" + i + ".txt", split);
LOG.debug("Text Part " + (i+1) + ":\n>" + split + "<");
}
}
@Override
protected JobGraph getJobGraph() throws Exception
{
Plan plan = new WordCount().getPlan(
config.getString("WordCountTest#NoSubtasks","1"),
getFilesystemProvider().getURIPrefix() + textPath,
getFilesystemProvider().getURIPrefix() + resultPath);
OptimizedPlan op = new PactCompiler().compile(plan);
return new JobGraphGenerator().compileJobGraph(op);
}
@Override
protected void postSubmit() throws Exception
{
compareResultsByLinesInMemory(COUNTS, resultPath);
}
@Parameters
public static Collection<Object[]> getConfigurations()
{
LinkedList<Configuration> configs = new LinkedList<Configuration>();
for(int subtasks = 1; subtasks < 5; subtasks++)
{
Configuration config = new Configuration();
config.setInteger("WordCountTest#NoSubtasks", subtasks);
configs.add(config);
}
return toParameterList(configs);
}
}
Cluster configuration:
“ClusterConfigs/localConfig1TM.prop”
ClusterProvider#clusterId=local1TM
LocalClusterProvider#numTaskTrackers=1
ClusterProvider#clusterProviderType=LocalClusterProvider
FilesystemProvider#filesystemType=local_fs
Test configuration:
“TestConfigs/eu.stratosphere.pact.test.prop”
WordCountTest=local1TM
Why do I get a “Insufficient java heap space …“
Increase the JVM heap-size - e.g. set option ”-Xmx512m”.
Why do I get a “IPv4 stack required …“
Disable IPv6 stack - e.g. set option ”-Djava.net.preferIPv4Stack=true”.
Unit tests have to meet the following criteria:
Unit tests should be executed continuously during the coding, e.g. before a commit. All unit tests should be run. Therefore it is necessary to develop unit tests as limited as possible (concerning the code under test) to ensure, that the unit test phase can be executed fast (milliseconds!).
Each reported (and accepted) bug has to be transformed into a unit test, that tests the faulty behaviour. In the comment of the test class should be a reference, to which bug this test is related to.
The guidelines cover code conventions, documentation requirements, and utilities for easing the development of Stratosphere in Java.
We strongly recommend using Eclipse and primarily specify the guidelines through eclipse specific configurations and settings. If Eclipse is not the IDE of your choice, you might have to infer the coding guidelines by yourself. Please share the inferred configuration of your IDE with us.
We fully use the standard code conventions. Following these conventions is straight-forward with Eclipse and our predefined settings for formatter, warnings, and templates.
Be aware of the name conventions:
class ClassNameInCamelCase implements InterfaceAlsoInCamelCase {
private final static int CONSTANT_IN_ANSI_STYLE = 1;
private int fieldStartsWithLowerCase;
public void methodStartsWithLowerCase(Object parametersAsWell) {
int soDoLocalVariables;
}
}
All package names of the stratosphere project start with
eu.stratosphere.
and follow the java naming
convention
for packages.
The following section defines when which log level should be used.
Debug: This is the most verbose logging level (maximum volume setting). Here everything may be logged, but note that you may respect privacy issues. Don't log sensitive information (this of course applies to all log levels). This log level is not intended to be switched on in a productive system.
Trace: This is a special debug level. With this level turned on a programmer may trace a specific error in a test or productive system. This level is not as verbose as debug, but gives enough information for a programmer, to identify the source of a problem. For example entering/exiting log messages for methods can be logged here.
Info: The information level is typically used to output information that is useful to the running and management of your system. This level should be the normal log level of a productive system. Info messages should be 1-time or very occasional messages. For example the start and stop of a specific component can be logged here (e.g. Jobmanager, Taskmanager etc.)
Warn: Warning is often used for handled 'exceptions' or other important log events. Warnings are exceptions from which the system can recover and where the part of the system, which produced the exception, can nevertheless give a reasonable result.
Example: Suppose you are iterating over a list of integer objects, which a functions wants to sum up. One entry in this list is a null pointer. While summing up, the function sooner or later reaches the null pointer and produces a null pointer exception. This could be logged as a warning and “interpreted” as summing up a 0.
Error: Errors are exceptions, from which the system can recover, but which prevents the system from fulfilling a service/task.
Fatal: Fatal is reserved for special exceptions/conditions where it is imperative that you can quickly pick out these events. Exceptions are fatal, if the system can not recover fully from this exception (e.g. a component is down because of this exception). This level should be used rarely.
We have defined a code style for the Stratosphere project to which all
the source code files should comply to.
See the Eclipse
Import
page for style configuration files and Eclipse configuration
instructions.
Stratosphere is released under Apache License, Version 2.0.
We do not modify the binary builds of released versions.
Bug fixes and last changes are published in the public Git repository.
Please follow the instructions to build the latest version of Stratosphere from the Git repository.
All major changes and bug fixes made to released versions are logged:
This page logs all important updates and bug fixes made to release 0.1 of the Stratosphere system.
Updates and Bug Fixes are only available in the source distribution available in our public Git repository (build instructions).
Of course, they will be included in the binary distributions of future releases.
2011/05/16:: Improved shell scripts for MacOS compatibility. Stratosphere can be started on MacOS now. 2011/05/12:: Fixed non-ASCII serialization bug in PactString. PactString serializes and deserializes Java (UTF-16) strings now.
This page shows the important changes of the Stratosphere system introduced by release 0.1.1. It also logs all important updates and bug fixes made to release 0.1.1.
Updates and Bug Fixes are only available in the source distribution available in our public Git repository (build instructions).
Of course, they will be included in the binary distributions of future releases.
This page shows the important changes of the Stratosphere system introduced by release 0.1.2. It also logs all important updates and bug fixes made to release 0.1.2.
Updates and Bug Fixes are only available in the source distribution available in our public Git repository (build instructions).
Of course, they will be included in the binary distributions of future releases.
Major performance optimizations to runtime engine:
Fault tolerance:
Support for plugins:
Support for application-layer multicast network transmissions:
Data-Model changed to PactRecord, ageneric tuple model (see http://www.stratosphere.eu/wiki/doku.php/wiki:PactRecord)
Support for composite keys for Reduce / Match / CoGroup
Efficient Union of data streams
Generic User-Code annotations replace Output Contracts
Added range partitioning and global sorting for results
Added support for sorted input groups for Reduce and CoGroup (a.k.a. Secondary Sort)
Removed limitation in record length for sort/hash strategies
Various runtime performance improvements
Extended library of input/output formats
Introduction of NormalizedKeys for efficient sorting of custom data types
Support for multiple disks/directories for parallel I/O operations on temporary files (temp storage, sort / hash overflows)
Chaining support for Mappers/Combiners (elimination of intermediate serializing)
Changed Runtime to work on generic data types (currently only with manual plan composition).