Skip to content
This repository has been archived by the owner on Jan 29, 2022. It is now read-only.

Enron Emails Example

Luke Lovett edited this page Feb 13, 2015 · 5 revisions

Enron E-mails

This sample can be run from the root directory with ./gradlew enronEmails. This will download the Enron email corpus and import them automatically in to mongodb. You can manually download a copy of the data set here.

Abbreviated code snippets shown below, but you may also check out the full source.

Goal

Each document in the data set contains a single e-mail, including headers for sender and recipients. In this example we will build a list of the unique sender/recipient pairs, counting how many times each pair occurs.

MapReduce with Java

The mapper class will get the headers field from each document, parse out the sender from the From field and the recipients from the To field, and construct a MailPair object containing each pair which will act as the key. Then we emit the value 1 for each key. MailPair is just a simple "POJO" that contains Strings for the from and to values, and implements WritableComparable so that it can be serialized across Hadoop nodes and sorted.

@Override
public void map(NullWritable key, BSONObject val, final Context context)
        throws IOException, InterruptedException{
        if (val.containsKey("headers")) {
           BSONObject headers = (BSONObject)val.get("headers");
           if (headers.containsKey("From") && headers.containsKey("To")){
              String from = (String)headers.get("From");
              String to = (String)headers.get("To");
              String[] recips = to.split(",");
              for(int i=0;i<recips.length;i++){
                  String recip = recips[i].trim();
                  if (recip.length() > 0) {
                     context.write(new MailPair(from, recip), new IntWritable(1));
                  }
              }
           }
        }
}

The reduce class will take the collected values for each key, sum them together, and record the output.

    @Override
    public void reduce( final MailPair pKey,
                        final Iterable<IntWritable> pValues,
                        final Context pContext )
            throws IOException, InterruptedException{
        int sum = 0;
        for ( final IntWritable value : pValues ){
            sum += value.get();
        }
        BSONObject outDoc = new BasicDBObjectBuilder().start().add( "f" , pKey.from).add( "t" , pKey.to ).get();
        BSONWritable pkeyOut = new BSONWritable(outDoc);
        pContext.write( pkeyOut, new IntWritable(sum) );
    }

Pig

To accomplish the same with pig, but with much less work:

REGISTER ../mongo-2.10.1.jar;
REGISTER ../core/target/mongo-hadoop-core_cdh4.3.0-1.1.0.jar
REGISTER ../pig/target/mongo-hadoop-pig_cdh4.3.0-1.1.0.jar

raw = LOAD 'file:///Users/mike/dump/enron_mail/messages.bson' using com.mongodb.hadoop.pig.BSONLoader('','headers:[]') ; 
send_recip = FOREACH raw GENERATE $0#'From' as from, $0#'To' as to;
send_recip_filtered = FILTER send_recip BY to IS NOT NULL;
send_recip_split = FOREACH send_recip_filtered GENERATE from as from, FLATTEN(TOKENIZE(to)) as to;
send_recip_split_trimmed = FOREACH send_recip_split GENERATE from as from, TRIM(to) as to;
send_recip_grouped = GROUP send_recip_split_trimmed BY (from, to);
send_recip_counted = FOREACH send_recip_grouped GENERATE group, COUNT($1) as count;
STORE send_recip_counted INTO 'file:///tmp/enron_emailcounts.bson' using com.mongodb.hadoop.pig.BSONStorage;
Clone this wiki locally