Tiered Storage Models in MongoDB: Optimizing Latency and Cost

May 14 • Posted 4 months ago

By Rohit Nijhawan, Senior Consulting Engineer at MongoDB with André Spiegel and Chad Tindel

For a user-facing application, speed and uptime are critical to success. There are a number of ways you can tune your application and hardware setup to provide the best experience for your customers — the trick is doing so at optimal cost. Here we provide an example for improving performance and lowering costs with MongoDB using Tiered Storage, a method of prioritizing data storage based on latency requirements.

In this example, we will be segmenting data by date: recent data is more frequently accessed and should exhibit lower latency than less recent data. However, the idea applies to other ways of segmenting data, such as location, user, source, size, or other criteria. This approach takes advantage of a powerful feature in MongoDB called tag-aware sharding that has been available since MongoDB 2.2.

Example Application: Insurance Claims

In many applications, low-latency access to data becomes less important as data ages. For example, an insurance company might prioritize access to claims from the last 12 months. Users should be able to view recent claims quickly, but once claims are more than a year old they tend to be accessed much less frequently, and the latency requirements tend to become less demanding.

By creating tiers of storage with different performance and cost profiles, the insurance company can provide a better experience for users while optimizing their costs. Older claims can be stored in a storage tier with more cost-effective hardware such as commodity hard drives. More recent data can be stored in a high-performance storage tier that provides lower latency such as SSD. Because the majority of the claims are more than a year old, storing older data in the lower-cost tier can provide significant cost advantages. The insurance company can optimize their hardware spread across the two tiers, providing a great user experience at an optimized cost point.

The requirements for this application can be summarized as:

The trailing 12 months of claims should reside on faster storage tier Claims over a year old should move to slower storage tier Over time new claims arrive, and older claims need to move from the faster tier to the slower tier

For simplicity, throughout this overview, we’ll distinguish the claims data by “current” and “tier-2” data.

Building Your Own Process: An Operational Headache

One approach to these requirements is use periodic batch jobs: selecting data, loading it into the archive, and erasing it from the faster storage. However, this is inherently complex:

  • The move process must be carefully coded to fail gracefully. In the event that a load fails, you don’t want to delete the original data!
  • If the data to be moved is large, you may wish to throttle the operations.
  • If moves succeed partially, you have to retry the unfinished data.
  • Unless you plan on halting your application during the move (generally unacceptable), your application needs custom code to find the data before, during, and after the move.
  • Your application needs to understand the physical location of the data, which unnecessarily complicates your code to the partitioning logic.

Furthermore, introducing another custom component to your operations requires additional maintenance and monitoring.

It’s an operational headache that many teams are forced to endure, but there is a simpler way: have MongoDB handle the load of migrating documents from the recent storage machines to the tier 2 storage machines, transparently. As it turns out, you can easily implement this approach with a feature called Tag-Aware Sharding.

The MongoDB Way: Tag-aware Sharding

MongoDB provides a feature called sharding to scale systems horizontally across multiple machines. Sharding is transparent to your application - whether you have 1 or 100 shards, your application code is the same. For a comprehensive description of sharding please see the Sharding Guide.

A key component of sharding is a process called the balancer. As collections grow, the balancer operates in the background to carefully move documents between shards. Normally the balancer works to achieve a uniform distribution of documents across shards. However, with tag-aware sharding we can create policies that affect where documents are stored. This feature can be applied in many use cases. One example is to keep user data in data centers that are near the user. In our application, we can use this feature to keep current data on our fast servers, and tier 2 data on cheaper, slower servers.

Here’s how it works:

  • Shards are assigned tags. A tag is an alphanumeric alias like “London-DC”.
  • Unique shard key ranges are ‘pinned’ to tags.
  • During normal balancing operations, chunks migrate only to shards whose tag is associated with a key range which contains the chunk’s key range*.
  • There are a few subtleties regarding what happens when a chunk’s key range overlaps more than one tag range. Please read the documentation carefully regarding this particular case

This means that we can assign the “tier-2” tag to shards running on slow servers and “current” tags to shards running on fast servers, and the balancer will handle migrating the data between tiers automatically. What’s great is that we can keep all the data in one database, so our application code doesn’t need to change as data moves between storage tiers.

Determining the shard key

When you query a sharded collection, the query router will do its best to only inspect the shards holding your data, but it can only do this if you provide the shard key as part of your query. (See Sharded Cluster Query Routing for more information.)

So we need to make sure that the we look up documents by the shard key. We also know that time is the basis for determining the location of documents in our two storage tiers. Accordingly, the shard key must contain an explicit timestamp. In our example, we’ll be using Enron’s email dataset, and we’ll set the top-level “date” as the shard key. Here’s a sample document:

Because the time is stored in the most significant digits of the date, messages from any given day will numerically precede messages from subsequent days.

Implementation

Here are the the steps to set up this system:

Set up an empty, sharded MongoDB cluster Create a target database to host the sharded collection Assign tags to different shards corresponding to the storage tiers Assign tag ranges to the shards Load data into the MongoDB Cluster

Set up the cluster The first thing you will want to do is set up your sharded cluster. You can see more information on how to set this up here.

In this case we will have a database called “enron” and a collection called “messages” which holds part of the Enron email corpus. In this example, we’ve set up a cluster with three shards. The first, shard0000, is optimized for low-latency access to data. The other two, shard0001 and shard0002, use more cost effective hardware for data that is older than the identified cutoff date.

Here’s our sharded cluster. These are empty machines with no data:

Adding the tags We can “tag” each of these shards to associate them with documents that should belong to our “current” tier or those that should belong to “tier-2.” In the absence of tags and range based tags, balancing will try to ensure that the number of chunks on each shard are equal without regard to any other data in the fields. Before we add the data to our collection, let’s tag shard0000 as “current” and the other two as “tier-2”:

Now we can verify our tags by calling sh.status():

Next, we need to set up a database and collection for the Enron emails. We’ll set up a new database ‘enron’ with a collection called ‘messages’ and enable sharding on that collection:

Since we’re going to shard the collection, we’ll need to set up a shard key. We will use the ‘date’ field as our shard key since this is the field that will define how the documents are distributed across shards:

Defining the cutoff date between tiers The cutoff point between “current” data and “tier-2” data is a point in time that we will update periodically to keep the most recent documents in our “current” shard. We will start with a cutoff of July 1, 2001, saved as an ISO Date ISODate(“2001-07-01”). Once we add the data to our collection, we will set this as the tag range. Going forward, when we add documents to the “messages” collection, any documents newer than July 1, 2001 will end up on the “current” shard, and documents older than that will end up on the “tier-2” shard.

It’s important that the two ranges overlap at exactly the same point in time. The lower bound of a tag range is inclusive, and the upper bound is exclusive. This means a document that has an date of exactly ‘ISODate(“2001-07-01”)’ will go on the “current” shard, not the “tier-2” shard.

Below you will see each of the shard’s new tag ranges:

As a final check, look in the config database for the tag range definitions.

Now, that all the shards and ranges are defined, we are ready to load the message data into the server. The collection will follow the instructions given by the tag ranges and land on the correct machines.

Now, let’s check the sharding status to see where the documents reside

That’s it! The mongos process automatically moves documents to comply with the tag ranges. In this example, it took all documents still on the “current” shard with an ISODate older than ISODate(“2001-07-01T00:00:00Z”) and move them to the “tier-2” shard.

The tag ranges must be updated on a regular basis to keep the cutoff point at the correct interval of time in the past (1 year, in our case). In order to do this, both ranges need to be updated. To perform this change the balancer should temporarily be disabled, so there is no point where the ranges overlap. Stopping the balancer temporarily is a safe operation - it will not affect the application or the experience of users.

If you wanted to move the cutoff back another month, to August 1, 2001, you just need to follow these three steps:

Stop the balancer sh.setBalancerState(false) Create a chunk split at August 1 sh.splitAt('enron.messages', {"date" : ISODate("2001-08-01")}) Move the cutoff date to ISODate(“2001-08-01T00:00:00Z”) var configdb=db.getSiblingDB("config"); configdb.tags.update({tag:"tier-2"},{$set:{'max.date':ISODate("2001-08-01")}}) configdb.tags.update({tag:"current"},{$set:{'min.date':ISODate("2001-08-01")}}) Re-start the balancer sh.setBalancerState(true) Verify the sharding status

By updating the chunk split to August 1, we have migrated all the documents with a date after July 1 but before August 1 from the “current” shard to the “tier-2” shards. The good news is that we were able to perform this operation without changing our application code and with no database downtime. We can also see that it would be simple to schedule this process to run automatically through an external process.

From Operational Headache to Simplicity

The end result is one collection spread across three shards and two different storage systems. This solution allows you to lower your storage costs without adding complexity to the architecture of your system. Instead of a complex setup with different databases on different machines we have one database to query, and instead of a data migration we update some simple rules to control the location of data in the system.

Like what you see? Sign up for the MongoDB Newsletter

Introducing mtools

May 8 • Posted 4 months ago

By Thomas Rueckstiess, Kernel Program Manager at MongoDB

mtools is a collection of helper scripts, implemented in Python, to parse and filter MongoDB log files (both for mongod and mongos), to visualize information from log files and to quickly set up complex MongoDB test environments on a local machine.

I started working on mtools a year ago, when I realized I would automate and script most of my daily tasks as an Engineer at MongoDB. Since then, mtools has grown to a suite of flexible, useful command line tools that are being used by many of our Engineers internally, as well as MongoDB customers and users, to diagnose the root cause of system issues.

If you find yourself looking at MongoDB log files to identify system and performance issues, then I encourage you to try mtools as well.

What’s in the box?

mtools in its current version 1.1.4 consists of 5 individual scripts: mloginfo, mlogfilter, mplotqueries, mlogvis and mlaunch.

  • mloginfo should be your first stop on the log file analysis. This script will parse the file quickly and output general information about its contents, including start and end date and time, line numbers, version and whether the file came from a mongos or mongod (if available in the file). In addition, you can request certain “sections” of additional information; currently those are “queries”, “connections”, “restarts” and “distinct”.

  • mlogfilter helps to narrow down the search in log files. The script lets you filter on attributes of log messages, like their namespace (database and collection names), their type of operation (queries, inserts, updates, commands, etc.) or by individual connection. You can also search for slow operations by setting a threshold, identify collection scans (those are the queries not using an index) and other properties. Additional features include slicing the log files by time (with flexible date/time parsing), merging files, shifting them to different time zones or converting timestamp formats, and exporting them to JSON. The key property of mlogfilter is that the output format always remains the same (log lines), so you can pipe the output to another instance of mlogfilter, to the grep command or to other scripts like mplotqueries.

  • mplotqueries takes a log file (mlogfiltered or not) and presents the information visually in various ways. There are a number of options for graph types, such as scatter plots (showing all operations over time vs. their duration), histograms, event and range plots, and other more specialized graphs like connection churn or replica set changes. Independent of graph type, you can assign a specific color to different class categories.

  • mlogvis is mplotqueries’ little brother, it is very similar in its functionality, but provides a web-based alternative using the d3.js javascript visualization library. This is particularly useful if the dependencies required by mplotqueries are not installed/available, or if you want to create a self-contained interactive graph that can be shared with others, such as customers or colleagues. mlogvis will create a single .html file that can be shared, since it loads the d3.js library dynamically.

  • mlaunch is a little different from the other scripts, and actually has nothing to do with log file parsing. mlaunch spins up any number of mongodb nodes on your local machine, either as a stand-alone, as replica sets or sharded clusters. This is useful if you want to do testing or reproduce issues locally. Rather than setting this up manually, mlaunch will start the processes and connect the replica sets or shards together. Within a few seconds, you can have a complex environment running, like a 5 shard cluster, each shard consisting of a replica set, authentication enabled, and any kinds of individual flags you want to pass onto the processes. mlaunch also has options to start and stop individual instances or groups, and to view which ones are running in the current environment and which ones are down.

How does it work?

Rather than going through all the features of each of the scripts, I’d just like to demonstrate two basic use cases. For a full list of features you can visit the mtools wiki, which contains the manual and many usage examples.

Use Case 1: Profiling your Queries with mloginfo

You have a number of slow queries running against MongoDB that are affecting the performance of the database. To get an idea of where MongoDB is slowing down, as a first step take a look at the “queries” section of mloginfo. Here is an example output, created with the following command:

Each line (from left to right) shows the namespace, the query pattern, and various statistics of this particular namespace/pattern combination. The rows are sorted by the “sum” column, descending. Sorting by sum is a good way to see where the database spent most of its time overall. In this example, we see that around half the total time is spent on $ne-type queries in the serverside.scrum_master collection. $ne queries are known to be inefficient since these queries cannot use an index, resulting in a high number of documents scanned. In fact, all of the queries took at least 15 seconds (“min” column). The “count” column also shows that only 20 of the queries were issued, yet these queries contributed to a large amount of the total time spent, more than double the time of the 804 email queries on serverside.user.

When optimizing queries and indexes, starting from the top of this list is a good idea as these optimizations will result in the highest gains in terms of performance.

Use Case 2: Visualizing Log Files with mplotqueries

Another way of looking at query performance and other operations is to visualize them graphically. mplotqueries’ scatter plot (the default) shows the duration of any operation (y-axis) over time (x-axis) and makes it easy to spot long-running operations. The following plot is generated with

mplotqueries mongod.log

and then press L for “logarithmic” y-axis view:

While most of the operations are sub-second (below the 10^3 ms mark), the blue dots immediately stand out, reaching up to the hundreds and thousands of seconds. Clicking on one of the blue dots prints out the relevant log line to stdout:

The getlasterror command is used for write concern. In this case, it blocked until the write was replicated to a majority of nodes in the replica set, which took 16 minutes. That is of course an issue, and because this is a command and not a query (or the query part of an update), it didn’t show up in the previous use case with mloginfo --queries.

To investigate this further, we can overlay the current plot with an “rsstate” plot, that shows replica set status changes over time. The following two commands create an overlay of the two plots:

This shows that for each of the blocking “majority” getlasterrors, replica set members are unavailable. The red vertical lines represent a node being DOWN, preceding the yellow lines for a node being in SECONDARY state again, at which point the getlasterror commands finally succeed.

From here, the next step would be to look at all the log files of the replica set at one of the incidents and investigate why the secondaries became unavailable:

This last command merges the log files of the three replica set members by time, each line prefixed with the filename, slices out a 5-minute window at the first instance of the issue and prints the lines back to stdout.

What’s Next?

This should give you a sense of how to use mtools for diagnosing and debugging issues affecting your MongoDB system. You can organize and visualize data in a number of ways, form a hypothesis, filter out noise and dig deeper into issues affecting your deployment, all from MongoDB log files.

mtools contains many more useful features that our Support team uses daily in working through customer cases. The best way to learn how you can leverage these scripts is to download and install mtools and follow some of the examples on the mtools wiki page. mtools is open source and available for download on github. It is also in the PyPI package index and can be installed via pip. If you have any questions, bug reports or feature requests, simply go to the mtools github issues page and open an issue.

My colleague Asya Kamsky (from askasya.com) will show some more examples on how mtools can be useful for diagnosing and troubleshooting in her talk Diagnostics and Debugging at MongoDB World. I’ll be in the “Ask the Experts” sessions, so if you have any questions you can come ask in the Ask the Experts room. You can use my discount code “25ThomasRueckstiess” for 25% off tickets.

MongoDB’s New Bulk API

May 6 • Posted 4 months ago

By Christian Kvalheim, Driver Lead and Node.js Driver Maintainer at MongoDB

The New Bulk API

One of the core new features in MongoDB 2.6 is the new bulk write operations. All the drivers include a new bulk api that allows applications to leverage these new operations using a fluid style API. Let’s explore the API and how it’s implemented in the Node.js driver.

The API

The API has two core concepts. The ordered and the unordered bulk operation. The main difference is in the way the operations are executed in bulk. In the case of an ordered bulk operation, every operation will be executed in the order they are added to the bulk operation. In the case of an unordered bulk operation however there is no guarantee what order the operations are executed. Later we will look at how each is implemented.

Operations

You can initialize an ordered or unordered bulk operation in the following way.

    var ordered = db.collection('documents').initializeOrderedBulkOp();
    var unordered = db.collection('documents').initializeUnorderedBulkOp();

Both the ordered and unordered instances are bulk operation objects that we can add insert, update and remove operations to. The following operations are valid.

updateOne (update first matching document)

```ordered.find({ a : 1 }).updateOne({$inc : {x : 1}});```

update (update all matching documents)

```ordered.find({ a : 1 }).update({$inc : {x : 2}});```

replaceOne (replace entire document)

```ordered.find({ a : 1 }).replaceOne({ x : 2});```

updateOne or upsert (update first existing document or upsert)

```ordered.find({ a : 2 }).upsert().updateOne({ $inc : { x : 1}});```

update or upsert (update all or upsert)

```ordered.find({ a : 2 }).upsert().update({ $inc : { x : 2}});```

replace or upsert (replace first document or upsert)

```ordered.find({ a : 2 }).upsert().replaceOne({ x : 3 });```

removeOne (remove the first document matching)

```ordered.find({ a : 2 }).removeOne();```

remove (remove all documents matching)

```ordered.find({ a : 1 }).remove();```

insert

```ordered.insert({ a : 5});```

What happens under the covers when you start adding operations to a bulk operation? Let’s take a look at the new write operations to see how it works.

The New Write Operations

MongoDB 2.6 introduces a completely new set of write operations. Before 2.6 all write operations where done using wire protocol messages at the socket level. From 2.6 this changes to using commands.

### Insert Write Command

The insert write commands allow an application to insert batches of documents. Here’s an example:

    {
        insert: 'collection name'
      , documents: [{ a : 1}, ...]
      , writeConcern: {
        w: 1, j: true, wtimeout: 1000
      }
      , ordered: true/false
    }

A couple of things to note. The documents field contains an array of all the documents that are to be inserted. The writeConcern field specifies what would have previously been a getLastError command that would follow the pre 2.6 write operations. In other words there is always a response from a write operation in 2.6. This means that w:0 has different semantics than what one is used to in pre 2.6. In the context w:0 basically means only return an ack without any information about the success or failure of insert operations.

Let’s take a look at the update and remove write commands before seeing the results that are returned when executing these operations in 2.6.

Update Write Command

There are some slight differences in the update write command in comparison to the insert write command. Here’s an example:

    {
        update: 'collection name'
      , updates: [{ 
            q: { a : 1 }
          , u: { $inc : { x : 1}}
          , multi: true/false
          , upsert: true/false
        }, ...]
      , writeConcern: {
        w: 1, j: true, wtimeout: 1000
      }
      , ordered: true/false
    }

The main difference here is that the updates array is an array of update operations where each entry in the array contains the q field that specifies the selector for the update. The u contains the update operation. multi specifies if we will updateOne or updateAll documents that matches the selection. Finally upsert tells the server if it will perform an upsert if the document is not found.

Finally let’s look at the remove write command.

Remove Write Command

The remove write command is very similar to the update write command. Here’s an example:

    {
        delete: 'collection name'
      , deletes: [{ 
            q: { a : 1 }
          , limit: 0/1
        }, ...]
      , writeConcern: {
        w: 1, j: true, wtimeout: 1000
      }
      , ordered: true/false
    }

Similar to the update example, we can see that the entries in the deletes array contain documents with specific fields. The q field is the selector that will match which documents will be removed. The limit field sets the number of elements to be removed. Currently limit only supports two values, 0 and 1. The value 0 for limit removes all documents that match the selector. A value of 1 for limit removes the first matching document only.

Now let’s take a look at how results are returned for these new write commands.

Write Command Results

One of the best new aspects of the new write commands is that they can return information about each individual operation error in the batch. Results are efficient - only information about errors are returned as well as the aggregated counts of successful operations. Here’s an example of a comprehensive* result:

    {
      "ok" : 1,
      "n" : 0,
      "nModified": 1, (Applies only to update)
      "nRemoved": 1, (Applies only to removes)
      "writeErrors" : [
        {
          "index" : 0,
          "code" : 11000,
          "errmsg" : "insertDocument :: caused by :: 11000 E11000 duplicate key error index: t1.t.$a_1  dup key: { : 1.0 }"
        }
      ],
      writeConcernError: {
        code : 22,
        errInfo: { wtimeout : true },
        errmsg: "Could not replicate operation within requested timeout"
      }      
    }

The two most interesting fields here are writeErrors and writeConcernError. If we take a look at writeErrors we can see how it’s an array of objects that include an index field as well as a code and errmsg. The field references the position of the failing document in the original documents, updates or deletes array allowing the application to identify the original batch document that failed.

The Effect of Ordered (true/false)

If ordered is set to true the write operation will fail on the first write error (meaning the first error that fails to apply the operation to memory). If one sets ordered to false the operation will continue until all operations have been executed (potentially in parallel), then return all the results. writeConcernError on the other hand does not stop the processing of a bulk operation if a document fails to be written to MongoDB.

It helps to think of writeErrors as hard errors and writeConcernError as a soft error.

The Special Case of w:0 As I mentioned previously, the semantics for w:0 changed for the write commands. The old style of write operations before 2.6 are a combination of a write wire message and a getLastError command. In the old style w:0 meant that the driver would not send a getLastError command after the write operation.

In 2.6 the new insert/update/delete commands will always respond. While w:0 would not return a result in versions of MongoDB before 2.6, in 2.6 and above it will. However it will truncate all the results and only return if the command ran successfully or failed.

As a result, if you execute.

    {
        insert: 'collection name'
      , documents: [{ a : 1}, ...]
      , writeConcern: {
        w: 0
      }
      , ordered: true/false
    }

All you receive from the server is the result

{ok : 1}

The Implication For The Bulk API

There are some implications to the fact that write commands are not mixed operations but either insert/update or removes. The Bulk API lets you mix operations and then merges the results back into a single result that simulates a mixed operations command in MongoDB. What does that mean in practice. Well let’s look at how node.js implements ordered and unordered bulk operations. Let’s use examples to show what happens.

Ordered Operations

Let’s take the following set of operations:

    var ordered = db.collection('documents').initializeOrderedBulkOp();
    ordered.insert({ a : 1 });
    ordered.find({ a : 1 }).update({ $inc: { x : 1 }});
    ordered.insert({ a: 2 });
    ordered.find({ a : 2 }).remove();
    ordered.insert({ a: 3 });

When running in ordered mode the bulk API guarantees the ordering of the operations and thus will execute this as 5 operations one after the other:

    insert bulk operation
    update bulk operation
    insert bulk operation
    remove bulk operation
    insert bulk operation

We have now reduced the bulk API to performing single operations and your throughput suffers accordingly.

If we re-order our bulk operations in the following way:

    var ordered = db.collection('documents').initializeOrderedBulkOp();
    ordered.insert({ a : 1 });
    ordered.insert({ a: 2 });
    ordered.insert({ a: 3 });
    ordered.find({ a : 1 }).update({ $inc: { x : 1 }});
    ordered.find({ a : 2 }).remove();

The execution is reduced to the following operations one after the other:

    insert bulk operation
    update bulk operation
    remove bulk operation

Thus for ordered bulk operations the order of operations will impact the number of write commands that need to be executed and thus the throughput possible.

Unordered Operations

Unordered operations do not guarantee the execution order of operations. Let’s take the example from above:

    var ordered = db.collection('documents').initializeOrderedBulkOp();
    ordered.insert({ a : 1 });
    ordered.find({ a : 1 }).update({ $inc: { x : 1 }});
    ordered.insert({ a: 2 });
    ordered.find({ a : 2 }).remove();
    ordered.insert({ a: 3 });

The Node.js driver will collect the operations into separate type-specific operations. So we get.

    insert bulk operation
    update bulk operation
    remove bulk operation

In difference to the ordered operation these bulks all get executed in parallel in Node.js and the results then merged when they have all finished.

Takeaway

MongoDB as of 2.6 only allows batches of inserts, updates or removes and not a mixed batch containing all three of the operation types. When performing ordered bulk operation we need to keep this in mind to avoid the scenario above. However for an unordered bulk operation the missing mixed batch type in 2.6 does not impact performance.

Note: Although the Bulk API actually supports downconversion to 2.4 the performance impact is considerable as all operations are reduced to single write operations with a getLastError. It’s recommended to leverage this API primarily with 2.6 or higher.

Like what you see? Sign up to the MongoDB Newsletter and get monthly updates straight to your inbox

Betting the Farm on MongoDB

May 1 • Posted 4 months ago

This is a guest post by Jon Dokulil, VP of Engineering at Hudl. Hudl’s CTO, Brian Kaiser, will be speaking at MongoDB World about migrating from SQL Server to MongoDB

Hudl helps coaches win. We give sports teams from peewee to the pros online tools to make working with and analyzing video easy. Today we store well over 600 million video clips in MongoDB spread across seven shards. Our clips dataset has grown to over 350GB of data with over 70GB of indexes. From our first year of a dozen beta high schools we’ve grown to service the video needs of over 50,000 sports teams worldwide.

Why MongoDB

When we began hacking away on Hudl we chose SQL Server as our database. Our backend is written primarily in C#, so it was a natural choice. After a few years and solid company growth we realized SQL Server was quickly becoming a bottleneck. Because we run in EC2, vertically scaling our DB was not a great option. That’s when we began to look at NoSQL seriously and specifically MongoDB. We wanted something that was fast, flexible and developer-friendly.

After comparing a few alternative NoSQL databases and running our own benchmarks, we settled on MongoDB. Then came the task of moving our existing data from SQL Server to MongoDB. Video clips were not only our biggest dataset, it was also our most frequently-accessed data. During our busy season we average 75 clip views per second but peak at over 800 per second. We wanted to migrate the dataset with zero downtime and zero data loss. We also wanted to have fail-safes ready during each step of the process so we could recover immediately from any unanticipated problems during the migration.

In this post we’ll take a look at our schema design choices, our migration plan and the performance we’ve seen with MongoDB.

Schema Design

In SQL Server we normalized our data model. Pulling together data from multiple tables is SQL’s bread-and-butter. In the NoSQL world joins are not an option and we knew that simply moving the SQL tables directly over to MongoDB and doing joins in code was a bad idea. So, we looked at how our application interacted with SQL and created an optimized schema in MongoDB.

Before I get into the schema we chose, I’ll try to provide context to Hudl’s product. Below is a screenshot of our ‘Library’ page. This is where coaches spend much of their time reviewing and analyzing video.

You see above a video playing and a kind of spreadsheet underneath. The video represents one angle of one clip (many of our teams film two or three angles each game). The spreadsheet contains rows of clips and columns of breakdown data. The breakdown data gives context to what happened in the clip. For example, the second clip was a defensive play from the 30 yard line. It was first and ten and was a run play to the left. This breakdown data is incredibly important for coaches to spot patterns and trends in their opponents play (as well as make sure they don’t have an obvious patterns that could be used against them).

When we translated this schema to MongoDB we wanted to optimize for the most-common operations. Watching video clips and editing clip metadata are our two highest frequency operations. To maximize performance we made a few important decisions.

  1. We chose to encapsulate an entire clip per document. Watching a clip would involve a single document lookup. Because MongoDB stores each document contiguously on disk, it would minimize the number of disk seeks when fetching a clip not in memory, which means faster clip loads.
  2. We denormalized our column names to speed up both writes and reads. Writes are faster because we no longer have to lookup or track Column IDs. A write operation is as simple as:
    db.clips.update({teamId:205, _id:123}, 
    {$set: {'data.PLAY TYPE':'Pass'}}) 
    Reads are also faster because we no longer have to join on the ClipDataColumn table to get the column names. This comes at a cost of greater storage and memory requirements as we store the same column names in multiple documents. Despite that, we felt the performance benefits were worth the cost.

One of the most important considerations when designing a schema in MongoDB is choosing a shard key. Have a good shard key is critical for effective horizontal scaling. Data is stored in shards (each shard is a replica set) and we can add new shards easily as our dataset grows. Replica sets don’t need to know about each other, they are only concerned with their own data. The MongoDB Router (mongos) is the piece that sees the whole picture. It knows which shard houses each document.

When you perform a query against a sharded collection, the shard key is not required. However, there is a cost penalty for not providing the shard key. The key is used to know which shard contains the answer to your query. Without it, the query has to be sent to all shards in your cluster. To illustrate this, I’ve got a four shard cluster. The shard key is TeamId (the property is named ‘t’), and you can see that clips belonging to teams 1-100 live on Shard 1, 101-200 live on Shard 2, etc. Given the query to find clip ‘123’, only Shard 3 will respond with results, but Shards 1, 2 and 4 must also process and execute the query. This is known as a scatter/gather query. In low volume this is ok, but you won’t see the benefits of horizontal scalability if every query has to be sent to all shards. Only when the shard key is provided can the query be sent directly to Shard 3. This is known as a targeted query.

For our Clips collection, we chose TeamId as our shard key. We looked at a few different possible shard keys:

  1. We considered sharding by clipId (_id) but decided against it because we let coaches organize clips into playlists (similar to a song playlist in iTunes or Spotify). While queries to all clips in a playlist are less common than grabbing an individual clip, they are common enough that we wanted it to use a targeted query.
  2. We also considered sharding by the playlist Id, but we wanted the ability for clips to be a part of multiple playlists. The shard key, once set, is immutable. Clips can be added or removed from playlists at any time.
  3. We finally settled on TeamId. TeamId is easily available to us when making the vast majority of our queries to the Clips collection. Only for a few infrequent operations would we need to use scatter/gather queries.

The Transition

As I mentioned, we needed to transition from SQL Server to MongoDB with zero downtime. In case anything went wrong, we needed fallbacks and fail-safes along the way. Our approach was two-fold. In the background we ran a process that ‘fork-lifted’ data from SQL Server to MongoDB. While that ran in the background, we created a multiplexed DAO (data access object, our db abstraction layer) that would only read from SQL but would write to both SQL and MongoDB. That allowed us to batch-move all clips without having to worry about stale data. Once the two databases were completely synced up, we switched over to perform all reads from MongoDB. We continued to dual-write so we could easily switch back to SQL Server if problems arose. After we felt confident in our MongoDB solution, we pulled the plug on SQL Server.

In step one we took a look at how we read and wrote clip data. That let us design an optimal MongoDB schema. We then refactored our existing database abstraction layer to use data-structures that matched the MongoDB schema. This gave us a chance to prove out the schema ahead of time.

Next we began sending write operations to both SQL and MongoDB. This was an important step because it allowed our data fork-lifting process work through all clips one after another while protecting us from data corruption.

The data fork-lifting process took about a week to complete. The time was due to both the large size of the dataset and our own throttling logic. We throttled the rate of data migration to minimize the impact on normal operations. We didn’t want coaches to feel any pain during this migration.

After the data fork-lift was complete we began the process of reading from MongoDB. We built in the ability to progressively send more and more read traffic to MongoDB. That allowed us to gain confidence in our code and the MongoDB cluster without having to switch all-at-once. After a while with dual writes but all MongoDB reads, we turned off dual writes and dropped the tables in SQL Server. It was both a scary moment (sure, we had backups… but still!) and very satisfying. Our SQL database size was reduced by over 80GB. Of that total amount, 20GB was index data, which means our memory footprint was also greatly reduced.

Performance

We have been thrilled with the performance of MongoDB. MongoDB exceeded our average performance goal of 100ms and, just as important, is consistently performant. While it’s good to keep an eye on average times, it’s more important to watch the 90th and 99th percentile performance metrics. With MongoDB our average clip load time is around 18ms and our 99th percentile times are typically at or under 100ms.

Clip load times during the same time period during season

Conclusion

Our transition from SQL Server to MongoDB started with our largest and most critical dataset. After having gone through it, we are very happy with the performance and scalability of MongoDB and appreciate how developer-friendly it is to work with. Moving from a relational to a NoSQL database naturally has a learning curve. Now that we are over it we feel very good about our ability to scale well into the future. Perhaps most telling of all, most new feature development at Hudl is done in MongoDB. We feel MongoDB lets us focus more on writing features to help coaches win and less time crafting database scripts.

Sign up for the MongoDB Newsletter to get MongoDB updates right to your inbox

The MongoDB Open Source Hack Contest

Apr 28 • Posted 4 months ago

Some of the best MongoDB tools come from the Open Source community. Projects like the Node.js Driver, Mongoose and Meteor have become the backbone of many MongoDB apps and have helped support the developer community all over the world. We want to see more of what the community has built.

For the month of May, we’ll be hosting a worldwide hack contest for Open Source tools built on or connected to MongoDB. The winner of the contest will receive a ticket to OSCON, furnished by O’Reilly.

Guidelines:

  • All projects must be built with the MongoDB source code or on top of a MongoDB API, community or MongoDB supported driver
  • Any new drivers created should abide by the driver requirements listed in the MongoDB Manual
  • All hacks will be judged by MongoDB engineers

All entries can be submitted to community@MongoDB.com with the following information before May 31

  • Github or Bitbucket URL
  • Description of the project
  • How do users benefit from this application?
  • Why did you choose to contribute to MongoDB?

We’re looking forward to seeing your hacks come in!

Want to keep up-to-date on MongoDB news and events? Sign up for the MongoDB Monthly Newsletter”

maxTimeMS() and Query Optimizer Introspection in MongoDB 2.6

Apr 23 • Posted 5 months ago

By Alex Komyagin, Technical Service Engineer in MongoDB’s New York Office

In this series of posts we cover some of the enhancements in MongoDB 2.6 and how they will be valuable for users. You can find a more comprehensive list of changes in our 2.6 release notes. These are changes I believe you will find helpful, especially for our advanced users.

This post introduces two new features: maxTimeMS and query optimizer introspection. We will also look at a specific support case where these features would have been helpful.

maxTimeMS: SERVER-2212

The socket timeout option (e.g. MongoOptions.socketTimeout in the Java driver) specifies how long to wait for responses from the server. This timeout governs all types of requests (queries, writes, commands, authentication, etc.). The default value is “no timeout” but users tend to set the socket timeout to a relatively small value with the intention of limiting how long the database will service a single request. It is thus surprising to many users that while a socket timeout will close the connection, it does not affect the underlying database operation. The server will continue to process the request and consume resources even after the connection has closed.

Many applications are configured to retry queries when the driver reports a failed connection. If a query whose connection is killed by a socket timeout continues to consume resources on the server, a retry can give rise to a cascading effect. As queries run in MongoDB, the application may retry the same resource intensive query multiple times, increasing the load on the system leading to severe performance degradation. Before 2.6 the only way to cancel these queries was to issue db.killOp, which requires manual intervention.

MongoDB 2.6 introduces a new queryparameter, maxTimeMS, which limits how long an operation can run in the database. When the time limit is reached, MongoDB automatically cancels the query. maxTimeMS can be used to efficiently prevent unexpectedly slow queries from overloading MongoDB. Following the same logic, maxTimeMS can also be used to identify slow or unoptimized queries.

There are a few important notes about using maxTimeMS:

  • maxTimeMS can be set on a per-operation basis — long running aggregation jobs can have a different setting than simple CRUD operations.
  • requests can block behind locking operations on the server, and that blocking time is not counted, i.e. the timer starts ticking only after the actual start of the query when it initially acquires the appropriate lock;
  • operations are interrupted only at interrupt points where an operation can be safely aborted — the total execution time may exceed the specified maxTimeMS value;
  • maxTimeMS can be applied to both CRUD operations and commands, but not all commands are interruptible;
  • if a query opens a cursor, subsequent getmore requests will be included in the total time (but network roundtrips will not be counted);
  • maxTimeMS does not override the inactive cursor timeout for idle cursors (default is 10 min).

All MongoDB drivers support maxTimeMS in their releases for MongoDB 2.6. For example, the Java driver will start supporting it in 2.12.0 and 3.0.0, and Python - in version 2.7.

To illustrate how maxTimeMS can come in handy, let’s talk about a recent case that the MongoDB support team was working on. The case involved performance degradation of a MongoDB cluster that is powering a popular site with millions of users and heavy traffic. The degradation was caused by a huge amount of read queries performing full scans of the collection. The collection in question stores user comment data, so it has billions of records. Some of these collection scans were running for 15 minutes or so, and they were slowing down the whole system.

Normally, these queries would use an index, but in this case the query optimizer was choosing an unindexed plan. While the solution to this kind of problem is to hint the appropriate index, maxTimeMS can be used to prevent unintentional runaway queries by controlling the maximum execution time. Queries that exceed the maxTimeMS threshold will error out on the application side (in the Java driver it’s the MongoExecutionTimeoutException exception). maxTimeMS will help users to prevent unexpected performance degradation, and to gain better visibility into the operation of their systems.

In the next section we’ll take a look at another feature which would help in diagnosing the case we just discussed.

Query Optimizer Introspection: SERVER-666

To troubleshoot the support case described above and to understand why the query optimizer was not picking up the correct index for queries, it would have been very helpful to know a few things, such as whether the query plan had changed and to see the query plan cache.

MongoDB determines the best execution plan for a query and stores this plan in a cache for reuse. The cached plan is refreshed periodically and under certain operational conditions (which are discussed in detail below). Prior to 2.6 this caching behavior was opaque to the client. Version 2.6 provides a new query subsystem and query plan cache. Now users have visibility and control over the plan cache using a set of new methods for query plan introspection.

Each collection contains at most one plan cache. The cache is cleared every time a change is made to the indexes for the collection. To determine the best plan, multiple plans are executed in parallel and the winner is selected based on the amount of retrieved results within a fixed amount of steps, where each step is basically one “advance” of the query cursor. When a query has successfully passed the planning process, it is added to the cache along with the related index information.

A very interesting feature of the new query execution framework is the plan cache feedback mechanism. MongoDB stores runtime statistics for each cached plan in order to remove plans that are determined to be the cause of performance degradation. In practice, we don’t see these degradations often, but if they happen it is usually a consequence of a change in the composition of the data. For example, with new records being inserted, an indexed field may become less selective, leading to slower index performance. These degradations are extremely hard to manually diagnose, and we expect the feedback mechanism to automatically handle this change if a better alternative index is present.

The following events will result in a cached plan removal:

  • Performance degradation
  • Index add/drop
  • No more space in cache (the total number of plans stored in the collection cache is currently limited to 200; this is a subject to change in future releases)
  • Explicit commands to mutate cache
  • After the number of write operations on a collection exceeds the built-in limit, the whole collection plan cache is dropped (data distribution has changed)

MongoDB 2.6 supports a set of commands to view and manipulate the cache. List all known query shapes (planCacheListQueryShapes), display cached plans for a query shape (planCacheListPlans), manual removal of a query shape from the cache as well as emptying the whole cache (planCacheClear).

Here is an example invocation of the planCacheListQueryShapes command, that lists the shape of the query

db.test.find({first_name:"john",last_name:"galt"},

{_id:0,first_name:1,last_name:1}).sort({age:1}):

> db.runCommand({planCacheListQueryShapes: "test"})
{
    "shapes" : [
        {
            "query" : {
                "first_name" : "alex",
                "last_name" : "komyagin"
            },
            "sort" : {
                "age" : 1
            },
            "projection" : {
                "_id" : 0,
                "first_name" : 1,
                "last_name" : 1
            }
        }
    ],
    "ok" : 1
}

The exact values in the query predicate are insignificant in determining the query shape. For example, a query predicate

{first_name:"john", last_name:"galt"} is equivalent to the query predicate {first_name:"alex", last_name:"komyagin"}.

Additionally, with log level set to 1 or greater MongoDB will log plan cache changes caused by the events listed above. You can set the log level using the following command:

use admin
db.runCommand( { setParameter: 1, logLevel: 1 } )

Please don’t forget to change it back to 0 afterwards, since log level 1 logs all query operations and it can negatively affect the performance of your system.

Together, the new plan cache and the new query optimizer should make related operations more transparent and help users to have the visibility and control necessary for maintaining predictable response times for their queries.

You can see how these new features work yourself, by trying our latest release MongoDB 2.6 available for download here. I hope you will find these features helpful. We look forward to hearing your feedback, please post your questions in the mongodb-user google group.

Like what you see? Get MongoDB updates straight to your inbox

MongoDB 2.6: Our Biggest Release Ever

Apr 8 • Posted 5 months ago

By Eliot Horowitz, CTO and Co-founder, MongoDB

Discuss on Hacker News

In the five years since the initial release of MongoDB, and after hundreds of thousands of deployments, we have learned a lot. The time has come to take everything we have learned and create a basis for continued innovation over the next ten years.

Today I’m pleased to announce that, with the release of MongoDB 2.6, we have achieved that goal. With comprehensive core server enhancements, a groundbreaking new automation tool, and critical enterprise features, MongoDB 2.6 is by far our biggest release ever.

You’ll see the benefits in better performance and new innovations. We re-wrote the entire query execution engine to improve scalability, and took our first step in building a sophisticated query planner by introducing index intersection. We’ve made the codebase easier to maintain, and made it easier to implement new features. Finally, MongoDB 2.6 lays the foundation for massive improvements to concurrency in MongoDB 2.8, including document-level locking.

From the very beginning, MongoDB has offered developers a simple and elegant way to manage their data. Now we’re bringing that same simplicity and elegance to managing MongoDB. MongoDB Management Service (MMS), which already provides 35,000 MongoDB customers with monitoring and alerting, now provides backup and point-in-time restore functionality, in the cloud and on-premises.

We are also announcing a game-changing feature coming later this year: automation, also with hosted and on-premises options. Automation will allow you to provision and manage MongoDB replica sets and sharded clusters via a simple yet sophisticated interface.

MongoDB 2.6 brings security, integration and analytics enhancements to ease deployment in enterprise environments. LDAP, x.509 and Kerberos authentication are critical enhancements for organizations that require a single authentication mechanism across their entire infrastructure. To enhance security, MongoDB 2.6 implements TLS encryption, user-defined roles, auditing and field-level redaction, a critical building block for trusted systems. IBM Guardium also now offers integration with MongoDB, providing more extensive auditing abilities.

These are only a few of the key improvements; read the full official release notes for more details.

MongoDB 2.6 was a major endeavor and bringing it to fruition required hard work and coordination across a rapidly growing team. Over the past few years we have built and invested in that team, and I can proudly say we have the experience, drive and determination to deliver on this and future releases. There is much still to be done, and with MongoDB 2.6, we have a foundation for the next decade of database innovation.

Like what you see? Get MongoDB updates straight to your inbox

MongoDB Innovation Awards: Call for Nominations

Apr 2 • Posted 5 months ago

Fortune 500 enterprises, startups, hospitals, governments and organizations of all kinds use MongoDB because it is the best database for modern applications. The MongoDB Innovation Awards recognize organizations and individuals who are changing the world with MongoDB.

Nominations are open to any individual representing an organization that runs MongoDB, including partner solutions built on or for MongoDB.

The deadline to submit is midnight eastern time on May 30. A panel of judges will review nominations. Winners will be announced at MongoDB World on June 23-25.

Each winner will receive:

  • MongoDB Innovation Award
  • Pass to MongoDB World
  • Invitation to award winners reception at MongoDB World
  • Inclusion in Innovation Awards press release, blog post and email newsletter
  • $2,500 Amazon Gift Card

Submit a nomination by completing this nomination form — you’ll receive a discount code for MongoDB World.

Running MongoDB Queries Concurrently With Go

Mar 24 • Posted 6 months ago

This is a guest post by William Kennedy, managing partner at Ardan Studios in Miami, FL, a mobile and web app development company. Bill is also the author of the blog GoingGo.Net and the organizer for the Go-Miami and Miami MongoDB meetups in Miami. Bill looked for a new language in 2013 that would allow him to develop back end systems in Linux and found Go. He has never looked back.

If you are attending GopherCon 2014 or plan to watch the videos once they are released, this article will prepare you for the talk by Gustavo Niemeyer and Steve Francia. It provides a beginners view for using the Go mgo driver against a MongoDB database.

Introduction

MongoDB supports many different programming languages thanks to a great set of drivers. One such driver is the MongoDB Go driver which is called mgo. This driver was developed by Gustavo Niemeyer from Canonical with some assistance from MongoDB Inc. Both Gustavo and Steve Francia, the head of the drivers team, will be talking at GopherCon 2014 in April about “Painless Data Storage With MongoDB and Go”. The talk describes the mgo driver and how MongoDB and Go work well together for building highly scalable and concurrent software.

MongoDB and Go let you build scalable software on many different operating systems and architectures, without the need to install frameworks or runtime environments. Go programs are native binaries and the Go tooling is constantly improving to create binaries that run as fast as equivalent C programs. That wouldn’t mean anything if writing code in Go was complicated and as tedious as writing programs in C. This is where Go really shines because once you get up to speed, writing programs in Go is fast and fun.

In this post I am going to show you how to write a Go program using the mgo driver to connect and run queries concurrently against a MongoDB database. I will break down the sample code and explain a few things that can be a bit confusing to those new to using MongoDB and Go together.

Sample Program

The sample program connects to a public MongoDB database I have hosted with MongoLab. If you have Go and Bazaar installed on your machine, you can run the program against my database. The program is very simple - it launches ten goroutines that individually query all the records from the buoy_stations collection inside the goinggo database. The records are unmarshaled into native Go types and each goroutine logs the number of documents returned:

Now that you have seen the entire program, we can break it down. Let’s start with the type structures that are defined in the beginning:

The structures represent the data that we are going to retrieve and unmarshal from our query. BuoyStation represents the main document and BuoyCondition and BuoyLocation are embedded documents. The mgo driver makes it easy to use native types that represent the documents stored in our collections by using tags. With the tags, we can control how the mgo driver unmarshals the returned documents into our native Go structures.

Now let’s look at how we connect to a MongoDB database using mgo:

We start with creating a mgo.DialInfo object. Connecting to a replica set can be accomplished by providing multiple addresses in the Addrs field or with a single address. If we are using a single host address to connect to a replica set, the mgo driver will learn about any remaining hosts from the replica set member we connect to. In our case we are connecting to a single host.

After providing the host, we specify the database, username and password we need for authentication. One thing to note is that the database we authenticate against may not necessarily be the database our application needs to access. Some applications authenticate against the admin database and then use other databases depending on their configuration. The mgo driver supports these types of configurations very well.

Next we use the mgo.DialWithInfo method to create a mgo.Session object. Each session specifies a Strong or Monotonic mode, and other settings such as write concern and read preference. The mgo.Session object maintains a pool of connections to MongoDB. We can create multiple sessions with different modes and settings to support different aspects of our applications.

The next line of code sets the mode for the session. There are three modes that can be set, Strong, Monotonic and Eventual. Each mode sets a specific consistency for how reads and writes are performed. For more information on the differences between each mode, check out the documentation for the mgo driver.

We are using Monotonic mode which provides reads that may not entirely be up to date, but the reads will always see the history of changes moving forward. In this mode reads occur against secondary members of our replica sets until a write happens. Once a write happens, the primary member is used. The benefit is some distribution of the reading load can take place against the secondaries when possible.

With the session set and ready to go, next we execute multiple queries concurrently:

This code is classic Go concurrency in action. First we create a sync.WaitGroup object so we can keep track of all the goroutines we are going to launch as they complete their work. Then we immediately set the count of the sync.WaitGroup object to ten and use a for loop to launch ten goroutines using the RunQuery function. The keyword go is used to launch a function or method to run concurrently. The final line of code calls the Wait method on the sync.WaitGroup object which locks the main goroutine until everything is done processing.

To learn more about Go concurrency and better understand how this particular piece of code works, check out these posts on concurrency and channels.

Now let’s look at the RunQuery function and see how to properly use the mgo.Session object to acquire a connection and execute a query:

The very first thing we do inside of the RunQuery function is to defer the execution of the Done method on the sync.WaitGroup object. The defer keyword will postpone the execution of the Done method, to take place once the RunQuery function returns. This will guarantee that the sync.WaitGroup objects count will decrement even if an unhandled exception occurs.

Next we make a copy of the session we created in the main goroutine. Each goroutine needs to create a copy of the session so they each obtain their own socket without serializing their calls with the other goroutines. Again, we use the defer keyword to postpone and guarantee the execution of the Close method on the session once the RunQuery function returns. Closing the session returns the socket back to the main pool, so this is very important.

To execute a query we need a mgo.Collection object. We can get a mgo.Collection object through the mgo.Session object by specifying the name of the database and then the collection. Using the mgo.Collection object, we can perform a Find and retrieve all the documents from the collection. The All function will unmarshal the response into our slice of BuoyStation objects. A slice is a dynamic array in Go. Be aware that the All method will load all the data in memory at once. For large collections it is better to use the Iter method instead. Finally, we just log the number of BuoyStation objects that are returned.

Conclusion

The example shows how to use Go concurrency to launch multiple goroutines that can execute queries against a MongoDB database independently. Once a session is established, the mgo driver exposes all of the MongoDB functionality and handles the unmarshaling of BSON documents into Go native types.

MongoDB can handle a large number of concurrent requests when you architect your MongoDB databases and collections with concurrency in mind. Go and the mgo driver are perfectly aligned to push MongoDB to its limits and build software that can take advantage of all the computing power that is available.

The mgo driver provides a safe way to leverage Go’s concurrency support and you have the flexibility to execute queries concurrently and in parallel. It is best to take the time to learn a bit about MongoDB replica sets and load balancer configuration. Then make sure the load balancer is behaving as expected under the different types of load your application can produce.

Now is a great time to see what MongoDB and Go can do for your software applications, web services and service platforms. Both technologies are being battle tested everyday by all types of companies, solving all types of business and computing problems.

Processing 2 Billion Documents A Day And 30TB A Month With MongoDB

Mar 14 • Posted 6 months ago

This is a guest post by David Mytton. He has been programming Python for over 10 years and founded his website and and monitoring company, Server Density, back in 2009.

Server Density processes over 30TB/month of incoming data points from the servers and web checks we monitor for our customers, ranging from simple Linux system load average to website response times from 18 different countries. All of this data goes into MongoDB in real time and is pulled out when customers need to view graphs, update dashboards and generate reports.

We’ve been using MongoDB in production since mid-2009 and have learned a lot over the years about scaling the database. We run multiple MongoDB clusters but the one storing the historical data does the most throughput and is the one I shall focus on in this article, going through some of the things we’ve done to scale it.

1. Use dedicated hardware, and SSDs

All our MongoDB instances run on dedicated servers across two data centers at Softlayer. We’ve had bad experiences with virtualisation because you have no control over the host, and databases need guaranteed performance from disk i/o. When running on shared storage (e.g., a SAN) this is difficult to achieve unless you can get guaranteed throughput from things like AWS’s Provisioned IOPS on EBS (which are backed by SSDs).

MongoDB doesn’t really have many bottlenecks when it comes to CPU because CPU bound operations are rare (usually things like building indexes), but what really causes problem is CPU steal - when other guests on the host are competing for the CPU resources.

The way we have combated these problems is to eliminate the possibility of CPU steal and noisy neighbours by moving onto dedicated hardware. And we avoid problems with shared storage by deploying the dbpath onto locally mounted SSDs.

I’ll be speaking in-depth about managing MongoDB deployments in virtualized or dedicated hardware at MongoDB World this June.

2. Use multiple databases to benefit from improved concurrency

Running the dbpath on an SSD is a good first step but you can get better performance by splitting your data across multiple databases, and putting each database on a separate SSD with the journal on another.

Locking in MongoDB is managed at the database level so moving collections into their own databases helps spread things out - mostly important for scaling writes when you are also trying to read data. If you keep databases on the same disk you’ll start hitting the throughput limitations of the disk itself. This is improved by putting each database on its own SSD by using the directoryperdb option. SSDs help by significantly alleviating i/o latency, which is related to the number of IOPS and the latency for each operation, particularly when doing random reads/writes. This is even more visible for Windows environments where the memory mapped data files are flushed serially and synchronously. Again, SSDs help with this.

The journal is always within a directory so you can mount this onto its own SSD as a first step. All writes go via the journal and are later flushed to disk so if your write concern is configured to return when the write is successfully written to the journal, making those writes faster by using an SSD will improve query times. Even so, enabling the directoryperdb option gives you the flexibility to optimise for different goals (e.g., put some databases on SSDs and some on other types of disk, or EBS PIOPS volumes, if you want to save cost).

It’s worth noting that filesystem based snapshots where MongoDB is still running are no longer possible if you move the journal to a different disk (and so different filesystem). You would instead need to shut down MongoDB (to prevent further writes) then take the snapshot from all volumes.

3. Use hash-based sharding for uniform distribution

Every item we monitor (e.g., a server) has a unique MongoID and we use this as the shard key for storing the metrics data.

The query index is on the item ID (e.g. the server ID), the metric type (e.g. load average) and the time range; but because every query always has the item ID, it makes it a good shard key. That said, it is important to ensure that there aren’t large numbers of documents under a single item ID because this can lead to jumbo chunks which cannot be migrated. Jumbo chunks arise from failed splits where they’re already over the chunk size but cannot be split any further.

To ensure that the shard chunks are always evenly distributed, we’re using the hashed shard key functionality in MongoDB 2.4. Hashed shard keys are often a good choice for ensuring uniform distribution, but if you end up not using the hashed field in your queries, you could actually hurt performance because then a non-targeted scatter/gather query has to be used.

4. Let MongoDB delete data with TTL indexes

The majority of our users are only interested in the highest resolution data for a short period and more general trends over longer periods, so over time we average the time series data we collect then delete the original values. We actually insert the data twice - once as the actual value and once as part of a sum/count to allow us to calculate the average when we pull the data out later. Depending on the query time range we either read the average or the true values - if the query range is too long then we risk returning too many data points to be plotted. This method also avoids any batch processing so we can provide all the data in real time rather than waiting for a calculation to catch up at some point in the future.

Removal of the data after a period of time is done by using a TTL index. This is set based on surveying our customers to understand how long they want the high resolution data for. Using the TTL index to delete the data is much more efficient than doing our own batch removes and means we can rely on MongoDB to purge the data at the right time.

Inserting and deleting a lot of data can have implications for data fragmentation, but using a TTL index helps because it automatically activates PowerOf2Sizes for the collection, making disk usage more efficient. Although as of MongoDB 2.6, this storage option will become the default.

5. Take care over query and schema design

The biggest hit on performance I have seen is when documents grow, particularly when you are doing huge numbers of updates. If the document size increases after it has been written then the entire document has to be read and rewritten to another part of the data file with the indexes updated to point to the new location, which takes significantly more time than simply updating the existing document.

As such, it’s important to design your schema and queries to avoid this, and to use the right modifiers to minimise what has to be transmitted over the network and then applied as an update to the document. A good example of what you shouldn’t do when updating documents is to read the document into your application, update the document, then write it back to the database. Instead, use the appropriate commands - such as set, remove, and increment - to modify documents directly.

This also means paying attention to the BSON data types and pre-allocating documents, things I wrote about in MongoDB schema design pitfalls.

6. Consider network throughput & number of packets

Assuming 100Mbps networking is sufficient is likely to cause you problems, perhaps not during normal operations, but probably when you have some unusual event like needing to resync a secondary replica set member.

When cloning the database, MongoDB is going to use as much network capacity as it can to transfer the data over as quickly as possible before the oplog rolls over. If you’re doing 50-60Mbps of normal network traffic, there isn’t much spare capacity on a 100Mbps connection so that resync is going to be held up by hitting the throughput limits.

Also keep an eye on the number of packets being transmitted over the network - it’s not just the raw throughput that is important. A huge number of packets can overwhelm low quality network equipment - a problem we saw several years ago at our previous hosting provider. This will show up as packet loss and be very difficult to diagnose.

Conclusions

Scaling is an incremental process - there’s rarely one thing that will give you a big win. All of these tweaks and optimisations together help us to perform thousands of write operations per second and get response times within 10ms whilst using a write concern of 1.

Ultimately, all this ensures that our customers can load the graphs they want incredibly quickly. Behind the scenes we know that data is being written quickly, safely and that we can scale it as we continue to grow.

Bug Hunt Winners for MongoDB 2.6-rc0

Mar 13 • Posted 6 months ago

We would like to thank everyone who participated in MongoDB’s inaugural Bug Hunt. Your efforts have helped to improve the 2.6 release.

The MongoDB Bug Hunt for 2.6-rc0 uncovered a number of bugs. From this pool, we’ve selected 5 winning Bug Reporters based on three criteria: user impact, severity and prevalence.

The Winners

First Prize: Roman Kuzmin, SERVER-12846
  • 1 ticket to MongoDB World — with a reserved front-row seat for keynote sessions
  • $1000 Amazon Gift Card
  • MongoDB Contributor T-shirt
Second Prize: Mark Callaghan, Server 13060 & 13041
  • 1 ticket to MongoDB World — with a reserved front-row seat for keynote sessions
  • $500 Donation to Khan Academy (at Mark’s request)
  • MongoDB Contributor T-shirt
Honorable Mentions: Tim Callaghan (Server 12878), David Glasser (Server 12981) and Jeff Lee (Server 12925)
  • 1 ticket to MongoDB World — with a reserved front-row seat for keynote sessions
  • $250 Amazon Gift Card
  • MongoDB Contributor T-shirt

Congratulations to the five winners of the first MongoDB Bug Hunt and thanks to everyone who downloaded, tested and gave feedback on the release candidates.

-Eliot, Dan, Alvin and the MongoDB team

Upcoming Changes to the MongoDB C++ Driver

Mar 3 • Posted 6 months ago

By Andrew Morrow, Kernel Engineer at MongoDB

In coordination with the upcoming MongoDB 2.6 release, the Kernel team is changing how the MongoDB C++ driver is developed, licensed, released, and distributed.

What are we doing?

As of MongoDB 2.6.0-rc1, it will no longer be possible to build the C++ driver from the MongoDB server sources. The files required to build the driver have been forked to a new repository on GitHub, and will be maintained independently of the server sources.

Can I start using it now?

Yes. We are still fixing up some post-fork rough edges though, so please consider the current state of the new repository to be unstable. We should be announcing release candidates soon. Release candidates are useable for testing, but should not be used in production.

Why are we doing this?

By separating the development streams for the server and the C++ driver we will be able to respond to C++ driver issues without being tied to the server release schedule, as well as make changes that were previously difficult to apply in the context of the server repository. We want to make developing and using the C++ driver a better experience for everyone involved.

How will the new driver be versioned?

The new repository will initially offer two release streams: the ‘26compat’ release stream, and the ‘legacy’ release stream.

The ‘26compat’ branch and release stream will contain a minimally altered snapshot of the C++ driver files from the server repository as of MongoDB 2.6.0-rc0. It is intended to be a drop-in replacement for existing users of the C++ driver seeking minimal disruption to their current workflow and code. The build system, physical structure, C++ API, and other aspects of the driver will change minimally in this stream. We will backport selected fixes from the v2.6 server repository branch as needed, but conservatively. Enhancements and improvements will be minimal or non-existent.

The ‘legacy’ branch and release stream will more aggressively diverge from the server sources. While this release stream will retain the overall look and feel of the existing driver, we are likely to make small breaking changes to the physical structure, build system, API, and testing model for the driver. New driver features will only be made in this release stream. Our goal in this release stream is to provide meaningful improvements to the C++ driver, while requiring only modest changes on the part of users.

I see a ‘legacy’ branch in the new repository, but no ‘26compat’ branch. Why?

The initial release candidates for the 26compat release stream will be made from the ‘legacy’ branch. When MongoDB 2.6.0 is released, the 26compat branch will be forked from the legacy branch to track the MongoDB 2.6 point releases. Until that split, the legacy branch is effectively the 26compat branch. After the 26compat branch is formed, the legacy branch will begin to receive improvements and changes that are not compatible with the goals of the 26compat branch.

What license will apply to the C++ drivers in this repository?

Unlike the driver embedded within the server repository, which is a mix of Affero GPL and Apache 2.0 licensed files, all files in the new repository will be Apache 2.0 licensed.

How will existing users of the C++ driver be affected?

Our aim is to minimize disruption, but also to enable more rapid evolution and improvement of the driver. Since these goals are in conflict, there will be a period where there are several valid ways to obtain and build the C++ driver, all yielding slightly different results. We will be updating the documentation to reflect these alternatives in more detail, but as an overview, the initial options are to:

  1. Switch to the new C++ driver repository and use the 26compat release stream. This should offer as close to a drop-in replacement for your existing workflow and application code as can be managed. Your help in testing the release candidates of this release stream will help us ensure that our first production release of this stream is an effective replacement for the older mechanisms of obtaining the driver.
  2. Continue to use the server repository build of the driver. This process is valid for releases of the driver up to and including 2.6.0-rc0, but will not advance beyond that release. If you are using this process and wish to continue with minimal change, we encourage you to move to the 26compat stream.
  3. Continue to use the tarball versions of the driver. This process will remain valid for releases of the driver up to and including 2.5.2, but will not advance beyond that release. If you are using the C++ Client Driver tarball, we strongly encourage you to move to any of the other processes listed above.

Later, once the 26compat stream has been stabilized, there will be an additional option to track the legacy release stream. Our goal is that all C++ driver users will eventually be able to migrate to the legacy release stream.

What about JIRA, SERVER tickets, etc.?

There is a new JIRA project for the C++ driver: https://jira.mongodb.org/browse/CXX. We will be evaluating existing SERVER tickets filed under the “C++ Driver” component or label and migrating or copying them to the new project as appropriate.

New C++ driver issues should be filed in the CXX project.

Why is it called the ‘legacy’ driver or the ‘legacy’ branch?

We are saving the master branch of the repository for something new and exciting a bit further down the road. In the meantime, we want to make the experience with the existing C++ driver better, but it isn’t going to diverge radically from the ‘legacy’ of its origins in the server codebase.

Can I help?

Absolutely. We are very interested in feedback on the plan outlined above, and your feedback will help us solidify the details. We will be releasing an RC of the 26compat release stream sometime in the next week or two, and testing of that release candidate will be much appreciated.

Similarly, we will be releasing a series of unstable releases of the legacy driver leading to a stabilized version in the near future. Experimenting with these releases will give us the best opportunity to improve the driver for the community.

I have other questions…

Please post to the mongodb-user or mongodb-dev mailing lists with any questions about this announcement and the C++ Driver team will be happy to answer.

Thank You,

The C++ Driver Team

MongoDB Bug Hunt Extended to March 8

Feb 28 • Posted 6 months ago

On February 21, we launched the first ever MongoDB Bug Hunt. We have been impressed with the community’s enthusiasm during the first week and have decided to extend the hunt until March 8. This will allow more members of the community to get involved and help improve MongoDB for users worldwide.

As a reminder, you can download the latest release at www.MongoDB.org/downloads. If you find a bug, submit the issue to Jira (Core Server project) by March 8 at 12:00AM GMT. Bug reports will be judged on three criteria: user impact, severity and prevalence.

We will review all bugs submitted against 2.6.0-rc0. Winners will be announced on the MongoDB blog and user forum by March 13. There will be one first place winner, one second place winner and at least two honorable mentions.

For more info on the Bug Hunt see our announcement on the MongoDB Blog.

Thanks to everyone who has downloaded and tested the server so far. Keep on hunting!

Announcing the MongoDB Bug Hunt 2.6.0-rc0 

Feb 21 • Posted 7 months ago

The MongoDB team released MongoDB 2.6.0-rc0 today and is proud to announce the MongoDB Bug Hunt. The MongoDB Bug Hunt is a new initiative to reward our community members who contribute to improving this MongoDB release. We’ve put the release through rigorous correctness, performance and usability testing. Now it’s your turn. Over the next 10 days, we challenge you to test and uncover any lingering issues in MongoDB 2.6.0-rc0.

How it works

You can download this release at MongoDB.org/downloads. If you find a bug, submit the issue to Jira (Core Server project) by March 4 at 12:00AM GMT. Bug reports will be judged on three criteria: user impact, severity and prevalence.

We will review all bugs submitted against 2.6.0-rc0. Winners will be announced on the MongoDB blog and user forum by March 8. There will be one first place winner, one second place winner and at least two honorable mentions.

The Rewards
First Prize:
  • 1 ticket to MongoDB World — with a reserved front-row seat for keynote sessions
  • $1000 Amazon Gift Card
  • MongoDB Contributor T-shirt
Second Prize:
  • 1 ticket to MongoDB World — with a reserved front-row seat for keynote sessions
  • $500 Amazon Gift Card
  • MongoDB Contributor T-shirt
Honorable Mentions:
  • 1 ticket to MongoDB World — with a reserved front-row seat for keynote sessions
  • $250 Amazon Gift Card
  • MongoDB Contributor T-shirt

How to get started:

  • Deploy in your test environment: Software is best tested in a realistic environment. Help us see how 2.6 fares with your code and data so that others can build and run applications on MongoDB 2.6 successfully.
  • Test new features and improvements: Dozens of new features were added in 2.6. See the 2.6 Release Notes for a full list.
  • Log a ticket: If you find an issue, create a report in Jira. See the documentation for a guide to submitting well written bug reports.

If you are interested in doing this work full time, consider applying to join our engineering teams in New York City, Palo Alto and Austin, Texas.

Happy hunting!

Eliot, Dan and the MongoDB Team

Crittercism: Scaling To Billions Of Requests Per Day On MongoDB

Feb 20 • Posted 7 months ago

This is a guest post by Mike Chesnut, Director of Operations Engineering at Crittercism. This June, Mike will present at MongoDB World on how Crittercism scaled to 30,000 requests/second (and beyond) on MongoDB.

MongoDB is capable of scaling to meet your business needs — that is why its name is based on the word humongous. This doesn’t mean there aren’t some growing pains you’ll encounter along the way, of course. At Crittercism, we’ve had a huge amount of growth over the past 2 years and have hit some bumps in the road along the way, but we’ve also learned some important lessons that can hopefully be of use to others.

Background

Crittercism provides the world’s first and leading Mobile Application Performance Management (mAPM) solution. Our SDK is embedded in tens of thousands of applications, and used by nearly a billion users worldwide. We collect performance data such as error reporting, crash diagnostics details, network breadcrumbs, device/carrier/OS statistics, and user behavior. This data is largely unstructured and varies greatly among different applications, versions, devices, and usage patterns.

Storing all of this data in MongoDB allows us to collect raw information that may be of use in a variety of ways to our customers, while also providing the analytics necessary to summarize the data down to digestible, actionable metrics.

As our request volume has grown, so too has our data footprint; over the course of 18 months our daily request volume increased over 40x:

Our primary MongoDB cluster now houses over 20TB of data, and getting to this point has helped us learn a few things along the way.

Routing

The MongoDB documentation suggests that the most common topology is to include a router — a mongos process — on each client system. We started doing this and it worked well for quite a while.

As the number of front-end application servers in production grew from the order of 10s to several 100s, we found that we were creating heavy load via hundreds (or sometimes thousands) of connections between our mongos routers and our mongod shard servers. This meant that whenever chunk balancing occurred — something that is an integral part of maintaining a well-balanced, sharded MongoDB cluster — the chunk location information that is stored in the config database took a long time to propagate. This is because every mongos router needs to have a full picture of where in the cluster all of the chunks reside.

So what did we learn? We found that we could alleviate this issue by consolidating mongos routers onto a few hosts. Our production infrastructure is in AWS, so we deployed 2 mongos servers per availability zone. This gave us redundancy per AZ, as well as offered the shortest network path possible from the clients to the mongos routers. We were concerned about adding an additional hop to the request path, but using Chef to configure all of our clients to only talk to the mongos routers in their own AZ helped minimize this issue.

Making this topology change greatly reduced the number of open connections to our mongod shards, which we were able to measure using MMS, without a noticeable reduction in application performance. At the same time, there were several improvements to MongoDB that made both the mongos updates and the internal consistency checks more efficient in general. Combined with the new infrastructure this meant that we could now balance chunks throughout our cluster without causing performance problems while doing so.

Shard Replacement

Another scenario we’ve encountered is the need to dynamically replace mongod servers in order to migrate to larger shards. Again following the recommended best deployment practice, we deploy MongoDB onto server instances utilizing large RAID10 arrays running xfs. We use m2.4xlarge instances in AWS with 16 disks. We’ve used basic Linux mdadm for performance, but at the expense of flexibility in disk configuration. As a result when we are ready to allocate more capacity to our shards, we need to perform a migration procedure that can sometimes take several days. This not only means that we need to plan ahead appropriately, but also that we need to be aware of the full process in order to monitor it and react when things go wrong.

We start with a replica set where all replicas are at approximately the same disk utilization. We first create a new server instance with more disk allocated to it, and add it to this replica set with rs.add().

The new replica will enter the STARTUP2 state and remain there for a long time (in our case, usually 2-3 days) while it first clones data, then catches up via oplog replication and builds indexes. During this time, index builds will often stop the replication process (note that this behavior is set to change in MongoDB 2.6), and so the replication lag time does not strictly decrease the whole time — it will steadily decrease for a while, then an index build will cause it to pause and start to fall further behind. Once the index build completes the replication will resume. It’s worth noting that while index builds occur, mongostat and anything else that requires a read lock will be blocked as well.

Eventually the replica will enter the SECONDARY state and will be fully functional. At this point we can rs.stepDown() one of the old replicas, shut down the mongod process running on it, and then remove it from the replica set via rs.remove(), making the server ready for retirement.

We then repeat the process for each member of the replica set, until all have been replaced with the new instances with larger disks.

While this process can be time-consuming and somewhat tedious, it allows for a graceful way to grow our database footprint without any customer-facing impact.

Conclusion

Operating MongoDB at scale — as with any other technology — requires some knowledge that you can gain from documentation, and some that you have to learn from experience. By being willing to experiment with different strategies such as those shown above, you can discover flexibility that may not have been previously obvious. Consolidating the mongos router tier was a big win for Crittercism’s Ops team in terms of both performance and manageability, and developing the above described migration procedure has enabled us to continue to grow to meet our needs without affecting our service or our customers.

See how Crittercism, Stripe, Carfax, Intuit, Parse and Sailthru are building the next generation of applications at MongoDB World. Register now and join the MongoDB Community in New York City this June.
blog comments powered by Disqus