Hadoop Quick Start
Custer Architecture
The CTIT hadoop cluster consists of 48 Hadoop nodes. For access to the cluster there are two gateway systems, so called head nodes, available. These are
- ctithead1.ewi.utwente.nl
- ctithead2.ewi.utwente.nl
For you to get an idea of the computing power of the cluster, it consists of 47 compute nodes (ctiti001 - ctit047) and one dispatcher node (ctit048). These nodes run several services, in which they are called by their function. For debugging, it is sometimes useful to know these names:
compute nodes:
- datanode (hadoop file system),
- yarn-node-manager (yarn),
- regionserver (hbase),
- supervisor (storm),
- kafka-server (kafka)
dispacher node:
- namenode (hadoop file system),
- yarn-compute-manager (yarn),
- hbase-master (hbase),
- nimbus server (storm)
- zookeeper server (zookeeper).
For more info see : YARN
Running the software
An automated test set for the cluster has been created for the cluster. The software for this can be checked out from github:
git clone https://github.com/utwente-db/cluster.git
The scripts are located in the directory test. For ease of use the individual scripts are listed here:
How to create files and directories in HDFS
- test01_hdfs.sh
echo "HDFS" echo "------------------" echo "The hadoop file system is accessed using the 'hdfs dfs' program" echo "The program accepts commands similar to unix directory commands" echo "" echo "For example:" echo "List directory '/' form hdfs" hdfs dfs -ls / echo "" echo "Create directory test under the user's home directory" hdfs dfs -mkidr test echo "" echo "Remove directory test from the user's home directory" hdfs dfs -rm -r test echo "" echo "Write to file testfile to $IN (points to /tmp/test)" hdfs dfs -put testfile $IN echo "" echo "Output file from hdfs" hdfs dfs -cat $IN | head -n 2
Example of running a simple mapreduce script
- test02_mapreduce.sh
echo "Map Reduce" echo "------------------" echo "This section explains creates a minimalistic map reduce job." echo "" echo "First set required variables" export PATH=$JAVA_HOME/bin:$PATH export HADOOP_CLASSPATH=$JAVA_HOME/lib/tools.jar echo "" echo "Compile the accompanying wordcount example." echo "We use the hadoop command instead of an direct call to javac because" echo "the former sets adds the necessary jar files to the classpath." hadoop com.sun.tools.javac.Main WordCount.java echo "Package the example into a jar file" jar cf wc.jar WordCount*.class echo "Set output destination" OUT=/tmp/fileout-$RANDOM echo "Run the map reduce job invoking the main class of WordCount in the wc.jar file" hadoop jar wc.jar WordCount $IN $OUT echo "The output of a map reduce job is a directory with potentially many part-r-xxxx files" echo "Each of these files contains the output of a reducer." hdfs dfs -cat $OUT/part-r-00000 | tail -n 2 echo "Remove the test output directory" hdfs dfs -rm -r $OUT echo "Remove the test input file" hdfs dfs -rm $IN
- WordCount.java
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
- testfile
Computer science is the scientific and practical approach to computation and its applications. It is the systematic study of the feasibility, structure, expression, and mechanization of the methodical procedures (or algorithms) that underlie the acquisition, representation, processing, storage, communication of, and access to information, whether such information is encoded as bits in a computer memory or transcribed in genes and protein structures in a biological cell.[1] An alternate, more succinct definition of computer science is the study of automating algorithmic processes that scale. A computer scientist specializes in the theory of computation and the design of computational systems
Example of simple piglatin script
- test03_piglatin.sh
echo "Piglatin" echo "------------------" echo "This script explains how to run pig latin scripts. " echo "Pig latin scripts are run as follows" pig -x mapreduce wordcount.pig | grep -v INFO echo "The following command should output one line:" hdfs dfs -cat /tmp/pigout/part* | grep sendmail echo "Removing temporary direcory" hdfs dfs -rm -r /tmp/pigout
- wordcount.pig
A = load '/tmp/passwd'; B = foreach A generate flatten(TOKENIZE((chararray)$0)) as word; C = group B by word; D = foreach C generate COUNT(B), group; store D into '/tmp/pigout';
A hbase example
- test04_hbase.sh
echo "HBase" echo "------------------" echo "This script explains the base commands for using hbase." TABLE=test-$RANDOM echo "To create a table" echo "create '$TABLE', 'test'" | hbase shell echo "To list all tables" echo "list" | hbase shell echo "To put a record into the temporary table" echo "put '$TABLE','test','test','value'" | hbase shell echo "To get a column value from the temporary table" echo "get '$TABLE','test','test'" | hbase shell echo "You should see" echo " test: timestamp=1429172811893, value=value " echo "To delete a table you first have to disable it and the to drop it." echo "disable '$TABLE'" | hbase shell echo "drop '$TABLE'" | hbase shell
Monitoring the system
To have a look at the state of the cluster, a number of machine ports are available to get information. These ports are only accessible from within the the ewi.utwente.nl domain. The state of the distributed files system can be found under:
http://ctit048.ewi.utwente.nl:50070/
The state of running, failed and retired jobs using YARN can be found under:
http://ctit048.ewi.utwente.nl:8088/cluster
If you want to obtain the logs from the worker nodes for a particular job, you can also type on the head node:
yarn logs -applicationId <APPID>
where you have to replace <APPID> with the application id of your job.
The state of storm topologies can be accessed under: