Implementing MongoDb Map Reduce using Aggregation

Read it in 9 Mins

Last updated on
19th Jan, 2022
Published
19th Jan, 2022
Views
7,371
Implementing MongoDb Map Reduce using Aggregation

Algorithms and applications in today's data-driven market collect data about people, processes, systems, and organisations 24 hours a day, seven days a week, resulting in massive amounts of data. The problem is figuring out how to process this massive amount of data efficiently without sacrificing valuable insights.

Implementing MongoDb Map Reduce using Aggregation

What is Map Reduce? 

The MapReduce programming model comes to the rescue here. MapReduce, which was first used by Google to analyse its search results, has grown in popularity due to its ability to split and process terabytes of data in parallel, generating results faster. 

A (Key,value) pair is the basic unit of information in MapReduce. Before feeding the data to the MapReduce model, all types of structured and unstructured data must be translated to this basic unit. The MapReduce model, as the name implies, consists of two distinct routines: the Map-function and the Reduce-function.  

MapReduce is a framework for handling parallelizable problems across huge files using huge number of devices (nodes), which are collectively referred to as a cluster (if all nodes are on the same local network and use similar hardware) or a grid (if the nodes are shared across geographically and administratively distributed systems, and use more heterogeneous hardware).  

When data stored in a filesystem (unstructured) or a database(structured) is processed, MapReduce can take advantage of data's locality, processing it close to where it's stored to reduce communication costs. 

Typically, a MapReduce framework (or system) consists of three operations: 

  • Map: Each worker node applies the map function to local data and saves the result to a temporary storage. Only one copy of the redundant input data is processed by a master node. 
  • Shuffle: worker nodes redistribute data based on output keys (produced by the map function), ensuring that all data associated with a single key is stored on the same worker node. 
  • Reduce: each group of output data is now processed in parallel by worker nodes, per key. 

This article will walk you through the Map-Reduce model's functionality step by step. 

Map Reducein MongoDB 

The map-reduce operation has been deprecated since MongoDB 5.0. 

An aggregation pipeline outperforms a map-reduce operation in terms of performance and usability. 

Aggregation pipeline operators like $group, $merge, and others can be used to rewrite map-reduce operations. 

Starting with version 4.4, MongoDB provides the $accumulator and $function aggregation operators for map-reduce operations that require custom functionality. In JavaScript, use these operators to create custom aggregation expressions. 

The map and reduce functions are the two main functions here. 

As a result, the data is independently mapped and reduced in different spaces before being combined in the function and saved to the specified new collection. This mapReduce() function was designed to work with large data sets only. You can perform aggregation operations like max and avg on data using Map Reduce, which is similar to groupBy in SQL. It works independently and in parallel on data. 

Implementing Map Reduce withMongosh(MongoDB Shell)  

The db.collection.mapReduce() method in mongosh is a wrapper for the mapReduce command. The examples that follow make use of the db.collection.mapReduce(). 

Example: 

Create a collection orders with these documents: 

db.orders.insertMany([ 
   { _id: 1, cust_id: "Ishan Jain", ord_date: new Date("2021-11-01"), price: 25, items: [ { sku: "oranges", qty: 5, price: 2.5 }, { sku: "apples", qty: 5, price: 2.5 } ], status: "A" }, 
   { _id: 2, cust_id: "Ishan Jain", ord_date: new Date("2021-11-08"), price: 70, items: [ { sku: "oranges", qty: 8, price: 2.5 }, { sku: "chocolates", qty: 5, price: 10 } ], status: "A" }, 
   { _id: 3, cust_id: "Bhavesh Galav", ord_date: new Date("2021-11-08"), price: 50, items: [ { sku: "oranges", qty: 10, price: 2.5 }, { sku: "pears", qty: 10, price: 2.5 } ], status: "A" }, 
   { _id: 4, cust_id: "Bhavesh Galav", ord_date: new Date("2021-11-18"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" }, 
   { _id: 5, cust_id: "Bhavesh Galav", ord_date: new Date("2021-11-19"), price: 50, items: [ { sku: "chocolates", qty: 5, price: 10 } ], status: "A"}, 
   { _id: 6, cust_id: "Madan Parmar", ord_date: new Date("2021-11-19"), price: 35, items: [ { sku: "carrots", qty: 10, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" }, 
   { _id: 7, cust_id: "Madan Parmar", ord_date: new Date("2021-11-20"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" }, 
   { _id: 8, cust_id: "Abhresh", ord_date: new Date("2021-11-20"), price: 75, items: [ { sku: "chocolates", qty: 5, price: 10 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" }, 
   { _id: 9, cust_id: "Abhresh", ord_date: new Date("2021-11-20"), price: 55, items: [ { sku: "carrots", qty: 5, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 }, { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" }, 
   { _id: 10, cust_id: "Abhresh", ord_date: new Date("2021-11-23"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" } 
]) 

Apply a map-reduce operation to the orders collection to group them by cust_id, then add the prices for each cust_id: 

To process each input document, define the map function: 

  • this refersthe document that the map-reduce operation is processing in the function. 
  • For each document, the function maps the price to the cust_id and outputs the cust_id and price. 
var mapFunction1 = function() {emit(this.cust_idthis.price);}; 

With the two arguments keyCustId and valuesPrices, define the corresponding reduce function: 

  • The elements of the valuesPrices array are the price values emitted by the map function, grouped by keyCustId. 
  • The valuesPrice array is reduced to the sum of its elements by this function. 
var reduceFunction1 = function(keyCustId, valuesPrices) {return Array.sum(valuesPrices);};

Apply the mapFunction1 map function and the reduceFunction1 reduce function to all documents in the orders collection: 

db.orders.mapReduce(mapFunction1,reduceFunction1,{ out: "map_reduce_example" }) 

The results of this operation are saved in the map_reduce_examplecollection. If the map_reduce_example collection already exists, the operation will overwrite its contents with the map-reduce operation's results. 

Implementing MongoDb Map Reduce using Aggregation

Check the map_reduce_example collection to verify: 

db.map_reduce_example.find().sort( { _id: 1 } ) 

Implementing MongoDb Map Reduce using Aggregation

Aggregation Alternative:

You can rewrite the map-reduce operation without defining custom functions by using the available aggregation pipeline operators: 

db.orders.aggregate([{$group: { _id:"$cust_id",value:{$sum: "$price" } } },{ $out: "agg_alternative_1" }]) 

Check the agg_alternative_1 collection to verify: 

db.agg_alternative_1.find().sort( { _id: 1 } )

Implementing MongoDb Map Reduce using Aggregation

Implementing Map Reduce with Java 

Consider the collection car and insert the following documents in it. 

db.car.insert( [ {car_id:"c1",name:"Audi",color:"Black",cno:"H110",mfdcountry:"Germany",speed:72,price:11.25}, {car_id:"c2",name:"Polo",color:"White",cno:"H111",mfdcountry:"Japan",speed:65,price:8.5}, {car_id:"c3",name:"Alto",color:"Silver",cno:"H112",mfdcountry:"India",speed:53,price:4.5}, {car_id:"c4",name:"Santro",color:"Grey",cno:"H113",mfdcountry:"Sweden",speed:89,price:3.5} , {car_id:"c5",name:"Zen",color:"Blue",cno:"H114",mfdcountry:"Denmark",speed:94,price:6.5} ] ) 

You will get an output like this:  

Implementing MongoDb Map Reduce using Aggregation

Let's now write the map reduce function on a collection of cars, grouping them by speed and classifying them as overspeed cars.  

var speedmap = function (){ 
var criteria; 
if ( this.speed > 70 ) {criteria = 'overspeed';emit(criteria,this.speed);}}; 

Based on the speed, this function classifies the vehicle as an overspeed vehicle. The term "this" refers to the current document that requires map reduction. 

var avgspeed_reducemap = function(key, speed) { 
     var total =0; 
     for (var i = 0; i < speed.length; i++) { 
total = total+speed[i]; 
     } 
     return total/speed.length; 
}; 

The speed of all the cars is added up by iterating the loop, and the average speed is calculated by multiplying the total speed by the number of overspeed cars. 

Call the Map and Reduce functions on all the documents in the car collection to invoke the map reduce function: 

var ret = db.car.mapReduce(speedmap, avgspeed_reducemap, {out: "avgspeed"}); 

The output is saved in the avgspeed collection. If this collection does not already exist, it will be created; otherwise, the new contents will be replaced. 

Check the documents with: 

db.avgspeed.find() 

Implementing MongoDb Map Reduce using Aggregation

The java programfor the above mongo shell example is shown below; note that it is only demonstrating how the Map Reduce functions work.  

package com.journaldev.mongodb; 
 
import java.net.UnknownHostException; 
 
import com.mongodb.DB; 
import com.mongodb.DBCollection; 
import com.mongodb.DBObject; 
import com.mongodb.MapReduceCommand; 
import com.mongodb.MapReduceOutput; 
import com.mongodb.MongoClient; 
 
public class MongoDBMapReduce { 
 
public static void main(String[] args) throws UnknownHostException { 
 
// create an instance of client and establish the connection 
MongoClient m1 = new MongoClient(); 
 
// get the test db,use your own 
DB db = m1.getDB("journaldev"); 
 
// get the car collection 
DBCollection coll = db.getCollection("car"); 
 
// map function to categorize overspeed cars 
String carMap = "function (){" + "var criteria;" 
+ "if ( this.speed > 70 ) {" + "criteria = 'overspeed';" 
+ "emit(criteria,this.speed);" + "}" + "};"; 
 
// reduce function to add all the speed and calculate the average speed 
 
String carReduce = "function(key, speed) {" + "var total =0;" 
+ "for (var i = 0; i < speed.length; i++) {" 
+ "total = total+speed[i];" + "}" 
+ "return total/speed.length;" + "};"; 
 
// create the mapreduce command by calling map and reduce functions 
MapReduceCommand mapcmd = new MapReduceCommand(coll, carMap, carReduce, 
null, MapReduceCommand.OutputType.INLINE, null); 
 
// invoke the mapreduce command 
MapReduceOutput cars = coll.mapReduce(mapcmd); 
 
// print the average speed of cars 
for (DBObject o : cars.results()) { 
 
System.out.println(o.toString()); 
 
} 
 
} 
 
} 

Output: 

{ "_id" : "overspeed" , "value" : 85.0} 

Summary 

The reduce function is used to perform operations on the mapped data, while the map function is used to group all the data based on the key-value pair. 

We have studied about the MapReduce as an operation. Also, we have gone through the implementation of mapReduce() with Mongosh and implementing mapReduce with Java. 

When there’s a large quantum of data to find specific results from in a short amount of time, the MapReduce operation has been a reliable function to use. 

Over time though, Google discarded MapReduce function in favour of Cloud Dataflow, and it has emerged as an alternative to MapReduce. Cloud Dataflow was invented by Google and later open sourced for performing data analytics operations across many servers, in real time.  

Profile

Abhresh Sugandhi

Author

Abhresh is specialized as a corporate trainer, He has a decade of experience in technical training blended with virtual webinars and instructor-led session created courses, tutorials, and articles for organizations. He is also the founder of Nikasio.com, which offers multiple services in technical training, project consulting, content development, etc.