## Wednesday, March 21, 2012

### Topic Discovery With Apache Pig and Mallet

A common desire when working with natural language is topic discovery. That is, given a set of documents (eg. tweets, blog posts, emails) you would like to discover the topics inherent in those documents. Often this method is used to summarize a large corpus of text so it can be quickly understood what that text is 'about'. You can go further and use topic discovery as a way to classify new documents or to group and organize the documents you've done topic discovery on.

## Latent Dirichlet Allocation

One popular method for topic discovery in a corpus is Latent Dirichlet Allocation (LDA). I won't pretend to be an expert on LDA but the main assumption is as follows. Each document is assumed to be a 'mixture' of topics. Going further, each topic is then assumed to be a distribution over terms. For example, say there is a topic in my corpus labeled 'apache hadoop'. It could be represented as a multinomial probability distribution with high probability of generating terms such as 'hadoop', 'data', 'apache', and 'map-reduce'. See the wikipedia article on LDA

## Problem

I'm going to use Apache Pig and Mallet, a java based machine learning and natural language processing library to discover topics in the 20 newsgroups data set. This corpus is nice since each document already belongs to a newsgroup (a topic) and so it gives us a way of checking how well our topic discovery is doing.

### The Data

The 20 newsgroups data set can be found on Infochimps here. Once you've got the data go ahead and place it somewhere on your hdfs. I put mine in my home directory under '20newsgroups/data'.

Here's what a head of that data looks like:

alt.atheism.53536 From: kmr4@po.CWRU.edu (Keith M. Ryan) Subject: Re: Smullyanism for the day..... In article ... *snip*
alt.atheism.51164 From: mccullou@snake2.cs.wisc.edu (Mark McCullough) Subject: Re: Idle questions for fellow atheists In article ... *snip*
alt.atheism.53448 From: sandvik@newton.apple.com (Kent Sandvik) Subject: Age of Reason Was: ... *snip*
alt.atheism.53753 Subject: Re: The Inimitable Rushdie From: kmagnacca@eagle.wesleyan.edu In article ... *snip*
alt.atheism.53290 From: Andrew Newell Subject: Re: Christian Morality is In article ... *snip*

It's a little messy but it should get the point across.

So it's tab separated where the first field is the document id (a concatenation of the newsgroup the document is coming from and an integer id). The second field is the document text itself. Here's the pig schema for that:

(doc_id:chararray, text:chararray)

### Algorithm

LDA operates on a set of documents. Trivially we could just skip the pig part and write a simple java program that operates on the entire document set and be done with it. But that's not the point. Typically, your input documents have metadata attached to them. For example, the region or user they're coming from, or even just the date they were generated. So we'll just use pig's GROUP BY statement to group the documents by this metadata and cluster the documents within each group independently. If the documents don't have this kind of metadata we're stuck doing a GROUP ALL and dealing with all the documents at once. There are clever ways of parallelizing LDA in this case that I'm not going to go into. See here and here.

Here's a sketch of the algorithm:

• (2) Group the documents by appropriate metadata (or by all)

• (3) Run LDA on each group of documents

• (4) Profit!!!

### Implementation

So it's clear that we're going to need a java udf to do the actual topic clustering. Right? This udf will operate on a DataBag of documents and return a DataBag containing the discovered topics. Each topic will be represented by a Tuple with the following schema:

(topic:tuple(topic_num:int, terms:bag{t:tuple(term:chararray, weight:double)}))

Each topic has a DataBag of top terms associated with it as a way of characterizing the topic discovered.

Here's the simplest implementation of such a udf I could come up with using Mallet (it's a mouthful):

package varaha.topic;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.TreeSet;
import java.util.regex.Pattern;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.BagFactory;
import org.apache.pig.backend.executionengine.ExecException;

import cc.mallet.pipe.Pipe;
import cc.mallet.pipe.CharSequenceLowercase;
import cc.mallet.pipe.CharSequence2TokenSequence;
import cc.mallet.pipe.CharSequence2CharNGrams;
import cc.mallet.pipe.TokenSequenceNGrams;
import cc.mallet.pipe.TokenSequenceRemoveStopwords;
import cc.mallet.pipe.TokenSequence2FeatureSequence;
import cc.mallet.pipe.TokenSequence2FeatureSequenceWithBigrams;

import cc.mallet.types.TokenSequence;
import cc.mallet.types.Token;

import cc.mallet.pipe.SerialPipes;
import cc.mallet.types.InstanceList;
import cc.mallet.types.Instance;
import cc.mallet.types.Alphabet;
import cc.mallet.types.IDSorter;
import cc.mallet.types.LabelSequence;
import cc.mallet.topics.TopicAssignment;
import cc.mallet.topics.ParallelTopicModel;

public class LDATopics extends EvalFunc<DataBag> {

private Pipe pipe;
private static Long numKeywords = 50l; // Maximum number of keywords to use to describe a topic

public LDATopics() {
pipe = buildPipe();
}

public DataBag exec(Tuple input) throws IOException {
if (input == null || input.size() < 2 || input.isNull(0) || input.isNull(1))
return null;

Integer numTopics = (Integer)input.get(0); // Number of topics to discover
DataBag documents = (DataBag)input.get(1); // Documents, {(doc_id, text)}
DataBag result = BagFactory.getInstance().newDefaultBag();

InstanceList instances = new InstanceList(pipe);

// Add the input databag as source data and run it through the pipe built
// by the constructor.

// Create a model with numTopics, alpha_t = 0.01, beta_w = 0.01
// Note that the first parameter is passed as the sum over topics, while
// the second is the parameter for a single dimension of the Dirichlet prior.
ParallelTopicModel model = new ParallelTopicModel(numTopics, 1.0, 0.01);
model.setNumThreads(1); // Important, since this is being run in the reduce, just use one thread
model.setTopicDisplay(0,0);
model.setNumIterations(2000);
model.estimate();

// Get the results
Alphabet dataAlphabet = instances.getDataAlphabet();
ArrayList<TopicAssignment> assignments = model.getData();

// Convert the results into comprehensible topics
for (int topicNum = 0; topicNum < model.getNumTopics(); topicNum++) {
TreeSet<IDSorter> sortedWords = model.getSortedWords().get(topicNum);
Iterator<IDSorter> iterator = sortedWords.iterator();

DataBag topic = BagFactory.getInstance().newDefaultBag();

// Get the set of keywords with weights for this topic and add them as tuples
// to the databag used to represent this topic
while (iterator.hasNext() && topic.size() < numKeywords) {
IDSorter info = iterator.next();
Tuple weightedWord = TupleFactory.getInstance().newTuple(2);
String wordToken = model.alphabet.lookupObject(info.getID()).toString(); // get the actual term text
weightedWord.set(0, wordToken);
weightedWord.set(1, info.getWeight()); // the raw weight of the term
}
Tuple topicTuple = TupleFactory.getInstance().newTuple(2);
topicTuple.set(0, topicNum);
topicTuple.set(1, topic);
}

return result;
}

// Instantiates a new pipe object for ingesting pig tuples
private Pipe buildPipe() {
Pattern tokenPattern = Pattern.compile("\\S[\\S]+\\S");
int[] sizes = {1,2};
ArrayList pipeList = new ArrayList();

pipeList.add(new TokenSequenceRemoveStopwords(false, false)); // we should use a real stop word list
return new SerialPipes(pipeList);
}

/**
A few minor updates to TokenSequenceNGrams:

(1) use delimiter that's passed in to delineate ngrams
*/
private class TokenSequenceNGramsDelim extends TokenSequenceNGrams {
int [] gramSizes = null;
String delim = null;

public TokenSequenceNGramsDelim(int [] sizes, String delim) {
super(sizes);
this.gramSizes = sizes;
this.delim = delim;
}

@Override
public Instance pipe (Instance carrier) {
String newTerm = null;
TokenSequence tmpTS = new TokenSequence();
TokenSequence ts = (TokenSequence) carrier.getData();

for (int i = 0; i < ts.size(); i++) {
Token t = ts.get(i);
for(int j = 0; j < gramSizes.length; j++) {
int len = gramSizes[j];
if (len <= 0 || len > (i+1)) continue;
if (len == 1) { tmpTS.add(t); continue; }
newTerm = new String(t.getText());
for(int k = 1; k < len; k++)
newTerm = ts.get(i-k).getText() + delim + newTerm;
}
}
carrier.setData(tmpTS);
return carrier;
}
}

/**
Allow for a databag to be source data for mallet's clustering
*/
private class DataBagSourceIterator implements Iterator<Instance> {

private Iterator<Tuple> tupleItr;
private String currentId;
private String currentText;

public DataBagSourceIterator(DataBag bag) {
tupleItr = bag.iterator();
}

public boolean hasNext() {
if (tupleItr.hasNext()) {
Tuple t = tupleItr.next();
try {
if (!t.isNull(0) && !t.isNull(1)) {
currentId = t.get(0).toString();
currentText = t.get(1).toString();
if (currentId.isEmpty() || currentText.isEmpty()) {
return false;
} else {
return true;
}
}
} catch (ExecException e) {
throw new RuntimeException(e);
}
}
return false;
}

public Instance next() {
// Get the next tuple and pull out its fields
Instance i = new Instance(currentText, "X", currentId, null);
return i;
}

public void remove() {
tupleItr.remove();
}
}
}

There's a few key things going on here. First, the udf operates on a bag that contains tuples with exactly two fields, doc_id and text. Mallet has the notion of pipes where your input data flows through a set of 'pipes' as a way of preparing the data. The class DataBagSourceIterator is simply a convenient way of plugging a DataBag object into this flow.

One of the pipes our documents flow through actually tokenizes the text. TokenSequenceNGramsDelim does this work. Mallet has a built-in TokenSequenceNGrams that works nicely, unfortunately when tokenizing n-grams where n > 1 it uses an '_' by default to separate the terms in the ngram. TokenSequenceNGramsDelim allows us to use our own delimiter, namely a ' ', instead.

Ultimately, all this udf does is read the input documents, prepare them for clustering, runs Mallet's built in LDA methods, and constructs the output DataBag in the way we'd like it.

See varaha for the code itself plus a pom.xml to compile it.

### Pig

Now that we have our udf, let's write a pig script to use it. Since the documents I've chosen to use don't have any obvious (or at least easy to get at) additional metadata we're going to use a GROUP ALL. Here's the pig script:

define TokenizeText varaha.text.TokenizeText();
define LDATopics varaha.topic.LDATopics();
define RangeConcat org.pygmalion.udf.RangeBasedStringConcat('0', ' ');

--
--
raw_documents = load '$DOCS' as (doc_id:chararray, text:chararray); -- -- Tokenize text to remove stopwords -- tokenized = foreach raw_documents generate doc_id AS doc_id, flatten(TokenizeText(text)) as (token:chararray); -- -- Concat the text for a given doc with spaces -- documents = foreach (group tokenized by doc_id) generate group as doc_id, RangeConcat(tokenized.token) as text; -- -- Ensure all our documents are sane -- for_lda = filter documents by SIZE(doc_id) > 0 and SIZE(text) > 0; -- -- Group the docs by all and find topics -- -- WARNING: This is, in general, not appropriate in a production environment. -- Instead it is best to group by some piece of metadata which partitions -- the documents into smaller groups. -- topics = foreach (group for_lda all) generate FLATTEN(LDATopics(20, for_lda)) as ( topic_num:int, keywords:bag {t:tuple(keyword:chararray, weight:int)} ); store topics into '$OUT';

There's a few things worth pointing out here. First, we load our text as normal. There's a step there to tokenize text which seems like it might be spurious. It uses the lucene tokenization udf from here as a way to remove stopwords. You could skip this step if that wasn't important for you. Next, the tokenized text is grouped back together by document id and concatenated back together to form cleaned documents. I'm using the nice udf from pygmalion to do the concatenation. Finally, the documents are grouped together and topics are discovered.

## Run it!

At this point we're ready to run our script. I named this script 'discover_topics_example.pig'. And here's how I ran it:

pig -p DOCS=20newsgroups/data -p OUT=20newsgroups/topics discover_topics_example.pig

And here's what the output looks like:

0 {(max,4523.0),(max max,3266.0),(g9v,1161.0),(b8f,1109.0),(a86,913.0),(g9v g9v,834.0),(145,740.0),(1d9,656.0),(a86 a86,643.0),(b8f b8f,599.0),(34u,512.0),(145 145,449.0),(75u,446.0),(bhj,445.0),(giz,430.0),(2di,414.0),(1d9 1d9,322.0),(2tm,300.0),(7ey,292.0),(2di 2di,247.0),(bxn,240.0),(6ei,215.0),(6um,189.0),(34u 34u,168.0),(75u 75u,162.0),(bhj bhj,150.0),(giz giz,142.0),(air,129.0),(qax,127.0),(b4q,120.0),(okz,116.0),(6um 6um,112.0),(nrhj,112.0),(b8e,109.0),(7kn,104.0),(1eq,102.0),(bxn bxn,99.0),(c8v,99.0),(rlk,99.0),(fyn,97.0),(2tm 2tm,96.0),(b9r,96.0),(3dy,96.0),(7ez,95.0),(1d9l,93.0),(b8g,92.0),(biz,91.0),(7ex,86.0),(7ey 7ey,84.0),(r8f,82.0)}
2 {(game,1010.0),(subject,831.0),(team,799.0),(hockey,725.0),(play,544.0),(games,466.0),(nhl,465.0),(writes,453.0),(season,390.0),(article,382.0),(win,372.0),(players,372.0),(period,303.0),(player,298.0),(goal,286.0),(teams,274.0),(cup,251.0),(league,248.0),(playoff,247.0),(pit,244.0),(detroit,236.0),(det,233.0),(espn,232.0),(leafs,228.0),(pittsburgh,228.0),(wings,223.0),(playoffs,220.0),(fans,214.0),(boston,207.0),(series,202.0),(toronto,200.0),(bos,198.0),(pens,198.0),(played,192.0),(blues,191.0),(chi,185.0),(montreal,184.0),(puck,181.0),(goals,178.0),(buffalo,176.0),(bruins,175.0),(devils,172.0),(maynard,171.0),(penguins,170.0),(april,169.0),(division,169.0),(power,168.0),(ice,165.0),(tor,161.0),(flyers,160.0)}
3 {(subject,1528.0),(drive,1442.0),(scsi,1181.0),(card,1178.0),(disk,682.0),(windows,644.0),(system,603.0),(ide,571.0),(bus,542.0),(dos,512.0),(hard,490.0),(modem,481.0),(software,460.0),(mac,450.0),(writes,450.0),(drives,447.0),(controller,420.0),(drivers,420.0),(article,394.0),(video,393.0),(bit,382.0),(memory,373.0),(board,365.0),(computer,356.0),(cards,349.0),(port,345.0),(ram,326.0),(driver,317.0),(disks,317.0),(data,313.0),(sale,311.0),(floppy,301.0),(i'm,294.0),(motherboard,289.0),(speed,286.0),(isa,277.0),(mode,252.0),(chip,250.0),(machine,240.0),(486,237.0),(bios,234.0),(ibm,226.0),(serial,221.0),(gateway,215.0),(run,214.0),(hardware,213.0),(set,211.0),(cache,209.0),(diamond,207.0),(mhz,200.0)}
4 {(gun,1095.0),(writes,942.0),(article,838.0),(subject,756.0),(fbi,647.0),(government,545.0),(fire,510.0),(guns,505.0),(batf,464.0),(waco,456.0),(koresh,437.0),(children,400.0),(atf,371.0),(weapons,345.0),(compound,288.0),(people,277.0),(cdt,251.0),(police,248.0),(clinton,248.0),(firearms,240.0),(control,234.0),(gas,233.0),(crime,228.0),(law,219.0),(federal,212.0),(killed,201.0),(david,193.0),(constitution,187.0),(survivors,180.0),(house,162.0),(hallam,158.0),(assault,155.0),(ranch,154.0),(agents,153.0),(criminals,150.0),(deaths,150.0),(arms,150.0),(davidians,145.0),(warrant,143.0),(started,143.0),(amendment,141.0),(roby,138.0),(burns,138.0),(tanks,137.0),(texas,136.0),(veal,131.0),(armed,131.0),(murder,130.0),(cult,129.0),(rights,129.0)}
5 {(andy,110.0),(kratz,91.0),(semi,84.0),(water,77.0),(uicvm.uic.edu,74.0),(revolver,73.0),(gun,72.0),(auto,70.0),(safety,64.0),(freeman,62.0),(jason,59.0),(ndet_loop.c,54.0),(u28037,50.0),(weapon,50.0),(gang,50.0),(cops,49.0),(ole.cdac.com,49.0),(02p,48.0),(expose,48.0),(mydisplay,43.0),(glock,42.0),(mahan,41.0),(sail.stanford.edu andy,39.0),(sail.stanford.edu,39.0),(andy sail.stanford.edu,37.0),(section,37.0),(firearm,36.0),(mwra,36.0),(ssave,35.0),(phil,35.0),(military,35.0),(trigger,34.0),(cement,33.0),(auto semi,31.0),(shooting,31.0),(jason kratz,30.0),(garrett,29.0),(silence,29.0),(autos,28.0),(dominance,28.0),(semi auto,27.0),(water dept,27.0),(concealed,26.0),(revolvers,25.0),(u28037 uicvm.uic.edu,25.0),(moment silence,25.0),(ordnance,25.0),(atlantic,25.0),(ingres.com,25.0),(item,25.0)}
6 {(god,2686.0),(jesus,1457.0),(bible,909.0),(christian,823.0),(christ,809.0),(church,781.0),(christians,739.0),(subject,719.0),(sin,589.0),(lord,504.0),(god's,494.0),(faith,478.0),(people,464.0),(sandvik,425.0),(life,411.0),(christianity,405.0),(writes,405.0),(paul,367.0),(love,358.0),(law,357.0),(hell,344.0),(heaven,323.0),(truth,313.0),(word,311.0),(athos.rutgers.edu,295.0),(article,295.0),(john,283.0),(catholic,280.0),(scripture,278.0),(father,257.0),(holy,254.0),(spirit,253.0),(son,248.0),(kent,234.0),(true,233.0),(eternal,231.0),(doctrine,231.0),(death,226.0),(brian,223.0),(day,214.0),(apr,211.0),(newton.apple.com,210.0),(jehovah,204.0),(children,204.0),(biblical,203.0),(words,200.0),(book,198.0),(earth,191.0),(jews,190.0),(matthew,190.0)}
12 {(windows,1782.0),(subject,1779.0),(file,1209.0),(program,863.0),(files,838.0),(window,809.0),(dos,715.0),(image,676.0),(writes,586.0),(graphics,541.0),(article,497.0),(software,441.0),(run,425.0),(display,424.0),(version,422.0),(server,404.0),(code,386.0),(data,371.0),(ftp,369.0),(images,366.0),(unix,364.0),(color,349.0),(bit,347.0),(application,339.0),(i'm,324.0),(manager,323.0),(motif,321.0),(directory,313.0),(format,312.0),(user,302.0),(mail,302.0),(system,296.0),(gif,292.0),(running,288.0),(microsoft,287.0),(screen,284.0),(package,271.0),(faq,264.0),(jpeg,259.0),(i've,251.0),(line,250.0),(memory,247.0),(programs,244.0),(3.1,236.0),(applications,226.0),(set,218.0),(support,201.0),(information,199.0),(source,198.0),(text,195.0)}
16 {(god,963.0),(writes,892.0),(subject,674.0),(article,625.0),(atheists,525.0),(morality,509.0),(religion,497.0),(moral,449.0),(evidence,432.0),(keith,415.0),(science,380.0),(atheism,365.0),(objective,365.0),(belief,319.0),(christian,317.0),(livesey,312.0),(islam,305.0),(argument,288.0),(atheist,287.0),(exist,287.0),(faith,277.0),(existence,274.0),(religious,272.0),(islamic,258.0),(frank,247.0),(mathew,242.0),(jon,235.0),(beliefs,198.0),(reason,191.0),(claim,188.0),(true,187.0),(exists,179.0),(statement,173.0),(christianity,169.0),(bible,169.0),(wrong,166.0),(values,162.0),(theism,160.0),(system,158.0),(universe,157.0),(truth,155.0),(solntze.wpd.sgi.com,152.0),(cobb,149.0),(jaeger,148.0),(scientific,145.0),(rushdie,142.0),(muslim,142.0),(question,140.0),(agree,139.0),(wrote,133.0)}
19 {(subject,866.0),(writes,741.0),(article,596.0),(game,578.0),(baseball,516.0),(team,462.0),(players,402.0),(games,385.0),(hit,286.0),(runs,264.0),(season,241.0),(morris,227.0),(win,223.0),(league,223.0),(braves,214.0),(michael,201.0),(ball,192.0),(pitching,181.0),(he's,179.0),(player,176.0),(pitcher,171.0),(hitter,170.0),(play,162.0),(georgia,160.0),(average,158.0),(run,155.0),(roger,150.0),(jays,149.0),(hitting,147.0),(sox,146.0),(david,146.0),(cubs,137.0),(home,135.0),(stats,134.0),(fans,133.0),(mike,131.0),(giants,128.0),(fan,127.0),(base,126.0),(batting,125.0),(pitch,125.0),(ai.uga.edu,124.0),(bonds,123.0),(covington,122.0),(jewish,121.0),(time,121.0),(career,120.0),(teams,119.0),(mcovingt,119.0),(smith,117.0)}

### Labeling

Now, we'd like to see how well we did. Here's the 20 topics we _know_ should exist:

• alt.atheism

• comp.graphics

• comp.os.ms-windows.misc

• comp.sys.ibm.pc.hardware

• comp.sys.mac.hardware

• comp.windows.x

• misc.forsale

• rec.autos

• rec.motorcycles

• rec.sport.baseball

• rec.sport.hockey

• sci.crypt

• sci.electronics

• sci.med

• sci.space

• soc.religion.christian

• talk.politics.guns

• talk.politics.mideast

• talk.politics.misc

• talk.religion.misc

There are a number of methods for labeling topics discovered in this way, (see here), but in the interest of time I'm going to manually match the topics above to the ones discovered. Obviously, 'eyeballing' it isn't appropriate for a production environment...

• alt.atheism,16

• comp.graphics,12

• comp.os.ms-windows.misc,9

• comp.sys.ibm.pc.hardware,3

• comp.sys.mac.hardware,11

• comp.windows.x,0

• misc.forsale,10

• rec.autos,14

• rec.motorcycles,14

• rec.sport.baseball,19

• rec.sport.hockey,2

• sci.crypt,15

• sci.electronics

• sci.med,1

• sci.space,13

• soc.religion.christian,6

• talk.politics.guns,5

• talk.politics.mideast,18,7

• talk.politics.misc,4

• talk.religion.misc,8

So, as far as I can tell there are some that map to multiple of the topics discovered and some that don't seem to map to one discovered at all. It's clear there's room for improvement (look at the parameters alpha and beta I'm hardcoding in the topic model for example). But all in all it's pretty good as a first pass. Now go away and find some topics.

Hurray!