Posts tagged:

operations

Powering Social Insights with MongoDB at uberVu

May 19 • Posted 3 months ago

This is a guest post from the uberVU team.

Today, more than ever, marketers are being asked to show how their financial investments are driving tangible business results. We help them accomplish that goal. uberVU is a real-time social media marketing platform that allows organizations to leverage their social media data to better understand, connect with, and grow their online communities. We have an extensive client list including enterprise customers such as Heinz, NBC, World Bank, and Fujitsu.

We were recently acquired by HootSuite, and together our two products offer a complete and integrated feature set that addresses the entire social media lifecycle:

  • monitoring
  • metrics
  • reporting
  • engagement
  • collaboration

We have a five-year history in the social media monitoring market, and our evolving data storage architecture has played a key role in elevating our application’s value and user experience. For our data handling needs, we started with Tokyo Cabinet, SimpleDB, and MySQL and now use MongoDB, DynamoDB, S3, Glacier, and ElasticSearch.

Originally our team intended to use MongoDB as a secondary data store, but after a short implementation and adoption period of 3 months in which it quickly gained traction internally, MongoDB was promoted to our primary data store.

Challenges

We collect and store social media content such as tweets, Facebook posts, blog posts, blog comments, etc. Each item is stored in the database as a separate document.

A stored tweet might look something like this:

{
    generator: ‘twitter’,
    content: ‘This is a tweet example for ubervu’,
    author: ‘Vladimir Oane’,
    gender: ‘male’,
language: ‘english’,
    sentiment: ‘positive’,
    search: ‘ubervu’,
    published: 1391767879,
    ...
}

For our clients, relevant social media content must contain or match a predefined expression of interest in the designated ‘search’ field. In the example above, the tweet is collected and stored because it contains the string ‘ubervu’ in the content body.

Unique Index Structure

Our most common use case with MongoDB is performing a range query over a time frame for a fixed expression. For example, we might want to retrieve social media content that contains or matches the expression ‘ubervu’ between October 1st and November 23rd.

We constructed the unique index in MongoDB, _id, to perform this query automatically. For space considerations, we opted for a 64 bit integer and divided it into three parts:

  1. A hash of the search expression
  2. The entire published timestamp, in seconds
  3. An item id, which together with the timestamp should uniquely identify a document

To conduct a search for all “ubervu” mentions between timestamp1 and timestamp2, we simply run a range query on “_id” between:

and:

Note above how the lowest and highest bounds have the item id portion filled entirely with 0’s and 1’s, respectively. This allows us to cover edge cases of items that fall between timestamp1 and timestamp2.

Efficient Filtering

Another very common use case is retrieving all the data that matches a criteria set. Within our application, the fields we can filter on are predefined (generator, language, sentiment, language, gender).

Efficient filtering is a challenge because the most obvious approach - creating indexes for every combination of filters - is not scalable as every added index costs storage space and has the potential to adversely affect write performance.

To improve query efficiency, we added an ‘attributes’ field into each document that consists of an encoded array of all the field values that can be used in a query. It looks like this:

attributes: [2041, 15, 178, 23 …]

Each numeric code in the array represents a property, such as “sentiment: positive” or “language: English”. We added an index over the ‘attributes’ field to speed up queries.

To retrieve all items matching a criteria set using the ‘attributes’ field, queries are run using the $all operator:

collection.find({attributes:{$all:[...]}}

A shortcoming of using the $all operator is that prior to MongoDB 2.6 the index is only used to match the first code in the ‘attributes’ array; all other codes must be retrieved from disk and matched with the rest of query criteria.

In an effort to reduce the number of documents that need to be checked from disk for each query, we developed a system that first sorts all numeric codes by the frequency that they appear in the data store and then orders the elements in the ‘attributes’ fields according to their ranking.

For example, the property “generator: Twitter” (representing all tweets) is more prevalent in the data store than the property “language: Romanian” (representing all content in Romanian). If we wanted to obtain all tweets written in Romanian from the database, it would be more efficient to place the numeric code representing “language: Romanian” first in the ‘attributes’ array as it is faster to retrieve all Romanian content from disk and check if they are tweets than to retrieve all tweets and check if they are written in Romanian.

This solution described above dramatically improved our query response time. MongoDB’s dynamic schema and rich query model made this possible.

Saving Storage Space

After realizing the fields in our documents would be relatively small in number and mostly consistent across the database, our team decided to impose a two character limit on all field names (“generator” became, “g”, “sentiment” became “s”, etc).

This small change saved us 16% of our storage space, without any loss in information.

Our Infrastructure Setup

We have taken full advantage of the cloud computing resources available to efficiently deploy and scale our offerings. Our current infrastructure resides entirely on the AWS stack.

We currently have 30+ instances deployed and over 30 terabytes of storage in permanent use. All EBS-backed production data stores currently reside on xfs RAID arrays.

This storage architecture provides us with not only volume redundancy, but also performance boosts, which were much needed in the beginning when provisioned IOPS was not yet available to ensure EBS performance.

Our MongoDB setup consists of six production clusters, each with its own unique scope and usage pattern.

Our MongoDB clusters all have the same topology:

  • Each shard (shardA, shardB … shardZ) consists of three member replica sets
  • Three ‘config server’ processes are deployed on separate instances
  • ‘mongos’ routing processes are spread throughout the whole system (webnode, API nodes, worker nodes, etc …)

Optimizing Performance and Redundancy with MongoDB

For us, relying on EBS-backed MongoDB clusters meant we had to familiarize ourselves with the concept of the working set, a number which represents the amount of data that is regularly accessed during day-to-day operations. In situations when the working set is larger than RAM, our application would be forced to read from disk, resulting in an immediate loss in performance due to EBS I/O latency. Now working sets can be estimated using the working set estimator, which was first introduced in MongoDB v2.4.

To prevent the working set from exceeding RAM, we first viewed our data usage patterns in Graphite:

The graphic above represents the ‘write’ working set for one of the clusters.

The graphic above represents the ‘read’ working set generated by our API.

Using our data usage information, we defined the following access patterns:

RecentDB HistoricalDB
Read Working Set 0-20 days 20-90 days
Write Working Set 0-20 days 0-90 days ( > 20 day data can be written directly here)

The architecture represented by the table above has been in place for more than three years and has proven itself on multiple occasions from both a performance and redundancy standpoint.

We currently use MongoDB Management Service (MMS) in addition to tools such as Graphite and Collectd for both monitoring and backup. These applications have been critical to managing our cloud-based cluster backups.

As our MongoDB-powered data store grew in size, the decision was made to implement an ‘inverted pyramid’ mechanism in an effort to provide the best possible response time while remaining cost efficient.

This mechanism relies on two main data stores, RecentDB and HistoricalDB, with the use of an in-house oplog replay tool that keeps the two clusters in sync.

The oplog - short for operations log - is a special capped collection that keeps a rolling record of all operations that modify the data stored in a MongoDB database, and is the basic mechanism that enables replication in MongoDB. Secondary nodes tail the oplog for new operations and replay them locally.

To implement the ‘inverted pyramid’ mechanism, we developed a process that connects source cluster shards (RecentDB) to a destination cluster mongos router instance, verifies the last written timestamp, tails the source oplog to that timestamp, and finally, replays all executed operations.

After taking into consideration our current settings and data volumes, we determined that an oplog replay timeframe of 72 - 96 hrs worked best for our clusters as it ensured there was enough time to counter any major failures at the cluster level (e.g. full replica sets downtime, storage replacements, etc).

In the current implementation, all 5 oplog processes (one per source shard) run on an administrative instance that is continually monitored for delays.

A key design step required in making our inverted pyramid possible was splitting our data store into five ‘segment’ databases, which are provisioned and depleted by two external jobs. This made it possible to drop data (at the db level) from the first two layers, RecentDB and HistoricalDB, in an orderly fashion without impacting any part of the application or compromising performance.

The last step of our data migration consists of offloading all data that passes the 90-day mark from the segment databases to S3. To accomplish this, each HistoricalDB secondary node is provisioned with two Python modules that parse through, collect, and export (in CSV format) all data older than 90 days. The legacy data is then uploaded into an S3 bucket and made available to other parts of our system.

An added benefit of our data architecture is the ability to use HistoricalDB on the off chance that a major issue impacts RecentDB. Although there is a storage space trade-off that comes with storing the data in the 0-20 day intervals on both clusters, having HistoricalDB on hand has proven useful for us in the past, with the AWS EAST-1 crash in the summer of 2012 being the most recent incident.

Conclusion

With MongoDB, we were able to quickly develop the query processes we needed to efficiently serve our customers, all on a flexible database architecture that stresses high performance and redundancy. MongoDB has been a partner that continues to deliver as we grow and tackle new challenges.

To learn more about how MongoDB can have a significant impact on your business, download our whitepaper How a Database Can Make Your Organization Faster, Better, Leaner.

Tiered Storage Models in MongoDB: Optimizing Latency and Cost

May 14 • Posted 3 months ago

By Rohit Nijhawan, Senior Consulting Engineer at MongoDB with André Spiegel and Chad Tindel

For a user-facing application, speed and uptime are critical to success. There are a number of ways you can tune your application and hardware setup to provide the best experience for your customers — the trick is doing so at optimal cost. Here we provide an example for improving performance and lowering costs with MongoDB using Tiered Storage, a method of prioritizing data storage based on latency requirements.

In this example, we will be segmenting data by date: recent data is more frequently accessed and should exhibit lower latency than less recent data. However, the idea applies to other ways of segmenting data, such as location, user, source, size, or other criteria. This approach takes advantage of a powerful feature in MongoDB called tag-aware sharding that has been available since MongoDB 2.2.

Example Application: Insurance Claims

In many applications, low-latency access to data becomes less important as data ages. For example, an insurance company might prioritize access to claims from the last 12 months. Users should be able to view recent claims quickly, but once claims are more than a year old they tend to be accessed much less frequently, and the latency requirements tend to become less demanding.

By creating tiers of storage with different performance and cost profiles, the insurance company can provide a better experience for users while optimizing their costs. Older claims can be stored in a storage tier with more cost-effective hardware such as commodity hard drives. More recent data can be stored in a high-performance storage tier that provides lower latency such as SSD. Because the majority of the claims are more than a year old, storing older data in the lower-cost tier can provide significant cost advantages. The insurance company can optimize their hardware spread across the two tiers, providing a great user experience at an optimized cost point.

The requirements for this application can be summarized as:

The trailing 12 months of claims should reside on faster storage tier Claims over a year old should move to slower storage tier Over time new claims arrive, and older claims need to move from the faster tier to the slower tier

For simplicity, throughout this overview, we’ll distinguish the claims data by “current” and “tier-2” data.

Building Your Own Process: An Operational Headache

One approach to these requirements is use periodic batch jobs: selecting data, loading it into the archive, and erasing it from the faster storage. However, this is inherently complex:

  • The move process must be carefully coded to fail gracefully. In the event that a load fails, you don’t want to delete the original data!
  • If the data to be moved is large, you may wish to throttle the operations.
  • If moves succeed partially, you have to retry the unfinished data.
  • Unless you plan on halting your application during the move (generally unacceptable), your application needs custom code to find the data before, during, and after the move.
  • Your application needs to understand the physical location of the data, which unnecessarily complicates your code to the partitioning logic.

Furthermore, introducing another custom component to your operations requires additional maintenance and monitoring.

It’s an operational headache that many teams are forced to endure, but there is a simpler way: have MongoDB handle the load of migrating documents from the recent storage machines to the tier 2 storage machines, transparently. As it turns out, you can easily implement this approach with a feature called Tag-Aware Sharding.

The MongoDB Way: Tag-aware Sharding

MongoDB provides a feature called sharding to scale systems horizontally across multiple machines. Sharding is transparent to your application - whether you have 1 or 100 shards, your application code is the same. For a comprehensive description of sharding please see the Sharding Guide.

A key component of sharding is a process called the balancer. As collections grow, the balancer operates in the background to carefully move documents between shards. Normally the balancer works to achieve a uniform distribution of documents across shards. However, with tag-aware sharding we can create policies that affect where documents are stored. This feature can be applied in many use cases. One example is to keep user data in data centers that are near the user. In our application, we can use this feature to keep current data on our fast servers, and tier 2 data on cheaper, slower servers.

Here’s how it works:

  • Shards are assigned tags. A tag is an alphanumeric alias like “London-DC”.
  • Unique shard key ranges are ‘pinned’ to tags.
  • During normal balancing operations, chunks migrate only to shards whose tag is associated with a key range which contains the chunk’s key range*.
  • There are a few subtleties regarding what happens when a chunk’s key range overlaps more than one tag range. Please read the documentation carefully regarding this particular case

This means that we can assign the “tier-2” tag to shards running on slow servers and “current” tags to shards running on fast servers, and the balancer will handle migrating the data between tiers automatically. What’s great is that we can keep all the data in one database, so our application code doesn’t need to change as data moves between storage tiers.

Determining the shard key

When you query a sharded collection, the query router will do its best to only inspect the shards holding your data, but it can only do this if you provide the shard key as part of your query. (See Sharded Cluster Query Routing for more information.)

So we need to make sure that the we look up documents by the shard key. We also know that time is the basis for determining the location of documents in our two storage tiers. Accordingly, the shard key must contain an explicit timestamp. In our example, we’ll be using Enron’s email dataset, and we’ll set the top-level “date” as the shard key. Here’s a sample document:

Because the time is stored in the most significant digits of the date, messages from any given day will numerically precede messages from subsequent days.

Implementation

Here are the the steps to set up this system:

Set up an empty, sharded MongoDB cluster Create a target database to host the sharded collection Assign tags to different shards corresponding to the storage tiers Assign tag ranges to the shards Load data into the MongoDB Cluster

Set up the cluster The first thing you will want to do is set up your sharded cluster. You can see more information on how to set this up here.

In this case we will have a database called “enron” and a collection called “messages” which holds part of the Enron email corpus. In this example, we’ve set up a cluster with three shards. The first, shard0000, is optimized for low-latency access to data. The other two, shard0001 and shard0002, use more cost effective hardware for data that is older than the identified cutoff date.

Here’s our sharded cluster. These are empty machines with no data:

Adding the tags We can “tag” each of these shards to associate them with documents that should belong to our “current” tier or those that should belong to “tier-2.” In the absence of tags and range based tags, balancing will try to ensure that the number of chunks on each shard are equal without regard to any other data in the fields. Before we add the data to our collection, let’s tag shard0000 as “current” and the other two as “tier-2”:

Now we can verify our tags by calling sh.status():

Next, we need to set up a database and collection for the Enron emails. We’ll set up a new database ‘enron’ with a collection called ‘messages’ and enable sharding on that collection:

Since we’re going to shard the collection, we’ll need to set up a shard key. We will use the ‘date’ field as our shard key since this is the field that will define how the documents are distributed across shards:

Defining the cutoff date between tiers The cutoff point between “current” data and “tier-2” data is a point in time that we will update periodically to keep the most recent documents in our “current” shard. We will start with a cutoff of July 1, 2001, saved as an ISO Date ISODate(“2001-07-01”). Once we add the data to our collection, we will set this as the tag range. Going forward, when we add documents to the “messages” collection, any documents newer than July 1, 2001 will end up on the “current” shard, and documents older than that will end up on the “tier-2” shard.

It’s important that the two ranges overlap at exactly the same point in time. The lower bound of a tag range is inclusive, and the upper bound is exclusive. This means a document that has an date of exactly ‘ISODate(“2001-07-01”)’ will go on the “current” shard, not the “tier-2” shard.

Below you will see each of the shard’s new tag ranges:

As a final check, look in the config database for the tag range definitions.

Now, that all the shards and ranges are defined, we are ready to load the message data into the server. The collection will follow the instructions given by the tag ranges and land on the correct machines.

Now, let’s check the sharding status to see where the documents reside

That’s it! The mongos process automatically moves documents to comply with the tag ranges. In this example, it took all documents still on the “current” shard with an ISODate older than ISODate(“2001-07-01T00:00:00Z”) and move them to the “tier-2” shard.

The tag ranges must be updated on a regular basis to keep the cutoff point at the correct interval of time in the past (1 year, in our case). In order to do this, both ranges need to be updated. To perform this change the balancer should temporarily be disabled, so there is no point where the ranges overlap. Stopping the balancer temporarily is a safe operation - it will not affect the application or the experience of users.

If you wanted to move the cutoff back another month, to August 1, 2001, you just need to follow these three steps:

Stop the balancer sh.setBalancerState(false) Create a chunk split at August 1 sh.splitAt('enron.messages', {"date" : ISODate("2001-08-01")}) Move the cutoff date to ISODate(“2001-08-01T00:00:00Z”) var configdb=db.getSiblingDB("config"); configdb.tags.update({tag:"tier-2"},{$set:{'max.date':ISODate("2001-08-01")}}) configdb.tags.update({tag:"current"},{$set:{'min.date':ISODate("2001-08-01")}}) Re-start the balancer sh.setBalancerState(true) Verify the sharding status

By updating the chunk split to August 1, we have migrated all the documents with a date after July 1 but before August 1 from the “current” shard to the “tier-2” shards. The good news is that we were able to perform this operation without changing our application code and with no database downtime. We can also see that it would be simple to schedule this process to run automatically through an external process.

From Operational Headache to Simplicity

The end result is one collection spread across three shards and two different storage systems. This solution allows you to lower your storage costs without adding complexity to the architecture of your system. Instead of a complex setup with different databases on different machines we have one database to query, and instead of a data migration we update some simple rules to control the location of data in the system.

Like what you see? Sign up for the MongoDB Newsletter

Crittercism: Scaling To Billions Of Requests Per Day On MongoDB

Feb 20 • Posted 6 months ago

This is a guest post by Mike Chesnut, Director of Operations Engineering at Crittercism. This June, Mike will present at MongoDB World on how Crittercism scaled to 30,000 requests/second (and beyond) on MongoDB.

MongoDB is capable of scaling to meet your business needs — that is why its name is based on the word humongous. This doesn’t mean there aren’t some growing pains you’ll encounter along the way, of course. At Crittercism, we’ve had a huge amount of growth over the past 2 years and have hit some bumps in the road along the way, but we’ve also learned some important lessons that can hopefully be of use to others.

Background

Crittercism provides the world’s first and leading Mobile Application Performance Management (mAPM) solution. Our SDK is embedded in tens of thousands of applications, and used by nearly a billion users worldwide. We collect performance data such as error reporting, crash diagnostics details, network breadcrumbs, device/carrier/OS statistics, and user behavior. This data is largely unstructured and varies greatly among different applications, versions, devices, and usage patterns.

Storing all of this data in MongoDB allows us to collect raw information that may be of use in a variety of ways to our customers, while also providing the analytics necessary to summarize the data down to digestible, actionable metrics.

As our request volume has grown, so too has our data footprint; over the course of 18 months our daily request volume increased over 40x:

Our primary MongoDB cluster now houses over 20TB of data, and getting to this point has helped us learn a few things along the way.

Routing

The MongoDB documentation suggests that the most common topology is to include a router — a mongos process — on each client system. We started doing this and it worked well for quite a while.

As the number of front-end application servers in production grew from the order of 10s to several 100s, we found that we were creating heavy load via hundreds (or sometimes thousands) of connections between our mongos routers and our mongod shard servers. This meant that whenever chunk balancing occurred — something that is an integral part of maintaining a well-balanced, sharded MongoDB cluster — the chunk location information that is stored in the config database took a long time to propagate. This is because every mongos router needs to have a full picture of where in the cluster all of the chunks reside.

So what did we learn? We found that we could alleviate this issue by consolidating mongos routers onto a few hosts. Our production infrastructure is in AWS, so we deployed 2 mongos servers per availability zone. This gave us redundancy per AZ, as well as offered the shortest network path possible from the clients to the mongos routers. We were concerned about adding an additional hop to the request path, but using Chef to configure all of our clients to only talk to the mongos routers in their own AZ helped minimize this issue.

Making this topology change greatly reduced the number of open connections to our mongod shards, which we were able to measure using MMS, without a noticeable reduction in application performance. At the same time, there were several improvements to MongoDB that made both the mongos updates and the internal consistency checks more efficient in general. Combined with the new infrastructure this meant that we could now balance chunks throughout our cluster without causing performance problems while doing so.

Shard Replacement

Another scenario we’ve encountered is the need to dynamically replace mongod servers in order to migrate to larger shards. Again following the recommended best deployment practice, we deploy MongoDB onto server instances utilizing large RAID10 arrays running xfs. We use m2.4xlarge instances in AWS with 16 disks. We’ve used basic Linux mdadm for performance, but at the expense of flexibility in disk configuration. As a result when we are ready to allocate more capacity to our shards, we need to perform a migration procedure that can sometimes take several days. This not only means that we need to plan ahead appropriately, but also that we need to be aware of the full process in order to monitor it and react when things go wrong.

We start with a replica set where all replicas are at approximately the same disk utilization. We first create a new server instance with more disk allocated to it, and add it to this replica set with rs.add().

The new replica will enter the STARTUP2 state and remain there for a long time (in our case, usually 2-3 days) while it first clones data, then catches up via oplog replication and builds indexes. During this time, index builds will often stop the replication process (note that this behavior is set to change in MongoDB 2.6), and so the replication lag time does not strictly decrease the whole time — it will steadily decrease for a while, then an index build will cause it to pause and start to fall further behind. Once the index build completes the replication will resume. It’s worth noting that while index builds occur, mongostat and anything else that requires a read lock will be blocked as well.

Eventually the replica will enter the SECONDARY state and will be fully functional. At this point we can rs.stepDown() one of the old replicas, shut down the mongod process running on it, and then remove it from the replica set via rs.remove(), making the server ready for retirement.

We then repeat the process for each member of the replica set, until all have been replaced with the new instances with larger disks.

While this process can be time-consuming and somewhat tedious, it allows for a graceful way to grow our database footprint without any customer-facing impact.

Conclusion

Operating MongoDB at scale — as with any other technology — requires some knowledge that you can gain from documentation, and some that you have to learn from experience. By being willing to experiment with different strategies such as those shown above, you can discover flexibility that may not have been previously obvious. Consolidating the mongos router tier was a big win for Crittercism’s Ops team in terms of both performance and manageability, and developing the above described migration procedure has enabled us to continue to grow to meet our needs without affecting our service or our customers.

See how Crittercism, Stripe, Carfax, Intuit, Parse and Sailthru are building the next generation of applications at MongoDB World. Register now and join the MongoDB Community in New York City this June.

Background Indexing on Secondaries and Orphaned Document Cleanup in MongoDB 2.6

Jan 27 • Posted 6 months ago

By Alex Komyagin, Technical Services Engineer in MongoDB’s New York Office

The MongoDB Support Team has broad visibility into the community’s use of MongoDB, issues they encounter, feature requests, bug fixes and the work of the engineering team. This is the first of a series of posts to help explain, from our perspective, what is changing in 2.6 and why.

Many of these changes are available today for testing in the 2.5.4 Development Release, which is available as of November 18, 2013 (2.5.5 release, coming soon, will be feature complete). Development Releases have odd-numbered minor versions (e.g., 2.1, 2.3, 2.5), and Production Releases have even-numbered minor versions (e.g., 2.2, 2.4, 2.6). MongoDB 2.6 will become available a little later this year.

Community testing helps MongoDB improve. You can test the development of MongoDB 2.5.4 today. Downloads are available here, and you can log Server issues in Jira.

Background indexes on secondaries (SERVER-2771)

Suppose you have a production replica set with two secondary servers, and you have a large, 1TB collection. At some point, you may decide that you need a new index to reflect a recent change in your application, so you build one in the background:

db.col.ensureIndex({..},{background : true})

Let’s also suppose that your application uses secondary reads (users should take special care with this feature, especially in sharded systems; for an example of why, see the next section in this post). After some time you observe that some of your application reads have started to fail, and replication lag on both secondaries has started to grow. While you are searching Google Groups for answers, everything magically goes back to normal by itself. Secondaries have caught up, and application reads on your secondaries are working fine. What happened?

One would expect that building indexes in the background would allow the replica set to continue serving regular operations during the index build. However, in all MongoDB releases before 2.6, background index builds on primaries become foreground index builds on secondaries, as noted in the documentation. Foreground index building is resource intensive and it can also affect replication and read/write operations on the database (see the FAQ on the MongoDB Docs). The good news is that impact can be minimized if your indexed collections are small enough for index builds to be relatively fast (on the order of minutes to complete).

The only way to make sure that indexing operations are not affecting the replica set in earlier versions of MongoDB was to build indexes in a rolling fashion. This works perfectly for most users, but not for everyone. For example, it wouldn’t work well for those who use a write concern “w:all”.

Starting with MongoDB 2.6, a background index build on the primary becomes a background index build on the secondaries. This behavior is much more intuitive and will improve the replica set robustness. We feel this will be a welcome enhancement for many users.

Please note that background index building normally takes longer than foreground building, because it allows other operations on the database to run. Keep in mind that, like most database systems, indexing in MongoDB is resource intensive and will increase the load on your system, whether it is a foreground or background process. Accordingly, it is best to perform these operations during a maintenance window or during off-peak hours.

The actual time needed to build a background index varies with the active load on your system, number of documents, database size and your hardware specs. Therefore, for production systems with large collections users can still take advantage of building indexes in a rolling fashion, or building them in foreground during maintenance windows if they believe a background index build will require more time than is acceptable.

Orphaned documents cleanup command (SERVER-8598)

MongoDB provides horizontal scaling through a feature called sharding. If you’re unfamiliar with sharding and how it works, I encourage you to read the nice new introduction to this feature the documentation team added a few months ago. Let me try and summarize some of the key concepts:

  • MongoDB partitions documents across shards.
  • Periodically the system runs a balancing process to ensure documents are uniformly distributed across the shards.
  • Groups of documents, called chunks, are the unit of a balancing job.
  • In certain failure scenarios stale copies of documents may remain on shards, which we call “orphaned documents.”

Under normal circumstances, there will be no orphaned documents in your system. However, in some production systems, “normal circumstances” are a very rare event, and migrations can fail (e.g., due to network connectivity issues), thus leaving orphaned documents behind.

The presence of orphaned documents can produce incorrect results for some queries. While orphaned documents are safe to delete, in versions prior to 2.6 there was no simple way to do so. In MongoDB 2.6 we implemented a new administrative command for sharded clusters: cleanupOrphaned(). This command removes orphaned documents from the shard in a single range of data.

The scenario where users typically encounter issues related to orphaned documents is when issuing secondary reads. In a sharded cluster, primary replicas for each shard are aware of the chunk placements, while secondaries are not. If you query the primary (which is the default read preference), you will not see any issues as the primary will not return orphaned documents even if it has them. But if you are using secondary reads, the presence of orphaned documents can produce unexpected results, because secondaries are not aware of the chunk ownerships and they can’t filter out orphaned documents. This scenario does not affect targeted queries (those having the shard key included), as mongos automatically routes them to correct shards.

To illustrate this discussion with an example, one of our clients told us that after a series of failed migrations he noticed that his queries were returning duplicate documents. He was using scatter-gather queries, meaning that they did not contain the shard key and were broadcast by mongos to all shards, as well as secondary reads. Shards return all the documents matching the query (including orphaned documents), which in this situation lead to duplicate entries in the final result set.

A short term solution was to remove orphaned documents (we used to have a special script for this). But a long term workaround for this particular client was to make their queries targeted, by including the shard key in each query. This way, mongos could efficiently route each query to the correct shard, not hitting the orphaned data. Routed queries are a best practice in any system as they also scale much better than scatter-gather queries.

Unfortunately, there are a few cases where there is no good way to make queries targeted, and you would need to either switch to primary reads or implement a regular process for removing orphaned documents.

The cleanupOrphaned() command is the first step on the path to automated cleanup of orphaned documents. This command should be run on the primary server and will clean up one unowned range on this shard. The idea is to run the command repeatedly, with a delay between calls to tune the cleanup rate.

In some configurations secondary servers might not be able to keep up with the rate of delete operations, resulting in replication lag. In order to control the lag, cleanupOrphaned() waits for the majority of the replica set members after the range removal is complete. Additionally, you can use the secondaryThrottle option, and each individual delete operation will be made with write concern w:2 (waits for one secondary). This may be useful for reducing the impact of removing orphaned documents on your regular operations.

You can find command usage examples and more information about the command in the 2.6 documentation.

I hope you will find these features helpful. We look forward to hearing your feedback on these features. If you would like to test them out, download MongoDB 2.5.4, the most recent Development Release of MongoDB.

Performance Tuning MongoDB on Solidfire

Oct 24 • Posted 10 months ago

This is a guest post by by Chris Merz & Garrett Clark, SolidFire

We recently had a large enterprise customer implement a MongoDB sharded cluster on SolidFire as the backend for a global e-commerce system. By leveraging solid-state drive technology with features like storage virtualization, Quality of Service (guaranteed IOPS per volume), and horizontal scaling, the customer was looking to combine the benefits of dedicated storage performance with the simplicity and scalability of a MongoDB environment.

During the implementation the customer reached out to us with some performance and tuning questions, requesting our assistance with the configuration. After meeting with the team and reviewing the available statistics, we discovered response times that were deemed out of range for the application’s performance requirements. Response times were ~13-20ms (with an average of 15-17 ms). While this is considered acceptable latency in many implementations, the team was targeting < 5ms average query response times.

When troubleshooting any storage latency performance issue it is important to focus on two critical aspects of the end-to-end chain: potential i/o queue depth bottlenecks and the main contributors to the overall latency in the chain. A typical end-to-end sequence with attached storage can be described by:

MongoDB > OS > NIC > Network > Storage > Network > NIC > OS > MongoDB

First off, we looked for any i/o queue depth bottlenecks and found the first one on the operating system layer. MongoDB was periodically sending an i/o queue depth of >100 to the operating system and, by default, iSCSI could only release a 32 queue depth per iSCSI session. This drop from an i/o queue depth of >100 to 32 caused frames to be stalled on the operating system layer while they were waiting to continue down the chain.

We alleviated the issue by increasing the number of iSCSI sessions to the volume from 1 to 4, which proportionally increased the queue depth exiting the operating system to 128 (32*4). This enabled all frames coming off the application layer to immediately pass through the operating system and NIC, decreased the overall latency from ~15ms to ~4ms. Despite the latency average being 4ms, performance was still rather variable.

We then turned our focus to pinpointing the sources of the remaining end-to-end latency. We were able to determine the latency factors in the stack through the study of three latency loops:

First, the complete chain of: MongoDB > OS > NIC > Network > Storage > Network > NIC > OS > MongoDB. This loop took an average of 3.9ms to complete.

Secondly, the subset loop of: OS > NIC > Network > Storage > Network > NIC > OS. This loop took ~1.1ms to complete. We determined the latency of this loop by the output of “iostat –xk 1” then greping for the corresponding volume.

The last loop segment, latency on the storage subsystem, was 0.7ms and was obtained through a polling API command issued to the SolidFire unit.

Our analysis pointed to the first layers of the stack contributing the most significant percent (>70%) of the end-to-end latency, so we decided to start there and continue downstream.

We reviewed the OS configuration and tuning, with an eye towards both SolidFire/iSCSI best practices and MongoDB performance. Several OS-level tunables were found that could be tweaked to ensure optimal throughput for this type of deployment. Unfortunately, none of these resulted in any major reduction in the end-to-end latency for mongo.

Having eliminated the obvious, we were left with what remained: MongoDB itself. A phrase oft-quoted by the famous fictional detective, Sherlock Holmes came to mind: “when you have eliminated the impossible, whatever remains, however improbable, must be the truth.”

Upon going over the collected statistics runs with a fine-toothed comb, we noticed that the latency spikes had intervals of almost exactly 60 seconds. That’s when the light bulb went off…

The MongoDB flush interval. The architecture of MongoDB was developed in the context of spinning disk, a vastly slower storage technology requiring batched file syncs to minimize query latency. The syncdelay setting defaults to 60 seconds for this very reason. In the documentation, it is clearly stated “In almost every situation you should not set this value and use the default setting”. ‘Almost’ was the key to our solution, in this particular case. It should be noted that changing syncdelay is an advanced tuning, and should be carefully evaluated and tested on a per-deployment basis.

Little’s Law (IOPS = Queue Depth / Latency) indicated that lowering the flush interval would reduce the variance in queue depth thereby smoothing the overall latency. In lab testing, we had found that, under maximum load, decreasing the syncdelay to 1 second would force a ‘continuous flush’ behavior usually repeating every 6-7 seconds, reducing i/o spikes in the storage path. We had seen this as a useful technique for controlling IOPS throughput variability, but had not typically viewed it as a latency reduction technique.

It worked!

After implementing the change, the customer excitedly reported that they were seeing average end-to-end MongoDB response times of 1.2ms, with a throughput of ~4-5k IOPS per mongod (normal for this application), and NO obvious increase in extraneous i/o.

By increasing the number of iSCSI sessions, normalizing the flush rate and removing the artificial 60s buffer, we reduced average latency more than an order of magnitude, proving out the architecture at scale in a global production environment. Increasing the iSCSI sessions increased parallelism, and decreased the latency by 3.5-4x. The reduction in syncdelay had the effect of smoothing the average queue depth being sent to the storage system, decreasing latency by slightly more than 3x.

This customer’s experience is a good example of how engaging the MongoDB team early on can ensure a smooth product launch. As of today, we’re excited to announce that SolidFire is a MongoDB partner. Learn more about the SolidFire and MongoDB integration on our Database Solutions page. To learn more about performance tuning MongoDB on SolidFire, register for our upcoming webinar on November 6 with MongoDB.

For more information on MongoDB performance, check out Performance Considerations for MongoDB, which covers other topics such as indexes and application patterns and offers insight into achieving MongoDB performance at scale.

Checking Disk Performance with the mongoperf Utility

Jan 17 • Posted 1 year ago

Note: while this blog post uses some Linux commands in its examples, mongoperf runs and is useful on just about all operating systems.

mongoperf is a utility for checking disk i/o performance of a server independent of MongoDB. It performs simple timed random disk i/o’s.

mongoperf has a couple of modes: mmf:false and mmf:true  

mmf:false mode is a completely generic random physical I/O test — there is effectively no MongoDB code involved.

Read more

How MongoDB’s Journaling Works

Oct 16 • Posted 1 year ago

This was originally posted to Kristina Chodorow’s blog, Snail in a Turtleneck

I was working on a section on the gooey innards of journaling for The Definitive Guide, but then I realized it’s an implementation detail that most people won’t care about. However, I had all of these nice diagrams just laying around.

image

 Good idea, Patrick! So, how does journaling work? Your disk has your data files and your journal files, which we’ll represent like this:

image

 

Read more
blog comments powered by Disqus