Hadoop Interview Questions

Ready to face your next Hadoop interview? Be interview-ready with this list of Hadoop interview questions and answers, carefully curated by industry experts. Get ready to answer questions on Hadoop applications, how Hadoop is different from other parallel processing engines, and the difference between NameNode, Checkpoint NameNode, and Backup Node. We have put together a detailed list of big data Hadoop interview questions that will help you become a Hadoop developer, Java developer, or Big Data engineer the industry talks about.

  • 4.5 Rating
  • 42 Question(s)
  • 25 Mins of Read
  • 8268 Reader(s)

Beginner

$ hadoop fs -copyToLocal
 $ hadoop fs -copyFromLocal
 $ hadoop fs -put

Below are the main tasks of JobTracker:

  • Accept jobs from the client.
  • Communicate with the NameNode to determine the location of the data.
  • Locate TaskTracker Nodes with available slots.
  • Submit the work to the chosen TaskTracker node and monitors the progress of each task.

Following are the three configuration files in Hadoop:

  • core-site.xml
  • mapred-site.xml
  • hdfs-site.xml

NameNode- It is also known as Master node. It maintains the file system tree and the metadata for all the files and directories present in the system. NameNode is a very highly available server that manages the File System Namespace and controls access to files by clients. It records the metadata of all the files stored in the cluster i.e. location of blocks stored, size of the files, hierarchy,permissions etc .

NameNode is the master daemon that manages and maintains all the DataNodes (slave nodes).

There are two files associated with the metadata:

FsImage: It is the snapshot of the file system when Name Node is started.

EditLogs: It is the sequence of changes made to the file system after the Name Node is started.

Checkpoint node- Checkpoint node is the new implementation of Secondary NameNode . It is used to create periodic checkpoints of file system metadata by merging edits file with fsimage file and finally it uploads the new image back to the active NameNode 

It is structured in the same directory as the NameNode and stores the latest checkpoint .

Backup Node - Backup Node is an extended checkpoint node that performs checkpointing and also supports online streaming of file system edits.

Its main role is to act as the dynamic Backup for the Filesystem Namespace (Metadata )in the Primary Namenode of the Hadoop Ecosystem.

The Backup node keeps an in-memory, up-to-date copy of the file system namespace which is always synchronized with the active NameNode state.

Backup node does not need to download fsimage and edits files from the active NameNode to create a checkpoint, as it already has an up-to-date state of the namespace in it’s own main memory.  So, creating checkpoint in backup node is just saving a copy of file system meta-data (namespace) from main-memory to its local files system.

 ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -distcp hdfs://namenodeA/apache_hadoop hdfs://namenodeB/Hadoop

Linux file system

*****************

  1. You can store the Linux file under a single disk
  2. You can store the small file in a disk if the file size is more than the disk size then you can not store.
  3. Each block size is 4 KB.
  4. If the machine is down, we can not able to get the data and failover chances are more

Hadoop  Distributed file system

*****************************

  1. A distributed file system can be stored in a logical layer which is created on one or more disk.
  2. You can store files as larger as you can, you need to add more disk to the logical layer
  3. Each block size is 64MB/128MB/256MB as per the Hadoop version and you can customize the size too.
  4. Data is replicated in different nodes. Clients are able to read the data if any node fails. Failover is less

High Availability of cluster was introduced in Hadoop 2 to solve the single point of Name node failure problem in Hadoop 1.

The High availability Name node architecture provides an opportunity to have two name nodes as Active name node and Passive/Standby name node. So, both are running Name Nodes at the same time in a High Availability cluster.

Whenever Active Name Node goes down due to crashes of server or graceful failover during the maintenance period at the same time control will go to passive/Standby  Name Node automatically and it reduces the cluster downtime. There are two problems in maintaining consistency in the HDFS High Availability cluster:

  1. Active and Passive/Standby Name Node should be in sync always because they are referring to the same metadata, to doing the same group of daemons called journal nodes will help. This will allow restoring the Hadoop cluster to the same namespace state whenever it got crashed or failed and it will provide us to have fast failover.
  2. One name node should be active at a time because two active Name Node will cause to loss or corruption of the data. This kind of scenario is known as a split-brain scenario where a cluster gets divided into a smaller cluster and each one believing that it is the only active cluster. Fencing helps to avoid such scenarios. Fencing is a process where it ensures that only one Name Node remains active at a particular time. It means whenever two Name Node will be in Active state fencing will kill one of the Name node active states.

As discussed above  There are two types of failover: A. Graceful Failover: In this case, we manually initiate the failover for routine maintenance. B. Automatic Failover: In this case, the failover is initiated automatically in case of Name Node failure or Name node crashes.

In either case of a Name Node failure, Passive or Stand by Name Node can take control of exclusive lock in Zookeeper and showing as it wants to become the next Active Name Node.

In HDFS High availability cluster, Apache Zookeeper is a service which provides the automatic failover. When the Name Node is active at that time Zookeeper maintains a session with the active Name Node. In any scenario when active Name Node get failed at that time the session will expire and the Zookeeper will inform to Passive or Stand by Name Node to initiate the failover process.

The ZookeeperFailoverController (ZKFC) is a Zookeeper client that also monitors and manages the Name Node status. Each of the Name Nodes runs a ZKFC also. ZKFC is responsible for monitoring the health of the Name Nodes periodically.

When zookeeper is installed in your cluster you should make sure that below are the process, or daemons running in Active Name Node, Standby Name Node and Data node.

When you do JPS (Java Virtual Machine Process Status Tool ) in Active NameNode you should get below Daemons:

  • Zookeeper
  • Zookeeper Fail Over controller
  • JournalNode
  • NameNode

When you do JPS (Java Virtual Machine Process Status Tool ) in Standby NameNode you should get below Daemons:

  • Zookeeper
  • Zookeeper Fail Over controller
  • JournalNode
  • NameNode

When you do JPS (Java Virtual Machine Process Status Tool ) in DataNode you should get below Daemons:

  • Zookeeper
  • JournalNode
  • DataNode

It is a facility provided by Hadoop map-reduce framework to access small file needed by an application during its execution. These files are small as it is in KB's and MB's in size. The type of files are mainly text, archive or jar files. These files are small that is why it will keep in the cache memory which is one of the fast memories. Applications which need to use distributed cache to distribute a file should make sure that the file is available and can be accessed via URLs. URLs can either be hdfs:// or http://

Once the file is present on the mentioned URL, the Map-Reduce framework will copy the necessary files on all the nodes before initiation of the tasks on those nodes. In case the files provided are archives, these will be automatically unarchived on the nodes after transfer.

Example: In a Hadoop cluster, we have three data nodes there are 30 tasks we run in the cluster. So each node will get 10 tasks each. Our nature of the task is such kind of task where it needs some information or a particular jar to be adopted before its execution. To fulfil this, we can cache these files which contain the info or jar files. Before execution of the job, the cache files will copy to each slave node application master. Application master than reads the files and start the tasks. The task can be mapper or reducer and these are read-only files. By default Hadoop, the distributed cache is 10GB if you want to change the same you have to modify the size in mapred-site.xml. Here it is coming to our mind that why cache memory is required to perform the tasks.  why can’t we keep the file in HDFS on each data node already present and have the application read it? they are a total of 30 tasks and in real time it should be more than 100 or 1000 tasks. If we put the files in HDFS than to perform 30 tasks the application has to access the HDFS location 30 times and then read it but HDFS is not very efficient to access small files for this many times. this is the reason why we are using cache memory and it reduces the number of reads from HDFS locations.

In Hadoop cluster when we are talking about Data node, Data node is where the actual data we are keeping. Data nodes are sending a heartbeat message to the name node in every 3 seconds to confirm that they are active. If the Name Node does not receive a heartbeat from a particular data node for 10 minutes, then it considers that data node to be dead. Then Name Node initiates the replication of Dead data node blocks to some other data nodes which are active. Data nodes can talk to each other to rebalance the data, move and copy the data around and keep the replication active in the cluster. You can get the block report using below HDFS commands.

Example:

hadoop fsck / ==> Filesystem check on HDFS

# hadoop fsck /hadoop/container/pbibhu

Total size:    16666944775310 B    <=== see here

Total dirs:    3922

Total files:   418464

Total blocks (validated):      202705 (avg. block size 32953610 B)

Minimally replicated blocks:   202705 (100.0 %)

Over-replicated blocks:        0 (0.0 %)

Under-replicated blocks:       0 (0.0 %)

Mis-replicated blocks:         0 (0.0 %)

Default replication factor:    3

Average block replication:     3.0

Corrupt blocks:                0

Missing replicas:              0 (0.0 %)

Number of data-nodes:          18

Number of racks:               1

FSCK ended at Thu Oct 20 20:49:59 CET 2011 in 7516 milliseconds

The filesystem under path '/hadoop/container/pbibhu 'is HEALTHY

Name node is the node which stores the file system metadata when we are talking about metadata, it is having information like List of file names, Owner, Permissions, Timestamps, Size, Replication Factor, List of Blocks for each file etc. Metadata, which files maps to what block location and which blocks are stored in which data node. When data nodes are storing a block of information, it maintains a checksum for each block as well. when any data has been written to HDFS, checksum value has been written simultaneously and when it reads by default verifies the same checksum value. The data nodes update the name node with the block information periodically and before updating verify the value of the checksum. when the checksum value is not correct for a particular block then we will consider as disk level corruption for that particular block , it skips that block information while reporting to the name node, in this way name node will get to know the disk level corruption on that data node and takes necessary steps like it can be replicated from its alternate locations to other active data nodes to bring the replication factor back to the normal level. Data nodes can be listed in DFS.HOSTS file, It contains a list of hosts that are permitted to connect to the Name Node.

Example:

Add this property to hdfs-site.xml:
<property>
  <name>dfs.hosts</name>
   <value>/home/hadoop/includes</value>
</property>
includes:
host name1
hostname2
hostname3

If include file is empty then all hosts are permitted but it is not a definitive list of active data nodes. Name node will consider those data nodes from which Name Node will receive the heart beats.

ANS:

LVM stands for Logical Volume Management. It is a system of managing logical volumes or filesystems, that is much more advanced and flexible than the traditional method of partitioning a disk into one or more segments and formatting that partition with a filesystem. Today the disks are huge (> 1TB) and LVM is the right tools to dynamically allocate and resize partitions of these huge disks.

If you are using Linux to deploy Hadoop nodes, master or slaves, it is strongly recommended that you should not use LVM in Linux because of below points

  1.  Attempts to reduce the size of a logical volume or reallocate storage space used by it to another volume commonly result in corruption and data loss.
  2. The loss or removal of a disk from LVM will leave the entire storage together with all the files inaccessible.
  3. Other operating systems do not recognize LVM, therefore, LVM partitions cannot be accessed from Windows or macOS.
  1. If any job is submitted by a client, it will come to the resource manager and resource manager is has a scheduler. It is the duty of the resource manager to see and calculate the required resources to run the job.
  2. Once it has calculated the number of resources whatever required, then the resource manager launches application specific application master.
  3. Application master daemon will be available until the job got completed.
  4. Application master is responsible to negotiate the resources from the resource manager. It means application master will ask for more or fewer resources based on the requirement to the resource manager.
  5. Application master launches the container in different node manager when data will be available.
  6. The container will be working under the surveillance of the node manager, so node manager is responsible for all the containers available in that node. The container will give periodic updates to the application master about the job.
  7. Once the job was completed and the container/resources are freed up then the application master updates to the resource manager about the completion of job and client will get the update from the resource manager.

HDFS data might not always be distributed uniformly across DataNodes for different reasons like if some DataNodes have less disk space available for use by HDFS or During the normal usage/ when usage is more, the disk utilization on the DataNode machines may become uneven or when a new Data Nodes are added to an existing cluster at that time also data nodes utilizations are uneven. to mitigate this problem balancer is required.

A balancer is a tool that balances disk space usage on an HDFS cluster and it analyzes block placement and balances data across the DataNodes. The balancer moves blocks until the cluster is deemed to be balanced, which means that the utilization of every DataNode more or less equally distributed. The balancer does not balance between individual volumes on a single DataNode.

HDFS balancer  [-policy <policy>]

The two supported policies are Blackpool and data node. Setting the policy to Blackpool means that the cluster is balanced if each pool in each node is balanced while the data node means that a cluster is balanced if each DataNode is balanced. The default policy is the data node.

HDFS balancer [-threshold <threshold>] specifies a number in [1.0, 100.0] representing the acceptable threshold of the percentage of storage capacity so that storage utilization outside the average +/- the threshold is considered as over/underutilized. The default threshold is 10.0.

When we are talking about Rack, It is the collection of multiple servers based on your requirement. All these servers are connected using the same network switch and if that network goes down then all machines in that rack will be out of service and we can say rack is downstate.

To mitigate the same, Rack Awareness was introduced for Hadoop by Apache. In Rack Awareness, Name Node chooses the Data Node which is closer to the rack where the Name Node will be available or nearby that rack. Name Node maintains all the Rack ids of each Data Node to get the rack information and based on Rack ID Name Node can communicate with Data Node. In Hadoop, when we are maintaining a Rack we have to follow certain rules as mentioned below.

  • All the replicas should not be stored on the same rack or in a single rack due to which Rack Awareness Algorithm can reduce the latency as well as Fault Tolerance.
  • By Default replication factor is 3 so according to Rack Awareness Algorithm below are the points to be followed:
  1. The first replica of the block will be stored on a local rack.
  2. The next replica will be store another Data Node within the same rack.
  3. The third replica stored on the different rack other than earlier Rack.

Below are some points due to which we are following Rack Awareness in Hadoop. Please find the details as mentioned below:

  • To improve the data high availability and consistency as the same block will be available in different Racks.     
  • The performance of the cluster will be improved as reading and writing in the cluster will be quick because two of data nodes will be available in the same rack and third data node will be available near to earlier rack.   
  • Network bandwidth will be improved for sure because of rack awareness rule Especially with rack awareness YARN is able to optimize the Map reduce job performance because YARN will assign the task to data nodes that are closer to each other based on Rack policy and where replica will be available to do the process.
  • As per the Rack policy, the Name Node assigns 2nd & 3rd replicas of a block to Data Nodes in a rack different from Data Node where the first replica is available. It will provide data protection even against Rack failure; It is possible only if Hadoop was configured with rack awareness.  

There are two types of tables which HIVE supports.

  • Managed Table
  • External Table

Hive Managed Tables:
Hive Managed Table is also known as an internal table. When we will create a table in Hive, by default Managed table will create and it manages the data as well. It means that Hive is storing the data into its warehouse directory. A managed table is stored under the hive.metastore.warehouse.dir path property and default location of table will be /apps/hive/warehouse/<db_name>.db/<table_name>. This path will be modifiable. If a managed table or partition is dropped, then the data and corresponding metadata of the table or partition are deleted. If you do not specify the PURGE option then the data is moved to a trash folder for a certain period, it will be deleted permanently after that.

Example:

1. Create Table

hive> create table univercity_db.school_table(name string, roll no int) row format delimited fields terminated by ',';

OK

Time taken: 0.202 seconds

2. Describe table

hive> describe formatted univercity_db.school_table;

OK

you will get extra information like whether the table is managed or an external table. when the table is created, what kind of file format, Location of the data path in HDFS, whether the object is a table or view.

3. Load the data to table from the local path

hive>load data local inpath '/home/pbibhu/Desktop/blog/school' into table univercity_db.school_table;

After loading from the local path you can further use hive commands to select/count/describe etc    

Hive External Tables:
while creating an External table the location of the data path is not the usual warehouse path, you have to provide the HDFS path outside of the warehouse directory. While Creating an external table location is mandatory in the create syntax. By any chance structure or partitioning of an external table is changed then an MSCK REPAIR TABLE table_name statement can be used to refresh metadata information. Basically, In External Table we cannot load the table from a local path. you have to load data from HDFS mentioning the path.   

Use external tables when files are present in the remote locations, and the files should remain even if the external table is dropped.

Example:

1. Create Table

CREATE EXTERNAL TABLE IF NOT EXISTS univercity_db.school_table( student_ID INT, FirstName STRING, LastName STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE/ORC LOCATION 'hdfs/pbibhu/school';

2. Create partition Table

CREATE EXTERNAL TABLE IF NOT EXISTS univercity_db.school_table( student_ID INT, FirstName STRING, LastName STRING)  partitioned by (student_ID int) STORED AS ORC LOCATION 'hdfs/pbibhu/school';

3. insert the data to internal table from external table,data structure should be same for both the tables.

hive> CREATE TABLE IF NOT EXISTS office(EmployeeID INT,FirstName STRING, Title STRING,
    State STRING, Laptop STRING)  STORED AS ORC;

OK

hive> CREATE EXTERNAL TABLE IF NOT EXISTS Office_text(
   EmployeeID INT,FirstName STRING, Title STRING,
   State STRING, Laptop STRING)
   ROW FORMAT DELIMITED
   FIELDS TERMINATED BY ','
   STORED AS TEXTFILE
   LOCATION '/user/pbibhu/office';

OK

hive> INSERT OVERWRITE TABLE office SELECT * FROM office_text;

Resource allocation within the queues is controlled separately. Within a queue:

FairScheduler can apply any of FIFO policy, FairPolicy or DominantResouceFairnessPolicy.

CapacityScheduler can apply either FifoPolicy and fair policy.

Fair Scheduler can use different scheduling policies. The default scheduling policy is fair sharing, using memory as a resource. There’s also a FIFO policy first in first out which is not much use. It’s quite common to use the third type of scheduling policy, DRF, which allocates both memory and CPU resources to applications,DRF is similar to fair-scheduling, but it is important to keep in mind that it applies primarily to the allocation of resources among queues, an activity which is already dominated by queue weights. Thus, the most important thing about DRF is that considers multiple resources, rather than that it attempts to provide equal resource allocation.

Initially, TCS and Wipro each have some resources allocated to jobs in their respective queues and only 10 GB remains in the cluster. Each queue is requesting to run a map task requiring 20 GB, so memory is available 30 GB and the rest of the required resource will take from CPU. WIPRO currently holds 15 GB resources. Another 10 GB is required for mapper task so the fair scheduler will award a container the requested 10 GB of memory to WIPRO. Now the available memory is 5 GB for TCS and it will require another 20 GB to run the mapper task. In this case, there is no memory available for TCS and DRF will try to use 5GB from memory and rest 20 GB can be used from the CPU.

Intermediate

Map-reduce jobs are limited by the bandwidth available on the cluster, hence it is beneficial if the data transferred between map and reduce tasks can be minimized. This can be achieved using Hadoop Combiner. A combiner runs on a map output and its output forms the input to the reducer. It decreases the amount of data that needs to be transferred between the mapper and reducer, as well as improves the performance of a map-reduce job. A combiner can, however, be used for functions that are commutative or associative.

Partitioner controls which partition a given key-value pair will go to. Partitioning ensures that all the values for each key are grouped together and the values having the same key go to the same reducer. The total number of practitioners that run in a Hadoop job is equal to the number of reducers.

The partition phase takes place after the map phase and the reduce phase. A map-reduce job having both partitioner and reducer work like below: Output from each mapper is written to a memory buffer and spilled to a local directory in case of overflow. The spilled data is partitioned according to the partitioner. Data in each partition is sorted and combined based on the logic in the combiner. The combined data is sent to reducer based on the partition key.

A  job consists of the following components: The client which submits map-reduce job, Resource manager which coordinates allocation of compute resources, Node managers which launch and monitor the compute containers, Hadoop Distributed File System (HDFS) which is used for sharing resources between the above components and Application Master which coordinates tasks running in map-reduce job.

The map-reduce job begins when the client/job submitter sends the request to the Resource Manager. It asks for a new application id to be allocated. It also checks whether the output directory specified exists or not, and computes input splits for the job as well. The resources needed to run the job including the application jar are copied to HDFS. Finally, the job submitter submits the job to Resource Manager.

The Resource Manager now allocates a container and launches the application master. The application master determines no of the mapper and reducer tasks that need to be launched and requests resource manager to launch containers for the same. Resource Manager, in turn, directs Node Managers to launch the containers where the tasks get run. Once the tasks are initiated, the application master keeps track of them.  In case any task fails or gets stuck it relaunches them on another container. Requests for map tasks are made first and with a higher priority than those for reduce tasks, since all the map tasks must complete before the sort phase of the reduce can start. Once the mapper task completes, its output undergoes sorting, shuffling and partitioning (in case of multiple reducers), is sent to the combiner (if any) and finally sent to reducer(s). The output of reducer is written to HDFS.

The usual block size on HDFS is 128 MB. The size of the HDFS block is kept large enough to minimize the seek cost. When the block size is large enough the time to transfer data will be significantly longer than the time to seek the start of a block. As data transfer is much higher than the disk seek rate it is optimal to keep the block size large. The seek time is usually kept as 1% of transfer time. e.g. If seek time around 10 ms and the data transfer rate is 100MB/s then block size comes to around 128 MB.

However, this doesn’t mean that the block size can be made indefinitely large. Map tasks operate on one block (assuming split size is equal to block size) at a time. Having a huge block size will result in fewer splits and hence less number of mappers which will reduce the advantage that can be gained by parallelly working on multiple blocks.

Having a block abstraction for a distributed file system has many benefits.

  • First, a file can be larger than any single disk in the network.
  • It simplifies the storage subsystem because blocks are a fixed size, and it is easy to calculate how many blocks can be stored on a given disk.
  • In addition, blocks fit well with the need for replication needed in a distributed file system.

High availability in HDFS implies that the system does not have any single point of failure, is available 24/7 so that there is no or limited impact on client applications and is able to self-recover from failure without any manual intervention.

For implementing High Availability in HDFS, a pair of NameNodes is set up in an active-standby configuration. The passive node is kept in sync with the active node. Both active and passive nodes have access to shared storage space. When any namespace modification is performed by the Active node, it logs a record of the modification to an edit log file stored in the shared directory. The Standby node is constantly watching this directory for edits, and as it sees the edits, it applies them to its own namespace thereby keeping in sync with Active node.

In case of a failure of active NameNode, the standby node takes over and starts servicing client requests. The transition from active to standby node is managed by Failover Controller. It uses Zookeeper to ensure that only NameNode is active at a given time. Each NameNode runs a failover controller process that monitors its NameNode for failures using a heartbeat mechanism and triggers a failover in case of failure.

However, it needs to be ensured that only NameNode is active at a given time. Two active NameNodes at the same time will cause the corruption of data. To avoid such a scenario fencing is done which ensures that only NameNode is active at a given time. The Journal Nodes perform fencing by allowing one NameNode to be writer at a time. The Standby NameNode takes over the responsibility of writing to the JournalNodes and forbid any other NameNode to remain active.

In the case of large data, it’s advised to use more than one reducer. In the case of multiple reducers, the thread spilling map output to disk first divides the data into partitions corresponding to the number of reducers. Within each partition, an in-memory sort on the data is performed. A combiner, if any, is applied to the output of the sort. Finally, the data is sent to reducer based on the partitioning key.

Partitioning ensures that all the values for each key are grouped together and the values having the same key go to the same reducer, thus allowing for even distribution of the map output over the reducer. The Default partitioner in a map-reduce job is Hash Partitioner which computes a hash value for the key and assigns the partition-based its result.

However, care must be taken to ensure that partitioning logic is optimal and data gets sent evenly to the reducers. In the case of a sub-optimal design, some reducers will have more work to do than others, as a result, the entire job will wait for that one reducer to finish its extra load share.

Advanced

 The replication factor in HDFS can be modified /overwritten in 2 ways-

  • Using the Hadoop FS Shell, replication factor can be changed per file basis using the below command
$hadoop fs –setrep –w 2 /my/sample.xml

 sample.xml is the filename whose replication factor will be set to 2

  • Using the Hadoop FS Shell, replication factor of all files under a given directory can be modified using the below command-
$hadoop fs –setrep –w 6 /my/sample_dir

sample_dir is the name of the directory and all the files in this directory will have a replication factor set to 6.

ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -touchz /hadoop/sample
ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -ls /hadoop

Found 2 items

-rw-r--r--  2 ubuntu supergroup
0 2018-11-08 00:57 /hadoop/sample 
-rw-r--r--  2 ubuntu supergroup
16 2018-11-08 00:45 /hadoop/test

fsck a utility to check health of the file system, to find missing files, over-replicated, under-replicated and corrupted blocks.

 Command for finding the blocks for a file: 

$ hadoop fsck -files -blocks –racks

Hadoop distributed file system (HDFS) is the primary storage system of Hadoop. HDFS stores very large files running on a cluster of commodity hardware. It works on the principle of storage of less number of large files rather than the huge number of small files.

 HDFS stores data reliably even in the case of hardware failure. It provides high throughput access to the application by accessing in parallel. Components of HDFS:

  • NameNode – It is also known as Master node. Namenode stores meta-data i.e. number of blocks, their replicas and other details.
  • DataNode – It is also known as Slave. In Hadoop HDFS, DataNode is responsible for storing actual data. DataNode performs read and write operation as per request for the clients in HDFS.

Update the network addresses in the dfs.include and mapred.include 

$ hadoop dfsadmin -refreshNodes and hadoop mradmin -refreshNodes Update the slave file.

 Start the DataNode and NodeManager on the added Node.

  • The client connects to the name node to register a new file in HDFS.
  • The name node creates some metadata about the file (either using the default block size, or a configured value for the file)
  • For each block of data to be written, the client queries the name node for a block ID and list of destination datanodes to write the data to. Data is then written to each of the datanodes.

By default, the HDFS block size is 64MB

 Default replication factor is 3

It dsplays the Access Control Lists (ACLs) of files and directories. If a directory has a default ACL, then getfacl also displays the default ACL.

ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -getfacl /hadoop
  • file: /hadoop
  • owner: ubuntu
  • group: supergroup

This exception means there is no communication between the DataNode and the DataNode due to any of the below reasons :

  • Block size is negative in hdfs-site.xml file
  • Disk usage for DataNode is 100% and there is no space available .
  • Due to poor network connectivity between NameNode and DataNode , primary DataNode is down when the write operation is in progress .

You can provide dfs.block.size on command line :

  • copying from HDFS to HDFS

hadoop fs -D dfs.block.size=<blocksizeinbytes> -cp /source /destination

  • copying from local to HDFS

hadoop fs -D dfs.block.size=<blocksizeinbytes> -put /source /destination 

Below command is used to enter Safe Mode manually – 

$ Hdfs dfsadmin -safe mode enter

Once the safe mode is entered manually, it should be removed manually.

Below command is used to leave Safe Mode manually – 

$ hdfs dfsadmin -safe mode leave

The two popular utilities or commands to find HDFS space consumed are

  • hdfs dfs –du
  • hdfs dfsadmin –report.

 HDFS provides reliable storage by copying data to multiple nodes. The number of copies it creates is usually referred to as the replication factor which is greater than one.

  • hdfs dfs –du –This command shows the space consumed by data without replications.
  • hdfs dfsadmin –report- This command shows the real disk usage as it counts data replication also . Therefore, the value of hdfs dfsadmin –report will always be greater than the output of hdfs dfs –du command.

Hadoop task log files are stored on the local disk of the slave node running in the disk. In general, log related configuration properties are yarn.nodemanager.log-dirs and yarn.log-aggregation-enable. yarn.nodemanager.log-dirs property determines where the container logs are stored on the node when the containers are running. its default value is ${yarn.log.dir}/userlogs. An application localized log directory will be found in /{yarn.nodemanager.log-dirs}/application_${application_id}.individual containers log directories will be shown in subdirectories named container_{$conatinerid}.

For MapReduce application, each container directory will contain the files STDERR, STDOUT and SYSLOG generated by the container.

The yarn.log-aggregation-enable property specifies whether to enable or disable log aggregation. If this function is disabled, then the node manager will keep the logs locally and not aggregate them.

Following properties are in force when log aggregation is enabled.

yarn.nodemanager.remote-app-log-dir: This location is found on the default file system (usually HDFS) and indicates where the node manager should aggregate logs. It should not be the local file system otherwise serving daemon such as the history server will not be able to serve the aggregated logs.the default value is /tmp/logs.

yarn.nodemanager.remote-app-log-dir-suffix: the remote log directory will be created at {yarn.nodemanager.remote-app-log-dir}/${user}/{suffix}. the default suffix value is "logs".

yarn.log-aggregation.retain.seconds: This property defines how long to wait before deleting aggregated logs; -1 or any other negative value disables the deletion of aggregated logs.

yarn.log-aggregation.retain-check-interval-seconds: This property determines how long tom wait between aggregated log retention checks.if its value is set to 0 or a negative value then the value is computed as one-tenth of the aggregated log retention time. The default value is -1.

yarn.log.server.url: once an application is done, Nodemanagers redirect the web UI users to this URL, where aggregated logs are served, it points to MapReduce-Specific job history.

The following properties are used when log aggregation is disabled:

yarn.nodemanager.log.retain-seconds: The time in seconds to retain user logs on the individual nodes if log aggregation is disabled. the default is 10800.

yarn.nodemanager.log.deletion-threads-count: The number of threads used by the node managers to clean up logs once the log retention time is hit for local log files when aggregation is disabled.    

YARN requires a staging directory for temporary files created by running jobs. local directories for storing various scripts that are generated to start up the job's containers (which will run the map reduce task).

Staging directory:

  1. When a user executes a MapReduce job, they usually invoke a job client to configure the job and lunch the same.
  2. As part of the job execution job client first checks to see if there is a staging directory under the user's name in HDFS, If not than staging directory is created under /user/<username>/.staging
  3. In addition to job-related files a file from the Hadoop JAR file named hadoop-mapreduce-client-jobclient.jar also placed in the .staging directory after renaming it to job.jar
  4. Once the staging directory is set up the job client submits the job to the resource manager.
  5. Job client also sends back to the console the status of the job progression(ex map 5%, reduce 0%).

Local directory:

  1. The resource manager service selects a node manager on one of the cluster's nodes to launch the application master process, which is always the very fast container to be created.in yarn job.
  2. The resource manager chooses the node manager to depend on the available resources at the time of launching the job. You cannot specify the node on which to start the job.
  3. The node manager service starts up and generates various scripts in the local cache directory to execute the application Master container/directory.
  4. The Application Masters directories are stored in the location that you have specified for the node manager's local directories with the yarn.nodemanager.local-dirs configuration property in the yarn-site.xml.
  5. The yarn.nodemanagercan store its localized file directory with the following directory structure.

[yarn.nodemanager.local-dirs]/usercache/$user/appcache/application_${app_is}

When a client launches an application, the corresponding application master container is launched with ID 000001. The default size is 1 GB for each application master container but some time data size will be more. In that case, the application master reaches the limits of its memory in this case application will fail and you will get a similar message as mentioned below.

Application application_1424873694018_3023 failed 2 times due to AM Container for appattempt_1424873694018_3023_000002 exited with exitCode: 143 due to: Container

[pid=25108,containerID=container_1424873694018_3023_02_000001] is running beyond physical memory limits. Current usage: 1.0 GB of 1 GB physical memory used; 1.5 GB

of 2.1 GB virtual memory used. Killing container.

When configuring MapReduce 2 resource utilization on YARN, there are three aspects to be considered:

  1. Physical RAM limit for each Map and Reduce Task.
  2. JVM heap size limit for each task
  3. the amount of virtual memory each task will get

Physical RAM limit for each Map and Reduce Task

*********************************************

You can define how much maximum memory each Map and Reduce task will take. Since each Map and each Reduce task will run in a separate container, these maximum memory settings should be at least equal to or more than the YARN minimum Container allocation(yarn.scheduler.minimum-allocation-mb).

In mapred-site.xml:
<name>mapreduce.map.memory.mb</name>
<value>4096</value>
<name>mapreduce.reduce.memory.mb</name>
<value>8192</value>
The JVM heap size limit for each task

*********************************

The JVM heap size should be set to lower than the Map and Reduce memory defined above, so that they are within the bounds of the Container memory allocated by YARN.

In mapred-site.xml:
<name>mapreduce.map.java.opts</name>
<value>-Xmx3072m</value>
<name>mapreduce.reduce.java.opts</name>
<value>-Xmx6144m</value>
the amount of virtual memory each task will get

********************************************

Virtual memory is determined  on upper limit of the physical RAM that  each Map and Reduce task will use.default value is 2.1. for example if Total physical RAM allocated = 4 GB than Virtual memory upper limit = is 4*2.1 = 8.2 GB

Under the Fair scheduler when a single application is running that application may request the entire cluster (if needed). If additional applications are submitted, resources that are free are assigned "fairly" to the new application so that each application gets roughly the same amount of resources. Fair scheduler organizes application further into queues and shares resources fairly between these queues. The fair scheduler in YARN supports hierarchical queues which means sub-queues can be created within a dedicated queue. All queues descend from a queue named “root”.A queue’s name starts with the names of its parents, with periods as separators. So a queue named “parent1” under the root queue would be referred to as “root.parent1”, and a queue named “queue2” under a queue named “parent1” would be referred to as “root.parent1.queue2”

A join operation is used to combine two or more datasets. In Map Reduce joins are of two types – map side joins and reduces side join.

A map side join is one in which join between two tables is performed in the Map phase without the involvement of the Reduce phase. It can be used when one of the data sets is much smaller than other data set and can easily be stored in DistributedCache. One of the ways to store datasets in DistributedCache is to do it in setup() method of Mapper. Since in map side join, the join is performed in mapper phase itself, it reduces the cost that is incurred for sorting and merging data in the shuffle and reduce phase, thereby improving the performance of the task

Reduce side join on the other hand works well for large datasets. Here the reducer is responsible for performing the join operation. This type of join is much simpler to implement as data undergo sorting and shuffling before reaching the reducer and values having identical keys are sent to the same reducer. The reducer is responsible for performing the join operation. It is comparatively simple and easier to implement than the map side join as the sorting and shuffling phase sends the values having identical keys to the same reducer and therefore, by default, the data is organized for us. However, the I/O cost is much higher due to data movement involved in the sorting and shuffling phase.

The map-reduce framework doesn’t guarantee that the combiner will be executed for every job run. The combiner is executed at each buffer spill. During a spill, the thread writing data to the disk first divides data into partitions corresponding to the number of reducers. Within each partition, the thread performs an in-memory sort on the data and applies the combiner function (if any) on the output of sort.

 Various ways to reduce data shuffling in a map-reduce job are:

  • Use a combiner to perform associative or commutative operations on the mapper output and hence reduce shuffling of data
  • If the data size is huge, then having a single reducer is not a good idea. An optimal number of reducers should be chosen in this case
  • Compressing mapper output reduces the amount of data that gets written to disk and transferred to reducer hence reducing the shuffling of data
  • Increase the buffer size used by mappers during sorting. This will reduce the number of spills to the disk. This can be controlled using property mapreduce.task.io.sort.mb
  • Leveraging cleanup() method of Mapper – This method is called once per mapper task and can be used to perform any associative or commutative operation on the output of the map(0 functions.

In a map-reduce job, the application master decides how many tasks need to be created for the job to be executed. The number of mapper tasks created is equal to the number of splits. These tasks are usually launched in different JVMs than the application master (governed by data locality).

However, if the job is small, the application master may decide to run the tasks in the same JVM as itself. In such a case the overhead of allocating containers for new tasks and monitoring them to gain that would be had in running the tasks in parallel compared to running the tasks sequentially on the same node. Such a job is called an Uber task.

So how does the application master determine if the job is small enough to be run as an uber task? By default, if the job requires less than 10 mappers for its processing and one 1 reducer, and input size is less than the size of one HDFS block, then the application master may consider launching the job as an uber task.

If the job doesn’t qualify to be run as an uber task then the app master requests for containers to be launched for all map and reduce tasks from the resource manager.

A file is read by a Map-Reduce job using an InputFormat. It defines how the file being read needs to be split up and read. InputFormat, in turn, defines a RecordReader which is responsible for reading actual records from the input files. The split computed by InputFormat is operated upon by map task. Map task uses Record Reader corresponding to InputFormat to read the data within each split and create key-value pairs.

The various types of InputFormat in Hadoop are:

  • FileInputFormat – Base class for all file-based input formats.
  • TextInputFormat  Default input format used in a map-reduce job. It treats each line of input as a separate record. LineRecordReader is the default record reader for TextInputFormat which treats each line of the input file as a separate value.
  • KeyValueTextInputFormat – Similar to TextInputFormat but it breaks the line being read into key and value. The key is text up to tab (\t) character while all the text after a tab is considered value.
  • SeqenceFileInputFormat – An input format for reading sequence files.
  • NlineInputFormat – It is a type of TextInputFormat but one which can read a variable number of lines of input.
  • DBInputFormat – An input format to read data from a relational database using JDBC. It is however suited for reading small datasets only.

Description

Hadoop is an open source framework highly adopted by several organizations to store and process a large amount of structured and unstructured data by applying the MapReduce programming model. There are so many top rated companies using Apache Hadoop framework to deal with their large amount of data that is increasing continuously every minute. Coming to the Hadoop cluster, Yahoo is the first name in the list having around 4500 nodes followed by Linkedin and Facebook

Here are some of the world’s most popular and top-rated organizations that are using Hadoop for their production and research. Adobe,  AOL, Alibaba, eBay, and Fox Audience network etc.

If you are looking to build your career in the field of big data Hadoop, then give a start with learning big data Hadoop. You can also take up Hadoop Training program and start a career as a big data Hadoop professional to solve large data problems.

Interview questions on Hadoop here are the top Hadoop Interview questions asked frequently and which are scenario based. You will also see how to explain Hadoop project in an interview which carries a lot of weight in the interview.

These Hadoop developer interview questions have been designed specially to get you familiarized with the nature of questions which you might face during your interview and will help you to crack Hadoop Interview easily & acquire your dream career as a Hadoop Developer.  Top big data Hadoop interview questions will surely boost your confidence to face an interview and will prepare you to answer to your interviewer’s questions in the best manner. These interview questions on Hadoop are suggested by the experts.

Turn yourself into a Hadoop Developer. Live your dream career!

Read More
Levels