mathjax

Tuesday, April 26, 2011

A Lucene Text Tokenization UDF for Apache Pig

As much as I loathe to admit it, sometimes java is called for. One of those times is tokenizing raw text. You'll notice in the post about tfidf how I used a Wukong script, written in ruby, to accomplish the task of tokenizing a large text corpus with Hadoop and Pig. There are a couple of problems with this:

1. Ruby is slow at this.

2. All the gem dependencies (wukong itself, extlib, etc) must exist on all the machines in the cluster and be available in the RUBYLIB (yet another environment variable to manage).

There is a better way.

A Pig UDF



Pig UDFs (User Defined Functions) come in a variety of flavors. The simplest type is the EvalFunc whose function 'exec()' essentially acts as the Wukong 'process()' method or the java hadoop Mapper's 'map()' function. Here we're going to write an EvalFunc that takes a raw text string as input and outputs a pig DataBag. Each Tuple in the DataBag will be a single token. Here's what it looks like as a whole:



import java.io.IOException;
import java.io.StringReader;
import java.util.Iterator;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.BagFactory;

import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.lucene.util.Version;
import org.apache.lucene.analysis.Token;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.analysis.standard.StandardTokenizer;

public class TokenizeText extends EvalFunc {

private static TupleFactory tupleFactory = TupleFactory.getInstance();
private static BagFactory bagFactory = BagFactory.getInstance();
private static String NOFIELD = "";
private static StandardAnalyzer analyzer = new StandardAnalyzer(Version.LUCENE_31);

public DataBag exec(Tuple input) throws IOException {
if (input == null || input.size() < 1 || input.isNull(0))
return null;

// Output bag
DataBag bagOfTokens = bagFactory.newDefaultBag();

StringReader textInput = new StringReader(input.get(0).toString());
TokenStream stream = analyzer.tokenStream(NOFIELD, textInput);
CharTermAttribute termAttribute = stream.getAttribute(CharTermAttribute.class);

while (stream.incrementToken()) {
Tuple termText = tupleFactory.newTuple(termAttribute.toString());
bagOfTokens.add(termText);
termAttribute.setEmpty();
}
return bagOfTokens;
}
}


There's absolutely nothing special going on here. Remember, the 'exec' function gets called on every Pig Tuple of input. bagOfTokens will be the Pig DataBag returned. First, the lucene library tokenizes the input string. Then all the tokens in the resulting stream are turned into Pig Tuples and added to the result DataBag. Finally the resulting DataBag is returned. A document is truly a bag of words.

Example Pig Script



And here's an example script to use that UDF:


documents = LOAD 'documents' AS (doc_id:chararray, text:chararray);
tokenized = FOREACH documents GENERATE doc_id AS doc_id, FLATTEN(TokenizeText(text)) AS (token:chararray);



And that's it. It's blazing fast text tokenization for Apache Pig.

Hurray.

Saturday, April 23, 2011

TF-IDF With Apache Pig

Well, it's been a long time. I'm sure you understand. One of the things holding me back here has been the lack of a simple data set (of a sane downloadable size) for the example I wanted to do next. That is, tf-idf (term frequency-inverse document frequency).

Anyhow, let's get started. We're going to use Apache Pig to calculate the tf-idf weights for document-term pairs as a means of vectorizing raw text documents.

One of the cool things about tf-idf is how damn simple it is. Take a look at the wikipedia page. If it doesn't make sense to you after reading that then please, stop wasting your time, and start reading a different blog. Like always, I'm going to assume you've got a hadoop cluster (at least a pseudo-distributed mode cluster) lying around, Apache Pig (>= 0.8) installed, and the Wukong rubygem.

Get Data


What would an example be without some data? Here's a link to the canonical 20 newsgroups data set that I've restructured slightly for your analysis pleasure.

Tokenization


Tokenization of the raw text documents is probably the worst part of the whole process. Now, I know there are a number of excellent tokenizers out there but I've provided one in Wukong for the hell of it that you can find here. We're going to use that script next in the pig script. If a java UDF is more to your liking feel free to write one and substitute in the relevant bits.

Pig, Step By Step



The first thing we need to do is let Pig know that we're going to be using an external script for the tokenization. Here's what that looks like:

DEFINE tokenize_docs `ruby tokenize_documents.rb --id_field=0 --text_field=1 --map` SHIP('tokenize_documents.rb');

This statement can be interpreted to mean:

"Define a new function called 'tokenize_docs' that is to be executed exactly the way it appears between the backticks, and lives on the local disk at 'tokenize_documents.rb' (relative path). Send this script file to every machine in the cluster."

Next up is to load and tokenize the data:


raw_documents = LOAD '$DOCS' AS (doc_id:chararray, text:chararray);
tokenized = STREAM raw_documents THROUGH tokenize_docs AS (doc_id:chararray, token:chararray);


So, all tokenize_docs is doing is creating a clean set of (doc_id, token) pairs.

Then, we need to count the number of times each unique (doc_id, token) pair appears:


doc_tokens = GROUP tokenized BY (doc_id, token);
doc_token_counts = FOREACH doc_tokens GENERATE FLATTEN(group) AS (doc_id, token), COUNT(tokenized) AS num_doc_tok_usages;


And, since we're after the token frequencies and not just the counts, we need to attach the document sizes:


doc_usage_bag = GROUP doc_token_counts BY doc_id;
doc_usage_bag_fg = FOREACH doc_usage_bag GENERATE
group AS doc_id,
FLATTEN(doc_token_counts.(token, num_doc_tok_usages)) AS (token, num_doc_tok_usages),
SUM(doc_token_counts.num_doc_tok_usages) AS doc_size
;


Then the term frequencies are just:


term_freqs = FOREACH doc_usage_bag_fg GENERATE
doc_id AS doc_id,
token AS token,
((double)num_doc_tok_usages / (double)doc_size) AS term_freq;
;



For the 'document' part of tf-idf we need to find the number of documents that contain at least one occurrence of a term:


term_usage_bag = GROUP term_freqs BY token;
token_usages = FOREACH term_usage_bag GENERATE
FLATTEN(term_freqs) AS (doc_id, token, term_freq),
COUNT(term_freqs) AS num_docs_with_token
;



Finally, we can compute the tf-idf weight:


tfidf_all = FOREACH token_usages {
idf = LOG((double)$NDOCS/(double)num_docs_with_token);
tf_idf = (double)term_freq*idf;
GENERATE
doc_id AS doc_id,
token AS token,
tf_idf AS tf_idf
;
};


Where NDOCS is the total number of documents in the corpus.

Pig, All Together Now



And here's the final script, all put together:


DEFINE tokenize_docs `ruby tokenize_documents.rb --id_field=0 --text_field=1 --map` SHIP('tokenize_documents.rb');

raw_documents = LOAD '$DOCS' AS (doc_id:chararray, text:chararray);
tokenized = STREAM raw_documents THROUGH tokenize_docs AS (doc_id:chararray, token:chararray);

doc_tokens = GROUP tokenized BY (doc_id, token);
doc_token_counts = FOREACH doc_tokens GENERATE FLATTEN(group) AS (doc_id, token), COUNT(tokenized) AS num_doc_tok_usages;

doc_usage_bag = GROUP doc_token_counts BY doc_id;
doc_usage_bag_fg = FOREACH doc_usage_bag GENERATE
group AS doc_id,
FLATTEN(doc_token_counts.(token, num_doc_tok_usages)) AS (token, num_doc_tok_usages),
SUM(doc_token_counts.num_doc_tok_usages) AS doc_size
;

term_freqs = FOREACH doc_usage_bag_fg GENERATE
doc_id AS doc_id,
token AS token,
((double)num_doc_tok_usages / (double)doc_size) AS term_freq;
;

term_usage_bag = GROUP term_freqs BY token;
token_usages = FOREACH term_usage_bag GENERATE
FLATTEN(term_freqs) AS (doc_id, token, term_freq),
COUNT(term_freqs) AS num_docs_with_token
;

tfidf_all = FOREACH token_usages {
idf = LOG((double)$NDOCS/(double)num_docs_with_token);
tf_idf = (double)term_freq*idf;
GENERATE
doc_id AS doc_id,
token AS token,
tf_idf AS tf_idf
;
};

STORE tfidf_all INTO '$OUT';



Run It


Assuming your data is on the hdfs at "/data/corpus/20_newsgroups/data", and you've named the pig script "tfidf.pig", then you can run with the following:


pig -p DOCS=/data/corpus/20_newsgroups/data -p NDOCS=18828 -p OUT=/data/corpus/20_newsgroups/tfidf tfidf.pig


Here's what the output of that looks like:


sci.crypt.15405 student 0.0187808247467920464
sci.crypt.16005 student 0.0541184292045718135
sci.med.58809 student 0.0839387881540297476
sci.med.59021 student 0.0027903667703849783
sci.space.60850 student 0.0377339506380500803
sci.crypt.15178 student 0.0010815147566519744
soc.religion.christian.21414 student 0.0587571517078208302
comp.sys.ibm.pc.hardware.60725 student 0.0685500103257909721
soc.religion.christian.21485 student 0.0232372916358613464
soc.religion.christian.21556 student 0.0790961657605280533


Hurray.

Next time let's look at how to take that output and recover the topics using clustering!