This small example shows how to set up a simple Celos installation and run two MapReduce wordcount workflows. It is intended to show you the ropes of working with Celos, and not as an example of creating a production-ready Celos installation or workflow. For that, please check out the rest of the Celos documentation.
-
src/
-
main/
-
celos/
-
workflow.js: The main Celos workflow file, that defines two workflows,
wordcount-nyc
andwordcount-lax
. -
defaults.js: Settings that can be imported by this or other workflows.
-
-
java/com/example/
-
WordCount.java: The Java MapReduce main we’re going to use.
-
-
oozie/
-
workflow.xml: The Oozie workflow XML file.
-
-
-
test/
-
resources/
-
input/: Some hourly-bucketed input files we’re going to process.
-
-
-
-
build.gradle: Simple build file building a JAR of the WordCount class.
In the Celos repository root directory do the following to build Celos:
scripts/build.sh
Then switch to the samples/quickstart
directory:
cd samples/quickstart
In this example, we’re going to do MapReduce wordcounts of hourly buckets of text files coming from two different data centers, nyc and lax, that are written into HDFS (say by Flume or Kafka), with this directory layout:
input/ nyc/ 2015-09-01/ 0000/ _READY file1.txt file2.txt 0100/ _READY file1.txt file2.txt file3.txt ... ... lax/ 2015-09-01/ 0000/ _READY file1.txt 0100/ _READY file1.txt file2.txt file3.txt file4.txt ... ...
You can look at the sample files here: src/test/resources/input.
A _READY
file is placed into each hourly bucket after all files in
the bucket have been completely written, so we will tell our Celos
workflow to wait for that file. (Of course, since this is an example,
we’ve manually placed the _READY
files there.)
We will produce corresponding wordcount outputs by MapReducing the files in each hourly bucket, and placing the outputs into an output directory:
output/ nyc/ 2015-09-01/ 0000/ _SUCCESS part-00000 0100/ _SUCCESS part-00000 part-00001 ... ... lax/ 2015-09-01/ 0000/ _SUCCESS part-00000 part-00001 0100/ _SUCCESS part-00000 part-00001 part-00002 ... ...
(The part-XXXXX files are created by MapReduce, as are the _SUCCESS files once processing is complete.)
We will define two workflows, wordcount-nyc
and wordcount-lax
, for
processing the files in the input/nyc
and input/lax
directories,
respectively.
The first thing we’re going to do is copy the input files into HDFS.
We’ll put them into a root directory, under your HDFS home directory,
at celos/quickstart
:
export USER=manuel # Change to your Hadoop username export HDFS_ROOT=/user/$USER/celos/quickstart hadoop fs -mkdir -p $HDFS_ROOT hadoop fs -put src/test/resources/input $HDFS_ROOT/input
The file src/main/java/com/example/WordCount.java is a simple Java class with a main method that takes an input directory and an output directory as arguments. It reads all text files in the input directory, and writes tab-separated files containing word counts into the output directory.
This builds the class and puts it into build/libs/wordcount-1.0.jar
:
./gradlew jar
Celos uses Oozie to do the actual execution of jobs, Celos only schedules them.
Oozie requires a small XML file, src/main/oozie/workflow.xml
that tells it what Java class to run. Note that the file refers to
two variables, ${inputPath}
and ${outputPath}
. We’re going to set
them from our Celos workflows.
The workflow.xml
and the wordcount-1.0.jar
must be stored together in
HDFS (with the JAR in a lib
subdirectory), from where Oozie will read
and execute them.
We’re going to use $HDFS_ROOT/wordcount
as the Oozie workflow
directory containing these files:
hadoop fs -mkdir -p $HDFS_ROOT/wordcount/lib hadoop fs -put -f src/main/oozie/workflow.xml $HDFS_ROOT/wordcount hadoop fs -put -f build/libs/wordcount-1.0.jar $HDFS_ROOT/wordcount/lib
Now we have our inputs at $HDFS_ROOT/input
, and our Oozie workflow
directory at $HDFS_ROOT/wordcount
, so we can turn to setting up Celos.
We’ll create a samples/quickstart/celos.d
directory that holds all
the directories required by Celos to run:
-
A workflows directory containing the JavaScript workflow files.
-
A defaults directory containing the JavaScript defaults files.
-
A logs directory containing the Celos log outputs.
-
A db directory containing Celos' state database.
mkdir celos.d mkdir celos.d/workflows mkdir celos.d/defaults mkdir celos.d/logs mkdir celos.d/db
On each scheduler step, Celos evaluates the JavaScript files in the workflows directory. These files define the workflows that Celos runs.
The defaults directory contains JavaScript files that may contain variables and utility functions that can be imported by workflow files.
In the logs directory, you’ll find the celos.log
file containing
informative output by Celos, as well as more celos-YYYY-MM-DD.log
files for older outputs.
The db directory contains small JSON files that Celos uses to keep track of the execution state of each periodical invocation of a workflow.
Celos must be told about some settings, such as your Hadoop name node, job tracker, and Oozie API URL.
Edit src/main/celos/defaults.js and update the settings at the top for your Hadoop and Oozie installation.
Now we’ll copy the workflow.js
and defaults.js
from
src/main/celos in the directories Celos will actually use:
cp src/main/celos/workflow.js celos.d/workflows/wordcount.js cp src/main/celos/defaults.js celos.d/defaults/wordcount.js
Note that this is a common pattern in Celos: in src/main/celos
we
use canonical names workflow.js
and defaults.js
for these files,
but in the Celos workflows and defaults directories we rename them to
the workflow (or project) name, wordcount
.
Note that we need to put /etc/hadoop/conf
on the classpath, so Celos
has access to the core-site.xml
and hdfs-site.xml
Hadoop configuration
files.
Also note that we’re running Celos with --autoSchedule 5
, which
means that the scheduler will run automatically every 5 seconds. In
production, we usually don’t use --autoSchedule
, and instead call
the scheduler from cron
every minute.
export CELOS_PORT=11337 # Adapt if needed export CLASSPATH=../../celos-server/build/libs/celos-server.jar:/etc/hadoop/conf java -cp $CLASSPATH com.collective.celos.server.Main --port $CELOS_PORT --workflows celos.d/workflows --defaults celos.d/defaults --logs celos.d/logs --db celos.d/db --autoSchedule 5 > /dev/null 2>&1 &
(Note that we’re piping stdout and stderr to /dev/null
in order to
keep the console clean. If you encounter an error in one of the
following steps, you might want to run this command without the
redirection.)
Do the following:
export CELOS=http://localhost:$CELOS_PORT curl "$CELOS/workflow-list"
This should print:
{ "ids" : [ "wordcount-lax", "wordcount-nyc" ] }
By default, Celos will only look at the slots within a 7 day sliding window before the current time.
To have Celos care about the input data do the following:
curl -X POST "$CELOS/rerun?id=wordcount-lax&time=2015-09-01T00:00Z" curl -X POST "$CELOS/rerun?id=wordcount-lax&time=2015-09-01T01:00Z" curl -X POST "$CELOS/rerun?id=wordcount-lax&time=2015-09-01T02:00Z" curl -X POST "$CELOS/rerun?id=wordcount-nyc&time=2015-09-01T00:00Z" curl -X POST "$CELOS/rerun?id=wordcount-nyc&time=2015-09-01T01:00Z" curl -X POST "$CELOS/rerun?id=wordcount-nyc&time=2015-09-01T02:00Z"
export HUE=http://cldmgr001.ewr004.collective-media.net:8888/oozie # Point to your Oozie UI java -jar ../../celos-ui/build/libs/celos-ui.jar --port 11338 --celos $CELOS --hue $HUE
Now go to this URL in your browser:
http://localhost:11338/ui?time=2015-09-02T00:00Z
You should see two workflows each of which has three ready or running slots. You can click on a running slot to see its Oozie information.
After a while, when all slots are green, you can look at the results in HDFS:
hadoop fs -cat $HDFS_ROOT/output/lax/2015-09-01/0000/part-00000 hadoop fs -cat $HDFS_ROOT/output/nyc/2015-09-01/0000/part-00000
So far, we’ve manually copied the JavaScript files into the workflows
and defaults directories, and manually copied the workflow.xml
and JAR
to HDFS. But Celos comes with a tool to automate this, called Celos CI.
To run Celos CI, you create a deployment directory, containing the JavaScript files and artefacts for HDFS. The script ci/deploy.sh does that.
Celos CI requires a target file that describes the HDFS and Celos installation it should deploy to. By changing the target file, Celos CI can deploy to a different Hadoop cluster and/or different Celos installation.
Edit ci/target.json and make defaults.dir.uri
and
workflows.dir.uri
point to the defaults and workflows dir in your
celos.d.
In production we set an environment variable that points to the target
file in our CI server. The TARGET_FILE
variable here simulates that:
export TARGET_FILE=file://`pwd`/ci/target.json ci/deploy.sh $HDFS_ROOT
This command basically does the same as we did manually before: copy the workflow and defaults files into the workflows and defaults directories, respectively, and upload the Oozie workflow directory to HDFS.
If you’re lucky, you have now installed Celos, run two workflows against Hadoop, and know how to continuously deliver workflows with Celos CI.
If it didn’t work, we’d love to help you make it work.