Handling mongodb connections

Mongodb is fast and for medium to large setup, it just works out of the box. But for setups which are bigger than large, you may run into a situation where the number of connections max out. So for extra large setups, let us look at how to increase the number of connections in a mongodb server.

Mongodb uses file descriptors to manage connections. In most unix-like operating systems, the default number of file descriptors available are set to 1024. This can be verified by using the command ulimit -n which should give the output as 1024. Ulimit is the per user limitation of various resources. Ulimit can be temporarily changed by issueing the command ulimit -n .

To change file descriptors system wide, change or add the following line to your /etc/sysctl.conf

fs.file-max = 64000

Now every user in your server would be able to use 64000 file descriptors instead of the earlier 1024. For a per user configuration, you will have to tweak the hard and soft limits in /etc/security/limits.conf.

By default the mongodb configuration file mostly mongodb.conf does not specify the number of max connections. It depends directly on the number of available file descriptors. But you can control it using the maxConns variable. Suppose you want to set the number of max connections to 8000, you will have to put the following configuration line in your mongodb.conf file.

maxConns = 8000

Remember : mongodb cannot use more than 20,000 connections on a server.

I recently came across a scenario where my mongodb which was using 7000 connections maxed out. I have a replica set configured, where there is a single master and multiple replicas. All reads and writes were happening on the master. With replica sets, the problem is that if the master mongo is restarted, any one of the replica mongo servers may become the master or primary. The TMC problem was caused by a missing index which caused multiple update queries to get queued up. To solve the problem, firstly the missing index was applied. Next all queries which were locking the collection had to be killed.

Here is a quick function that we were able to put together to first list all pending write operations and then kill them.

db.currentOp().inprog.forEach(
   function(d){
     if(d.waitingForLock && d.lockType != “read”)
        printjson(d.opid);       
        db.killOp(d.opid);       
        i++
     })

how to cleanup a huge mongodb collection ?

As most of mongodb users must be knowing, mongodb works on RAM. The more RAM you give on the DB server, the happier mongodb is. But if the data/index size exceeds the RAM requirements, you see increasing response times for all your queries.

Recently we had an issue where the db size exceeded the RAM we had on our machine. Suddenly we saw the query response time increase to 10-20 times its original time. By luck we had a cleanup strategy in place but never got the chance to execute the same.

We were dealing with around 110 million entries and were expecting that after cleanup around 50% of entries would be removed. The problem was our setup.

We had multiple slaves in our replica set. So running a simple delete query on the master would send the entries to the slave as well. What we wanted to do was remove all entries which are say “n” days old. For an example say 6 months. The delete query for this would be

db.test1.remove( { ts : { $lt : ISODate(“2012-09-27T00:00:00.000Z”)  } } )

This will fire 1 query on master but for each record deleted on master, it will have a delete query written in the oplog. Which will replicate on slave. So if this query is run on master and we intend to remove 50 million entries from our existing 110 million entries, we would end up having 50 million entries in the oplog. Which is a lot of IO.

Another solution that crossed our mind was to disable oplog by creating a stand alone instance of mongodb and running our delete query there. This should have theoretically worked. But even when the oplog was disabled, the deletions were terribly slow. After firing the query and waiting for around 3 hours, we knew that this will not work.

This plan aborted, another small beam of light came through. Remember mysql and how we used to move data across tables.

Select * from table1 select * from table2 where

We tried replicating this statement in mongo and were successful.

db.col1.find( { ts : { $gt : ISODate(“2012-09-27T00:00:00.000Z”)  } } ).forEach( function(c){db.col2.insert(c)} )


This query took approximately 30 minutes to execute. And we had a new collection col2 ready with data greater than 6 months. Now all we needed to do was to rename the collections. Prefer swapping to backup existing data – in case something went wrong.

db.test1.renameCollection(temp);
db.test2.renameCollection(test1);
db.temp.renameCollection(test2);


In order to maintain the data, we converted the collection to a ttl collection.

db.test1.ensureIndex( { “ts” : 1 }, { expireAfterSeconds : 15552000 } )

So any entry which exceeds 6 months = 15552000 seconds will be automatically deleted.

scaling using mongodb : Map-Reduce on sharded collection (part 3)

The idea here is to create a sharded database and then run map-reduce on it to process the data. This is a very basic example that I am trying to emulate. I created a sharded collection “posts” with the following structure. The idea is to find the count of tags throughout the complete sharded collection. I am using two machines named “241” and “243” as shards for mongodb. The mongos service is running on a thrid machine “249”.

Input collection structure :

mongos> db.posts.find()
{ “_id” : ObjectId(“4f4221149a6777895a000000”), “id” : “0”, “content” : “data for content 0”, “tags” : [ “tag1”, “tag2”, “tag3”, “tag4” ] }
{ “_id” : ObjectId(“4f4221149a6777895a000001”), “id” : “1”, “content” : “data for content 1”, “tags” : [ “tag2”, “tag4”, “tag5”, “tag7”, “tag9” ] }

Output collection structure :

mongos> db.tags.find()
{ “_id” : “tag1”, “value” : 14705 }
{ “_id” : “tag3”, “value” : 14418 }

Lets see the step by step process for creation, population of test data and running of map-reduce.

Lets create the posts collection by putting in a few records. If you print the collection stats, you will see that it is not sharded.

mongos> db.printCollectionStats()

posts
{
        “sharded” : false,
        “primary” : “shard241”,
        “ns” : “test.posts”,
        “count” : 2,
        “size” : 256,
        “avgObjSize” : 128,
        “storageSize” : 8192,
        “numExtents” : 1,
        “nindexes” : 1,
        “lastExtentSize” : 8192,
        “paddingFactor” : 1,
        “flags” : 1,
        “totalIndexSize” : 8176,
        “indexSizes” : {
                “_id_” : 8176
        },
        “ok” : 1
}

To shard the collection, you will need to first create index on “id”. And then shard the collection using “id” as the key.

mongos> db.posts.ensureIndex({id:1})
mongos> db.posts.getIndexes()
[
        {
                “v” : 1,
                “key” : {
                        “_id” : 1
                },
                “ns” : “test.posts”,
                “name” : “_id_”
        },
        {
                “v” : 1,
                “key” : {
                        “id” : 1
                },
                “ns” : “test.posts”,
                “name” : “id_1”
        }
]

mongos> use admin
switched to db admin
mongos> db.runCommand( { shardcollection : “test.posts” , key : { id : 1 } } )
{ “collectionsharded” : “test.posts”, “ok” : 1 }





The collection “posts” is now sharded. Lets populate some test data into the collection. Here is the php script that I used to populate data into the collection.

$m = new Mongo( “mongodb://192.168.1.249:10003”, array(“persist” => “x”) );
$db = $m->test;
$table = $db->posts;
$start = 0;
$end = 200000;
for($i=$start; $i<$end; $i++)
{
        $tags = getTag();
        $obj = array(“id”=>”$i”, “content”=>”data for content $i”, “tags”=>$tags);
        $table->insert($obj);
        echo “$i:”.implode(‘,’,$tags);
}
$found = $table->count();
echo “Found : $foundn”;

function getTag()
{
        $tagArray = array(‘tag1′,’tag2′,’tag3′,’tag4′,’tag5′,’tag6′,’tag7′,’tag8′,’tag9′,’tag10′,’tag11′,’tag12′,’tag13′,’tag14′,’tag15′,’tag16′,’tag17′,’tag18′,’tag19′,’tag20′,’tag21′,’tag22′,’tag23′,’tag24′,’tag25′,’tag26′,’tag27′,’tag28′,’tag29′,’tag30′,’tag31′,’tag32′,’tag33′,’tag34′,’tag35′,’tag36′,’tag37′,’tag38′,’tag39′,’tag40′,’tag41′,’tag43’);

        $tags = array();
        $tagcount = rand(2,5);

        $count = sizeof($tagArray);
        for($x=0; $x<$tagcount; $x++)
        {
                $tid = rand(0,$count);

                $tags[] = $tagArray[$tid];
        }
        return $tags;
}
?>



I pushed in 200,000 records into the collection. Here is how the data was sharded between “241” and “243”;

mongos> db.printCollectionStats()

posts
{
        “sharded” : true,
        “flags” : 1,
        “ns” : “test.posts”,
        “count” : 200000,
        “numExtents” : 10,
        “size” : 24430872,
        “storageSize” : 32743424,
        “totalIndexSize” : 15534400,
        “indexSizes” : {
                “_id_” : 6508096,
                “id_1” : 9026304
        },
        “avgObjSize” : 122.15436,
        “nindexes” : 2,
        “nchunks” : 4,
        “shards” : {
                “shard241” : {
                        “ns” : “test.posts”,
                        “count” : 109889,
                        “size” : 13423484,
                        “avgObjSize” : 122.15493598947415,
                        “storageSize” : 17978183,
                        “numExtents” : 8,
                        “nindexes” : 2,
                        “lastExtentSize” : 12083200,
                        “paddingFactor” : 1,
                        “flags” : 1,
                        “totalIndexSize” : 8531049,
                        “indexSizes” : {
                                “_id_” : 3573332,
                                “id_1” : 4957718
                        },
                        “ok” : 1
                },
                “shard243” : {
                        “ns” : “test.posts”,
                        “count” : 90111,
                        “size” : 10913985,
                        “avgObjSize” : 121.11711711711712,
                        “storageSize” : 33251771,
                        “numExtents” : 8,
                        “nindexes” : 2,
                        “lastExtentSize” : 12083200,
                        “paddingFactor” : 1,
                        “flags” : 1,
                        “totalIndexSize” : 13274730,
                        “indexSizes” : {
                                “_id_” : 6617370,
                                “id_1” : 6657360
                        },
                        “ok” : 1
                }
        },
        “ok” : 1
}


Now we will create the map and reduce functions. The map function will check for the tags array for each record in the posts collection. For each element of the tag array, it will emit the tag and a count of 1. Next we create a reduce function which counts the occurrances of each tag and returns the final count. The map function calls the emit(key, value) any number of times to feed data to the reducer. The reduce function will receive an array of emitted values from the map function and reduce them to a single value. The structure of the object returned by the reduce function must be identical to the structure of the map function’s emitted value.

mongos> map = function() {
… if(!this.tags) {
… return;
… }
… for ( index in this.tags) {
… emit(this.tags[index],1);
… }
… }
function () {
    if (!this.tags) {
        return;
    }
    for (index in this.tags) {
        emit(this.tags[index], 1);
    }
}
mongos> reduce = function(key, values) {
… var count = 0;
… for(index in values) {
… count += values[index];
… }
… return count;
… }
function (key, values) {
    var count = 0;
    for (index in values) {
        count += values[index];
    }
    return count;
}
To understand how it works, lets say that after some iterations, map emitts the following value { “tag1” , 1 }. Suppose at that point “tag1” has a count of 50. That is the document can be represented as:

{ “tag1”, 50 }

It map again emits { “tag1”, 1 }, reduce will be called as follows :

reduce( “tag1”, [50,1] )

The result will be a simple combination of counts for tag1

{ “tag1”, 51 }

To invoke map-reduce run the following commands. The command states that mapreduce is run on “posts” collection. Map function is “map” and reduce function is “reduce”. Output is redirected to a collection named “tags”.

mongos> result =  db.runCommand( {
… “mapreduce” : “posts”,
… “map” : map, //name of map function
… “reduce” : reduce,  //name of reduce function
… “out” : “tags” } )
{
        “result” : “tags”,
        “shardCounts” : {
                “192.168.1.241:10000” : {
                        “input” : 109889,
                        “emit” : 499098,
                        “reduce” : 6400,
                        “output” : 43
                },
                “192.168.1.243:10000” : {
                        “input” : 90111,
                        “emit” : 200395,
                        “reduce” : 3094,
                        “output” : 43
                }
        },
        “counts” : {
                “emit” : NumberLong(699493),
                “input” : NumberLong(200000),
                “output” : NumberLong(43),
                “reduce” : NumberLong(9494)
        },
        “ok” : 1,
        “timeMillis” : 9199,
        “timing” : {
                “shards” : 9171,
                “final” : 28
        }
}

See how the output documents. The output has only “no of tags” documents – in our case 43.

mongos> db.tags.find()
{ “_id” : “tag1”, “value” : 14643 }
{ “_id” : “tag2”, “value” : 14705 }
{ “_id” : “tag3”, “value” : 14418 }
{ “_id” : “tag4”, “value” : 14577 }
{ “_id” : “tag5”, “value” : 14642 }
{ “_id” : “tag6”, “value” : 14505 }
{ “_id” : “tag7”, “value” : 14623 }
{ “_id” : “tag8”, “value” : 14529 }
{ “_id” : “tag9”, “value” : 14767 }
{ “_id” : “tag10”, “value” : 14489 }
has more
 

mongos> db.tags.count()
43

References

http://cookbook.mongodb.org/patterns/count_tags/
http://www.mongodb.org/display/DOCS/MapReduce/

Scaling using mongodb : HA and Automatic sharding (part 2)

We discussed creation of HA using replica sets in the earlier post part 1. In this post we will create a sharded cluster of replica sets consisting of multiple mongodb servers. If you would have noticed the parameters passed during starting mongodb on the earlier server 241 & 242, you could see that in addition to the db path, log path and replica set parameters, I have also added the parameter ‘–shardsvr’. This enables sharding on this instance of mongodb.

For creating a cluster of database servers using mongodb you need data nodes, config servers and router servers. Data nodes are used to store data. The config servers are used to store sharding information. It is used for the client to figure out where the data resides on the data nodes. The router servers are used by the client to communicate with the config servers and data nodes. The clients interface to the database through the router servers.

On each of the config servers, start mongodb with the parameter ‘–configsvr’.

230:/home/mongodb } ./bin/mongod –configsvr –logappend –logpath ./data/log –pidfilepath ./data/mongodb.pid –fork –dbpath ./data/config –directoryperdb –bind_ip 192.168.1.230 –port 10002
231:/home/mongodb } ./bin/mongod –configsvr –logappend –logpath ./data/log –pidfilepath ./data/mongodb.pid –fork –dbpath ./data/config –directoryperdb –bind_ip 192.168.1.231 –port 10002
And start routing service on the routing servers.

233:/home/mongodb } ./bin/mongos –port 10003 –configdb 192.168.1.230:10002,192.168.1.231:10002 –logappend –logpath ./data/route_log –fork –bind_ip 192.168.1.233 &
The same service can be started on routing server 234. To shard a database and add nodes to the shard, you need to connect to the routing server.

233:/home/mongodb } ./bin/mongo 192.168.1.233:10003
233:/home/mongodb } mongos> use admin
switched to db admin

To add the first replica set (set_a) to the cluster, run the following command.

233:/home/mongodb } mongos> db.runCommand( { addshard : “set_a/192.168.1.241:10000,192.168.1.242:10000,192.168.1.243:10000”, name : “shard1” } )
{ “shardAdded” : “shard1”, “ok” : 1 }

Similarly replica sets of the other 2 shards can also be added to the mongodb cluster. Eventually you can fire a listShards command to see the shards added to the cluster.

233:/home/mongodb } mongos > db.runCommand( {listshards:1} )
{
        “shards” : [
                {
                        “_id” : “shard1”,
                        “host” : “192.168.1.241:10000,192.168.1.242:10000,192.168.1.243:10000”
                },
                {
                        “_id” : “shard2”,
                        “host” : “192.168.1.244:10000,192.168.1.245:10000,192.168.1.246:10000”
                },
                {
                        “_id” : “shard3”,
                        “host” : “192.168.1.247:10000,192.168.1.248:10000,192.168.1.249:10000”
                }           
        ],                                                                                                                                                        
        “ok” : 1                    
}              

To enable sharding on a database say “test” run the following commands.

233:/home/mongodb } mongos > db.runCommand( { enablesharding : “test” } )
{ “ok” : 1 }

To shard on a particular key. The test collection consists of the following fields id, name and email. Lets shard on id.

233:/home/mongodb } > db.runCommand( { shardcollection : “test.test”, key : { id : 1 } } )
{ “collectionsharded” : “test.test”, “ok” : 1 }

I used a php script to push more than 20,000 records to the sharded cluster. You can check the status of sharded cluster using the following commands.

233:/home/mongodb } mongos > db.printShardingStatus()
— Sharding Status —
  sharding version: { “_id” : 1, “version” : 3 }
  shards:
        {  “_id” : “shard1”,  “host” : “192.168.1.241:10000,192.168.1.242:10000,192.168.1.243:10000” }
        {  “_id” : “shard2”,  “host” : “192.168.1.244:10000,192.168.1.245:10000,192.168.1.246:10000” }
        {  “_id” : “shard3”,  “host” : “192.168.1.247:10000,192.168.1.248:10000,192.168.1.249:10000” }
  databases:
        {  “_id” : “admin”,  “partitioned” : false,  “primary” : “config” }
        {  “_id” : “test”,  “partitioned” : true,  “primary” : “shard1” }
                test.test chunks:
                                shard1        2
                                shard2        1
                        { “id” : { $minKey : 1 } } –>> { “id” : “1” } on : shard1 { “t” : 2000, “i” : 1 }
                        { “id” : “1” } –>> { “id” : “999” } on : shard2 { “t” : 1000, “i” : 3 }
                        { “id” : “999” } –>> { “id” : { $maxKey : 1 } } on : shard2 { “t” : 2000, “i” : 0 }

233:/home/mongodb } mongos > db.printCollectionStats()

test
{
        “sharded” : true,
        “flags” : 1,
        “ns” : “test.test”,
        “count” : 19998,
        “numExtents” : 6,
        “size” : 1719236,
        “storageSize” : 2801664,
        “totalIndexSize” : 1537088,
        “indexSizes” : {
                “_id_” : 662256,
                “id_1” : 874832
        },
        “avgObjSize” : 85.97039703970397,
        “nindexes” : 2,
        “nchunks” : 3,
        “shards” : {
                “shard1” : {
                        “ns” : “test.test”,
                        “count” : 19987,
                        “size” : 1718312,
                        “avgObjSize” : 85.97148146295092,
                        “storageSize” : 2793472,
                        “numExtents” : 5,
                        “nindexes” : 2,
                        “lastExtentSize” : 2097152,
                        “paddingFactor” : 1,
                        “flags” : 1,
                        “totalIndexSize” : 1520736,
                        “indexSizes” : {
                                “_id_” : 654080,
                                “id_1” : 866656
                        },
                        “ok” : 1
                },
                “shard2” : {
                        “ns” : “test.test”,
                        “count” : 11,
                        “size” : 924,
                        “avgObjSize” : 84,
                        “storageSize” : 8192,
                        “numExtents” : 1,
                        “nindexes” : 2,
                        “lastExtentSize” : 8192,
                        “paddingFactor” : 1,
                        “flags” : 1,
                        “totalIndexSize” : 16352,
                        “indexSizes” : {
                                “_id_” : 8176,
                                “id_1” : 8176
                        },
                        “ok” : 1
                }
        },
        “ok” : 1
}

To use multiple routing servers in a database connection string – for connecting to mongodb through a programming language for example php the following syntax can be used

mongodb://[username:password@]host1[:port1][,host2[:port2:],…]/db_name

This will connect to atleast one host from the list of hosts provided. If none of the hosts are available, an exception will be thrown.

Scaling using mongodb : HA and Automatic sharding (part 1)

Mongodb introduces the concept of automatic sharding and replica sets. Lets see how both can be used to create a highly available cluster of database which is horizontally scalable.

First lets take a look at the HA capability of mongodb. HA is handled in mongodb by something known as Replica Sets. Replica sets are basically two or more mongodb nodes which are copies/replicas of each other. A replica set automatically selects a primary master and provides automated failover and disaster recovery.

Lets create a replica set of 2 nodes first and proceed from there.

I have taken 2 machines 192.168.1.241 and 192.168.1.242 for creating replica sets. Lets download the tar of mongodb 2.0.2 and put it in 241 and 242. We will create separate directories for database and replica db. Here is how we go about it.

241:/home/mongodb } mkdir -p data/db
241:/home/mongodb } mkdir -p repl/db
242:/home/mongodb } mkdir -p data/db
242:/home/mongodb } mkdir -p repl/db

Create primary mongodb replica set server on 241

241:/home/mongodb } ./bin/mongod –shardsvr –replSet set_a –logappend –logpath ./data/log –pidfilepath ./data/mongodb.pid –fork –dbpath ./data/db –directoryperdb –bind_ip 192.168.1.241 –port 10000 &
Create secondary replica set on 242 running on port 10001

242:/home/mongodb } ./bin/mongod –shardsvr –replSet set_a –logappend –logpath ./data/log –pidfilepath ./data/mongodb.pid –fork –dbpath ./data/db –directoryperdb –bind_ip 192.168.1.241 –port 10001 &
Check whether the replica set is working fine.

241:/home/mongodb } ./bin/mongo 192.168.1.241:10000
241: } > rs.status()
{
        “startupStatus” : 3,
        “info” : “run rs.initiate(…) if not yet done for the set”,
        “errmsg” : “can’t get local.system.replset config from self or any seed (EMPTYCONFIG)”,
        “ok” : 0
}
241: } > rs.initiate()
{
        “info2” : “no configuration explicitly specified — making one”,
        “me” : “192.168.1.243:10000”,
        “info” : “Config now saved locally.  Should come online in about a minute.”,
        “ok” : 1
}
241: } > rs.isMaster()
db.isMaster()
{
        “setName” : “set_a”,
        “ismaster” : true,
        “secondary” : false,
        “hosts” : [
                “192.168.1.241:10000”
        ],
        “primary” : “192.168.1.241:10000”,
        “me” : “192.168.1.241:10000”,
        “maxBsonObjectSize” : 16777216,
        “ok” : 1
}

We can see that 241 is master, but 242 is not added to the replica set. Lets add 242 as a secondary server of the replica set.

241: } >  rs.add(“192.168.1.242:10001”)
{ “ok” : 1 }

241: } > rs.conf()
{
        “_id” : “set_a”,
        “version” : 2,
        “members” : [
                {
                        “_id” : 0,
                        “host” : “192.168.1.241:10000”
                },
                {
                        “_id” : 1,
                        “host” : “192.168.1.242:10001”
                }
        ]
}
241: } > rs.status()
{
        “set” : “set_a”,
        “date” : ISODate(“2012-02-15T09:42:07Z”),
        “myState” : 1,
        “members” : [
                {
                        “_id” : 0,
                        “name” : “192.168.1.241:10000”,
                        “health” : 1,
                        “state” : 1,
                        “stateStr” : “PRIMARY”,
                        “optime” : {
                                “t” : 1329298881000,
                                “i” : 1
                        },
                        “optimeDate” : ISODate(“2012-02-15T09:41:21Z”),
                        “self” : true
                },
                {
                        “_id” : 1,
                        “name” : “192.168.1.242:10001”,
                        “health” : 1,
                        “state” : 3,
                        “stateStr” : “RECOVERING”,
                        “uptime” : 46,
                        “optime” : {
                                “t” : 0,
                                “i” : 0
                        },
                        “optimeDate” : ISODate(“1970-01-01T00:00:00Z”),
                        “lastHeartbeat” : ISODate(“2012-02-15T09:42:07Z”),
                        “pingMs” : 5339686
                }
        ],
        “ok” : 1
}
242 is in recovery mode – replicating the data that is already there on 241. Once the data is completely replicated, the second node starts acting as a secondary node.

241: } > rs.status()
{
        “set” : “set_a”,
        “date” : ISODate(“2012-02-15T10:33:32Z”),
        “myState” : 1,
        “members” : [
                {
                        “_id” : 0,
                        “name” : “192.168.1.241:10000”,
                        “health” : 1,
                        “state” : 1,
                        “stateStr” : “PRIMARY”,
                        “optime” : {
                                “t” : 1329298881000,
                                “i” : 1
                        },
                        “optimeDate” : ISODate(“2012-02-15T09:41:21Z”),
                        “self” : true
                },
                {
                        “_id” : 1,
                        “name” : “192.168.1.242:10001”,
                        “health” : 1,
                        “state” : 2,
                        “stateStr” : “SECONDARY”,
                        “uptime” : 3131,
                        “optime” : {
                                “t” : 1329298881000,
                                “i” : 1
                        },
                        “optimeDate” : ISODate(“2012-02-15T09:41:21Z”),
                        “lastHeartbeat” : ISODate(“2012-02-15T10:33:31Z”),
                        “pingMs” : 0
                }
        ],
        “ok” : 1
}

Now the replica set is working fine and the secondary node will act as primary if the primary goes out of service. Ideally there should be 3 or more nodes added to any replica set.

Foursquare outage post mortem

As many of you are aware, Foursquare had a significant outage this week. The outage was caused by capacity problems on one of the machines hosting the MongoDB database used for check-ins. This is an account of what happened, why it happened, how it can be prevented, and how 10gen is working to improve MongoDB in light of this outage.

It’s important to note that throughout this week, 10gen and Foursquare engineers have been working together very closely to resolve the issue.

Some history
Foursquare has been hosting check-ins on a MongoDB database for some time now. The database was originally running on a single EC2 instance with 66GB of RAM. About 2 months ago, in response to
increased capacity requirements, Foursquare migrated that single instance to a two-shard cluster. Now, each shard was running on its own 66GB instance, and both shards were also replicating to a slave for redundancy. This was an important migration because it allowed Foursquare to keep all of their check-in data in RAM, which is essential for maintaining acceptable performance.

The data had been split into 200 evenly distributed chunks based on user id. The first half went to one server, and the other half to the other. Each shard had about 33GB of data in RAM at this point, and the whole system ran smoothly for two months.

What we missed in the interim
Over these two months, check-ins were being written continually to each shard. Unfortunately, these check-ins did not grow evenly across chunks. It’s easy to imagine how this might happen: assuming certain subsets of users are more active than others, it’s conceivable that their updates might all go to the same shard. That’s what occurred in this case, resulting in one shard growing to 66GB and the other only to 50GB. [1]

What went wrong
On Monday morning, the data on one shard (we’ll call it shard0) finally grew to about 67GB, surpassing the 66GB of RAM on the hosting machine. Whenever data size grows beyond physical RAM, it becomes necessary to read and write to disk, which is orders of magnitude slower than reading and writing RAM. Thus, certain queries started to become very slow, and this caused a backlog that brought the site down.

We first attempted to fix the problem by adding a third shard. We brought the third shard up and started migrating chunks. Queries were now being distributed to all three shards, but shard0 continued to hit disk very heavily. When this failed to correct itself, we ultimately
discovered that the problem was due to data fragmentation on shard0. In essence, although we had moved 5% of the data from shard0 to the new third shard, the data files, in their fragmented state, still needed the same amount of RAM. This can be explained by the fact that Foursquare check-in documents are small (around 300 bytes each), so many of them can fit on a 4KB page. Removing 5% of these just made each page a little more sparse, rather than removing pages altogether.[2]

After the first day’s outage it had become clear that chunk migration, sans compaction, was not going to solve the immediate problem. By the time the second day’s outage occurred, we had already move 5% of the data off of shard0, so we decided to start an offline process to
compact the database using MongoDB’s repairDatabase() feature. This process took about 4 hours (partly due to the data size, and partly because of the slowness of EBS volumes at the time). At the end of the 4 hours, the RAM requirements for shard0 had in fact been reduced
by 5%, allowing us to bring the system back online.

Afterwards
Since repairing shard0 and adding a third shard, we’ve set up even more shards, and now the check-in data is evenly distributed and there is a good deal of extra capacity. Still, we had to address the fragmentation problem. We ran a repairDatabase() on the slaves, and
promoted the slaves to masters, further reducing the RAM needed on each shard to about 20GB.

How is this issue triggered?
Several conditions need to be met to trigger the issue that brought down Foursquare:

  • Systems are at or over capacity. How capacity is defined varies; in the case of Foursquare, all data needed to fit into RAM for acceptable performance. Other deployments may not have such strict RAM requirements.
  • Document size is less than 4k. Such documents, when moved, may be too small to free up pages and, thus, memory.
  • Shard key order and insertion order are different. This prevents data from being moved in contiguous chunks.

Most sharded deployments will not meet these criteria. Anyone whose documents are larger than 4KB will not suffer significant fragmentation because the pages that aren’t being used won’t be
cached.

Prevention
The main thing to remember here is that once you’re at max capacity, it’s difficult to add more capacity without some downtime when objects are small. However, if caught in advance, adding more shards on a live system can be done with no downtime.

For example, if we had notifications in place to alert us 12 hours earlier that we needed more capacity, we could have added a third shard, migrated data, and then compacted the slaves.

Another salient point: when you’re operating at or near capacity, realize that if things get slow at your hosting provider, you may find yourself all of a sudden effectively over capacity.

Final Thoughts
The 10gen tech team is working hard to correct the issues exposed by this outage. We will continue to work as hard as possible to ensure that everyone using MongoDB has the best possible experience. We are thankful for the support that we have received from Foursquare and our
community during this unfortunate episode. As always, please let us know if you have any questions or concerns.

[1] Chunks get split when they are 200MB into 2 100MB halves. This means that even if the number of chunks on each shard was the same, data size is not always so. This is something we are going to be addressing in MongoDB. We’ll be making splitting balancing look for this imbalance so it can act upon it.

[2] The 10gen team is working on doing online incremental compaction of both data files and indexes. We know this has been a concern in non-sharded systems as well. More details about this will be coming in the next few weeks.

** copied from mongodb google group **

Document oriented data stores

A document oriented database or data store does not use tables for storing data. It stores each record as a document with certain characteristics. So a multiple documents in this type of store can have different characteristics – which means different number of fields per record and different fields per row. The benefit would be that if you are using a document oriented database for storing a large number of records in a huge database, any change in the number or type of row does not need an alter on the table. All you need to do is insert new documents with new structure and it is automatically inserted to the current datastore.

I went ahead and tried comparing some document-oriented data stores – TokyoTyrant, MongoDb and CouchDb and i compared them to Mysql-5.4 as well to get an idea about the performance advantage.

I created 3 scripts in all – 2 for insert and 1 for select. And created a 50,00,000 record table with a very normal record structure

field1 -> string, indexed
field2 -> string, indexed
date -> date, not indexed
n -> integer, indexed

The size of the original mysql table was 214 MB data & 205 MB index – total 419 MB for 50,00,000 records. The 3 scripts could be described as follows

  • insert script 1 : went from 1 to 25,00,000 pulling 1000 records in a single query from the source database and inserting the records one by one to the target data store.
  • insert script 2 : went from 50,00,000 to 24,99,999 pulling 1000 records in a single query from source database and inserting the records one by one to the target data store.
  • select script : picks records from the source database and makes 2 queries for both field1 & field2 and fires both the queries on the target data store.

Test Machine configuration:

CPU : Intel Xeon 1.6 GHz – Quad Core [4 MB L2 cache] 64 Bit
Memory : 8 GB RAM
OS : Centos 5.2 – Kernel 2.6.18 64 bit

Mysql

Used mysql version 5.4
Load during execution : 3.5
Time taken by Insert script 1 : 19682.673708916 sec for 2501000 records
Time taken by Insert script 2 : 19608.902073145 sec for 2499000 records
Time taken by Select script : 20465.380266905 sec for 8099782 records
Table engine : MyISAM
Total database size in mysql 5.4 : 215+233 = 248 MB

Ofcourse i used MyISAM which resulted in table locks, and i had indexes on three fields which resulted in increased locking times on these tables during inserts & selects.

MongoDB – www.mongodb.org

Used version for testing : 0.9.1
The current version available is 0.9.7 which has a lot of bug fixes and performance improvement features.MongoDB is written in C++ and stores data in BSON format which is Binary Serialization of JSON documents. It has a console (similar to mysql) which can be used to fire queries – so unlike bdb, you dont have to go and write programs to fire queries. MongoDB has full indexing support for different columns. You can also do query profiling (like explain in mysql) to check the execution path of the query and then make optimizations if possible. It provides replication support. It can be used to store large binary data like videos very efficiently.

The one good thing about the future of mongodb is that it would be providing auto sharding of data for cloud level scalability. This feature is in alpha stage now, but it should be mature in some time and could be used then.

Mongodb provides drivers/apis for a lot of languages including python, php, java, ruby, c++ and perl. I downloaded the php driver and compiled it and installed the extension mongo.so in the php extentions directory. And then i ran the same tests to check out the speed of mongodb.

Load during execution : 2.5
Time taken by Insert script 1 : 1006.6038348675 sec for 2501000 records
Time taken by Insert script 2 : 1435.0536739826 sec for 2499000 records
Time taken by Select script : 2942.2539789677 sec – 9999914 records
Total database size in mongodb : 4 GB (both data & index)

Wow, so Mongodb turns to be approximately 16 times faster than simple mysql – MyISAM tables

But it takes up a huge amount of space. Why is that? Well, Mongodb creates data files with predefined sizes so that it does not have to increase or decrease the file size as per requirements. The first file it creates is of 64MB, the next file is 128MB etc. upto 2GB. After 2GB the remaining files are of 2GB only. So, even if we exceed a byte above the current file of 2GB, another file of 2GB will be created – even if it is 98% empty.

Another important fact about mongodb is that mongodb’s storage engine uses memory-mapped files for performance. This limits the data size on a 32 bit machine to around 2GB. I had hit this limitation earlier so i used a 64 bit machine for testing. But this architecture of data files in
mongodb storage engine allows the code to be much plain and simple and open to embrace the 64 bit world. Mongodb does not support traditional locking which is another reason it is fast.

More info / References :
32 bit limitation => blog.mongodb.org/post/137788967/32-bit-limitations
performance testing => www.mongodb.org/display/DOCS/Performance+Testing
regarding mongodb => www.mongodb.org/display/DOCS/Home
Quickstart => www.mongodb.org/display/DOCS/Quickstart
Production deployments => www.mongodb.org/display/DOCS/Production+Deployments
Locking in mongodb => www.mongodb.org/display/DOCS/Atomic+Operations

Tokyo Cabinet/Tyrant

Tokyo Tyrant is a set of 3 applications
– tokyo cabinet : the embedded data store
Tokyo Cabinet => tokyocabinet.sourceforge.net/spex-en.html
– tokyo tyrant : network api
Tokyo Tyrant => tokyocabinet.sourceforge.net/tyrantdoc/
– tokyo dystopia : fulltext search system
Tokyo Dystopia => tokyocabinet.sourceforge.net/dystopiadoc/

I explored tokyo cabinet-1.4.29 & tokyo tyrant-1.1.30. So after installing tokyo cabinet, I had to go ahead and install tokyo tyrant on top of tokyo cabinet for the networking support. Tokyo tyrant provides a set of binaries for talking to the tokyo cabinet embedded database. So you can use tokyo tyrant to insert and select data from a tokyo cabinet server.

I got an api for php to interact with the tokyo tyrant server. It is known as Php Tyrant and it can be obtained from mamasam.indefero.net/p/tyrant/. It is written completely in php and is not a C extension.

If you go through the documentation, you would see that Tokyo cabinet supports majorly 4 types of databases.

Hash database – a key value store which uses hash algorithm to retrieve records. The time complexity in this case is constant [ O(1) ].
B+ tree database – a B+Tree is slower than the hash database. Records of a B+Tree are sorted and arranged in logical pages. So the time complex
ity of record retrival is O(log n). But the size of a B+Tree database is half of a hash database.
Fixed length database – It is faster than a hash database. But it has a restriction that each key has to be a natural number and the length of
each value is limited. The whole region of the database is mapped to memory by the mmap call – which reduces overhead related to file I/O.
Table database – It is close to a document oriented database than a key value pair. You can also form indexes on columns in the table to improve search and sorting performance. Indices are implemented as different files of B+Tree database.

So, i created a simple table database with the structure i wanted and created indexes on the required columns. And started running my scripts. But i found that tokyo cabinet is fast in the beginning, but it suffers from file locks. So any operation happening on the table locks the comp
lete file. This caused a lot of problem with simultaneous inserts & selects. In fact i had cases where the server stopped responding totally. And it took 1 hr 17 minutes to push in just 661 inserts & fire 270 selects. So i stopped running the select script and focused on running multiple insert scripts. And then ran the select script separately.

This says that if you go ahead implementing tokyo tyrant on a huge table, you should be aware of the file locks and hence should implement the selects on a slave of the main database. A master can handle multiple inserts while a slave can handle multiple selects. Or you form a queue for firing queries sequentially instead of simultaneously.

I ran 2 insert scripts simultaneously and the results were even worse then mysql.

Load during execution : 2.9
Time taken by Insert script 1 : 65671.21945715 sec for 2501000 records
Time taken by Insert script 2 : 63807.51564312 sec for 2499000 records
Total database size : 1095 MB [715 MB data + 380 MB index]

And i ran 2 select scripts simultaneously and the results were comparable to that of mysql.

Load during execution : 2.1
Time taken by Select script 1 : 20269.342120886 sec for 9999914 records
Time taken by Select script 2 : 20115.848437071 sec for 9999914 records

What could be said about tokyo tyrant is that it should be used as a key-value store – maybe by using a hash database or a fixed length database for persistent session storage.

Couchdb
couchdb.apache.org/

The third database i looked into was couchdb. Which i had heard a lot about. The good thing about couchdb is that it provides a RESTful JSON API that can be accessed from any environment that allows HTTP requests. Couchdb is written in erlang and is quite fast. Again it does not have a c extention. There is a php api known as phpillow which wraps all functionalities of couchdb in its function calls.

You can download phpillow at arbitracker.org/phpillow.html

Also note that it works only with php 5.3. With php 5.2.x it still has a lot of bugs. Firstly there is no proper documentation, so it took me some time to figure out how to write insertion and select scripts and how to go about creating views (couchdb calls indexes as views). When i ra
n the scripts i found that it was crashing a lot. So, i did not go ahead with a proper testing of couchdb. Couchdb also supports master master replication (with developer supplied conflict resolution).

You could go ahead and read the comparison of couchdb & mongodb at mongodb’s page

www.mongodb.org/display/DOCS/Comparing+Mongo+DB+and+Couch+DB
www.mongodb.org/display/DOCS/MongoDB%2C+CouchDB%2C+MySQL+Compare+Grid