## Wednesday, January 26, 2011

### Graph Processing With Apache Pig

So, you're probably sick of seeing this airport data set by now (flight edges) but it's so awesome that I have to re-use it. Let's use Pig to do the same calculation as this post in a much more succinct way. We'll really get a feel for what Pig is better at than Wukong.

## Degree Distribution

Since the point here is to illustrate the difference between the wukong way and the pig way we're not going to introduce anything clever here. Here's the code for calculating the degree by month (both passengers and flights) for every domestic airport since 1990:

---- Caculates the monthly degree distributions for domestic airports from 1990 to 2009.---- Load data (boring part)flight_edges = LOAD '$FLIGHT_EDGES' AS (origin_code:chararray, destin_code:chararray, passengers:int, flights:int, month:int);-- For every (airport,month) pair get passengers, seats, and flights outedges_out = FOREACH flight_edges GENERATE origin_code AS airport, month AS month, passengers AS passengers_out, flights AS flights_out ;-- For every (airport,month) pair get passengers, seats, and flights inedges_in = FOREACH flight_edges GENERATE destin_code AS airport, month AS month, passengers AS passengers_in, flights AS flights_in ;-- group them together and sumgrouped_edges = COGROUP edges_in BY (airport,month), edges_out BY (airport,month);degree_dist = FOREACH grouped_edges { passenger_degree = SUM(edges_in.passengers_in) + SUM(edges_out.passengers_out); flights_degree = SUM(edges_in.flights_in) + SUM(edges_out.flights_out); GENERATE FLATTEN(group) AS (airport, month), passenger_degree AS passenger_degree, flights_degree AS flights_degree ; };STORE degree_dist INTO '$DEG_DIST';

So, here's what's going on:

• FOREACH..GENERATE: this is called a 'projection' in pig. Here we're really just cutting out the fields we don't want and rearranging our records. This is exactly the same as what we do in the wukong script, where we yielded two different types of records for the same input data in the map phase, only a lot more clear.

• COGROUP: here we're simply joining our two data relations together (edges in and edges out) by a common key (airport code,month) and aggregating the values for that key. This is exactly the same as what we do in the 'accumulate' part of the wukong script.

• FOREACH..GENERATE (once more): here we run through our grouped records and sum the flights and passengers. This is exactly the same as the 'finalize' part of the wukong script.

So, basically, we've done in 4 lines (not counting the LOAD and STORE or the prettification) of very clear and concise code what took us ~70 lines of ruby. Win.

Here's the wukong one again for reference:

#!/usr/bin/env rubyrequire 'rubygems'require 'wukong'class EdgeMapper < Wukong::Streamer::RecordStreamer  #  # Yield both ways so we can sum (passengers in + passengers out) and (flights  # in + flights out) individually in the reduce phase.  #  def process origin_code, destin_code, passengers, flights, month    yield [origin_code, month, "OUT", passengers, flights]    yield [destin_code, month, "IN", passengers, flights]  endendclass DegreeCalculator < Wukong::Streamer::AccumulatingReducer  #  # What are we going to use as a key internally?  #  def get_key airport, month, in_or_out, passengers, flights    [airport, month]  end  def start! airport, month, in_or_out, passengers, flights    @out_degree = {:passengers => 0, :flights => 0}    @in_degree  = {:passengers => 0, :flights => 0}  end  def accumulate airport, month, in_or_out, passengers, flights    case in_or_out    when "IN" then      @in_degree[:passengers] += passengers.to_i      @in_degree[:flights]    += flights.to_i    when "OUT" then      @out_degree[:passengers] += passengers.to_i      @out_degree[:flights]    += flights.to_i    end  end  #  # For every airport and month, calculate passenger and flight degrees  #  def finalize    # Passenger degrees (out, in, and total)    passengers_out   = @out_degree[:passengers]    passengers_in    = @in_degree[:passengers]    passengers_total = passengers_in + passengers_out    # Flight degrees (out, in, and total)    flights_out      = @out_degree[:flights]    flights_in       = @in_degree[:flights]    flights_total    = flights_in + flights_out    yield [key, passengers_in, passengers_out, passengers_total, flights_in, flights_out, flights_total]  endend## Need to use 2 fields for partition so every record with the same airport and# month land on the same reducer#Wukong::Script.new(  EdgeMapper,  DegreeCalculator,  :partition_fields  => 2 # use two fields to partition records  ).run

## Plot Data

A very common workflow pattern with Hadoop is to use a tool like pig or wukong to process large scale data and generate some result data set. The last step in this process is (rather obviously) to summarize that data in some way. Here's a quick plot (using R and ggplot2) of the flights degree distribution after I further summarized by year:

That's funny...

Finally, here's the code for the plot:

# include the ggplot2 library for nice plotslibrary(ggplot2);# Read data in and take a subsetdegrees        <- read.table('yearly_degrees.tsv', header=FALSE, sep='\t', colClasses=c('character', 'character', 'numeric', 'numeric'));names(degrees) <- c('airport_code', 'year', 'passenger_degree', 'flights_degree');select_degrees <- subset(degrees, year=='2000' | year=='2001' | year=='2002' | year=='2009' | year=='1990');# Plotting with ggplot2pdf('passenger_degrees.pdf', 12, 6, pointsize=10);ggplot(select_degrees, aes(x=passenger_degree, fill=year)) + geom_density(colour='black', alpha=0.3) + scale_x_log10() + ylab('Probability') + xlab(expression(log[10] ('Passengers in + Passengers out'))) + opts(title='Passenger Degree Distribution')

Hurray.

## Sunday, January 23, 2011

### Bulk Indexing With ElasticSearch and Hadoop

At Infochimps we recently indexed over 2.5 billion documents for a total of 4TB total indexed size. This would not have been possible without ElasticSearch and the Hadoop bulk loader we wrote, wonderdog. I'll go into the technical details in a later post but for now here's how you can get started with ElasticSearch and Hadoop:

## Getting Started with ElasticSearch

The first thing is to actually install elasticsearch:

$: wget http://github.com/downloads/elasticsearch/elasticsearch/elasticsearch-0.14.2.zip$: sudo mv elasticsearch-0.14.2 /usr/local/share/$: sudo ln -s /usr/local/share/elasticsearch-0.14.2 /usr/local/share/elasticsearch Next you'll want to make sure there is an 'elasticsearch' user and that there are suitable data, work, and log directories that 'elasticsearch' owns: $: sudo useradd elasticsearch$: sudo mkdir -p /var/log/elasticsearch /var/run/elasticsearch/{data,work}$: sudo chown -R elasticsearch /var/{log,run}/elasticsearch

Then get wonderdog (you'll have to git clone it for now) and go ahead and copy the example configuration in wonderdog/config:

$: sudo mkdir -p /etc/elasticsearch$: sudo cp config/elasticsearch-example.yml /etc/elasticsearch/elasticsearch.yml$: sudo cp config/logging.yml /etc/elasticsearch/$: sudo cp config/elasticsearch.in.sh /etc/elasticsearch/

Make changes to 'elasticsearch.yml' such that it points to the correct data, work, and log directories. Also, you'll want to change the number of 'recovery_after_nodes' and 'expected_nodes' in elasticsearch.yml to however many nodes (machines) you actually expect to have in your cluster. You'll probably also want to do a quick once-over of elasticsearch.in.sh and make sure the jvm settings, etc are sane for your particular setup. Finally, to startup do:

sudo -u elasticsearch /usr/local/share/elasticsearch/bin/elasticsearch -Des.config=/etc/elasticsearch/elasticsearch.yml

You should now have a happily running (reasonably configured) elasticsearch data node.

## Index Some Data

Prerequisites:

• You have a working hadoop cluster

• Elasticsearch data nodes are installed and running on all your machines and they have discovered each other. See the elasticsearch documentation for details on making that actually work.

• You've installed the following rubygems: 'configliere' and 'json'

### Get Data

As an example lets index this UFO sightings data set from Infochimps here. (You should be familiar with this one by now...) It's mostly raw text and so it's a very reasonable thing to index. Once it's downloaded go ahead and throw it on the HDFS:
$: hadoop fs -mkdir /data/domestic/ufo$: hadoop fs -put chimps_16154-2010-10-20_14-33-35/ufo_awesome.tsv /data/domestic/ufo/

### Index Data

This is the easy part:

$: bin/wonderdog --rm --field_names=sighted_at,reported_at,location,shape,duration,description --id_field=-1 --index_name=ufo_sightings --object_type=ufo_sighting --es_config=/etc/elasticsearch/elasticsearch.yml /data/domestic/aliens/ufo_awesome.tsv /tmp/elasticsearch/aliens/out Flags: '--rm' - Remove output on the hdfs if it exists '--field_names' - A comma separated list of the field names in the tsv, in order '--id_field' - The field to use as the record id, -1 if the record has no inherent id '--index_name' - The index name to bulk load into '--object_type' - The type of objects we're indexing '--es_config' - Points to the elasticsearch config* *The elasticsearch config that the hadoop machines need must be on all the hadoop machines and have a 'hosts' entry listing the ips of all the elasticsearch data nodes (see wonderdog/config/elasticsearch-example.yml). This means we can run the hadoop job on a different cluster than the elasticsearch data nodes are running on. The other two arguments are the input and output paths. The output path in this case only gets written to if one or more index requests fail. This way you can re-run the job on only those records that didn't make it the first time. The indexing should go pretty quickly. Next is to refresh the index so we can actually query our newly indexed data. There's a tool in wonderdog's bin directory for that: $: bin/estool --host=hostname -i refresh_index

### Query Data

Once again, use estool
$: bin/estool --host=hostname -i --index_name=ufo_sightings --query_string="ufo" query Hurray. ## Saturday, January 22, 2011 ### JRuby and Hadoop, Notes From a Non-Java Programmer So I spent a fair deal of time playing with JRuby this weekend. Here's my notes/conclusions so far. Disclaimer: My experience with java overall is limited, so most of this might be obvious to a java ninja but maybe not to a ruby one... Goal: The whole point here is that it sure would be nice to only write a few lines of ruby code into a file called something like 'rb_mapreduce.rb' and run it by saying: ./rb_mapreduce.rb. No compiling, no awkward syntax. Pure ruby, plain and simple. Experiments/Notes: I created a 'WukongPlusMapper' that subclassed 'org.apache.hadoop.mapreduce.Mapper' and implemented the important methods, namely 'map'. Then I setup and launched a job from inside jruby using this jruby mapper ('WukongPlusMapper') as the map class. The job launched and ran just fine. But... Problems and Lessons Learned: • It is possible (in fact extremely easy) to setup and launch a Hadoop job with pure jruby • It is not possible, that I can tell so far, to use an uncompiled jruby class as either the mapper or the reducer for a Hadoop job. It doesn't throw an error (so long as you've subclassed a proper java mapper) but actually just uses the superclass's definition instead. I believe the reason is that each map task must have access to the full class definition for its mapper (only sensible) and has no idea what to do with my transient 'WukongPlusMapper' class. Obviously the same would apply to the reducer • It is possible to compile a jruby class ahead-of-time, stuff it into the job jar, and then launch the job with ordinary means. There are a couple somewhat obvious drawbacks with this method: • You've got to specify 'java_signatures' for each of your methods that are going to be called inside java • Messy logic+code for compiling thyself, stuffing thyself into a jar, shipping thyself with MR job. Might as well just write java at this point. radoop has some logic for doing that pretty well laid out. • It is possible to define and create an object in jruby that subclasses a java class or implements a java interface. Then you can simply overwrite the methods you want to overwrite. It's possible to pass instances of this class to a java runtime that only knows about the superclass and the subclass's methods (at least the ones that have the signatures defined in the superclass) will work just fine. Unfortunately, (and plainly obvious in hindsight) this does NOT work with Hadoop since these instances all show up in java as 'proxy classes' and are only accessible to the launching jvm • On another note there is the option of using the scripting engine which, as far as I can tell, is what both jruby-on-hadoop and radoop are using. Something of concern though is that neither of these two projects seem to have much traction. However, it may be that the scripting engine is the only way to reasonably make this work, at least 2 people vote yes ... So, implementation complexity aside, it looks like all one would have to do is come up with some way of making JRuby's in-memory class definitions available to all of the spawned mappers and reducers. Probably not something I want to delve into at the moment. Script engine it is. ## Friday, January 21, 2011 ### Pig, Bringing Simplicity to Hadoop In strong contrast to the seat-of-your-pants style of Wukong there is another high level language for Hadoop called Pig. See Apache Pig. ## Overview At the top level, here's what Pig gets you: • No java required. That is, use as little (zero) or as much (reams) of java code as you want. • No boilerplate code • Intuitive and easy to understand language (similar to SQL) with clean uniform syntax • Separation of high level algorithm and low level map-reduce jobs • Build your analysis as a set of operations acting on data • Most algorithms are less than 5, human readable, lines of Pig ## Get Pig Go here and download pig (version 0.8) from somewhere close to you. Unpack it and put it wherever you like. Then, type 'pig' at the command line and see what happens. It's very likely that it doesn't pick up your existing Hadoop configuration. To fix that set HADOOP_HOME to point to your hadoop installation and PIG_CLASSPATH to point to your hadoop configuration (probably /etc/hadoop/conf). Here's all that again: $: wget http://mirrors.axint.net/apache//pig/pig-0.8.0/pig-0.8.0.tar.gz$: tar -zxf pig-0.8.0.tar.gz$: sudo mv pig-0.8.0 /usr/local/share/$: sudo ln -s /usr/local/share/pig-0.8.0 /usr/local/share/pig$: sudo ln -s /usr/local/share/pig/bin/pig /usr/local/bin/pig$: hash -r$: export HADOOP_HOME=/usr/lib/hadoop$: export PIG_CLASSPATH=/etc/hadoop/conf$: pig2011-01-21 09:56:32,486 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/jacob/pig_1295625392480.log2011-01-21 09:56:32,879 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://master:80202011-01-21 09:56:33,402 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: master:54311grunt>

See how easy that was. Get it over with now.

## Firsties

People post the most interesting data on Infochimps. Let's get the first billion digits of pi here. Notice that it's arranged in groups of 10 digits with 10 groups per line. We're going to use pig to see how the digits are distributed.

This will allow us to visually (once we make a plot) spot any obvious deviation from a random series of numbers. That is, for a random series of numbers we'd expect each digit (0-9) to appear equally often. If this isn't true then we know we're dealing with a more complicated beast.

### Pre-Process

After catting the data file you'll notice the end of the line has the crufty details attached that makes this data more or less impossible to load into Pig as a table. Pig is terrible at data munging/parsing/cleaning. Thankfully we have wukong. Let's write a dead simple wukong script to fix the last field and create one digit per line:

#!/usr/bin/env rubyrequire 'rubygems'require 'wukong'class PiCleaner < Wukong::Streamer::LineStreamer  def process line    fields               = line.strip.split(' ', 10)    hundred_digit_string = [fields[0..8], fields[9][0..9]].join rescue ""    hundred_digit_string.each_char{|digit| yield digit}  endendWukong::Script.new(PiCleaner, nil).run

Let's go ahead and run that right away (this will take a few minutes depending on your rig, it'll be a billion lines of output...):
$: hdp-mkdir /data/math/pi/$: hdp-put pi-010.txt /data/math/pi/$: ./pi_clean.rb --run /data/math/pi/pi-010.txt /data/math/pi/first_billion_digits.tsvI, [2011-01-22T11:01:57.363704 #12489] INFO -- : Launching hadoop!I, [2011-01-22T11:01:57.363964 #12489] INFO -- : Running/usr/local/share/hadoop/bin/hadoop \ jar /usr/local/share/hadoop/contrib/streaming/hadoop-*streaming*.jar \ -D mapred.reduce.tasks=0 \ -D mapred.job.name='pi_clean.rb---/data/math/pi/pi-010.txt---/data/math/pi/first_billion_digits.tsv' \ -mapper '/usr/bin/ruby1.8 pi_clean.rb --map ' \ -reducer '' \ -input '/data/math/pi/pi-010.txt' \ -output '/data/math/pi/first_billion_digits.tsv' \ -file '/home/jacob/Programming/projects/data_recipes/examples/pi_clean.rb' \ -cmdenv 'RUBYLIB=~/.rubylib'11/01/22 11:01:59 INFO mapred.FileInputFormat: Total input paths to process : 111/01/22 11:01:59 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-jacob/mapred/local]11/01/22 11:01:59 INFO streaming.StreamJob: Running job: job_201012031305_025111/01/22 11:01:59 INFO streaming.StreamJob: To kill this job, run:11/01/22 11:01:59 INFO streaming.StreamJob: /usr/local/share/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=master:54311 -kill job_201012031305_025111/01/22 11:01:59 INFO streaming.StreamJob: Tracking URL: http://master:50030/jobdetails.jsp?jobid=job_201012031305_025111/01/22 11:02:00 INFO streaming.StreamJob: map 0% reduce 0%11/01/22 11:05:11 INFO streaming.StreamJob: map 100% reduce 100%11/01/22 11:05:11 INFO streaming.StreamJob: Job complete: job_201012031305_025111/01/22 11:05:11 INFO streaming.StreamJob: Output: /data/math/pi/first_billion_digits.tsvpackageJobJar: [/home/jacob/Programming/projects/data_recipes/examples/pi_clean.rb, /usr/local/hadoop-datastore/hadoop-jacob/hadoop-unjar4401930660028806042/] [] /tmp/streamjob3153669001520547.jar tmpDir=null Great. Now let's write some Pig: ### Analysis This is what Pig is awesome at. Remember that accumulating reducer in wukong? We're about to do the identical thing in two lines of no-nonsense pig: -- loaddigits = LOAD '$PI_DIGITS' AS (digit:int);groups = GROUP digits BY digit;counts = FOREACH groups GENERATE group AS digit, COUNT(digits) AS num_digits;-- storeSTORE counts INTO '$OUT'; All we're doing here is reading in the data (one digit per line), accumulating all the digits with the same 'digit', and counting them up. Save it into a file called 'pi.pig' and run with the following: $: pig -p PI_DIGITS=/data/math/pi/first_billion_digits.tsv -p OUT=/data/math/pi/digit_counts.tsv pi.pig

I'll skip the output since it's rather verbose. Now we can make a simple plot by hdp-catting our output data into a local file (it's super tiny by now) and plotting it with R:

$: hdp-catd /data/math/pi/digit_counts.tsv > digit_counts.tsv$: R> library(ggplot2)> digit_counts <- read.table('digit_counts.tsv', header=FALSE, sep="\t")> names(digit_counts) <- c('digit', 'count')> p <- ggplot(digit_counts, aes(x=digit)) + geom_histogram(aes(y = ..density.., weight = count, binwidth=1), colour='black', fill='grey', alpha=0.7)> p + scale_y_continuous(limits=c(0,0.5))

Will result in the following:

I'll leave it to you to make your own assumptions.

## Thursday, January 20, 2011

### Graph Processing With Wukong and Hadoop

As a last (for now) tutorial oriented post on Wukong, let's process a network graph.

## Get Data

This airport data (airport edges) from Infochimps is one such network graph with over 35 million edges. It represents the number of flights and passengers transported between two domestic airports in a given month. Go ahead and download it.

## Explore Data

We've got to actually look at the data before we can make any decisions about how to process it and what questions we'd like answered:

$: head data/flights_with_colnames.tsv | wu-lign origin_airport destin_airport passengers flights month MHK AMW 21 1 200810EUG RDM 41 22 199011EUG RDM 88 19 199012EUG RDM 11 4 199010MFR RDM 0 1 199002MFR RDM 11 1 199003MFR RDM 2 4 199001MFR RDM 7 1 199009MFR RDM 7 2 199011 So it's exactly what you'd expect; An adjacency list with (origin node,destination node,weight_1,weight_2,timestamp). There are thousands of data sets with similar characteristics... ## Ask A Question A simple question to ask (and probably the first question you should ask of a graph) is what the degree distribution is. Notice there are two flavors of degree in our graph: 1. Passenger Degree: For a given airport (node in the graph) the number of passengers in + the number of passengers out. Passengers in is called the 'in degree' and passengers out is (naturally) called the 'out degree'. 2. Flights Degree: For a given airport the number of flights in + the number of flights out. Let's write the question wukong style: #!/usr/bin/env rubyrequire 'rubygems'require 'wukong'class EdgeMapper < Wukong::Streamer::RecordStreamer # # Yield both ways so we can sum (passengers in + passengers out) and (flights # in + flights out) individually in the reduce phase. # def process origin_code, destin_code, passengers, flights, month yield [origin_code, month, "OUT", passengers, flights] yield [destin_code, month, "IN", passengers, flights] endendclass DegreeCalculator < Wukong::Streamer::AccumulatingReducer # # What are we going to use as a key internally? # def get_key airport, month, in_or_out, passengers, flights [airport, month] end def start! airport, month, in_or_out, passengers, flights @out_degree = {:passengers => 0, :flights => 0} @in_degree = {:passengers => 0, :flights => 0} end def accumulate airport, month, in_or_out, passengers, flights case in_or_out when "IN" then @in_degree[:passengers] += passengers.to_i @in_degree[:flights] += flights.to_i when "OUT" then @out_degree[:passengers] += passengers.to_i @out_degree[:flights] += flights.to_i end end # # For every airport and month, calculate passenger and flight degrees # def finalize # Passenger degrees (out, in, and total) passengers_out = @out_degree[:passengers] passengers_in = @in_degree[:passengers] passengers_total = passengers_in + passengers_out # Flight degrees (out, in, and total) flights_out = @out_degree[:flights] flights_in = @in_degree[:flights] flights_total = flights_in + flights_out yield [key, passengers_in, passengers_out, passengers_total, flights_in, flights_out, flights_total] endend## Need to use 2 fields for partition so every record with the same airport and# month land on the same reducer#Wukong::Script.new( EdgeMapper, DegreeCalculator, :partition_fields => 2 # use two fields to partition records ).run Don't panic. There's a lot going on in this script so here's the breakdown (real gentle like): ### Mapper Here we're using wukong's RecordStreamer class which reads lines from$stdin and splits on tabs for us already. That's how we know exactly what arguments the process method gets.

Next, as is often the case with low level map-reduce, we've got to be a bit clever in the way we yield data in the map. Here we yield the edge both ways and attach an extra piece of information ("OUT" or "IN") depending on whether the passengers and flights were going into the airport in a month or out. This way we can distinguish between these two pieces of data in the reducer and process them independently.

Finally, we've carefully rearranged our records such that (airport,month) is always the first two fields. We'll partition on this as the key. (We have to say that explicitly at the bottom of the script)

### Reducer

We've seen all these methods before except for one. The reducer needs to know what fields to use as the key (it defaults to the first field). Here we've explicitly told it to use the airport and month as the key with the 'get_key' method.

* start! - Here we initialize the internal state of the reducer with two ruby hashes. One, the @out_degree will count up all the passengers and flights out. The @in_degree will do the same but for passengers and flights in. (Let's all take a moment and think about how awful and unreadable that would be in java...)

* accumulate - Here we simply look at each record and decide which counters to increment depending on whether it's "OUT" or "IN".

* finalize - All we're doing here is taking our accumulated counts, creating the record we care about, and yielding it out. Remember, the 'key' is just (airport,month).

## Get An Answer

We know how to put the data on the hdfs and run the script by now so we'll skip that part. Here's what the output looks like:

$: hdp-catd /data/domestic/flights/degree_distribution | head -n20 | wu-lign 1B1 200906 1 1 2 1 1 2ABE 200705 0 83 83 0 3 3ABE 199206 0 31 31 0 1 1ABE 200708 0 904 904 0 20 20ABE 200307 0 91 91 0 2 2ABE 200703 0 36 36 0 1 1ABE 199902 0 84 84 0 1 1ABE 200611 0 753 753 0 18 18ABE 199209 0 99 99 0 1 1ABE 200702 0 54 54 0 1 1ABE 200407 0 98 98 0 1 1ABE 200705 0 647 647 0 15 15ABE 200306 0 27 27 0 1 1ABE 200703 0 473 473 0 11 11ABE 200309 0 150 150 0 1 1ABE 200702 0 313 313 0 8 8ABE 200103 0 0 0 0 1 1ABE 199807 0 105 105 0 1 1ABE 199907 0 91 91 0 1 1ABE 199501 0 50 50 0 1 1  At this point is where you might bring this back down to your local file system, crack open a program like R, make some plots, etc. And we're done for now. Hurray. ## Wednesday, January 19, 2011 ### Apache Pig 0.8 with Cloudera cdh3 So it's January and Cloudera hasn't released pig 0.8 as a debian package yet. Too bad. Turns out for the particular project I'm working on it's important to have a custom partioner, only available in pig 0.8. Also, I'd like to make use of the HbaseStorage load and storefuncs. Also, only available in 0.8. Anyhow, here's how I got it working with my current install of Hadoop (cdh3): ## Get Pig Go the the Pig releases page here and download the apache release for pig-0.8 ## Install Pig Skip this part if you don't care (ie. you're going to put wherever you want and don't give a flip what my opinion is on where it should go). It's usually a good idea to put things you download and install yourself in /usr/local/share/ so it doesn't conflict with /usr/lib/ when you apt-get install it. So go ahead and unpack the downloaded archive into that directory. As an example (for those of us just getting familiar): $: wget http://apache.mesi.com.ar//pig/pig-0.8.0/pig-0.8.0.tar.gz$: tar -zxvf pig-0.8.0.tar.gz$: sudo mv pig-0.8.0 /usr/local/share/$: sudo ln -s /usr/local/share/pig-0.8.0 /usr/local/share/pig ## Perform Pig Surgery As it stands your new pig install will not work with cloudera hadoop. Let's fix that. 1. Nuke the current pig jar and rebuild without hadoop $: sudo rm pig-0.8.0-core.jar $: sudo ant jar-withouthadoop 2. Add these lines to bin/pig (I don't think it matters where, I put mine before PIG_CLASSPATH is set): # Add installed version of Hadoop to classpathHADOOP_HOME=${HADOOP_HOME:-/usr/lib/hadoop}. $HADOOP_HOME/bin/hadoop-config.shfor jar in$HADOOP_HOME/hadoop-core-*.jar $HADOOP_HOME/lib/* ; do CLASSPATH=$CLASSPATH:$jardoneif [ ! -z "$HADOOP_CLASSPATH" ] ; then  CLASSPATH=$CLASSPATH:$HADOOP_CLASSPATHfiif [ ! -z "$HADOOP_CONF_DIR" ] ; then CLASSPATH=$CLASSPATH:$HADOOP_CONF_DIRfi 3. Nuke the build dir and rename pig-withouthadoop.jar $: sudo mv pig-withouthadoop.jar pig-0.8.0-core.jar$: sudo rm -r build 4. Test it out $: bin/pig2011-01-19 13:49:07,766 [main] INFO  org.apache.pig.Main - Logging error messages to: /usr/local/share/pig-0.8.0/pig_1295466547762.log2011-01-19 13:49:07,959 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:80202011-01-19 13:49:08,163 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: localhost:8021grunt>

You can try typing things like 'ls' in the grunt shell to make sure it sees your HDFS. Hurray.

## Monday, January 17, 2011

### Processing XML Records with Hadoop and Wukong

Another common pattern that Wukong addresses exceedingly well is liberating data from unwieldy formats (XML) into tsv. For example, lets consider the following Hacker News dataset: See RedMonk Analytics

A single record looks like this:

<row><ID>33</ID><ParentID>31</ParentID><Text>&lt;font color="#5a5a5a"&gt;winnar winnar chicken dinnar!&lt;/font&gt;</Text><Username>spez</Username><Points>0</Points><Type>2</Type><Timestamp>2006-10-10T21:11:18.093</Timestamp><CommentCount>0</CommentCount></row>

And here's a wukong example script that turns that into tsv:

#!/usr/bin/env rubyrequire 'rubygems'require 'wukong'require 'wukong/encoding'require 'crack'class HackernewsComment < Struct.new(:username, :url, :title, :text, :timestamp, :comment_id, :points, :comment_count, :type)  def self.parse raw    raw_hash = Crack::XML.parse(raw.strip)    return unless raw_hash    return unless raw_hash["row"]    raw_hash                 = raw_hash["row"]    raw_hash[:username]      = raw_hash["Username"].wukong_encode if raw_hash["Username"]    raw_hash[:url]           = raw_hash["Url"].wukong_encode      if raw_hash["Url"]    raw_hash[:title]         = raw_hash["Title"].wukong_encode    if raw_hash["Title"]    raw_hash[:text]          = raw_hash["Text"].wukong_encode     if raw_hash["Text"]    raw_hash[:feed_id]       = raw_hash["ID"].to_i                if raw_hash["ID"]    raw_hash[:points]        = raw_hash["Points"].to_i            if raw_hash["Points"]    raw_hash[:comment_count] = raw_hash["CommentCount"].to_i      if raw_hash["CommentCount"]    raw_hash[:type]          = raw_hash["Type"].to_i              if raw_hash["Type"]    # Eg. Map '2010-10-26T19:29:59.717' to easier to work with '20101027002959'    raw_hash[:timestamp]     = Time.parse_and_flatten(raw_hash["Timestamp"]) if raw_hash["Timestamp"]    #    self.from_hash(raw_hash, true)  endendclass XMLParser < Wukong::Streamer::LineStreamer  def process line    return unless line =~ /^\<row/    yield HackernewsComment.parse(line)  endendWukong::Script.new(XMLParser, nil).run

Here's how it works. We're going to use the "StreamXmlRecordReader" for hadoop streaming. What this does is give the map task one row per map. That's our line variable. Additionally, we've defined a data model to read the row into called "HackernewsComment". This guy is responsible for parsing the xml record and creating a new instance of itself.

Inside the HackernewsComment's parse method we create clean fields that we'd like to use. Wukong has a method for strings called 'wukong_encode' which simply xml encodes the text so weird characters aren't an issue. You can imagine modifying the raw fields in other ways to construct and fill the fields of your output data model.

Finally, a new instance of HackernewsComment is created using the clean fields and emitted. Notice that we don't have to do anything special to the new comment once it's created. That's because Wukong will do the "right thing" and serialize out the class name as a flat field (hackernews_comment) along with the fields, in order, as a tsv record.

Save this into a file called "process_xml.rb" and run with the following:

$:./process_xml.rb --split_on_xml_tag=row --run /tmp/hn-sample.xml /tmp/xml_outI, [2011-01-17T11:09:17.461643 #5519] INFO -- : Launching hadoop!I, [2011-01-17T11:09:17.461757 #5519] INFO -- : Running/usr/local/share/hadoop/bin/hadoop \ jar /usr/local/share/hadoop/contrib/streaming/hadoop-*streaming*.jar \ -D mapred.reduce.tasks=0 \ -D mapred.job.name='process_xml.rb---/tmp/hn-sample.xml---/tmp/xml_out' \ -inputreader 'StreamXmlRecordReader,begin=<row>,end=</row>' \ -mapper '/usr/bin/ruby1.8 process_xml.rb --map ' \ -reducer '' \ -input '/tmp/hn-sample.xml' \ -output '/tmp/xml_out' \ -file '/home/jacob/Programming/projects/data_recipes/examples/process_xml.rb' \ -cmdenv 'RUBYLIB=$HOME/.rubylib'11/01/17 11:09:18 INFO mapred.FileInputFormat: Total input paths to process : 111/01/17 11:09:19 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-jacob/mapred/local]11/01/17 11:09:19 INFO streaming.StreamJob: Running job: job_201012031305_024311/01/17 11:09:19 INFO streaming.StreamJob: To kill this job, run:11/01/17 11:09:19 INFO streaming.StreamJob: /usr/local/share/hadoop/bin/../bin/hadoop job  -Dmapred.job.tracker=master:54311 -kill job_201012031305_024311/01/17 11:09:19 INFO streaming.StreamJob: Tracking URL: http://master:50030/jobdetails.jsp?jobid=job_201012031305_024311/01/17 11:09:20 INFO streaming.StreamJob:  map 0%  reduce 0%11/01/17 11:09:34 INFO streaming.StreamJob:  map 100%  reduce 0%11/01/17 11:09:40 INFO streaming.StreamJob:  map 87%  reduce 0%11/01/17 11:09:43 INFO streaming.StreamJob:  map 87%  reduce 100%11/01/17 11:09:43 INFO streaming.StreamJob: Job complete: job_201012031305_024311/01/17 11:09:43 INFO streaming.StreamJob: Output: /tmp/xml_outpackageJobJar: [/home/jacob/Programming/projects/data_recipes/examples/process_xml.rb, /usr/local/hadoop-datastore/hadoop-jacob/hadoop-unjar902611811523431467/] [] /tmp/streamjob681918437315823836.jar tmpDir=null

Finally, let's take a look at our new, happily liberated, tsv records:

## Load Data

Now, 77M is small enough that you COULD process on a single machine with methods you already know. However, this example is about hadoop so let's go ahead and throw it on the hadoop distributed file system (HDFS) so we can process it in parallel:
hdp-mkdir /data/domestic/ufohdp-put chimps_16154-2010-10-20_14-33-35/ufo_awesome.json /data/domestic/ufo/

(I'm going to have to assume you already have a HDFS up an running, see this for a simple how-to.)
If all goes well you should see your file there
hdp-ls /data/domestic/ufo/   Found 1 items-rw-r--r--   1 jacob supergroup   80346460 2011-01-16 21:21 /data/domestic/ufo/ufo_awesome.json

## Process Data

Let's write a really simple wukong script to find the most popular ufo shapes:

#!/usr/bin/env rubyrequire 'rubygems'require 'wukong'require 'json'class JSONMapper < Wukong::Streamer::LineStreamer  def process record    sighting = JSON.parse(record) rescue {}    return unless sighting["shape"]    yield sighting["shape"] unless sighting["shape"].empty?  endendclass ShapeReducer < Wukong::Streamer::AccumulatingReducer  def start! shape    @count = 0  end  def accumulate shape    @count += 1  end  def finalize    yield [key, @count]  endendWukong::Script.new(JSONMapper, ShapeReducer).run

### Mapper

Our mapper class, JSONMapper, is nearly identical to the earlier post. All it does is read in single json records from $stdin, parse out the "shape" field (with some rescues for handling data nastiness), and emit the "shape" field back to$stdout.

### Reducer

The reducer, ShapeReducer, is about the simplest reducer in Wukong that still illustrates the major points:

* start! - the first method that is called on a new group of data. A group is, if you remember your map-reduce, all records with the same key. In this case it's all the "shape" fields with the same shape. All this method does is decide how to initialize any internal state a reducer has. In this case we simply initialize a counter to 0.

* accumulate - even simpler. This method operates on each record in the group. In our simple case we just increment the internal counter by 1.

* finalize - the final step in the reduce. We've processed all our records and so we yield the key corresponding to our group (that's just going to be the unique "shape") and the count so far.

And that's it. Let's save it into a file called "process_ufo.rb" and run it locally on 10000 lines:

$: cat chimps_16154-2010-10-20_14-33-35/ufo_awesome.json| head -n10000| ./process_ufo.rb --map | sort | ./process_ufo.rb --reduce | sort -nk2 | wu-lignchanged 1dome 1flare 1hexagon 1pyramid 1crescent 2round 2delta 8cross 25cone 41teardrop 79rectangle 112egg 113chevron 128diamond 137flash 138cylinder 155changing 204cigar 255formation 290oval 333unknown 491sphere 529circle 667other 721disk 727fireball 799triangle 868light 1760 Notice when we run this locally we have to stick the "sort" program in there. This is to simulate what hadoop gives us for free. It looks like light is going to come out ahead. Let's see what happens when we run it with hadoop: $: ./process_ufo.rb --run /data/domestic/ufo/ufo_awesome.json /data/domestic/ufo/shape_countsI, [2011-01-16T21:51:43.431534 #11447]  INFO -- :   Launching hadoop!I, [2011-01-16T21:51:43.476626 #11447]  INFO -- : Running/usr/local/share/hadoop/bin/hadoop  \  jar /usr/local/share/hadoop/contrib/streaming/hadoop-*streaming*.jar  \  -D mapred.job.name='process_ufo.rb---/data/domestic/ufo/ufo_awesome.json---/data/domestic/ufo/shape_counts'  \  -mapper  '/usr/bin/ruby1.8 process_ufo.rb --map '  \  -reducer '/usr/bin/ruby1.8 /home/jacob/Programming/projects/data_recipes/examples/process_ufo.rb --reduce '  \  -input   '/data/domestic/ufo/ufo_awesome.json'  \  -output  '/data/domestic/ufo/shape_counts'  \  -file    '/home/jacob/Programming/projects/data_recipes/examples/process_ufo.rb'  \  -cmdenv 'RUBYLIB=$HOME/.rubylib'11/01/16 21:51:45 INFO mapred.FileInputFormat: Total input paths to process : 111/01/16 21:51:46 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-jacob/mapred/local]11/01/16 21:51:46 INFO streaming.StreamJob: Running job: job_201012031305_022111/01/16 21:51:46 INFO streaming.StreamJob: To kill this job, run:11/01/16 21:51:46 INFO streaming.StreamJob: /usr/local/share/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=master:54311 -kill job_201012031305_022111/01/16 21:51:46 INFO streaming.StreamJob: Tracking URL: http://master:50030/jobdetails.jsp?jobid=job_201012031305_022111/01/16 21:51:47 INFO streaming.StreamJob: map 0% reduce 0%11/01/16 21:51:59 INFO streaming.StreamJob: map 100% reduce 0%11/01/16 21:52:12 INFO streaming.StreamJob: map 100% reduce 100%11/01/16 21:52:15 INFO streaming.StreamJob: Job complete: job_201012031305_022111/01/16 21:52:15 INFO streaming.StreamJob: Output: /data/domestic/ufo/shape_countspackageJobJar: [/home/jacob/Programming/projects/data_recipes/examples/process_ufo.rb, /usr/local/hadoop-datastore/hadoop-jacob/hadoop-unjar3466191386581838257/] [] /tmp/streamjob3112551829711880856.jar tmpDir=null As one final step let's cat the output data and take a look at it: hdp-catd /data/domestic/ufo/shape_counts | sort -nk2 | wu-lign changed 1dome 1flare 1hexagon 1pyramid 1crescent 2round 2delta 8cross 177cone 265teardrop 592egg 661chevron 757diamond 909rectangle 957cylinder 980flash 988changing 1532cigar 1774formation 1774oval 2859fireball 3436sphere 3613unknown 4458other 4570disk 4794circle 5249triangle 6036light 12138 Pretty simple? ## Saturday, January 15, 2011 ### Swineherd Swineherd is a useful tool for combining together multiple pig scripts, wukong scripts, and even R scripts into a workflow managed by rake. Here though I'd like to save the actual workflow part for a later post and just illustrate it's uniform interface to these scripts by showing how to launch a wukong script: #!/usr/bin/env rubyrequire 'rake'require 'swineherd'task :wukong_job do script = WukongScript.new('/path/to/wukong_script') script.options = {:some_option => "123", :another_option => "foobar"} script.input << '/path/to/input' script.output << '/path/to/output' script.runend You can save this into a file called "Rakefile" and run it by saying: rake wukong_job ### Wukong's Hadoop Convenience Utilities Wukong comes with a number of convenience command-line utilities for working with the hdfs as well as a few commands for basic hadoop streaming. All of them can be found in wukong's bin directory. Enumerating a few: * hdp-put * hdp-ls * hdp-mkdir * hdp-rm * hdp-catd * hdp-stream * hdp-steam-flat * and more... ### HDFS utilities These are just wrappers around the hadoop fs utility to cut down on the amount of typing: Hadoop fs utilityWukong Convenience Command hadoop fs -puthdp-put hadoop fs -lshdp-ls hadoop fs -mkdirhdp-mkdir hadoop fs -rm hdp-rm ### hdp-catd hdp-catd will take an arbitrary hdfs directory and cat it's contents. It ignores those files that start with a "_" character. This means we can cat a whole directory of those awful part-xxxxx files. ### hdp-stream hdp-stream allows you to run a generic streaming task without all the typing. You almost always only need to specify input, output, num keys for partition, num sort key fields, number of reducers, and what scripts to use as the mapper and reducer. Here's an example of running a uniq: hdp-stream /path/to/input /path/to/output /bin/cat /usr/bin/uniq 2 3 -Dmapred.reduce.tasks=10 will launch a streaming job using 2 fields for partition and 3 fields for sort keys and 10 reduce tasks. See http://hadoop.apache.org/common/docs/r0.20.0/mapred-default.html for other options you can pass in with the "-D" flag. ### hdp-stream-flat There's one other extremely useful case when you don't care to specify anything about partitioners because you either aren't running a reduce or don't care how your data is sent to individual reducers. In this case hdp-stream-flat is very useful. Here's how cut off the first two fields of a large input file: hdp-stream-flat /path/to/input /path/to/output "/usr/bin/cut -f1,2" "/bin/cat" -Dmapred.reduce.tasks=0 see wukong/bin for more useful command line utilities. ### Wukong, Bringing Ruby to Hadoop Wukong is hands down the simplest (and probably the most fun) tool to use with hadoop. It especially excels at the following use case: You've got a huge amount of data (let that be whatever size you think is huge). You want to perform a simple operation on each record. For example, parsing out fields with a regular expression, adding two fields together, stuffing those records into a data store, etc etc. These are called map only jobs. They do NOT require a reduce. Can you imagine writing a java map reduce program to add two fields together? Wukong gives you all the power of ruby backed by all the power (and parallelism) of hadoop streaming. Before we get into examples, and there will be plenty, let's make sure you've got wukong installed and running locally. # Installing Wukong First and foremost you've got to have ruby installed and running on your machine. Most of the time you already have it. Try checking the version in a terminal: $: ruby --versionruby 1.8.7 (2010-01-10 patchlevel 249) [x86_64-linux]

If that fails then I bet google can help you get ruby installed on whatever os you happen to be using.

Next is to make sure you've got rubygems installed
$: gem --version1.3.7 Once again, google can help you get it installed if you don't have it. Wukong is a rubygem so we can just install it that way: sudo gem install wukongsudo gem install jsonsudo gem install configliere Notice we also installed a couple of other libraries to help us out (the json gem, the configliere gem, and the extlib gem). If at any time you get weird errors (LoadError: no such file to load -- somelibraryname) then you probably just need to gem install somelibraryname. # An example Moving on. You should be ready to test out running wukong locally now. Here's the most minimal working wukong script I can come up with that illustrates a map only wukong script: #!/usr/bin/env rubyrequire 'rubygems'require 'wukong'class LineMapper < Wukong::Streamer::LineStreamer def process line yield line endendWukong::Script.new(LineMapper, nil).run Save that into a file called wukong_test.rb and run it with the following: cat wukong_test.rb | ./wukong_test.rb --map If everything works as expected then you should see exactly the contents of your script dump onto your terminal. Lets examine what's actually going on here. ### Boiler plate ruby First, we're letting the interpreter know we want to use ruby with the first line (somewhat obvious). Next, we're including the libraries we need. ### The guts Then we define a class in ruby for doing our map job called LineMapper. This guy subclasses from the wukong LineStreamer class. All the LineStreamer class does is simply read records from stdin and gives them as arguments to the LineMapper's process method. The process method then does nothing more than yield the line back to the LineStreamer which emits the line back to stdout. ### The runner Finally, we have to let wukong know we intend to run our script. We create a new script object with LineMapper as the mapper class and nil as the reducer class. More succinctly, we've written our own cat program. When we ran the above command we simply streamed our script, line by line, through the program. Try streaming some real data through the program and adding some more stuff to the process method. Perhaps parsing the line with a regular expression and yielding numbers? Yielding words? Yielding characters? The choice is yours. Have fun with it. Meatier examples to come. ### Real Deal Concrete Hadoop Examples If you think you have to be a java programmer to use Hadoop then you've been lied to. Hadoop is not hard. What makes learning hadoop (or more correctly, map reduce) tedious is the lack of concrete and useful examples. Word counts are f*ing boring. The next few posts will overview two of the most useful higher level abstractions on top of hadoop (Pig and Wukong) with copious examples. ## Thursday, January 13, 2011 ### Convert TSV to JSON command line So you've got some tsv data: $: head foo.tsv148 0.05 49.0530378784848380 0.8 85.0345986160553496 0.05 33.665653373865612 0.15 58.1366745330187728 0.1 60.8615655373785844 0.3 69.4102235910563960 0.2 74.45302187912481076 0.2 76.61293548258071192 2.25 99.00503979840811888 0.5 53.7328506859725

and you've got some field names (field_1,field_2,field_3). Try this:

$: export FIELDS=field_1,field_2,field_3$: cat foo.tsv| ruby -rjson -ne 'puts ENV["FIELDS"].split(",").zip($_.strip.split("\t")).inject({}){|h,x| h[x[0]]=x[1];h}.to_json' will give you something that looks like: {"field_1":"148","field_2":"0.05","field_3":"49.0530378784848"}{"field_1":"380","field_2":"0.8","field_3":"85.0345986160553"}{"field_1":"496","field_2":"0.05","field_3":"33.665653373865"}{"field_1":"612","field_2":"0.15","field_3":"58.1366745330187"}{"field_1":"728","field_2":"0.1","field_3":"60.8615655373785"}{"field_1":"844","field_2":"0.3","field_3":"69.4102235910563"}{"field_1":"960","field_2":"0.2","field_3":"74.4530218791248"}{"field_1":"1076","field_2":"0.2","field_3":"76.6129354825807"}{"field_1":"1192","field_2":"2.25","field_3":"99.0050397984081"}{"field_1":"1888","field_2":"0.5","field_3":"53.7328506859725"} Hurray. ### Plot a FIFO in R Recently discovered a really simple way to plot a fifo in rstats. Here's a simple example of plotting the output of your ifstat program. From one terminal do: mkfifo ifstat_fifoifstat -n > ifstat_fifo Then, in another terminal, open an R shell and do the following: # Plot the most recent 100 seconds of inbound network traffic> while(T){ d <- read.table(fifo("ifstat_fifo",open="read")) x <- rbind(x,d) x <-tail(x,100) plot(x$V1,type='l')  Sys.sleep(1)}

You may have to run it a couple times while the fifo fills with data. And here's what that looks like:

### JAVA_HOME on mac os X

As opposed to searching for and keeping in mind the JAVA_HOME environment variable on a mac there's a simple trick to remember:

export JAVA_HOME=/usr/libexec/java_home