Usage
There are a couple of ways to track the progress of a Stratosphere program:
tail -f ../log/nephele-<user>-jobmanager-<host>.log
.
The Stratosphere Development with Eclipse guide explains how to import the source code into Eclipse and run / debug Nephele in local mode directly from Eclipse.
Errors
Please have a look in the web-frontends log file (./log/nephele-<user>-pact-web-<host>.log). All exceptions are logged there.
The PACT execution framework relies on correct implementation of methods java.lang.Object#hashCode() and java.lang.Object#equals(Object o) methods. These methods are always backed with default implementations, that cause the PACT framework to compute wrong results. Therefore, all keys must override hashCode() and equals(Object o). Unfortunately, IDEs and Java compilers will not force you to override these methods, due to the default implementation of java.lang.Object.
All data type classes must have a public nullary constructor (constructor with no arguments). If it misses, that exception will be thrown. Further more, the classes must not be abstract. If the classes are internal classes, they must be public and static.
Nephele's runtime visualization is a SWT application running. It requires the appropriate native library for SWT (Standard Widget Toolkit), specific to your platform. The one that is packaged with Stratosphere is for 64bit Linux GTK systems. If you have a different system, you need a different SWT library.
To fix the problem, update the maven dependency in /nephele/nephele-visualization/pom.xml to refer to your platform specific library.
You can find a the list of available library versions under http://repo1.maven.org/maven2/org/eclipse/swt/.
Those exceptions typically occur when the running HDFS version is incompatible with the version that is used by Nephele to connect to the HDFS. The fix is simply to recompile Stratosphere and use the desired Hadoop Version. Verfiy that you exception has a stack trace like the following:
Call to <host:port> failed on local exception: java.io.EOFException
at org.apache.hadoop.ipc.Client.wrapException(Client.java:775)
at org.apache.hadoop.ipc.Client.call(Client.java:743)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220)
at $Proxy0.getProtocolVersion(Unknown Source)
at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:359)
at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:106)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:207)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:170)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:82)
at eu.stratosphere.nephele.fs.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:117)
at eu.stratosphere.nephele.fs.FileSystem.get(FileSystem.java:224)
at eu.stratosphere.nephele.fs.Path.getFileSystem(Path.java:296)
...
To solve that problem, follow these steps:
Open the file pom.xml in the stratosphere root directory. It should contain an entry
<dependencyManagement>
<!--
this section defines the module versions that are used if nothing
else is specified.
-->
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.2</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
</dependencies>
</dependencyManagement>
4) Change the version from 0.20.2 to the version that you are using
5) Build the system as described in Building the System
In some error cases it happens that the JobManager or TaskManager cannot
be stopped with the provided stop-scripts (bin/stop-local.sh
,
bin/stop-cluster.sh
, bin/stop-cloud.sh
).
You can kill their processes as follows:
jps
command for that.kill -9 <pid>
, where pid
is the process id
of the affected JobManager or TaskManager process.The memory of a TaskManager is managed and organized by an internal
component called MemoryManager. During start-up, the MemoryManager
allocates a certain amount of main memory and provides it to PACT
subtasks that requested it (e.g., for sorting, hash-tables, user code).
The PACT compiler is responsible for distributing the available memory
between tasks. A MemoryAllocationException
is thrown if a task
requests more memory than available on its instance. This might happen
if more subtasks are scheduled to an instance than the PACT compiler was
assuming.
OutOfMemoryExceptions in Java are kind of tricky. The exception is not necessarily thrown by the component that allocated most of the memory but by the component that tried to allocate more memory than was available even though that requested memory itself was little.
First, you should check whether your program consumed too much memory and try to minimize the memory footprint of your code. Because Stratosphere accumulates data internally in a byte representation in pre-reserved memory, it is mostly the user code that consumes too much memory.
If your user code simply has a large memory requirement, you can
decrease the size of the memory managed by the MemoryManager using the
parameter taskmanager.memory.size
. This will leave more memory to JVM
heap. See the Configuration
Reference
for details.
Check the logging behavior of your jobs. You should not emit log statements on pair or tuple level. Although, this might be helpful to debug jobs in small setups with tiny data sets, it becomes very inefficient and disk space consuming if used for large input data.
Tuning
If you run Nephele in a massively parallel setting (100+ parallel
threads), you should adapt the number of write buffers. As a
rule-of-thumb, the number of write buffers should be
(2 \ numberOfNodes * numberOfTasksPerNode%%^2 )* See Configuration
Reference
for details.
Features
At the current state, Stratosphere's fault tolerance is experimental and not enabled by default. In future releases we will stabilize the feature and enable it by default.
At the moment, both are not supported. If the distributed cache's task was to replicate user data to nodes, check whether the Cross Contract can solve the problem.