- Aggregation >
- Map-Reduce
Map-Reduce¶
On this page
Map-reduce operations can handle complex aggregation tasks. To perform
map-reduce operations, MongoDB provides the mapReduce
command and, in the mongo shell, the
db.collection.mapReduce() wrapper method.
For many simple aggregation tasks, see the aggregation framework.
Map-Reduce Examples¶
This section provides some map-reduce examples in the mongo
shell using the db.collection.mapReduce() method:
For more information on the parameters, see the
db.collection.mapReduce() reference page .
Consider the following map-reduce operations on a collection orders
that contains documents of the following prototype:
Return the Total Price Per Customer Id¶
Perform map-reduce operation on the orders collection to group by
the cust_id, and for each cust_id, calculate the sum of the
price for each cust_id:
Define the map function to process each input document:
- In the function,
thisrefers to the document that the map-reduce operation is processing. - The function maps the
priceto thecust_idfor each document and emits thecust_idandpricepair.
- In the function,
Define the corresponding reduce function with two arguments
keyCustIdandvaluesPrices:- The
valuesPricesis an array whose elements are thepricevalues emitted by the map function and grouped bykeyCustId. - The function reduces the
valuesPricearray to the sum of its elements.
- The
Perform the map-reduce on all documents in the
orderscollection using themapFunction1map function and thereduceFunction1reduce function.This operation outputs the results to a collection named
map_reduce_example. If themap_reduce_examplecollection already exists, the operation will replace the contents with the results of this map-reduce operation:
Calculate the Number of Orders, Total Quantity, and Average Quantity Per Item¶
In this example you will perform a map-reduce operation on the orders collection, for
all documents that have an ord_date value
greater than 01/01/2012. The operation groups by
the item.sku field, and for each sku calculates the number of orders and the
total quantity ordered. The operation concludes by calculating the average quantity per
order for each sku value:
Define the map function to process each input document:
- In the function,
thisrefers to the document that the map-reduce operation is processing. - For each item, the function associates the
skuwith a new objectvaluethat contains thecountof1and the itemqtyfor the order and emits theskuandvaluepair.
- In the function,
Define the corresponding reduce function with two arguments
keySKUandvaluesCountObjects:valuesCountObjectsis an array whose elements are the objects mapped to the groupedkeySKUvalues passed by map function to the reducer function.- The function reduces the
valuesCountObjectsarray to a single objectreducedValuethat also contains thecountand theqtyfields. - In
reducedValue, thecountfield contains the sum of thecountfields from the individual array elements, and theqtyfield contains the sum of theqtyfields from the individual array elements.
Define a finalize function with two arguments
keyandreducedValue. The function modifies thereducedValueobject to add a computed field namedaverageand returns the modified object:Perform the map-reduce operation on the
orderscollection using themapFunction2,reduceFunction2, andfinalizeFunction2functions.This operation uses the
queryfield to select only those documents withord_dategreater thannew Date(01/01/2012). Then it output the results to a collectionmap_reduce_example. If themap_reduce_examplecollection already exists, the operation will merge the existing contents with the results of this map-reduce operation:
Incremental Map-Reduce¶
If the map-reduce dataset is constantly growing, then rather than performing the map-reduce operation over the entire dataset each time you want to run map-reduce, you may want to perform an incremental map-reduce.
To perform incremental map-reduce:
- Run a map-reduce job over the current collection and output the result to a separate collection.
- When you have more data to process, run subsequent map-reduce job
with:
- the
queryparameter that specifies conditions that match only the new documents. - the
outparameter that specifies thereduceaction to merge the new results into the existing output collection.
- the
Consider the following example where you schedule a map-reduce
operation on a sessions collection to run at the end of each day.
Data Setup¶
The sessions collection contains documents that log users’ session
each day, for example:
Initial Map-Reduce of Current Collection¶
Run the first map-reduce operation as follows:
Define the
mapfunction that maps theuseridto an object that contains the fieldsuserid,total_time,count, andavg_time:Define the corresponding
reducefunction with two argumentskeyandvaluesto calculate the total time and the count. Thekeycorresponds to theuserid, and thevaluesis an array whose elements corresponds to the individual objects mapped to theuseridin themapFunction.Define
finalizefunction with two argumentskeyandreducedValue. The function modifies thereducedValuedocument to add another fieldaverageand returns the modified document.Perform map-reduce on the
sessioncollection using themapFunction, thereduceFunction, and thefinalizeFunctionfunctions. Output the results to a collectionsession_stat. If thesession_statcollection already exists, the operation will replace the contents:
Subsequent Incremental Map-Reduce¶
Later as the sessions collection grows, you can run additional
map-reduce operations. For example, add new documents to the
sessions collection:
At the end of the day, perform incremental map-reduce on the
sessions collection but use the query field to select only the
new documents. Output the results to the collection session_stat,
but reduce the contents with the results of the incremental
map-reduce:
Temporary Collection¶
The map-reduce operation uses a temporary collection during processing. At completion, the map-reduce operation renames the temporary collection. As a result, you can perform a map-reduce operation periodically with the same target collection name without affecting the intermediate states. Use this mode when generating statistical output collections on a regular basis.
Concurrency¶
The map-reduce operation is composed of many tasks, including:
- reads from the input collection,
- executions of the
mapfunction, - executions of the
reducefunction, - writes to the output collection.
These various tasks take the following locks:
The read phase takes a read lock. It yields every 100 documents.
The JavaScript code (i.e.
map,reduce,finalizefunctions) is executed in a single thread, taking a JavaScript lock; however, most JavaScript tasks in map-reduce are very short and yield the lock frequently.The insert into the temporary collection takes a write lock for a single write.
If the output collection does not exist, the creation of the output collection takes a write lock.
If the output collection exists, then the output actions (i.e.
merge,replace,reduce) take a write lock.
Although single-threaded, the map-reduce tasks interleave and appear to run in parallel.
Note
The final write lock during post-processing makes the results appear
atomically. However, output actions merge and reduce may
take minutes to process. For the merge and reduce, the
nonAtomic flag is available. See the
db.collection.mapReduce() reference for more information.
Sharded Cluster¶
Sharded Input¶
When using sharded collection as the input for a map-reduce operation,
mongos will automatically dispatch the map-reduce job to
each shard in parallel. There is no special option
required. mongos will wait for jobs on all shards to
finish.
Sharded Output¶
By default the output collection is not sharded. The process is:
mongosdispatches a map-reduce finish job to the shard that will store the target collection.The target shard pulls results from all other shards, and runs a final reduce/finalize operation, and write to the output.
If using the
shardedoption to theoutparameter, MongoDB shards the output using_idfield as the shard key.Changed in version 2.2.
If the output collection does not exist, MongoDB creates and shards the collection on the
_idfield. If the collection is empty, MongoDB creates chunks using the result of the first stage of the map-reduce operation.mongosdispatches, in parallel, a map-reduce finish job to every shard that owns a chunk.Each shard will pull the results it owns from all other shards, run a final reduce/finalize, and write to the output collection.
Note
- During later map-reduce jobs, MongoDB splits chunks as needed.
- Balancing of chunks for the output collection is automatically prevented during post-processing to avoid concurrency issues.
In MongoDB 2.0:
mongosretrieves the results from each shard, and performs merge sort to order the results, and performs a reduce/finalize as needed.mongosthen writes the result to the output collection in sharded mode.- This model requires only a small amount of memory, even for large datasets.
- Shard chunks are not automatically split during insertion. This requires manual intervention until the chunks are granular and balanced.
Warning
For best results, only use the sharded output options for
mapReduce in version 2.2 or later.
Troubleshooting Map-Reduce Operations¶
You can troubleshoot the map function and the reduce function
in the mongo shell.
Troubleshoot the Map Function¶
You can verify the key and value pairs emitted by the map
function by writing your own emit function.
Consider a collection orders that contains documents of the
following prototype:
Define the
mapfunction that maps thepriceto thecust_idfor each document and emits thecust_idandpricepair:Define the
emitfunction to print the key and value:Invoke the
mapfunction with a single document from theorderscollection:Verify the key and value pair is as you expected.
Invoke the
mapfunction with multiple documents from theorderscollection:Verify the key and value pairs are as you expected.
Troubleshoot the Reduce Function¶
Confirm Output Type¶
You can test that the reduce function returns a value that is the
same type as the value emitted from the map function.
Define a
reduceFunction1function that takes the argumentskeyCustIdandvaluesPrices.valuesPricesis an array of integers:Define a sample array of integers:
Invoke the
reduceFunction1withmyTestValues:Verify the
reduceFunction1returned an integer:Define a
reduceFunction2function that takes the argumentskeySKUandvaluesCountObjects.valuesCountObjectsis an array of documents that contain two fieldscountandqty:Define a sample array of documents:
Invoke the
reduceFunction2withmyTestObjects:Verify the
reduceFunction2returned a document with exactly thecountand theqtyfield:
Ensure Insensitivity to the Order of Mapped Values¶
The reduce function takes a key and a values array as its
argument. You can test that the result of the reduce function does
not depend on the order of the elements in the values array.
Define a sample
values1array and a samplevalues2array that only differ in the order of the array elements:Define a
reduceFunction2function that takes the argumentskeySKUandvaluesCountObjects.valuesCountObjectsis an array of documents that contain two fieldscountandqty:Invoke the
reduceFunction2first withvalues1and then withvalues2:Verify the
reduceFunction2returned the same result:
Ensure Reduce Function Idempotentcy¶
Because the map-reduce operation may call a reduce multiple times
for the same key, the reduce function must return a value of the
same type as the value emitted from the map function. You can test
that the reduce function process “reduced” values without
affecting the final value.
Define a
reduceFunction2function that takes the argumentskeySKUandvaluesCountObjects.valuesCountObjectsis an array of documents that contain two fieldscountandqty:Define a sample key:
Define a sample
valuesIdempotentarray that contains an element that is a call to thereduceFunction2function:Define a sample
values1array that combines the values passed toreduceFunction2:Invoke the
reduceFunction2first withmyKeyandvaluesIdempotentand then withmyKeyandvalues1:Verify the
reduceFunction2returned the same result: