Posts tagged:

MapReduce

PageRank on Flights Dataset

Sep 3 • Posted 10 months ago

By Sweet Song and Daniel Alabi, MongoDB Summer Interns for 2013

This is the second of three blog posts from this summer internship project showing how to answer questions concerning big datasets stored in MongoDB using MongoDB’s frameworks and connectors.

Having done some basic analysis on the Flights dataset (mostly using the MongoDB aggregation framework), we moved on to do some more advanced analysis on this dataset. We settled on computing the PageRank of all airports in the Flights dataset. The PageRank of nodes in a network is often computed iteratively. This process can easily be parallelized and often is. We can utilize MongoDB to compute the PageRank of nodes in a network in several ways. Here are the two options we considered:

  1. We can use the MongoDB MapReduce framework, which since version 2.4 uses the V8 JavaScript engine. Furthermore, since MongoDB is known for its robust sharding capabilities, we can increase the performance of query operations by setting up a MongoDB sharded cluster for our dataset. This is essential for really large working datasets.

  2. The Hadoop open-source framework is well-known for its robust distributed data processing features. MongoDB interfaces with hadoop via the Mongo-Hadoop connector.

For this particular dataset, we opted for (1) since the Flights dataset has only 319 airports. Regardless, there were 4,601 total weighted edges among USA commercial airports. The weight of an edge between any two airports was calculated using the 6,155,752 recorded trips in the flights collection.

Making a Graph of Airports

The airports dataset is fairly connected, with only one airport receiving flights in the past year without any domestic departures. Most flights out of Puerto Rico are considered international flights; as a result, our dataset didn’t have any recorded domestic flights in that year. This would be a black hole for any PageRank that goes to that airport. A more thorough explanation can be found here. Therefore, we removed that singular airport in Puerto Rico from our airports graph.

From the previous analysis, we had put the Flights dataset in a flights collection in the flying database. An entry looks like this:

For each document, we create (or modify) at least one node that keeps track of this “edge”:

where NUM_OF_AIRPORTS_IN_DB is the total number of airports in the Flights dataset which corresponds to the number of nodes in the network. NUM_OF_FLIGHTS_FROM_12478 is the total number of flights leaving from airport with airportId=12478. NUM_OF_FLIGHTS_FROM_12478_TO_12892 is the number of flights that leave the airport with airportId=12478 and arrive at the airport with airportId=12892. pg is the current PageRank of an airport; prs is a Map of <aId, pr> where pr is the probability of a flight going from the airport specified by _id to an airport identified by aId. For example, NUM_OF_FLIGHTS_FROM_12478_TO_12892/NUM_OF_FLIGHTS_FROM_12478 is the probability of transitioning from airport with airportId=12478 to airport with airportId=12892.

We wrote preformat.py to create the graph that contains information about the probability of every node in the graph transitioning to another. The resulting graph was stored in an fpg_0 collection (Flights PageRank 0) with 318 nodes.

MongoDB MapReduce

Next, we wrote some JavaScript code to calculate PageRank on the graph stored in the database. The goal was to create a new collection fpg_i for every ith iteration of PageRank. Every iteration is a call on oneiteration() in iteration.js consists of a map and a reduce function. The PageRank algorithm will stop once the average percentage change of the PageRank values for all nodes drops below 0.1%. The map function looks like this:

The map function considers every document (corresponding to an airport) in the current fpg_i. For each airport (call this x), it emits its airport ID (stored in _id) and passes the prs and prevpg (previous pg) information, for use in the next iteration of PageRank. Then, it passes a portion of x's PageRank to every airport that x links to.

The reduce function looks like this:

The reduce function has two duties:

  1. Collect the prs and prevpg information for each node;

  2. Accumulate the total PageRank score sent to each node.

Finally, db["fpg_"+i].mapReduce(map, reduce, {out: "fpg_"+(i+1)}); runs MapReduce on the fpg_i collection using the map and reduce functions defined below and stores the result (in the same format as fpg_i) into fpg_(i+1).

We keep applying the MapReduce operations until the PageRank of the nodes eventually converges. This happens when the average percentage change of pg for each node is less than a certain threshold (0.1% in our case). The execution of our implementation of the PageRank algorithm took 6.203 seconds, having converged after 20 iterations.

PageRank Result and Interpretation

The 10 airports with the most PageRank are:

The outcome matches our intuition that the airports with the most flights would accumulate most of the PageRank. In general, the nodes in a weighted graph with the most PageRank will be the ones with a greater ratio of incoming weight to outgoing weight.

Below is a map of the USA that illustrates the PageRank of all airports in the Flights dataset. Click on the image below to see the interactive map. The bigger the circle on the airport, the larger its PageRank. Hover around a circle to see the full name of an airport, its airport code, and the percentage of the total PageRank the airport accumulated.

Challenges/Lessons Learned

Insert vs. Update

Initially, we envisioned iterations.js to merely update the pg and prevpg of the PageRank collection instead of outputting to a new collection. However, updates were significantly slower than inserts into a new collection, even though we already had indexes on the pg and prevpg fields. We learned that, in general, updates in really large collections are significantly slower than insertions into a new collection. This preference of inserts over updates would be common in our other attempts.

Flights Dataset has no information on International Flights

Only domestic flights are present in our Flights dataset. Perhaps, if international flights were included, JFK, O’Hare, and San Francisco airports would have the most PageRank. Also, our map does not show the USA states and territories of Alaska, Hawaii, and Guam. If they were included, then the continental USA would have been too small to distinguish between individual airports.

Relatively small number of nodes in graph

Even though our initial Flights dataset contained 6,155,748 documents (corresponding to domestic flights), the resulting airports graph had only 318 documents (corresponding to airports/nodes). This is why the MongoDB MapReduce framework was very fast and converged after a few seconds and after less than 20 iterations. Perhaps, it might take a longer time before it converged if run on a dataset with more nodes (more airports).

The next dataset we’ll use is the Twitter Memes dataset. This dataset will have at least 1 million nodes (after pre-processing) that correspond to web pages on the Internet. Performance analysis based on the PageRank algorithm is more easily done on datasets with more nodes.

MongoDB Connector for Hadoop

Aug 7 • Posted 11 months ago

by Mike O’Brien, MongoDB Kernel Tools Lead and maintainer of Mongo-Hadoop, the Hadoop Adapter for MongoDB

Hadoop is a powerful, JVM-based platform for running Map/Reduce jobs on clusters of many machines, and it excels at doing analytics and processing tasks on very large data sets.

Since MongoDB excels at storing large operational data sets for applications, it makes sense to explore using these together - MongoDB for storage and querying, and Hadoop for batch processing.

The MongoDB Connector for Hadoop

We recently released the 1.1 release of the MongoDB Connector for Hadoop. The MongoDB Connector for Hadoop makes it easy to use Mongo databases, or MongoDB backup files in .bson format, as the input source or output destination for Hadoop Map/Reduce jobs. By inspecting the data and computing input splits, Hadoop can process the data in parallel so that very large datasets can be processed quickly.

Read more

Hadoop Streaming Support for MongoDB

Jun 7 • Posted 2 years ago

MongoDB has some native data processing tools, such as the built-in Javascript-oriented MapReduce framework, and a new Aggregation Framework in MongoDB v2.2. That said, there will always be a need to decouple persistance and computational layers when working with Big Data.

Enter MongoDB+Hadoop: an adapter that allows Apache’s Hadoop platform to integrate with MongoDB.

Using this adapter, it is possible to use MongoDB as a real-time datastore for your application while shifting large aggregation, batch processing, and ETL workloads to a platform better suited for the task.

          

Well, the engineers at 10gen have taken it one step further with the introduction of the streaming assembly for Mongo-Hadoop.

What does all that mean?

The streaming assembly lets you write MapReduce jobs in languages like Python, Ruby, and JavaScript instead of Java, making it easy for developers that are familiar with MongoDB and popular dynamic programing languages to leverage the power of Hadoop.

                    

It works like this:

Once a developer has Java installed and Hadoop ready to rock they download and build the adapter. With the adapter built, you compile the streaming assembly, load some data into Mongo, and get down to writing some MapReduce jobs.

The assembly streams data from MongoDB into Hadoop and back out again, running it through the mappers and reducers defined in a language you feel at home with. Cool right?

Ruby support was recently added and is particularly easy to get started with. Lets take a look at an example where we analyze twitter data.

Import some data into MongoDB from twitter:

This script curls the twitter status stream and and pipes the json into mongodb using mongoimport. The mongoimport binary has a couple of flags: “-d” which specifies the database “twitter” and -c which specifies the collection “in”.

Next, write a Mapper and save it in a file called mapper.rb:

The mapper needs to call the MongoHadoop.map function and passes it a block. This block takes an argument “docuement” and emits a hash containing the user’s timezone and a count of 1.

Now, write a Reducer and save it in a file called reducer.rb:

The reducer calls the MongoHadoop.reduce function and passes it a block. This block takes two parameters, a key and an array of values for that key, reduces the values into a single aggregate and emits a hash with the same key and the newly reduced value.

To run it all, create a shell script that executes hadoop with the streaming assembly jar and tells it how to find the mapper and reducer files as well as where to retrieve and store the data:

Make them all executable by running chmod +x on the all the scripts and run twit.sh to have hadoop process the job.

MongoDB Driver Releases: April

May 8 • Posted 2 years ago

We’ve had a big month with updates and improvements to our drivers.  Here’s a summary:

Operations in the New Aggregation Framework

Jan 17 • Posted 2 years ago

Available in 2.1 development release. Will be stable for production in the 2.2 release

Built by Chris Westin (@cwestin63)

MongoDB has built-in MapReduce functionality that can be used for complex analytics tasks. However, we’ve found that most of the time, users need the kind of group-by functionality that SQL implementations have. This can be implemented using map/reduce, but doing so is more work than it was in SQL. In version 2.1, MongoDB is introducing a new aggregation framework that will make it much easier to obtain the kind of results SQL group-by is used for, without having to write custom JavaScript.

Read more
blog comments powered by Disqus