Processing 2 Billion Documents A Day And 30TB A Month With MongoDB

Mar 14 • Posted 6 months ago

This is a guest post by David Mytton. He has been programming Python for over 10 years and founded his website and and monitoring company, Server Density, back in 2009.

Server Density processes over 30TB/month of incoming data points from the servers and web checks we monitor for our customers, ranging from simple Linux system load average to website response times from 18 different countries. All of this data goes into MongoDB in real time and is pulled out when customers need to view graphs, update dashboards and generate reports.

We’ve been using MongoDB in production since mid-2009 and have learned a lot over the years about scaling the database. We run multiple MongoDB clusters but the one storing the historical data does the most throughput and is the one I shall focus on in this article, going through some of the things we’ve done to scale it.

1. Use dedicated hardware, and SSDs

All our MongoDB instances run on dedicated servers across two data centers at Softlayer. We’ve had bad experiences with virtualisation because you have no control over the host, and databases need guaranteed performance from disk i/o. When running on shared storage (e.g., a SAN) this is difficult to achieve unless you can get guaranteed throughput from things like AWS’s Provisioned IOPS on EBS (which are backed by SSDs).

MongoDB doesn’t really have many bottlenecks when it comes to CPU because CPU bound operations are rare (usually things like building indexes), but what really causes problem is CPU steal - when other guests on the host are competing for the CPU resources.

The way we have combated these problems is to eliminate the possibility of CPU steal and noisy neighbours by moving onto dedicated hardware. And we avoid problems with shared storage by deploying the dbpath onto locally mounted SSDs.

I’ll be speaking in-depth about managing MongoDB deployments in virtualized or dedicated hardware at MongoDB World this June.

2. Use multiple databases to benefit from improved concurrency

Running the dbpath on an SSD is a good first step but you can get better performance by splitting your data across multiple databases, and putting each database on a separate SSD with the journal on another.

Locking in MongoDB is managed at the database level so moving collections into their own databases helps spread things out - mostly important for scaling writes when you are also trying to read data. If you keep databases on the same disk you’ll start hitting the throughput limitations of the disk itself. This is improved by putting each database on its own SSD by using the directoryperdb option. SSDs help by significantly alleviating i/o latency, which is related to the number of IOPS and the latency for each operation, particularly when doing random reads/writes. This is even more visible for Windows environments where the memory mapped data files are flushed serially and synchronously. Again, SSDs help with this.

The journal is always within a directory so you can mount this onto its own SSD as a first step. All writes go via the journal and are later flushed to disk so if your write concern is configured to return when the write is successfully written to the journal, making those writes faster by using an SSD will improve query times. Even so, enabling the directoryperdb option gives you the flexibility to optimise for different goals (e.g., put some databases on SSDs and some on other types of disk, or EBS PIOPS volumes, if you want to save cost).

It’s worth noting that filesystem based snapshots where MongoDB is still running are no longer possible if you move the journal to a different disk (and so different filesystem). You would instead need to shut down MongoDB (to prevent further writes) then take the snapshot from all volumes.

3. Use hash-based sharding for uniform distribution

Every item we monitor (e.g., a server) has a unique MongoID and we use this as the shard key for storing the metrics data.

The query index is on the item ID (e.g. the server ID), the metric type (e.g. load average) and the time range; but because every query always has the item ID, it makes it a good shard key. That said, it is important to ensure that there aren’t large numbers of documents under a single item ID because this can lead to jumbo chunks which cannot be migrated. Jumbo chunks arise from failed splits where they’re already over the chunk size but cannot be split any further.

To ensure that the shard chunks are always evenly distributed, we’re using the hashed shard key functionality in MongoDB 2.4. Hashed shard keys are often a good choice for ensuring uniform distribution, but if you end up not using the hashed field in your queries, you could actually hurt performance because then a non-targeted scatter/gather query has to be used.

4. Let MongoDB delete data with TTL indexes

The majority of our users are only interested in the highest resolution data for a short period and more general trends over longer periods, so over time we average the time series data we collect then delete the original values. We actually insert the data twice - once as the actual value and once as part of a sum/count to allow us to calculate the average when we pull the data out later. Depending on the query time range we either read the average or the true values - if the query range is too long then we risk returning too many data points to be plotted. This method also avoids any batch processing so we can provide all the data in real time rather than waiting for a calculation to catch up at some point in the future.

Removal of the data after a period of time is done by using a TTL index. This is set based on surveying our customers to understand how long they want the high resolution data for. Using the TTL index to delete the data is much more efficient than doing our own batch removes and means we can rely on MongoDB to purge the data at the right time.

Inserting and deleting a lot of data can have implications for data fragmentation, but using a TTL index helps because it automatically activates PowerOf2Sizes for the collection, making disk usage more efficient. Although as of MongoDB 2.6, this storage option will become the default.

5. Take care over query and schema design

The biggest hit on performance I have seen is when documents grow, particularly when you are doing huge numbers of updates. If the document size increases after it has been written then the entire document has to be read and rewritten to another part of the data file with the indexes updated to point to the new location, which takes significantly more time than simply updating the existing document.

As such, it’s important to design your schema and queries to avoid this, and to use the right modifiers to minimise what has to be transmitted over the network and then applied as an update to the document. A good example of what you shouldn’t do when updating documents is to read the document into your application, update the document, then write it back to the database. Instead, use the appropriate commands - such as set, remove, and increment - to modify documents directly.

This also means paying attention to the BSON data types and pre-allocating documents, things I wrote about in MongoDB schema design pitfalls.

6. Consider network throughput & number of packets

Assuming 100Mbps networking is sufficient is likely to cause you problems, perhaps not during normal operations, but probably when you have some unusual event like needing to resync a secondary replica set member.

When cloning the database, MongoDB is going to use as much network capacity as it can to transfer the data over as quickly as possible before the oplog rolls over. If you’re doing 50-60Mbps of normal network traffic, there isn’t much spare capacity on a 100Mbps connection so that resync is going to be held up by hitting the throughput limits.

Also keep an eye on the number of packets being transmitted over the network - it’s not just the raw throughput that is important. A huge number of packets can overwhelm low quality network equipment - a problem we saw several years ago at our previous hosting provider. This will show up as packet loss and be very difficult to diagnose.

Conclusions

Scaling is an incremental process - there’s rarely one thing that will give you a big win. All of these tweaks and optimisations together help us to perform thousands of write operations per second and get response times within 10ms whilst using a write concern of 1.

Ultimately, all this ensures that our customers can load the graphs they want incredibly quickly. Behind the scenes we know that data is being written quickly, safely and that we can scale it as we continue to grow.

blog comments powered by Disqus
blog comments powered by Disqus