PageRank on Flights Dataset

MongoDB

#Releases

By Sweet Song and Daniel Alabi, MongoDB Summer Interns for 2013

This is the second of three blog posts from this summer internship project showing how to answer questions concerning big datasets stored in MongoDB using MongoDB’s frameworks and connectors.

Having done some basic analysis on the Flights dataset (mostly using the MongoDB aggregation framework), we moved on to do some more advanced analysis on this dataset. We settled on computing the PageRank of all airports in the Flights dataset. The PageRank of nodes in a network is often computed iteratively. This process can easily be parallelized and often is. We can utilize MongoDB to compute the PageRank of nodes in a network in several ways. Here are the two options we considered:

  1. We can use the MongoDB MapReduce framework, which since version 2.4 uses the V8 JavaScript engine. Furthermore, since MongoDB is known for its robust sharding capabilities, we can increase the performance of query operations by setting up a MongoDB sharded cluster for our dataset. This is essential for really large working datasets.

  2. The Hadoop open-source framework is well-known for its robust distributed data processing features. MongoDB interfaces with hadoop via the Mongo-Hadoop connector.

For this particular dataset, we opted for (1) since the Flights dataset has only 319 airports. Regardless, there were 4,601 total weighted edges among USA commercial airports. The weight of an edge between any two airports was calculated using the 6,155,752 recorded trips in the flights collection.

Making a Graph of Airports

The airports dataset is fairly connected, with only one airport receiving flights in the past year without any domestic departures. Most flights out of Puerto Rico are considered international flights; as a result, our dataset didn’t have any recorded domestic flights in that year. This would be a black hole for any PageRank that goes to that airport. A more thorough explanation can be found here. Therefore, we removed that singular airport in Puerto Rico from our airports graph.

From the previous analysis, we had put the Flights dataset in a flights collection in the flying database. An entry looks like this:

{
  "_id" : ObjectId("51bf..."),
  ...
  "origAirportId" : 12478,
  "origStateId" : "NY",
  ...
  "destAirportId" : 12892,
  "destStateId" : "CA",
  ...
}

For each document, we create (or modify) at least one node that keeps track of this “edge”:

{
  "_id" : "12478",
  "value" : {
    "pg" : (1 / NUM_OF_AIRPORTS_IN_DB),
    "prs" : {
      "12892" : (NUM_OF_FLIGHTS_FROM_12478_TO_12892 / NUM_OF_FLIGHTS_FROM_12478),
      ...
   } 
 }
}

where NUM_OF_AIRPORTS_IN_DB is the total number of airports in the Flights dataset which corresponds to the number of nodes in the network. NUM_OF_FLIGHTS_FROM_12478 is the total number of flights leaving from airport with airportId=12478. NUM_OF_FLIGHTS_FROM_12478_TO_12892 is the number of flights that leave the airport with airportId=12478 and arrive at the airport with airportId=12892. pg is the current PageRank of an airport; prs is a Map of <aId, pr> where pr is the probability of a flight going from the airport specified by _id to an airport identified by aId. For example, NUM_OF_FLIGHTS_FROM_12478_TO_12892/NUM_OF_FLIGHTS_FROM_12478 is the probability of transitioning from airport with airportId=12478 to airport with airportId=12892.

We wrote preformat.py to create the graph that contains information about the probability of every node in the graph transitioning to another. The resulting graph was stored in an fpg_0 collection (Flights PageRank 0) with 318 nodes.

MongoDB MapReduce

Next, we wrote some JavaScript code to calculate PageRank on the graph stored in the database. The goal was to create a new collection fpg_i for every ith iteration of PageRank. Every iteration is a call on oneiteration() in iteration.js consists of a map and a reduce function. The PageRank algorithm will stop once the average percentage change of the PageRank values for all nodes drops below 0.1%. The map function looks like this:

var map = function() {
    // For each node that is reachable from this node, give it the 
    // appropriate portion of my pagerank
    for (var toNode in this["value"]["prs"]) {
        emit(toNode, {totalNodes : 0.0
                  , pg : this["value"]["prs"][toNode] * this["value"]["pg"]
                  , prs : {}
                  , diff : 0.0
                  , prevpg : 0.0});
    }
<pre><code>// Pass the previous pagerank and the probability matrix to myself
emit(this["_id"], {totalNodes: this["value"]["totalNodes"]
               , pg: 0.0
               , prs: this["value"]["prs"]
               , diff: 0.0
               , prevpg: this["value"]["pg"]});

// Reduce won't be called on a key unless there's more than one value for that key
// So this is just an extra emit to make sure that reduce is called
emit(this["_id"], {totalNodes: 0.0
               , pg : 0.0
               , prs : {} 
               , diff : 0.0
               , prevpg : 0.0});

};

The map function considers every document (corresponding to an airport) in the current fpg_i. For each airport (call this x), it emits its airport ID (stored in _id) and passes the prs and prevpg (previous pg) information, for use in the next iteration of PageRank. Then, it passes a portion of x’s PageRank to every airport that x links to.

The reduce function looks like this:

var reduce = function(airportId, values) {
    var pg = 0
    , diff = 0
    , prs = {} 
    , prevpg = 0 
    , beta = 0.9
    , totalNodes = 0;
<pre><code>for (var i in values) {

// Retrieve the previous pagerank and the probability matrix prevPRS = values[i]["prs"] for (var key in prevPRS) { prs[key] = prevPRS[key]; } prevpg += values[i]["prevpg"]; // Summation of the pagerank pg += values[i]["pg"]; totalNodes += values[i]["totalNodes"]; }

diff = Math.abs(prevpg - pg) / prevpg;
return {"totalNodes" : totalNodes
    , "pg" : pg 
    , "prs" : prs
    , "diff" : diff
    , "prevpg" : prevpg};

};

The reduce function has two duties:

  1. Collect the prs and prevpg information for each node;

  2. Accumulate the total PageRank score sent to each node.

Finally, db["fpg<em>"+i].mapReduce(map, reduce, {out: "fpg</em>"+(i+1)}); runs MapReduce on the fpg_i collection using the map and reduce functions defined below and stores the result (in the same format as fpg<em>i</em>) into fpg(i+1).

We keep applying the MapReduce operations until the PageRank of the nodes eventually converges. This happens when the average percentage change of pg for each node is less than a certain threshold (0.1% in our case). The execution of our implementation of the PageRank algorithm took 6.203 seconds, having converged after 20 iterations.

PageRank Result and Interpretation

The 10 airports with the most PageRank are:

1.{ pg: 0.06370586088275128,
    airportCode: "ATL",
    airportState: "Georgia",
    airportStateId: "GA",
    airportCity: "Atlanta, GA" },
2.{ pg: 0.04987817077679942,
    airportCode: "ORD",
    airportState: "Illinois",
    airportStateId: "IL",
    airportCity: "Chicago, IL" },
3.{ pg: 0.04484114423869301,
    airportCode: "DFW',
    airportState: "Texas",
    airportStateId: "TX",
    airportCity: "Dallas/Fort Worth, TX" },
4.{ pg: 0.0375874819401995,
    airportCode: "DEN",
    airportState: "Colorado",
    airportStateId: "CO",
    airportCity: "Denver, CO" },
5.{ pg: 0.035847669686020475,
    airportCode: "LAX",
    airportState: "California",
    airportStateId: "CA",
    airportCity: "Los Angeles, CA" },
6.{ pg: 0.029359141715724606,
    airportCode: "IAH",
    airportState: "Texas",
    airportStateId: "TX",
    airportCity: "Houston, TX" },
7.{ pg: 0.029269624393415964,
    airportCode: "PHX",
    airportState: "Arizona",
    airportStateId: "AZ",
    airportCity: "Phoenix, AZ" },
8.{ pg: 0.027586105077479,
    airportCode: "SFO",
    airportState: "California",
    airportStateId: "CA",
    airportCity: "San Francisco, CA" },
9.{ pg: 0.022826269022159618,
    airportCode: "LAS",
    airportState: "Nevada",
    airportStateId: "NV",
    airportCity: "Las Vegas, NV" },
10.{ pg: 0.022075486537264547,
    airportCode: "CLT",
    airportState: "North Carolina",
    airportStateId: "NC",
    airportCity: "Charlotte, NC" }

The outcome matches our intuition that the airports with the most flights would accumulate most of the PageRank. In general, the nodes in a weighted graph with the most PageRank will be the ones with a greater ratio of incoming weight to outgoing weight.

Below is a map of the USA that illustrates the PageRank of all airports in the Flights dataset. Click on the image below to see the interactive map. The bigger the circle on the airport, the larger its PageRank. Hover around a circle to see the full name of an airport, its airport code, and the percentage of the total PageRank the airport accumulated.

Challenges/Lessons Learned

Insert vs. Update

Initially, we envisioned iterations.js to merely update the pg and prevpg of the PageRank collection instead of outputting to a new collection. However, updates were significantly slower than inserts into a new collection, even though we already had indexes on the pg and prevpg fields. We learned that, in general, updates in really large collections are significantly slower than insertions into a new collection. This preference of inserts over updates would be common in our other attempts.

Flights Dataset has no information on International Flights

Only domestic flights are present in our Flights dataset. Perhaps, if international flights were included, JFK, O'Hare, and San Francisco airports would have the most PageRank. Also, our map does not show the USA states and territories of Alaska, Hawaii, and Guam. If they were included, then the continental USA would have been too small to distinguish between individual airports.

Relatively small number of nodes in graph

Even though our initial Flights dataset contained 6,155,748 documents (corresponding to domestic flights), the resulting airports graph had only 318 documents (corresponding to airports/nodes). This is why the MongoDB MapReduce framework was very fast and converged after a few seconds and after less than 20 iterations. Perhaps, it might take a longer time before it converged if run on a dataset with more nodes (more airports).

The next dataset we’ll use is the Twitter Memes dataset. This dataset will have at least 1 million nodes (after pre-processing) that correspond to web pages on the Internet. Performance analysis based on the PageRank algorithm is more easily done on datasets with more nodes.