Hadoop Interview Questions

Ready to face your next Hadoop interview? Be interview-ready with this list of Hadoop interview questions and answers, carefully curated by industry experts. Get ready to answer questions on Hadoop applications, how Hadoop is different from other parallel processing engines, and the difference between NameNode, Checkpoint NameNode, and Backup Node. We have put together a detailed list of big data Hadoop interview questions that will help you become a Hadoop developer, Java developer, or Big Data engineer the industry talks about.

  • 4.5 Rating
  • 58 Question(s)
  • 25 Mins of Read
  • 8268 Reader(s)

Beginner

$ hadoop fs -copyToLocal
 $ hadoop fs -copyFromLocal
 $ hadoop fs -put

Below are the main tasks of JobTracker:

  • Accept jobs from the client.
  • Communicate with the NameNode to determine the location of the data.
  • Locate TaskTracker Nodes with available slots.
  • Submit the work to the chosen TaskTracker node and monitors the progress of each task.

Following are the three configuration files in Hadoop:

  • core-site.xml
  • mapred-site.xml
  • hdfs-site.xml

NameNode- It is also known as Master node. It maintains the file system tree and the metadata for all the files and directories present in the system. NameNode is a very highly available server that manages the File System Namespace and controls access to files by clients. It records the metadata of all the files stored in the cluster i.e. location of blocks stored, size of the files, hierarchy,permissions etc .

NameNode is the master daemon that manages and maintains all the DataNodes (slave nodes).

There are two files associated with the metadata:

FsImage: It is the snapshot of the file system when Name Node is started.

EditLogs: It is the sequence of changes made to the file system after the Name Node is started.

Checkpoint node- Checkpoint node is the new implementation of Secondary NameNode . It is used to create periodic checkpoints of file system metadata by merging edits file with fsimage file and finally it uploads the new image back to the active NameNode 

It is structured in the same directory as the NameNode and stores the latest checkpoint .

Backup Node - Backup Node is an extended checkpoint node that performs checkpointing and also supports online streaming of file system edits.

Its main role is to act as the dynamic Backup for the Filesystem Namespace (Metadata )in the Primary Namenode of the Hadoop Ecosystem.

The Backup node keeps an in-memory, up-to-date copy of the file system namespace which is always synchronized with the active NameNode state.

Backup node does not need to download fsimage and edits files from the active NameNode to create a checkpoint, as it already has an up-to-date state of the namespace in it’s own main memory.  So, creating checkpoint in backup node is just saving a copy of file system meta-data (namespace) from main-memory to its local files system.

 ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -distcp hdfs://namenodeA/apache_hadoop hdfs://namenodeB/Hadoop

Linux file system

*****************

  1. You can store the Linux file under a single disk
  2. You can store the small file in a disk if the file size is more than the disk size then you can not store.
  3. Each block size is 4 KB.
  4. If the machine is down, we can not able to get the data and failover chances are more

Hadoop  Distributed file system

*****************************

  1. A distributed file system can be stored in a logical layer which is created on one or more disk.
  2. You can store files as larger as you can, you need to add more disk to the logical layer
  3. Each block size is 64MB/128MB/256MB as per the Hadoop version and you can customize the size too.
  4. Data is replicated in different nodes. Clients are able to read the data if any node fails. Failover is less

High Availability of cluster was introduced in Hadoop 2 to solve the single point of Name node failure problem in Hadoop 1.

The High availability Name node architecture provides an opportunity to have two name nodes as Active name node and Passive/Standby name node. So, both are running Name Nodes at the same time in a High Availability cluster.

Whenever Active Name Node goes down due to crashes of server or graceful failover during the maintenance period at the same time control will go to passive/Standby  Name Node automatically and it reduces the cluster downtime. There are two problems in maintaining consistency in the HDFS High Availability cluster:

  1. Active and Passive/Standby Name Node should be in sync always because they are referring to the same metadata, to doing the same group of daemons called journal nodes will help. This will allow restoring the Hadoop cluster to the same namespace state whenever it got crashed or failed and it will provide us to have fast failover.
  2. One name node should be active at a time because two active Name Node will cause to loss or corruption of the data. This kind of scenario is known as a split-brain scenario where a cluster gets divided into a smaller cluster and each one believing that it is the only active cluster. Fencing helps to avoid such scenarios. Fencing is a process where it ensures that only one Name Node remains active at a particular time. It means whenever two Name Node will be in Active state fencing will kill one of the Name node active states.

As discussed above  There are two types of failover: A. Graceful Failover: In this case, we manually initiate the failover for routine maintenance. B. Automatic Failover: In this case, the failover is initiated automatically in case of Name Node failure or Name node crashes.

In either case of a Name Node failure, Passive or Stand by Name Node can take control of exclusive lock in Zookeeper and showing as it wants to become the next Active Name Node.

In HDFS High availability cluster, Apache Zookeeper is a service which provides the automatic failover. When the Name Node is active at that time Zookeeper maintains a session with the active Name Node. In any scenario when active Name Node get failed at that time the session will expire and the Zookeeper will inform to Passive or Stand by Name Node to initiate the failover process.

The ZookeeperFailoverController (ZKFC) is a Zookeeper client that also monitors and manages the Name Node status. Each of the Name Nodes runs a ZKFC also. ZKFC is responsible for monitoring the health of the Name Nodes periodically.

When zookeeper is installed in your cluster you should make sure that below are the process, or daemons running in Active Name Node, Standby Name Node and Data node.

When you do JPS (Java Virtual Machine Process Status Tool ) in Active NameNode you should get below Daemons:

  • Zookeeper
  • Zookeeper Fail Over controller
  • JournalNode
  • NameNode

When you do JPS (Java Virtual Machine Process Status Tool ) in Standby NameNode you should get below Daemons:

  • Zookeeper
  • Zookeeper Fail Over controller
  • JournalNode
  • NameNode

When you do JPS (Java Virtual Machine Process Status Tool ) in DataNode you should get below Daemons:

  • Zookeeper
  • JournalNode
  • DataNode

It is a facility provided by Hadoop map-reduce framework to access small file needed by an application during its execution. These files are small as it is in KB's and MB's in size. The type of files are mainly text, archive or jar files. These files are small that is why it will keep in the cache memory which is one of the fast memories. Applications which need to use distributed cache to distribute a file should make sure that the file is available and can be accessed via URLs. URLs can either be hdfs:// or http://

Once the file is present on the mentioned URL, the Map-Reduce framework will copy the necessary files on all the nodes before initiation of the tasks on those nodes. In case the files provided are archives, these will be automatically unarchived on the nodes after transfer.

Example: In a Hadoop cluster, we have three data nodes there are 30 tasks we run in the cluster. So each node will get 10 tasks each. Our nature of the task is such kind of task where it needs some information or a particular jar to be adopted before its execution. To fulfil this, we can cache these files which contain the info or jar files. Before execution of the job, the cache files will copy to each slave node application master. Application master than reads the files and start the tasks. The task can be mapper or reducer and these are read-only files. By default Hadoop, the distributed cache is 10GB if you want to change the same you have to modify the size in mapred-site.xml. Here it is coming to our mind that why cache memory is required to perform the tasks.  why can’t we keep the file in HDFS on each data node already present and have the application read it? they are a total of 30 tasks and in real time it should be more than 100 or 1000 tasks. If we put the files in HDFS than to perform 30 tasks the application has to access the HDFS location 30 times and then read it but HDFS is not very efficient to access small files for this many times. this is the reason why we are using cache memory and it reduces the number of reads from HDFS locations.

In Hadoop cluster when we are talking about Data node, Data node is where the actual data we are keeping. Data nodes are sending a heartbeat message to the name node in every 3 seconds to confirm that they are active. If the Name Node does not receive a heartbeat from a particular data node for 10 minutes, then it considers that data node to be dead. Then Name Node initiates the replication of Dead data node blocks to some other data nodes which are active. Data nodes can talk to each other to rebalance the data, move and copy the data around and keep the replication active in the cluster. You can get the block report using below HDFS commands.

Example:

hadoop fsck / ==> Filesystem check on HDFS

# hadoop fsck /hadoop/container/pbibhu

Total size:    16666944775310 B    <=== see here

Total dirs:    3922

Total files:   418464

Total blocks (validated):      202705 (avg. block size 32953610 B)

Minimally replicated blocks:   202705 (100.0 %)

Over-replicated blocks:        0 (0.0 %)

Under-replicated blocks:       0 (0.0 %)

Mis-replicated blocks:         0 (0.0 %)

Default replication factor:    3

Average block replication:     3.0

Corrupt blocks:                0

Missing replicas:              0 (0.0 %)

Number of data-nodes:          18

Number of racks:               1

FSCK ended at Thu Oct 20 20:49:59 CET 2011 in 7516 milliseconds

The filesystem under path '/hadoop/container/pbibhu 'is HEALTHY

Name node is the node which stores the file system metadata when we are talking about metadata, it is having information like List of file names, Owner, Permissions, Timestamps, Size, Replication Factor, List of Blocks for each file etc. Metadata, which files maps to what block location and which blocks are stored in which data node. When data nodes are storing a block of information, it maintains a checksum for each block as well. when any data has been written to HDFS, checksum value has been written simultaneously and when it reads by default verifies the same checksum value. The data nodes update the name node with the block information periodically and before updating verify the value of the checksum. when the checksum value is not correct for a particular block then we will consider as disk level corruption for that particular block , it skips that block information while reporting to the name node, in this way name node will get to know the disk level corruption on that data node and takes necessary steps like it can be replicated from its alternate locations to other active data nodes to bring the replication factor back to the normal level. Data nodes can be listed in DFS.HOSTS file, It contains a list of hosts that are permitted to connect to the Name Node.

Example:

Add this property to hdfs-site.xml:
<property>
  <name>dfs.hosts</name>
   <value>/home/hadoop/includes</value>
</property>
includes:
host name1
hostname2
hostname3

If include file is empty then all hosts are permitted but it is not a definitive list of active data nodes. Name node will consider those data nodes from which Name Node will receive the heart beats.

ANS:

LVM stands for Logical Volume Management. It is a system of managing logical volumes or filesystems, that is much more advanced and flexible than the traditional method of partitioning a disk into one or more segments and formatting that partition with a filesystem. Today the disks are huge (> 1TB) and LVM is the right tools to dynamically allocate and resize partitions of these huge disks.

If you are using Linux to deploy Hadoop nodes, master or slaves, it is strongly recommended that you should not use LVM in Linux because of below points

  1.  Attempts to reduce the size of a logical volume or reallocate storage space used by it to another volume commonly result in corruption and data loss.
  2. The loss or removal of a disk from LVM will leave the entire storage together with all the files inaccessible.
  3. Other operating systems do not recognize LVM, therefore, LVM partitions cannot be accessed from Windows or macOS.
  1. If any job is submitted by a client, it will come to the resource manager and resource manager is has a scheduler. It is the duty of the resource manager to see and calculate the required resources to run the job.
  2. Once it has calculated the number of resources whatever required, then the resource manager launches application specific application master.
  3. Application master daemon will be available until the job got completed.
  4. Application master is responsible to negotiate the resources from the resource manager. It means application master will ask for more or fewer resources based on the requirement to the resource manager.
  5. Application master launches the container in different node manager when data will be available.
  6. The container will be working under the surveillance of the node manager, so node manager is responsible for all the containers available in that node. The container will give periodic updates to the application master about the job.
  7. Once the job was completed and the container/resources are freed up then the application master updates to the resource manager about the completion of job and client will get the update from the resource manager.

HDFS data might not always be distributed uniformly across DataNodes for different reasons like if some DataNodes have less disk space available for use by HDFS or During the normal usage/ when usage is more, the disk utilization on the DataNode machines may become uneven or when a new Data Nodes are added to an existing cluster at that time also data nodes utilizations are uneven. to mitigate this problem balancer is required.

A balancer is a tool that balances disk space usage on an HDFS cluster and it analyzes block placement and balances data across the DataNodes. The balancer moves blocks until the cluster is deemed to be balanced, which means that the utilization of every DataNode more or less equally distributed. The balancer does not balance between individual volumes on a single DataNode.

HDFS balancer  [-policy <policy>]

The two supported policies are Blackpool and data node. Setting the policy to Blackpool means that the cluster is balanced if each pool in each node is balanced while the data node means that a cluster is balanced if each DataNode is balanced. The default policy is the data node.

HDFS balancer [-threshold <threshold>] specifies a number in [1.0, 100.0] representing the acceptable threshold of the percentage of storage capacity so that storage utilization outside the average +/- the threshold is considered as over/underutilized. The default threshold is 10.0.

When we are talking about Rack, It is the collection of multiple servers based on your requirement. All these servers are connected using the same network switch and if that network goes down then all machines in that rack will be out of service and we can say rack is downstate.

To mitigate the same, Rack Awareness was introduced for Hadoop by Apache. In Rack Awareness, Name Node chooses the Data Node which is closer to the rack where the Name Node will be available or nearby that rack. Name Node maintains all the Rack ids of each Data Node to get the rack information and based on Rack ID Name Node can communicate with Data Node. In Hadoop, when we are maintaining a Rack we have to follow certain rules as mentioned below.

  • All the replicas should not be stored on the same rack or in a single rack due to which Rack Awareness Algorithm can reduce the latency as well as Fault Tolerance.
  • By Default replication factor is 3 so according to Rack Awareness Algorithm below are the points to be followed:
  1. The first replica of the block will be stored on a local rack.
  2. The next replica will be store another Data Node within the same rack.
  3. The third replica stored on the different rack other than earlier Rack.

Below are some points due to which we are following Rack Awareness in Hadoop. Please find the details as mentioned below:

  • To improve the data high availability and consistency as the same block will be available in different Racks.     
  • The performance of the cluster will be improved as reading and writing in the cluster will be quick because two of data nodes will be available in the same rack and third data node will be available near to earlier rack.   
  • Network bandwidth will be improved for sure because of rack awareness rule Especially with rack awareness YARN is able to optimize the Map reduce job performance because YARN will assign the task to data nodes that are closer to each other based on Rack policy and where replica will be available to do the process.
  • As per the Rack policy, the Name Node assigns 2nd & 3rd replicas of a block to Data Nodes in a rack different from Data Node where the first replica is available. It will provide data protection even against Rack failure; It is possible only if Hadoop was configured with rack awareness.  

There are two types of tables which HIVE supports.

  • Managed Table
  • External Table

Hive Managed Tables:
Hive Managed Table is also known as an internal table. When we will create a table in Hive, by default Managed table will create and it manages the data as well. It means that Hive is storing the data into its warehouse directory. A managed table is stored under the hive.metastore.warehouse.dir path property and default location of table will be /apps/hive/warehouse/<db_name>.db/<table_name>. This path will be modifiable. If a managed table or partition is dropped, then the data and corresponding metadata of the table or partition are deleted. If you do not specify the PURGE option then the data is moved to a trash folder for a certain period, it will be deleted permanently after that.

Example:

1. Create Table

hive> create table univercity_db.school_table(name string, roll no int) row format delimited fields terminated by ',';

OK

Time taken: 0.202 seconds

2. Describe table

hive> describe formatted univercity_db.school_table;

OK

you will get extra information like whether the table is managed or an external table. when the table is created, what kind of file format, Location of the data path in HDFS, whether the object is a table or view.

3. Load the data to table from the local path

hive>load data local inpath '/home/pbibhu/Desktop/blog/school' into table univercity_db.school_table;

After loading from the local path you can further use hive commands to select/count/describe etc    

Hive External Tables:
while creating an External table the location of the data path is not the usual warehouse path, you have to provide the HDFS path outside of the warehouse directory. While Creating an external table location is mandatory in the create syntax. By any chance structure or partitioning of an external table is changed then an MSCK REPAIR TABLE table_name statement can be used to refresh metadata information. Basically, In External Table we cannot load the table from a local path. you have to load data from HDFS mentioning the path.   

Use external tables when files are present in the remote locations, and the files should remain even if the external table is dropped.

Example:

1. Create Table

CREATE EXTERNAL TABLE IF NOT EXISTS univercity_db.school_table( student_ID INT, FirstName STRING, LastName STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE/ORC LOCATION 'hdfs/pbibhu/school';

2. Create partition Table

CREATE EXTERNAL TABLE IF NOT EXISTS univercity_db.school_table( student_ID INT, FirstName STRING, LastName STRING)  partitioned by (student_ID int) STORED AS ORC LOCATION 'hdfs/pbibhu/school';

3. insert the data to internal table from external table,data structure should be same for both the tables.

hive> CREATE TABLE IF NOT EXISTS office(EmployeeID INT,FirstName STRING, Title STRING,
    State STRING, Laptop STRING)  STORED AS ORC;

OK

hive> CREATE EXTERNAL TABLE IF NOT EXISTS Office_text(
   EmployeeID INT,FirstName STRING, Title STRING,
   State STRING, Laptop STRING)
   ROW FORMAT DELIMITED
   FIELDS TERMINATED BY ','
   STORED AS TEXTFILE
   LOCATION '/user/pbibhu/office';

OK

hive> INSERT OVERWRITE TABLE office SELECT * FROM office_text;

Resource allocation within the queues is controlled separately. Within a queue:

FairScheduler can apply any of FIFO policy, FairPolicy or DominantResouceFairnessPolicy.

CapacityScheduler can apply either FifoPolicy and fair policy.

Fair Scheduler can use different scheduling policies. The default scheduling policy is fair sharing, using memory as a resource. There’s also a FIFO policy first in first out which is not much use. It’s quite common to use the third type of scheduling policy, DRF, which allocates both memory and CPU resources to applications,DRF is similar to fair-scheduling, but it is important to keep in mind that it applies primarily to the allocation of resources among queues, an activity which is already dominated by queue weights. Thus, the most important thing about DRF is that considers multiple resources, rather than that it attempts to provide equal resource allocation.

Initially, TCS and Wipro each have some resources allocated to jobs in their respective queues and only 10 GB remains in the cluster. Each queue is requesting to run a map task requiring 20 GB, so memory is available 30 GB and the rest of the required resource will take from CPU. WIPRO currently holds 15 GB resources. Another 10 GB is required for mapper task so the fair scheduler will award a container the requested 10 GB of memory to WIPRO. Now the available memory is 5 GB for TCS and it will require another 20 GB to run the mapper task. In this case, there is no memory available for TCS and DRF will try to use 5GB from memory and rest 20 GB can be used from the CPU.

Basically, we store files under some folders in HDFS, most of the time the folder that we give will be based on Application Name. When we talk about small files it should be lesser than the block size, for example, if the block size is 64mb or 128mb then smaller files are considered as lesser than the block size. If the files are smaller than the block size then we will face a problem at the HDFS level as well as Map-Reduce Level.

In HDFS when we are storing files/Directories, corresponding metadata will be stored in the Name Node, each file, directory, block metadata information will approximately occupy 150 bytes. Suppose if you have 1 million files and each   are using approximately a block size or lesser then the block size then metadata size of the corresponding files/directories are approximately 300MB of memory, In such case lot of memory is occupied in the name node and after some time threshold will be reached and further it will be a problem with the current hardware. Certainly, performance will be a downgrade.

During the execution of Map-reduce, when the file size is less than or equivalent to the block size, for each block size or equivalent split size one mapper will launch so approximately large number of Mapper will launch for a large number of small files in this case processing time will be more for each file having small chunk of data .when we are reading and writing a large number of small files seek time will be more which will impact performance and seeks are generally expensive operation . Since Hadoop is designed in such a way to run over your entire dataset, it is best to minimize seeks by using large files.

Remediation plan:
We can merge all the small files using  HDFS getmerge command into a big file. getmerge command can copy all the files available in the HDFS folder to a single concatenated file in the local system. after concatenated in the local system you can place the same file from local to HDFS using  HDFS PUT command. Please find the example mentioned below.

hadoop fs -getmerge /hdfs_path/pbibhu/school_info_* /local_path/pbibhu/school_inf.txt
hadoop fs -put school_inf.txt   /hdfs_path/pbibhu/school_inf.txt

Below are the file formats which support Hadoop.

a. Text(Ex: CSV(Comma separated file) and TSV(Tab-separated file))
b. JSON(Javascript object notation)
c. AVRO
d. RC(Record Columnar)
e. ORC(Optimized Record Columnar)
f. Parquet.

a. Text file format (Ex:  CSV(Comma-separated values) and TSV(Tab-separated values)) 

Usually, text format was very common prior to Hadoop and even it is very common in a Hadoop environment as well. Data are presented as lines and each line terminated by a newline character as /n or Tab separated as /t.

CSV stands for comma-separated-values, so data fields are separated or delimited by comma. For example, we have below value in excel sheet

Nameclasssectionsubject
Bibhu7AEnglish

The above data will be present in a CSV formatted file as follows.
Bibhu,7, A, English

b. JSON 

JSON stands for Javascript object Notion. It is a readable format for structuring data, basically, it is used to transfer the data from server to web Application. We can use it as an alternative to XML. In JSON data are presenting as key and value pairs. The key is always a string data type which is enclosed with a quotation mark. Value can be a String, Number, Boolean, Array or object.

the basic syntax is Key followed by a colon followed by a value.
Example: "Name" : "Bibhu"

c. AVRO

AVRO stores the data in JSON format which is easy to read and understand. The Data itself stored in Binary format which is making it compressed and Efficient, Each value is stored without having any metadata other than a small schema identifier having a size of 1 to 4 bytes. it is having the capability to split the large data set into subsets which are very much suitable for Map Reduce processing. 

In Hive following command is used to use AVRO.

Create table avro_school

(column_address)

stored as avro;

d. RC

RC stands for Record Columnar which is one type of Binary file format, it will provide high compression on top of rows or on multiple rows at a time for which we want to do some operation.RC Files consisting of Binary Key/Value pairs. RC File format first partitions the rows horizontally into Row split and after that all the row split presented vertically in a columnar way. please find the example as mentioned below:

Step 1
First, partition the rows horizontally into Row split

501502503504
505506507508
509510511512
513514515516

Step 2
All the row split presented vertically in a columnar way

501502503504
505506507508
509510511512
513514515516

RC file combines Multiple functions such as data storage formatting, data compression, and data access optimization. It is able to meet all the four below requirements of data storage.

  1. Fast data storing
  2. Improved query processing
  3. optimized storage space utilization
  4. dynamic data access patterns.

e. ORC(Optimized Record Columnar)

The ORC File provides a more efficient way to store the Relational Data than then RC file. It is basically reducing the data storage format by up to 75% of the original. as compared to the RC file ORC file takes less time to access the data and takes less space to store the data as well, It internally divides the data again with a default size of 250M.

In Hive following command is used to use the ORC file.

CREATE TABLE ...STORED AAS ORC
ALTER TABLE ... SET FILEFORMAT ORC
SET hive.default.fileformat=ORC

F. Parquet

It's another column-oriented storage like RC format and ORC format but it's very good at handling nested data as well as good at query scan for a particular column in a table. In the Parquet New column can be added at the end of the structure. It is handling the compression using Snappy, ggip currently snappy is a default. The parquet is supported by Cloudera and optimized for Cloudera Impala.

Hive Parquet File Format Example:

 Create table parquet_school_table
(column_specs)
stored as parquet;

Both the scheduler cannot be used in the same cluster. Both the scheduling algorithms have come up due to specific use-cases and cluster-wise you have to set up the configuration file for either Fair scheduler or Capacity Scheduler. you cannot set up both the scheduler for one cluster.

you can choose the Fair Scheduler using below scheduler class in yarn-site.xml as mentioned below:

<property>

  <name>yarn.resourcemanager.scheduler.class</name>

  <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>

</property> 

To use the Capacity Scheduler you have to configure the Resource Manager in the conf/yarn-site.xml as mentioned below:

yarn.resourcemanager.scheduler.class- org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler 

while setting up the queues in Capacity Scheduler you need to make some changes in etc/hadoop/capacity-scheduler.xml configuration file.

The Capacity Scheduler has a predefined queue called root. whatever queues we will create in the system are children of the root queue.

Setting up further queues- Configure property yarn.scheduler.capacity.root.queues with a list of comma-separated child queues.

Setting up sub-queues within a queue- configure property yarn.scheduler.capacity.<queue-path>.queues

queue-path can mention the full path of the queue’s hierarchy and it is starting at root with. (dot) as the delimiter.

Queue capacity is provided in percentage (%). The sum of capacities for all queues, at each queue level, must be equal to 100. If there are free resources in the queue then Applications in the queue may consume the required resources.

Capacity scheduler queue configuration example:

If there are two child queues starting from root XYZ and ABC. XYZ further divides the queue into technology and development. XYZ is given 60% of the cluster capacity and ABC is given 40% in this scenario please find the details as mentioned below to set up your yarn-site.xml.

<property>

  <name>yarn.scheduler.capacity.root.queues</name>

  <value>XYZ, ABC</value>

</property>

<property>

  <name>yarn.scheduler.capacity.root.XYZ.queues</name>

  <value>technology,marketing</value>

</property>

<property>

  <name>yarn.scheduler.capacity.root.XYZ.capacity</name>

  <value>60</value>

</property>

<property>

  <name>yarn.scheduler.capacity.root.ABC.capacity</name>

  <value>40</value>

</property>

Basically It is massaging system which is exchanging the large volume of Streaming/log data in between processes, Application and servers. Distributed messaging is based on the queue which can handle a high volume of data and allow you to pass the messages from one end to another. Kafka is appropriate for both offline and online message consumption.

Prior to talk about Kafka further, we need to know about the components belongs to Kafka and below are the details.

  1. Kafka Broker
  2. Kafka Topics
  3. Kafka Topic Partition
  4. Kafka producers 
  5. Kafka consumer
  6. Consumer Group

Kafka Broker: Kafka cluster consists of one or more server that is called kafka broker in which kafka is running. Producers are nothing but processes that  distribute data into Kafka topics within the brokers, then consumer of topics drag the messages off from the Kafka topics.
few basic points related to Kafka Broker:

  • Each broker has an identification number I mean to say it's an integer number
  • Each broker contains some topic partition and multiple partitions of the same topics to
  • Producer or consumer can connect to any broker.

Kafka Topics: A Topic is nothing but category or feed name to which messages are stored and distributed. All kafka massages are prepared into topics. so whenever you want to send a message you can send it to specific Topic and whenever you want to read the messages you can read it from a specific topic.

Kafka Topic Partition: Kafka topics are divided into a number of partitions and it contains the messages in a sequence, sequence is only applicable within a partition. Each massage in partition is recognized by its offset value. Here offset is represented as an incremental ID which is maintained by Zookeeper. The offsets are meaningful for that partition, It does not have any value across the partition. A topic may contain any number of partitions. Basically there is no such rule and regulation for write the available messages to which partition. However, there is an option available to adding a key to a massage. If a producer distributes the messages with a Key then all the messages with the same key will go to the same partition.

Kafka producers:  Basically producers are writing data to a topic, while writing data, producers need to specify the Topic name and one broker name to connect to. Kafka is having own mechanism to send the data to the right partition of the right broker automatically.

Producers having the Mechanism where producer can receive an acknowledgment of data it writes. Below is the acknowledgment which the producer receives.

acks = 0 ==> It means successful, in this case producer does not wait for any acknowledgements.
acks = 1 ==> In this case producer will wait for leader acknowledgement and it will make sure at least one broker has got the message. there is no surety whether data has made it to replica or not.
acks= all ==>, In this case, the leader, as well as replica, has to acknowledge back. the performance will be impact.

Kafka Consumer: Basically consumer reads data from topics. As we know Topics are divided into multiple partitions so consumer reads data from each partition of topic. Consumers need to mention the topic name as well as broker. Consumer read data from a partition in sequence. when consumer connects a broker Kafka will make sure that it connected to an entire cluster.

Kafka Consumer Group: Consumer group consists of multiple consumer process. One consumer group having one unique group Id. One consumer instance in one consumer group will read data from one partition. If the number of consumers exceeds the number of partition then in this case extra number of consumers will be inactive. For example, there are 6 partitions in total and there are 8 consumers in a single consumer group. In this case, there will be 2 inactive consumers.

Here in Kafka two types of massaging patterns are available such as:

  1. Point to point messaging system
  2. Publish subscribe messaging system.

1. Point to point messaging system:

Point to point messaging system in Hadoop

In point to point messaging system, Massages are keeping on the queue. One or more consumers read the message in the queue but a particular message can be read by one consumer at a time.

Basically Point-to-point messaging is used when a single message will be received by only one message consumer. There may be multiple consumers reading on the queue for the same message but only one of the consumers will receive it. There can be multiple producers as well. They will be sending messages to the queue but it will be received by only one receiver.

2. Publish subscribe messaging system:

Here in Publish subscribe messaging system, message producers are called publishers and message consumers are called subscribers. Here in this scenario Topic can have multiple receivers and each and every receiver receives a copy of each message.

Publish subscribe messaging system in Hadoop

Based on the above picture, below are a few points that explain the publish-subscribe messaging system.

Massages are shared through channel and it is called as Topic. Topics are placed in a centralized place where the producer can distribute and a consumer can read the messages.

Each message is delivered to one or more than one consumer and it is called subscribers. The publisher or producer is not aware of which massage or topic is receiving by  which consumer or subscriber. A single message created by one publisher may be copied and distributed to hundreds or thousands of subscribers.

Role of Zookeeper in Kafka:

Zookeeper is a mandatory component in Kafka ecosystem, It helps  in managing kafka brokers and  helps in leader election of partitions. It helps in maintaining the cluster membership. For example, when a new broker is added or a broker is removed and a new topic is added or a topic is deleted, when a broker goes down or comes up etc, Zookeeper manages such situations informing Kafka. It also handle the topic configurations like number of partitions a topic has and the leader of the partitions for a topic.

The sequence of starting the Kafka services:

a. Zookeeper configuration file provided by Kafka:

This is default zookeeper configuration file available in Kafka, for which below are the properties

dataDir=/tmp/zookeeper

Client Port= 2183

[root@xxxx]# /bin/zookeeper-server-start.sh /config/zookeeper.properties

b. After Zookeeper next will be Kafka broker:

You can start the Kafka broker with the default configuration file. Below are the configuration properties

broker.id=0

log.dir=/tmp/Kafka-logs

zookeeper.connect=localhost:2183

Here one broker whose ID is 0 and its connecting the zookeeper using port as 2183.

[root@xxxx]# /bin/kafka-server-start.sh /config/server.properties

c. Both zookeeper and Broker has been started now it's time to create the Topic:

Below is the example to create a topic with a single partition and replica 

[root@xxxx]#/bin/Kafka-create-topic.sh -zookeeper localhost:2183 -replica 1 -partition 1 -topic examtopic

Here in the above example, we created a topic as an examtopic.

D.  The topic has been created now we need to start a producer to send messages to the Kafka cluster. To start the producer below are the commands.

[root@xxxx]#/bin/Kafka-console-producer.sh -broker-list localhost:9090 -topic exam topic

broker-list ==> this is the server and port information for the brokers, here in the above example we have provided server as localhost and port as 9090
Topic==> Name of the Topic here in the above example we have provided as examtopic

in command line we created the producer client that accepts your massages and distributes it to a cluster as massages then a consumer can consume or read the messages.
Producer client is running you can type something on the terminal where the producer is running

Hi Bibhu, How are you?

E. We have to start the consumer to read or consume the data sending by producer

[root@xxxx]#/bin/Kafka-console-consumer.sh -zookeeper localhost:2183 -topic examtopic -from-beginning

Consumer runs with the default configuration properties as mentioned below, this information will be there in the consumer. Properties file.

groupid=test-consumer-group
zookeeper.connect=localhost:2183

Basically, SQOOP can use to get the Data from Relational database that is DB2,  MYSQL, Oracle, etc and load into Hadoop that is HDFS, Hive, Hbase, etc or vice versa this process is called ETL for Extract, Transform and Load. Alternatively, SQOOP can import and export data from the Relational database to Hadoop.

Below are some of the important features which are Sqoop having:

a. Full Load: Sqoop can load the single table or all the tables in a database using sqoop command.

b. Incremental Load: Sqoop can do incremental load, it means it will retrieve only rows newer than some previously-imported set of rows.

c. Parallel import/export: Sqoop is using the YARN framework to import and export the data. The YARN framework provides parallelism as it is read and writes multiple nodes parallelly and fault tolerance is very much possible because by default replication is happening.

d. Import results of SQL query: It is having the facility to import the result of the query in HDFS.

e. Compression: Sqoop having the facility to do the compression of the data, what it imports from a database. Sqqop having various options to compress the data. simply if you specify -compress while importing data, sqoop compress the output file with gzip format by default and it will create an extension as .gz, If you provide -compression-codec instead of compress then sqoop compress the output with bgip2 format.

f. Connectors for all major RDBMS Databases: Sqoop having almost all the connectors to connect the relational databases.

g. Kerberos Security Integration: Sqoop supports Kerberos Authentication, Kerberos Authentication is a protocol which works on the basis of Ticket or key tab which will help you to authenticate user as well as services prior to connect the services like HDFS/HIVE, etc.

How Sqoop works:

Sqoop creating SQL query for each mapper internally which is ingesting data from a source table to HDFS, basically, 4 mappers will be generated by default but you can modify the number of mappers based on your logic and requirements. The number of mapper influence the split by column. split by column work based on where condition and each mapper have a logical partition of the Target table or directory.  For example, if we used three mappers and a split-by column. suppose 1,000,000 records are there. Sqoop can segregate using min and max call to the DB on the split-by column. Sqoop's first mapper would try to get values from 0 to 333333 records, the second mapper would pull from 333334 to 666666 records and the last would grab from 666667 to 1000000 records.

Scoop is running a Map-only job, as we know the Reduce phase is required in case of aggregations. But here in Apache Sqoop we just import and export the data. It does not perform any aggregations. Map job launch multiple mappers depending on the number defined by the user in the above example we are considering as 3. For Sqoop import, each mapper task will be assigned with a part of the data to be imported. Sqoop distributes the input data among the mappers equally to get high performance. Then each mapper creates a connection with the database using JDBC and fetches the part of data assigned by Sqoop and writes it into HDFS or Hive or HBase based on the arguments provided in the CLI so alternatively Mappers drop the data in the Target-dir with a file named as part-m-00000, part-m-00001, part-m-00002.

How Sqoop works to move data into Hive/HDFS:

Here in this scenario, we will discuss how sqoop will import data from the Relational database to Hive. Sqoop can only import the data as a text file or sequence file into a hive database. If you want to use the ORC file format then you must follow a two-stage approach, in the first stage sqoop can get the data into HDFS as a text file format or sequence file format, then in the second stage hive can convert the data into ORC file format.
Please find the steps as mentioned below.

a. You need to get all the source connection details to get connect with a relational database.

Database URL: db.bib.com
Database name: sales
Connection protocol: jdbc:mysql
source database username and password
Specify the file where the password is stored

Example:  
sqoop import --connect jdbc:mysql://db.bib.com/sales --table EMPLOYEES --username
 <username> --password-file ${user.home}/.password

b. Below are the few considerations when you are using parallelism while doing the import.

  • You can mention the entire source table for import
  •  You can mention columns from the table
  •  You can mention only the latest records by specifying them with a WHERE clause
  •  You can mention a number of map tasks specifying write parallelism, in this case, Sqoop evenly splits the primary key range of the source table

Example:
sqoop import --connect jdbc:mysql://db.bib.com/sales --table EMPLOYEES --columns "employee_id,first_name,last_name,job_title" --where "start_date > '2010-01-01'"
--num-mappers 8

c. Below are the few scenarios where you can mention split-by column

  • We can use split key using --split-by
  • We can use split by with condition

Example:

sqoop import --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id)
WHERE $CONDITIONS' --split-by a.id --target-dir /user/bib/sales

d. Destination you can mention as HDFS directory or Hive table
Example: 

a. HDFS as target directory
sqoop import --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id)
WHERE $CONDITIONS' --split-by a.id --target-dir /user/bib/sales

b. Hive as Target table
sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --hive-import

After jobs submissions, Job IDs are generated by job tracker in Hadoop 1 and in Hadoop 2/3 Application IDs are generated. Application ID or Job ID is represented as a globally unique identifier for an Application or Job.

Example: job_1410450250506_002 / application_1410450250506_002
_1410450250506 ==> this is the start time of Resource manager which is achieved by using "cluster timestamp"
_002 ==> Basically counter is used to keep track of occurrences of the job

Task IDs are formed by replacing the job or Application with task prefix within the job
Example: task_1410450250506_0002_m_000002

Here in the above example, _000002 is the third map task of the job "job_1410450250506_002"
Tasks may be executed more than once due to task failure so to identify different instances of task execution, Task attempts are given unique IDs.

Example: attempt_1410450250506_0002_m_0000002_0
_0 is the first attempt of the task task_1410450250506_002_m_0000002

When you will open the Job history WEB UI, you will get the image below. Here in the image, you can able to see the Job state where the Job is succeeded or Failed. How many Mappers and Reducers are launched whether all the Mappers and Reducers are completed or not you can find all these details.

JOB HISTORY Server:

JOB HISTORY Server in Hadoop

When you click the Job id from the Job history server, you will get below image and more or less similar information you will get as above.

Overview:

MapReduce Job in Hadoop

Hadoop Counters:

This is the most useful option to examine job performance. Hadoop provides several built-in counters as well as you can customize counters as per your requirements. Counters help you to get the below kind of information.

  • Whether the correct number of Mappers and Reducers were launched and completed or not
  • Whether the correct number of input bytes were read and the expected number of output bytes were written or not.
  • Whether a correct number of records were read and written in the local file as well as HDFS files or not.
  • For the Job whether CPU usage and memory consumption are appropriate or not

Hadoop Counters

Hadoop counters provide three types of Built-in counters such as :

  1. File system counters
  2. Job Counters
  3. Map-reduce Framework counters. 

In addition to this Hadoop provides another 3 counters from other groups by default, such as:

  1. Shuffle error counters
  2. File input format counters 
  3. File output format counters.

File system counters:

Under File system counter You can get the information regarding reading and write operations in both the local file system and HDFS as well. The total number of bytes read and written depending upon compression algorithms. Here are the few key counters.

File_Bytes_Read: The total number of bytes read from the local file system by the map-reduce Tasks. File_Bytes_Write: Total number of bytes written to the local file system. During the Map phase,  the mapper task writes the intermediate results to the local file system and during the shuffle phase of the Reducer task also write to the local file system when they spill intermediate results to the local file system during sorting.

HDFS_Bytes_Read: Total bytes read from HDFS
HDFS_Bytes_Written: Total bytes are written to HDFS.

JOB Counters:

You will get Job information related to Mapper and reducer under JOB Counters. The following are the key job counters.

DATA_LOCAL_MAPS: It indicates how many map tasks executed on the local file system alternatively Number of map tasks are running on the same node where the Tasks related data are also available in the same node.

TOTAL_LAUNCHED_MAPS: It shows the Total number of Launched map tasks including failed tasks too. Basically, it is the same as the number of input splits for the job.

TOTAL_LAUNCHED_REDUCES: It shows the total number of reducer task launched for the job

NUM_KILLED_MAPS: Number of killed map tasks
NUM_KILLED_REDUCES: Number of killed reduce tasks.
MILLIS_MAPS: This is the total time(In milli sec) spent by all map tasks which are running for the job.
MILLIS_REDUCES: This is the total time spent by all reduce tasks that are running for the job.

MapReduce Framework counters:

You will get all the statistic of MapReduce job under MapReduce framework counter. It will help you to do the performance tuning of the job.

MAP_INPUT_RECORDS: The total number of input records reads for the job during the Map phase.
MAP_OUTPUT_RECORDS: Total number of records written for the job during the Map phase.
CPU_MILLISECONDS: CPU time spent on all the tasks
GC_TIME_MILLIS: Total time spent during the garbage collection of the JVMs. Garbage collection is the process of getting back the run time unused memory automatically.  
PHYSICAL_MEMORY_BYTES: Total physical memory used by all tasks.
REDUCE_SHUFFLE_BYTES: Total number of output bytes copied from Map tasks to reduce tasks during the shuffle phase.
SPILLED_RECORDS: The total number of records spilled to the disk for all the Map and reducer tasks.

Other counters are as follows:

  1. Shuffle error counters: Error details during the shuffle phase such as BAD_ID, IO_ERROR, WRONG_MAP, and WRONG_REDUCE
  2. File input format counters: It includes the bytes read by each task.
  3. File output format counters: Bytes written by each map and reduce task using an output format

Intermediate

Map-reduce jobs are limited by the bandwidth available on the cluster, hence it is beneficial if the data transferred between map and reduce tasks can be minimized. This can be achieved using Hadoop Combiner. A combiner runs on a map output and its output forms the input to the reducer. It decreases the amount of data that needs to be transferred between the mapper and reducer, as well as improves the performance of a map-reduce job. A combiner can, however, be used for functions that are commutative or associative.

Partitioner controls which partition a given key-value pair will go to. Partitioning ensures that all the values for each key are grouped together and the values having the same key go to the same reducer. The total number of practitioners that run in a Hadoop job is equal to the number of reducers.

The partition phase takes place after the map phase and the reduce phase. A map-reduce job having both partitioner and reducer work like below: Output from each mapper is written to a memory buffer and spilled to a local directory in case of overflow. The spilled data is partitioned according to the partitioner. Data in each partition is sorted and combined based on the logic in the combiner. The combined data is sent to reducer based on the partition key.

A  job consists of the following components: The client which submits map-reduce job, Resource manager which coordinates allocation of compute resources, Node managers which launch and monitor the compute containers, Hadoop Distributed File System (HDFS) which is used for sharing resources between the above components and Application Master which coordinates tasks running in map-reduce job.

The map-reduce job begins when the client/job submitter sends the request to the Resource Manager. It asks for a new application id to be allocated. It also checks whether the output directory specified exists or not, and computes input splits for the job as well. The resources needed to run the job including the application jar are copied to HDFS. Finally, the job submitter submits the job to Resource Manager.

The Resource Manager now allocates a container and launches the application master. The application master determines no of the mapper and reducer tasks that need to be launched and requests resource manager to launch containers for the same. Resource Manager, in turn, directs Node Managers to launch the containers where the tasks get run. Once the tasks are initiated, the application master keeps track of them.  In case any task fails or gets stuck it relaunches them on another container. Requests for map tasks are made first and with a higher priority than those for reduce tasks, since all the map tasks must complete before the sort phase of the reduce can start. Once the mapper task completes, its output undergoes sorting, shuffling and partitioning (in case of multiple reducers), is sent to the combiner (if any) and finally sent to reducer(s). The output of reducer is written to HDFS.

The usual block size on HDFS is 128 MB. The size of the HDFS block is kept large enough to minimize the seek cost. When the block size is large enough the time to transfer data will be significantly longer than the time to seek the start of a block. As data transfer is much higher than the disk seek rate it is optimal to keep the block size large. The seek time is usually kept as 1% of transfer time. e.g. If seek time around 10 ms and the data transfer rate is 100MB/s then block size comes to around 128 MB.

However, this doesn’t mean that the block size can be made indefinitely large. Map tasks operate on one block (assuming split size is equal to block size) at a time. Having a huge block size will result in fewer splits and hence less number of mappers which will reduce the advantage that can be gained by parallelly working on multiple blocks.

Having a block abstraction for a distributed file system has many benefits.

  • First, a file can be larger than any single disk in the network.
  • It simplifies the storage subsystem because blocks are a fixed size, and it is easy to calculate how many blocks can be stored on a given disk.
  • In addition, blocks fit well with the need for replication needed in a distributed file system.

High availability in HDFS implies that the system does not have any single point of failure, is available 24/7 so that there is no or limited impact on client applications and is able to self-recover from failure without any manual intervention.

For implementing High Availability in HDFS, a pair of NameNodes is set up in an active-standby configuration. The passive node is kept in sync with the active node. Both active and passive nodes have access to shared storage space. When any namespace modification is performed by the Active node, it logs a record of the modification to an edit log file stored in the shared directory. The Standby node is constantly watching this directory for edits, and as it sees the edits, it applies them to its own namespace thereby keeping in sync with Active node.

In case of a failure of active NameNode, the standby node takes over and starts servicing client requests. The transition from active to standby node is managed by Failover Controller. It uses Zookeeper to ensure that only NameNode is active at a given time. Each NameNode runs a failover controller process that monitors its NameNode for failures using a heartbeat mechanism and triggers a failover in case of failure.

However, it needs to be ensured that only NameNode is active at a given time. Two active NameNodes at the same time will cause the corruption of data. To avoid such a scenario fencing is done which ensures that only NameNode is active at a given time. The Journal Nodes perform fencing by allowing one NameNode to be writer at a time. The Standby NameNode takes over the responsibility of writing to the JournalNodes and forbid any other NameNode to remain active.

In the case of large data, it’s advised to use more than one reducer. In the case of multiple reducers, the thread spilling map output to disk first divides the data into partitions corresponding to the number of reducers. Within each partition, an in-memory sort on the data is performed. A combiner, if any, is applied to the output of the sort. Finally, the data is sent to reducer based on the partitioning key.

Partitioning ensures that all the values for each key are grouped together and the values having the same key go to the same reducer, thus allowing for even distribution of the map output over the reducer. The Default partitioner in a map-reduce job is Hash Partitioner which computes a hash value for the key and assigns the partition-based its result.

However, care must be taken to ensure that partitioning logic is optimal and data gets sent evenly to the reducers. In the case of a sub-optimal design, some reducers will have more work to do than others, as a result, the entire job will wait for that one reducer to finish its extra load share.

Advanced

 The replication factor in HDFS can be modified /overwritten in 2 ways-

  • Using the Hadoop FS Shell, replication factor can be changed per file basis using the below command
$hadoop fs –setrep –w 2 /my/sample.xml

 sample.xml is the filename whose replication factor will be set to 2

  • Using the Hadoop FS Shell, replication factor of all files under a given directory can be modified using the below command-
$hadoop fs –setrep –w 6 /my/sample_dir

sample_dir is the name of the directory and all the files in this directory will have a replication factor set to 6.

ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -touchz /hadoop/sample
ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -ls /hadoop

Found 2 items

-rw-r--r--  2 ubuntu supergroup
0 2018-11-08 00:57 /hadoop/sample 
-rw-r--r--  2 ubuntu supergroup
16 2018-11-08 00:45 /hadoop/test

fsck a utility to check health of the file system, to find missing files, over-replicated, under-replicated and corrupted blocks.

 Command for finding the blocks for a file: 

$ hadoop fsck -files -blocks –racks

Hadoop distributed file system (HDFS) is the primary storage system of Hadoop. HDFS stores very large files running on a cluster of commodity hardware. It works on the principle of storage of less number of large files rather than the huge number of small files.

 HDFS stores data reliably even in the case of hardware failure. It provides high throughput access to the application by accessing in parallel. Components of HDFS:

  • NameNode – It is also known as Master node. Namenode stores meta-data i.e. number of blocks, their replicas and other details.
  • DataNode – It is also known as Slave. In Hadoop HDFS, DataNode is responsible for storing actual data. DataNode performs read and write operation as per request for the clients in HDFS.

Update the network addresses in the dfs.include and mapred.include 

$ hadoop dfsadmin -refreshNodes and hadoop mradmin -refreshNodes Update the slave file.

 Start the DataNode and NodeManager on the added Node.

  • The client connects to the name node to register a new file in HDFS.
  • The name node creates some metadata about the file (either using the default block size, or a configured value for the file)
  • For each block of data to be written, the client queries the name node for a block ID and list of destination datanodes to write the data to. Data is then written to each of the datanodes.

By default, the HDFS block size is 64MB

 Default replication factor is 3

It dsplays the Access Control Lists (ACLs) of files and directories. If a directory has a default ACL, then getfacl also displays the default ACL.

ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -getfacl /hadoop
  • file: /hadoop
  • owner: ubuntu
  • group: supergroup

This exception means there is no communication between the DataNode and the DataNode due to any of the below reasons :

  • Block size is negative in hdfs-site.xml file
  • Disk usage for DataNode is 100% and there is no space available .
  • Due to poor network connectivity between NameNode and DataNode , primary DataNode is down when the write operation is in progress .

You can provide dfs.block.size on command line :

  • copying from HDFS to HDFS

hadoop fs -D dfs.block.size=<blocksizeinbytes> -cp /source /destination

  • copying from local to HDFS

hadoop fs -D dfs.block.size=<blocksizeinbytes> -put /source /destination 

Below command is used to enter Safe Mode manually – 

$ Hdfs dfsadmin -safe mode enter

Once the safe mode is entered manually, it should be removed manually.

Below command is used to leave Safe Mode manually – 

$ hdfs dfsadmin -safe mode leave

The two popular utilities or commands to find HDFS space consumed are

  • hdfs dfs –du
  • hdfs dfsadmin –report.

 HDFS provides reliable storage by copying data to multiple nodes. The number of copies it creates is usually referred to as the replication factor which is greater than one.

  • hdfs dfs –du –This command shows the space consumed by data without replications.
  • hdfs dfsadmin –report- This command shows the real disk usage as it counts data replication also . Therefore, the value of hdfs dfsadmin –report will always be greater than the output of hdfs dfs –du command.

Hadoop task log files are stored on the local disk of the slave node running in the disk. In general, log related configuration properties are yarn.nodemanager.log-dirs and yarn.log-aggregation-enable. yarn.nodemanager.log-dirs property determines where the container logs are stored on the node when the containers are running. its default value is ${yarn.log.dir}/userlogs. An application localized log directory will be found in /{yarn.nodemanager.log-dirs}/application_${application_id}.individual containers log directories will be shown in subdirectories named container_{$conatinerid}.

For MapReduce application, each container directory will contain the files STDERR, STDOUT and SYSLOG generated by the container.

The yarn.log-aggregation-enable property specifies whether to enable or disable log aggregation. If this function is disabled, then the node manager will keep the logs locally and not aggregate them.

Following properties are in force when log aggregation is enabled.

yarn.nodemanager.remote-app-log-dir: This location is found on the default file system (usually HDFS) and indicates where the node manager should aggregate logs. It should not be the local file system otherwise serving daemon such as the history server will not be able to serve the aggregated logs.the default value is /tmp/logs.

yarn.nodemanager.remote-app-log-dir-suffix: the remote log directory will be created at {yarn.nodemanager.remote-app-log-dir}/${user}/{suffix}. the default suffix value is "logs".

yarn.log-aggregation.retain.seconds: This property defines how long to wait before deleting aggregated logs; -1 or any other negative value disables the deletion of aggregated logs.

yarn.log-aggregation.retain-check-interval-seconds: This property determines how long tom wait between aggregated log retention checks.if its value is set to 0 or a negative value then the value is computed as one-tenth of the aggregated log retention time. The default value is -1.

yarn.log.server.url: once an application is done, Nodemanagers redirect the web UI users to this URL, where aggregated logs are served, it points to MapReduce-Specific job history.

The following properties are used when log aggregation is disabled:

yarn.nodemanager.log.retain-seconds: The time in seconds to retain user logs on the individual nodes if log aggregation is disabled. the default is 10800.

yarn.nodemanager.log.deletion-threads-count: The number of threads used by the node managers to clean up logs once the log retention time is hit for local log files when aggregation is disabled.    

YARN requires a staging directory for temporary files created by running jobs. local directories for storing various scripts that are generated to start up the job's containers (which will run the map reduce task).

Staging directory:

  1. When a user executes a MapReduce job, they usually invoke a job client to configure the job and lunch the same.
  2. As part of the job execution job client first checks to see if there is a staging directory under the user's name in HDFS, If not than staging directory is created under /user/<username>/.staging
  3. In addition to job-related files a file from the Hadoop JAR file named hadoop-mapreduce-client-jobclient.jar also placed in the .staging directory after renaming it to job.jar
  4. Once the staging directory is set up the job client submits the job to the resource manager.
  5. Job client also sends back to the console the status of the job progression(ex map 5%, reduce 0%).

Local directory:

  1. The resource manager service selects a node manager on one of the cluster's nodes to launch the application master process, which is always the very fast container to be created.in yarn job.
  2. The resource manager chooses the node manager to depend on the available resources at the time of launching the job. You cannot specify the node on which to start the job.
  3. The node manager service starts up and generates various scripts in the local cache directory to execute the application Master container/directory.
  4. The Application Masters directories are stored in the location that you have specified for the node manager's local directories with the yarn.nodemanager.local-dirs configuration property in the yarn-site.xml.
  5. The yarn.nodemanagercan store its localized file directory with the following directory structure.

[yarn.nodemanager.local-dirs]/usercache/$user/appcache/application_${app_is}

When a client launches an application, the corresponding application master container is launched with ID 000001. The default size is 1 GB for each application master container but some time data size will be more. In that case, the application master reaches the limits of its memory in this case application will fail and you will get a similar message as mentioned below.

Application application_1424873694018_3023 failed 2 times due to AM Container for appattempt_1424873694018_3023_000002 exited with exitCode: 143 due to: Container

[pid=25108,containerID=container_1424873694018_3023_02_000001] is running beyond physical memory limits. Current usage: 1.0 GB of 1 GB physical memory used; 1.5 GB

of 2.1 GB virtual memory used. Killing container.

When configuring MapReduce 2 resource utilization on YARN, there are three aspects to be considered:

  1. Physical RAM limit for each Map and Reduce Task.
  2. JVM heap size limit for each task
  3. the amount of virtual memory each task will get

Physical RAM limit for each Map and Reduce Task

*********************************************

You can define how much maximum memory each Map and Reduce task will take. Since each Map and each Reduce task will run in a separate container, these maximum memory settings should be at least equal to or more than the YARN minimum Container allocation(yarn.scheduler.minimum-allocation-mb).

In mapred-site.xml:
<name>mapreduce.map.memory.mb</name>
<value>4096</value>
<name>mapreduce.reduce.memory.mb</name>
<value>8192</value>
The JVM heap size limit for each task

*********************************

The JVM heap size should be set to lower than the Map and Reduce memory defined above, so that they are within the bounds of the Container memory allocated by YARN.

In mapred-site.xml:
<name>mapreduce.map.java.opts</name>
<value>-Xmx3072m</value>
<name>mapreduce.reduce.java.opts</name>
<value>-Xmx6144m</value>
the amount of virtual memory each task will get

********************************************

Virtual memory is determined  on upper limit of the physical RAM that  each Map and Reduce task will use.default value is 2.1. for example if Total physical RAM allocated = 4 GB than Virtual memory upper limit = is 4*2.1 = 8.2 GB

Under the Fair scheduler when a single application is running that application may request the entire cluster (if needed). If additional applications are submitted, resources that are free are assigned "fairly" to the new application so that each application gets roughly the same amount of resources. Fair scheduler organizes application further into queues and shares resources fairly between these queues. The fair scheduler in YARN supports hierarchical queues which means sub-queues can be created within a dedicated queue. All queues descend from a queue named “root”.A queue’s name starts with the names of its parents, with periods as separators. So a queue named “parent1” under the root queue would be referred to as “root.parent1”, and a queue named “queue2” under a queue named “parent1” would be referred to as “root.parent1.queue2”

A join operation is used to combine two or more datasets. In Map Reduce joins are of two types – map side joins and reduces side join.

A map side join is one in which join between two tables is performed in the Map phase without the involvement of the Reduce phase. It can be used when one of the data sets is much smaller than other data set and can easily be stored in DistributedCache. One of the ways to store datasets in DistributedCache is to do it in setup() method of Mapper. Since in map side join, the join is performed in mapper phase itself, it reduces the cost that is incurred for sorting and merging data in the shuffle and reduce phase, thereby improving the performance of the task

Reduce side join on the other hand works well for large datasets. Here the reducer is responsible for performing the join operation. This type of join is much simpler to implement as data undergo sorting and shuffling before reaching the reducer and values having identical keys are sent to the same reducer. The reducer is responsible for performing the join operation. It is comparatively simple and easier to implement than the map side join as the sorting and shuffling phase sends the values having identical keys to the same reducer and therefore, by default, the data is organized for us. However, the I/O cost is much higher due to data movement involved in the sorting and shuffling phase.

The map-reduce framework doesn’t guarantee that the combiner will be executed for every job run. The combiner is executed at each buffer spill. During a spill, the thread writing data to the disk first divides data into partitions corresponding to the number of reducers. Within each partition, the thread performs an in-memory sort on the data and applies the combiner function (if any) on the output of sort.

 Various ways to reduce data shuffling in a map-reduce job are:

  • Use a combiner to perform associative or commutative operations on the mapper output and hence reduce shuffling of data
  • If the data size is huge, then having a single reducer is not a good idea. An optimal number of reducers should be chosen in this case
  • Compressing mapper output reduces the amount of data that gets written to disk and transferred to reducer hence reducing the shuffling of data
  • Increase the buffer size used by mappers during sorting. This will reduce the number of spills to the disk. This can be controlled using property mapreduce.task.io.sort.mb
  • Leveraging cleanup() method of Mapper – This method is called once per mapper task and can be used to perform any associative or commutative operation on the output of the map(0 functions.

In a map-reduce job, the application master decides how many tasks need to be created for the job to be executed. The number of mapper tasks created is equal to the number of splits. These tasks are usually launched in different JVMs than the application master (governed by data locality).

However, if the job is small, the application master may decide to run the tasks in the same JVM as itself. In such a case the overhead of allocating containers for new tasks and monitoring them to gain that would be had in running the tasks in parallel compared to running the tasks sequentially on the same node. Such a job is called an Uber task.

So how does the application master determine if the job is small enough to be run as an uber task? By default, if the job requires less than 10 mappers for its processing and one 1 reducer, and input size is less than the size of one HDFS block, then the application master may consider launching the job as an uber task.

If the job doesn’t qualify to be run as an uber task then the app master requests for containers to be launched for all map and reduce tasks from the resource manager.

A file is read by a Map-Reduce job using an InputFormat. It defines how the file being read needs to be split up and read. InputFormat, in turn, defines a RecordReader which is responsible for reading actual records from the input files. The split computed by InputFormat is operated upon by map task. Map task uses Record Reader corresponding to InputFormat to read the data within each split and create key-value pairs.

The various types of InputFormat in Hadoop are:

  • FileInputFormat – Base class for all file-based input formats.
  • TextInputFormat  Default input format used in a map-reduce job. It treats each line of input as a separate record. LineRecordReader is the default record reader for TextInputFormat which treats each line of the input file as a separate value.
  • KeyValueTextInputFormat – Similar to TextInputFormat but it breaks the line being read into key and value. The key is text up to tab (\t) character while all the text after a tab is considered value.
  • SeqenceFileInputFormat – An input format for reading sequence files.
  • NlineInputFormat – It is a type of TextInputFormat but one which can read a variable number of lines of input.
  • DBInputFormat – An input format to read data from a relational database using JDBC. It is however suited for reading small datasets only.

YARN stands for Yet Another Resource Negotiator. YARN is taking care of Job tracker's work like resource management and a part of that YARN is working as a schedule as well. It Supports a variety of processing engines and Applications. When we are saying different data processing engine it means it supports Graph processing, Interactive Stream processing and batch processing to run and process the data which is stored in HDFS. Basically, the Resource manager receives the Job request from the client and accordingly it will Launch Application master JVM having default memory as 1 core and 2gb.

Application Master will contact Name Node and get the location of the block, based on the availability of block in Node Manager It will check whether sufficient resources are available or not, Accordingly it will inform the Resource manager and Resource manager will provide resources to Node Manager to Launch the JVM for the JOB.

Yarn is working as a schedule it means the Scheduler is responsible for allocating the resources to running the Application. It will not monitor the Application as well as it will not track the Application. It will not restart the failed task whether it is failed due to Application failure or Hardware Failure.

YARN Scheduler supports three types of scheduler

1. FIFO scheduler
2. FAIR scheduler
3. Capacity Scheduler.

Based on the Application requirement Hadoop Admin will select either FIFO, FAIR or Capacity Scheduler. 

FIFO scheduling is First in First out, in our current environment, this is rarely used. Fair scheduling is a method where resources are distributed in such a way that it is more or less equally divided to each job. Capacity scheduler where you can make sure that some percentage of resources you can assign to cluster based on your demand or computing need.    

Prior to start the YARN services, start the Resource manager and node manager services. In between Resource manager and Node, the manager makes sure the resource manager should start before starting node manager services. Please start your YARN services in the sequence mentioned below.

a. On the resource manager system, 

#service Hadoop-yarn-resource manager start

b. On each Node manager(where data node services run) system 

#service  -yarn-nodemanager start

c. To start the MapReduce job history server 

#service Hadoop-MapReduce-history server start

Fundamentally snapshot means taking a Xerox copy of the content from the entire file-level or subtree of the file system until a certain time and its read-only. Snapshot is handling data corruption of user or application and accidental delete. It is always quicker to recovery from snapshot as compared to restore of the whole FSImage and it is easy to create a snapshot of the important directory before changing anything to it.

Snapshot can be taken on any directory once you can be marked as "snapshot table", to doing the same you have to provide the command as "Hdfs dfsadmin -allowSnapshot <Path>".Once the snapshot table directory has been created than under that, subdirectory has been created as .snapshot, It is the place where snapshots are stored. There is no limit on the number of snapshot table directories, any number of a directory can create and snapshot table directory can contain 65536 snapshots simultaneously. We can change the name of a snapshot or we can use the default one (based on timestamp: "s'yyyyMMdd-HHmmss.SSS"). If there are any snapshots in the snapshot table directory then neither you can delete the directory nor rename the directory. deleting the snapshot table directory you have to delete all the snapshots under that directory. during the upgrading version of HDFS, ".snapshot" need to first be renamed or deleted to avoid conflicting with the reserved path.

Snapshots are easily created with hdfs dfsadmin command, Please find the few commands related to snapshot.

a.
# Create directory structure
hdfs dfs -mkdir /my_dir_bibhu
b.
# Allow snapshots creation for /my_dir_bibhu
hdfs dfsadmin -allowSnapshot /my_dir_bibhu
Allowing snaphot on /my_dir_bibhu succeeded
c.
# Create the first snapshot
hdfs dfs -createSnapshot /my_dir_bibhu snaptest1
Created snapshot /my_dir_bibhu/.snapshot/snaptest1
d.
# .snapshot can be read directly using below command
hdfs dfs -ls /my_dir_bibhu/.snapshot 
Found 1 items
drwxr-xr-x   - bibhu supergroup          0 2016-12-03 09:52 /my_dir/.snapshot/snaptest1
e.
# Create new snapshot - this time for directory containing a file
hdfs dfs -createSnapshot /my_dir_bibhu snaptest2
Created snapshot /my_dir_bibhu/.snapshot/snaptest2
f.
# This command serves to compare snapshots
hdfs  snapshotDiff /my_dir_bibhu .snapshot/snaptest1 .snapshot/snaptest2
g.
# Restore snapshot directory to a temporary place and check if file is there or not
hdfs dfs -cp /my_dir_bibhu/.snapshot/snaptest2 /tmp/dir_from_snapshot
hdfs dfs -ls /dir_from_snapshot

<property>

<name>yarn.nodemanager.resource.memory-mb</name>

<value>102400</value>

</property>

<property>

<name>yarn.nodemanager.resource.cpu-vcores</name>

<value>48</value>

</property>

<name>yarn.scheduler.minimum-allocation-mb</name>

<value> what is the correct value here</value>

ANS:

Usually, YARN is taking all of the available resources on each machine in the cluster into consideration. Based on the available resources, YARN negotiates the resources as requested from the application or map-reduce running in the cluster. YARN is allocating containers based on how much resources are required to the application. A container is the basic unit of processing capacity in YARN, and the resource element included memory CPU, etc. In the Hadoop cluster, it is required to balance the usage of memory(RAM), processors (CPU cores) and disks so that processing is not controlled by any one of these cluster resources. As per the best practice, it allows for two containers per disk and one core gives the best balance for cluster utilization.

When you are considering the appropriate YARN and MapReduce memory configurations for a cluster node, in such a case, it is an ideal situation to consider the below values in each node.

a. RAM (Amount of memory)

b. CORES (Number of CPU cores)

c. DISKS(Number of disks)

Prior to calculating how much RAM, how much CORE and how much disks are required, you have to be aware of the below parameters.

  1. Approximately how much data is required to store in your cluster for example 200TB
  2. What is the retention policy of the data for example 1year
  3. What kind of workload you have whether it is CPU intensive for example complex query or query which is computing a billion records, I/O Intensive for example Ingestion of data, Memory intensive for example spark processing.
  4. what kind of storage mechanism for the data for example whether the data format is plain Text or AVRO or Parque, ORC or compress GZIP or snappy 

Total memory available ==> 102400

No of containers ==> 100

Minimum memory required for container ==> 102400 MB total RAM/100 = 1024MB minimum per container.

The next calculation is to determine the maximum number of containers allowed per node.

no of containers = Minimum of (2*cores,1.8* disks,(Total available RAM/MIN_CONTAINER_SIZE)

RAM-per -container = Maximum of (MIN_CONATINER_SIZE,(total available RAM)/CONTAINER))

Basically "mapreduce.task.io.sort.mb" is the total amount of buffer memory which is to use while sorting files. It is representing in megabytes.

Tune or provide the io.sort.mb value in such a way that the number of spilled records equals or is as close to equal the number of map output records.

Map-reduce job makes the assurance that the input to every reducer is sorted by key. The process by which the system performs the sort and then transfers the mapper output to the reducers as inputs are known as shuffle. In the Map-reduce job, shuffle is an area of the code where fine-tuning and improvements are continually being made. In many ways, the shuffle is the heart of the map-reduce job. When the map function starts producing output, the process takes an advantage of buffering and writes in memory and doing some presorting for more efficiency as well.

Each map task has a circular memory buffer that writes the output too. The buffer is 100mb by default, a size which can be tuned by changing the io.sort.mb property when the contents of the buffer reach a certain threshold size. Usually the default threshold size of io.sort.spill is 0.8 or 80% when it reaches the threshold a background thread will start to spill the contents to disk. Mapper output will continue to be written to the buffer while the spill takes place, but if the buffer fills up during this time the map will block until the spill is complete. Spills are written in a round-robin fashion to the directories specified by the mapred.local.dir property in a   subdirectory.

Each time when the memory buffer reaches the spill threshold at that time a new spill file is created, so after the map task has written its last output record there could be several spill files before the task is finished. The spill files are merged into single partitioned and sorted the output file. The configuration property io.sort.factor controls the maximum number of streams to merge at once. the default value of io.sort.factor is 10.

Just want to brief about how io.sort.factor is working, when the Mapper task is running it continuously writing data into Buffers, to maintain the buffer we have to set up a parameter called io.sort.spill .percent.

The value of  io.sort.spill.percent will indicate, after which point the data will be written into disk instead of a buffer which is filling up. All of this spilling to disk is done in a separate thread so that the Map can continue running. There may be multiple spills on the task tracker after the map task finished. Those files have to be merged into one single sorted file per partition which is fetched by a reducer. The property io.sort.factor says how many of those spill files will be merged into one file at a time.

Basically DFS.HOST file contains all the data node details and it allows access to all the nodes mentioned in the DFS.HOST file. This is the default configuration used by the name node. DFS.HOST and DFS.HOST.EXCLUDE will help to re-commission and decommission the data nodes. 

Hadoop provides the decommission feature to exclude a set of existing data nodes, the nodes to be taken out, should be included in excluding file and the exclude file name should be specified as a configuration parameter as dfs.hosts.exclude. You can find the example mentioned below.

Examples:

Modify the conf/mapred-site.xml, add: 

<property>

<name>dfs.hosts</name>

<value>/opt/hadoop/Bibhu/conf/datanode-allow.list</value>

</property>

<property>

<name>dfs.hosts.exclude</name>

<value>/opt/hadoop/Bibhu/conf/datanode-deny.list</value>

</property>

Decommission cannot happen immediately because it requires replication of potentially a large number of blocks and we do not want the cluster to be overwhelmed with just this one job. The decommission progress can be monitored on the name-node web UI or Cloudera UI. Till all blocks are replicated, the status of nodes will be in the "Decommission in progress" state. when decommission is done the state will change to "Decommissioned". The node can be removed whenever decommission is finished.

We can use below commands Without creating a dfs.hosts file or making any entries, run the commands hadoop.dfsadminrefreshModes on the Name Node.

# $HADOOP_HOME/bin/hadoop dfsadmin -refresh nodes

-refreshNodes, It will update the name node with a set of data nodes so that data nodes are allowed to connect the Name node.

Java garbage collection is the process by which Java programs perform automatic memory management. when we are talking about automatic memory management, it is a technique that automatically manages to allocation and deallocation of memory. Java programs compile to bytecode that can be run on a Java Virtual Machine alternatively Byte code is the compiled format of java program, once java program has been converted to byte code afterward it will execute by JVM and transferred across a network. While Java programs are running on the JVM , JVM has consumed memory which is called heap memory to do the same. Heap memory is a part of memory dedicated to the program.

Hadoop mapper is a java process and every java process has its own heap memory. Heap memory maximum allocation settings configured as mapred.map.child.java.opts or mapreduce.map.java.opts in Hadoop2. If the mapper process runs out of heap memory then the mapper throws a java out of memory exceptions as mentioned below.

Error: java.lang.Runtimeexception:Java.lang.OutofMemoryError

The java heap settings or size should be smaller than the Hadoop container memory limit because we need to reserve some memory for java code. Usually, it is recommended to reserve 20% memory for code. So if the settings are correct then Java-based Hadoop tasks will never get killed by Hadoop so you will not see the "Killing container" error like above.

To execute the actual map or reduce task, YARN will run a JVM within the container. the Hadoop property MapReduce.{map|reduc}.java.opts is proposed to pass to this JVM. This could include -Xmx to set the max heap size of the JVM.

Example: hadoop jar<jarName> -Dmapreduce.reduce.memory.mb=4096 -Dmapreduce.map.java.opts=-Xmx3276

Hive works on structured data provide a SQL like a layer on top of HDFS, Map-reduce task will execute for each query of Hive which is trying to do some compute of HDFS data. Impala is a Massive parallel processing SQL query engine that is capable enough to handle a huge volume of data. Impala is faster than Hive because Impala is not storing the intermediate query results on disk, it processes the SQL query in Memory without running any Map-reduce. 

Below are the few Hive components

a. Hive Clients
b. Hive Services

a. Hive Clients:

Hive clients are helping hive to perform the queries. There are three types of clients we can use to perform the queries 

1. Thrift Clients

2. JDBC clients

3. ODBC clients


  1. Thrift clients: Basically Apache Thrift is a   which will help to get connect in between client and server. Apache Hive uses Thrift to allow remote users to make a connection with HiveServer2(The thrift server) to connect to it and submit queries. Thrift protocols are written in different languages like C++, Java, Python so a user can query the same source in different languages.
  2. JDBC Clients: Apache hive allow Java applications to connect the Hive using the JDBC driver. It is defined as the class as apache .hadoop.hive.jdbc.HiveDriver.
  3. ODBC Clients: ODBC driver allows applications that support Open database connectivity protocol to connect to the hive.

b. Hive Services

Apache Hive provides below services.

1. CLI(Command Line Interfac): This is the default hive shell that will help query and command   to directly.

2. Web interface: Hive also provides web-based GUI for executing Hive queries and commands for example HUE in Cloudera.

3.Hive server/Thrift server: Different clients submit their requests to Hive and get the result accordingly.

4.Hive Driver: Once queries are submitted from Thrift/ JDBC/ODBC/CLI/Web UL, a driver is responsible to receive those queries, then it will process through a compiler, optimizer, and executor.

The compiler will verify the syntax check with the help of schema present in the metastore then optimizer generates the optimized logical plan in the form of Directed Acyclic Graph of Map-reduce and HDFS tasks. The Executor executes the tasks after the compilation and optimization steps. The Executor directly interacts with the Hadoop Job Tracker for scheduling of tasks to be run.

5. Metastore: Metastore is the central repository of Hive Metadata like schema and locations etc. 

Impala components are 1. Impala daemon(Impalad) 2. Impala State Store 3. Impala Catalog Service.

Impala daemon(Impalad): Impala daemon is running where impala is installed, Impalad accepts the queries from various interfaces like impala-shell, hue, etc and processes the queries to get the result.

Whenever query submitted in any impala daemon, the related node is considered " central coordinator node" for that query. After accepting the query, IMPALAD logically divides the query into smaller parallel queries and distribute them to different nodes in the impala cluster. all the Impalad gather all the intermediate result and send it to the central coordinator node, accordingly central coordinator node constructs the final query output.

Impala State Store: Impala daemons are continuously communicating with the statstore to identify the nodes that are healthy and capable enough to accept the new work. This information will convey to the Daemons by the Statstore component. due to any reason if any node is getting failed then Statestore updates all other nodes about this failure and once this notification is available to the other impalad, no other Impala daemon assigns any further queries to the affected node.

Impala Catalog Service: Catalog service provides the information about the metadata changes from Impala SQL statement to all the Impala daemons in the cluster. Impala uses the data stored in Hive so Impala refers the Metastore Table to get connect the Database and Tables created in Hive. Once the Table is created through Hive shell, prior to available this table for Impala queries we need to invalidate the metadata so that Impala reloads the corresponding metadata before the query is processed. 

Example :   INVALIDATE METADATA [[db_name.]table_name];

REFRESH [db_name.]table_name];

As we know that most of the Hive tables are containing billions and millions records and for any computation hive query will process with the help of Mapper and Reducer and it will consume more time and memory. Few of the optimization techniques which will always help hive query to perform better . Please find few of the below techniques.

a. Use Tez to Fasten the execution:  Apache TEZ is an execution engine used for faster query execution. Tez will allow you to launch a single Application Master for each session for multiple job, condition is that jobs are comparatively small so that Tez memory can use for those jobs. You need to set up the processing engine as Tez instead of default Map-Reduce execution engine providing below parameter.

Set hive.execution.engine=tez;

If you are using Cloudera/Hortonworks, then you will find TEZ option in the Hive query editor as well.

b. Enable compression in Hive

Basically Compression techniques, It reduce the amount of data size being transferred, so that it reduces the data transfer between mappers and reducers and compression is not suggestible if your data is already compressed  because the output file size might be larger than the original.  

For better result, you need to perform compression at both mapper and reducer side separately. There are many compression formats are available out of which gzip is taking more CPU resources than Snappy or LZO but it provides higher compression ratio. It is not relevant for splittable table.   

Other formats are snappy, lzo, bzip, etc. You can set compression at mapper and reducer side using codes below:

set mapred.compress.map.output = true;
set mapred.output.compress= true;

Users can also set the following properties in hive-site.xml and map-site.xml to get permanent effects.

<property>

  <name>mapred.compress.map.output</name>  

  <value>true</value>

</property>

<property>

 <name>mapred.map.output.compression(for MR)/compress(for Yarn).codec</name>  

  <value>org.apache.hadoop.io.compress.SnappyCodec</value>

</property>

c. Use ORC file format

ORC (optimized record columnar) is an appropriate format for hive performance tunin,query performance can improve using ORC file format easily. We can use ORC file format for all kind of table whether it is partitioned or single and in response, you get faster computation and compressed file size.

Create table orctbl (id int, name string, address string) stored as ORC tblproperties (“orc.compress”= “SNAPPY”);

Now simply you can also insert the data like

Insert overwrite table orctbl select * from tbldetails;

d. Optimize your joins

If your table is having large data then it is not advisable to just use normal joins which we use in SQL. There are many other joins like Map Join; bucket joins, etc. which will help to improve Hive query performance.

  • Use Map Join

When we are talking about Map join, It is beneficial when one table is as compare to other table which will take part of the Join. so that it can fit into the memory. Hive has a property which can do auto-map join when enabled. Set the below parameter to true to enable auto map join. Set hive.auto.convert.join to true to enable the auto map join. we can set this from the command line as well as from the hive-site.xml file

<property>
<name>hive.auto.convert.join</name>
<value>true</value>
<description>Whether Hive enables the optimization about converting common join into mapjoin based on the input file size</description>
</property>

e. Bucketed Map Join

If tables are bucketed by a particular column, you can use bucketed map join to improve the hive query performance. You can set the below two property to enable the bucketed map to join in Hive.

<property>
<name>hive.optimize.bucketmapjoin</name>
<value>true</value>
<description>Whether to try bucket mapjoin</description>
</property>
<property>
<name>hive.optimize.bucketmapjoin.sortedmerge</name>
<value>true</value>
<description>Whether to try sorted bucket merge map join</description>
</property>

f. Use Partition

Partition is always helpful for huge data. It is used to segregate the large table based on certain columns so that the whole data can be divided into small chunks. When we are saying partition the table, basically It allows you to store the data under sub-directory inside a table.

Selecting the partition table is always a critical decision, and you need to take care of future data and volume of data as well. For example, if you have data of a particular location then you can partition the table based on state. You can also partition the data in month wise as well. You can define the partition column based on your requirement.

Here is the syntax to create a partition table

CREATE TABLE countrydata_partition
(Id int, country name string, population int, description string)
PARTITIONED BY (country VARCHAR(64), state VARCHAR(64))
row format delimited
fields terminated by ‘\t’
stored AS textfile;

There are two types of partition in Hive.

  • Static partition
  • Dynamic partition

By default, the partition is static in a hive. In static partition usually we are providing the parameter as " PARTITIONED BY (department String) ". when loading big files into the hive, the static partition is preferred.

Single insert to partition table is known as dynamic partition and it load the data from non partitioned Table. If you don't know how many columns are available in your table in this scenario also dynamic partition is suitable. To use dynamic partition in Hive, you need to set the following property-

  • set hive.exec.dynamic.partition=true;
  • set hive.exec.dynamic.partition.mode=nonstrict;
  • set hive.exec.max.dynamic.partitions=1000;
  • set hive.exec.max.dynamic.partitions.pernode=100;

g. Use Vectorization

A standard query is executed one row at a time. vectorized query execution, it improves performance of operation like scan, aggregation, filter and joins and it is considering 1024 rows at a time to perform the operation. To use Vectorization you can use the below parameter.

set hive.vectorized.execution.enabled=true

set hive.vactorized.execution.reduce.enabled=true

LDAP and Active Directory are providing a centralized security system for managing both servers and users, It is managing for all user accounts and associated privileges for your employee. Kerberos is handled Authentication it means when a user trying to connect any Hadoop services, Kerberos will authenticate the user first then it will authenticate service too. when you are considering AD, LDAP and Kerberos in this scenario Kerberos will only provide authentication, all Identity Management is handled outside of Kerberos that is in AD and LDAP.    

In the high level when a new employee joins, his/her id has to be added in Active directory first then LDAP and Kerberos because AD is a directory service, owned by Microsoft and AD supports several standard protocols such as LDAP and Kerberos. 

LDAP and AD communicating with each other based on what user ID belongs to which group, for example, user Bibhu is a member of which groups and what kind of access permission he is having in different directories or files. These are the information is managed differently in AD and Linux system. In Windows, we have a concept called SID or Window security identifiers and in Linux, we do have a User ID or Group ID. SSSD can use the SID of an AD user to algorithmically generate POSIX IDs in a process called ID mapping. ID mapping creates a map between SIDs in AD and UID/GID on Linux.

AD can create and store POSIX attributes such as  uidNumber, gidNumber, unixHomeDirectory, or login Shell

There are two ways to mapping these SID and UID/GID using SSSD.

1. To connect AD and LDAP using SSSD you can add the below line in sssd.conf file

ldap_id_mapping = true 

2. POSIX permissions are the standards that define how Unix interacts with applications. POSIX stands for Portable Operating System Interface. we need to configure for each user in AD-related to each UID/GID of LDAP using POSIX. especially when we have several domains. In this case, we write following in sssd.conf  to Disable ID Mapping in SSSD

ldap_id_mapping = False

Below are few concepts need to know to understand the Integration of AD/LDAP/Kerberos

a. There is no such built-in authentication mechanism in LINUX. You can find password details in /etc/passwd file.

b. There are two important modules that are having an important role in providing security features at Linux level  1. PAM 2.NSS

PAM: PAM stands for pluggable authentication Module, which allows integration of authentication technology such as Unix, Linux, LDAP, etc into system services such as password, login, ssh, etc. alternatively When you're prompted for a password, that's usually PAM's doing. PAM provides an API through which authentication requests are mapped into technology-specific actions. This kind of mapping is done by PAM configuration files. Authentication mechanism is providing for each service.

NSS: NSS uses a common API and a configuration file (/etc/nsswitch.conf) in which the name service providers for every supported database are specified. Here Names include hostnames, usernames, group names such as /etc/passwd, /etc/group, and /etc/hosts.

c. Below are the few components related to Kerberos.

  • Key Distribution Center(KDC): It's a Kerberos server which contains encrypted database where it stores all the principal entries related to the user, hosts, and services including domain or Realm information
  • Authentication Server: Once a user successfully authenticates the Authenticate server,  an Authenticate server grants TGT to the client. The principal will use the TGT and request access for the Hadoop service
  • Ticket granting server: Ticket granting server validates a TGT in return grant the service ticket to the client, which the client can use to access the Hadoop service
  • Keytab File: It's a secure file which contains the password of all the service principal in a domain
  • Realm: A realm is the Domain name which has to mention in Upper case letters. For example HADOOP.COM
  • Principal: A principal may be a user or service or host which is part of the Realm. For example pbibhu@HADOOP.COM

d. NTP(Network Time Protocol): It is an internet protocol that is used for synchronizing the computer clock time in a network alternatively NTP client initiates a time request exchange with the NTP server.

e. PAM_SSS: Kerberos(pam_sss): One of the design principles of SSSD’s PAM module pam_sss was that it should not do any decisions on its own but let SSSD do them. pam_sss cannot decide which type of password prompt should be shown to the user but must ask SSSD first. Currently, the first communication between pam_sss and SSSD’s PAM responder happens after the user entered the password. Hence a new request, a pre-authentication request, to the PAM responder must be added before the user is prompted for the password.

f. NSS_SSS: LDAP(nss_sss_) SSSD provides a new NSS module as nss_sss, so that you can configure your system to use SSSD to retrieve user information.  

Below are 3 ways of integrating Linux with AD for Authentication

  1. Using LDAP/Kerberos PAM and NSS Module
  2. Using Winbind
  3. Using SSSD that is system services daemon for Integrating with Active Directory

Let’s understand clearly:

1. Using LDAP/Kerberos PAM and NSS Module:

PAM is configured to use Kerberos for authentication and NSS is to use the LDAP protocol for querying UID or GID information.  nss_ldap, pam_ldap, and pam_krb5 modules are available to support.

Here Problem is no caching of the credentials and there is no such offline support available here. 

2. Using Winbind:

Samba Winbind was a traditional or usual way of connecting Linux systems to AD. Basically, Winbind copy a Windows client on a Linux system and is able to communicate to AD servers alternatively we have winbind daemon which will receive calls from PAM and NSS, Once it is received it will translate into corresponding Active directory calls using either LDAP, KERBEROS or Remote protocol(RPC) depending on the requirement. The current versions of the System Security Services Daemon (SSSD) closed a feature gap between Samba Winbind and SSSD so Samba Winbind is no longer the first choice in general. 

3. Using SSSD that is system services daemon for Integrating with Active Directory:

The System Security Services Daemon (SSSD)  is an intermediary between local clients and any Remote Directories and Authentication Mechanism. The local clients connect to SSSD and then SSSD contacts the external providers that are AD, LDAP server. So here SSSD is working as a Bridge which will help you to Access the AD, LDAP.

Basically System authentication is configured locally which means initially services check with a local user store to determine users and credentials. SSSD allows a local service to check with local cache in SSSD so Local cache information might have taken from an LDAP directory or AD or Kerberos Realm. 

Below are the few advantages related to SSSD

  • It will reduce the load on identification/authentication servers. Despite connecting AD or LDAP directly, all of the local clients can contact SSSD which can connect to the identification server or check its cache.
  • Permitting offline authentication. SSSD also caches those users and credentials, so that user credentials are still available if the local system or the identity provider goes offline.
  • Using a single user account. Usually, Remote users have two (or even more) user accounts, such as one for their local system and one for the organizational system. In this scenario, It is necessary to connect to a virtual private network (VPN). Because SSSD supports caching and offline authentication, remote users can connect to network resources simply by authenticating to their local machine and then SSSD maintains their network credentials. 

sssd daemon provides different services for different purposes. We have a configuration file called sssd.conf which determines what tasks sssd can do. The file has 2 main parts as we can see here:

[sssd]

domains = WIN.EXAMPLE.COM
services = nss, pam
config_file_version = 2

[domain/WINDOWS]

id_provider = ad
auth_provider = ad
access_provider = ad

In the first part, we have clearly mentioned that what services on the system must use sssd, here in the above example nss and Pam has mentioned. The second part, domain/WINDOWS defines directory services also called identity provider for example AD, LDAP server. SSSD connecting AD/LDAP for querying the information, authentication, password change, etc. 

In brief below are the steps how SSSD is working or brief about the above diagram

  • Once user id and password have provided LIBC opens the nss_sss module as per the nsswitch.conf and passes the request
  • The nss_sss memory-mapped cache is consulted first to check the user id and corresponding password
  • If not found in nss_sss cache the request is passed to the sssd_nss module or SSSD
  • Then sssd_nss checks the SSSD on-disk LDB cache, All cache files are named for the domain. For example, for a domain named example LDAP, the cache file is named cache_exampleldap.ldb. If the data is present in the cache and valid, the nss responder returns it
  • If the data is not present in the LDB cache or if it is expired then it connects to the remote server and runs the search, here Remote server indicates AD or LDAP
  • The sssd.conf is configured with multiple domains; “domains = AD, LDAP”.
  • Active Directory is searched first, and if not found
  • LDAP searched next
  • When the search is finished the LDB cache is updated
  • The sssd_be or SSSD back end control provide signals back to the NSS responder to check the cache again.
  • The sssd_nss responder returns the cached data. If there is no data in the cache then no data is returned.

Sentry is a role-based authorization to both data and metadata stored on a Hadoop cluster for a user. Prior to know more about Sentry, below are the components based on which sentry is working. 

  1. Sentry server
  2. Data engine
  3. Sentry plugin

Sentry server:  The sentry server is   RPC (Remote protocol) server that stores all the authorization metadata details in an underlying relational database. RPC interface to retrieve or control the privileges.

Data engine: Data engines that are providing access to the data. Here we can consider data engine as  Hive, Impala and Hadoop HDFS.

Sentry Plug-in: Sentry plug-in runs in data engine. The plug-in interfaces will help to manipulation of authorization metadata which is stored in the Apache Sentry Server. Whatever access request from data engine (Hive, Impala, Hdfs) those are validated by plug-in authorization policy engine referring authorization metadata.

Sentry server only helps you to get the metadata information. The actual authorization decision is made by a Data engine that runs in data processing applications such as Hive or Impala. Each component loads the Sentry plug-in it means for each service like Hive/Hdfs/Impala/solr, each sentry plug-in has to be installed for dealing with the Sentry services and the policy engine to validate the authorization request.

Below are the few capabilities which sentry is having.

1. Fine-Grained Authorization:

It means Permissions on object hierarchies for example Server level, Database level, Table level, view (Row/column level authorization), URI and permissions hierarchies will be Select/insert/All this is called  Fine-Grained Authorization.

2. Role-Based Authorization(RBAC):

Sentry is providing role-based authorization where it is supporting a set of privileges and it supports for role templates which combine multiple access rules for a large set of users and data objects(Database, Table, etc).

For example, If Bibhu joins the Finance Department, all you need to do is add him to the finance-department group in Active Directory. This will give Bibhu access to data from the Sales and Customer tables.

You can create a role called Analyst and grant SELECT on tables Customer and Sales to this role.

CREATE ROLE Analyst;

GRANT SELECT on table Customer TO ROLE Analyst;

Now Bibhu who is a member of the finance-department group gets the SELECT privilege to the Customer and Sales tables.

GRANT ROLE Analyst TO GROUP finance-department ;

3. Multi Tanent Administration or Delegate Admin responsibilities:

It is having the capability to delegate or assign the admin responsibilities for a subset of resources. Delegate admin responsibility it means Delegated-Admin Privilege is assigned on a specific set of resources for a specific set of users/groups by a person who has already Delegated-Admin privilege on the specific set of resources.

4. User Identity and Group Mapping: Sentry depends on Kerberos or LDAP to identify the user. It also uses the group mapping mechanism configured in Hadoop to ensure that Sentry sees the same group mapping as other components of the Hadoop ecosystem.

For example, considering that users Bibhu and Sibb belong to an Active Directory (AD) group called the finance-department. Sibb also belongs to a group called finance-managers. In Sentry, create the roles first and then grant required privileges to those roles. For example, you can create a role called Analyst and grant SELECT on tables Customer and Sales to this role.

CREATE ROLE Analyst;

GRANT SELECT on table Customer TO ROLE Analyst;

The next step is to join these authentication entities (users and groups) to authorization entities (roles). This can be done by granting the Analyst role to the finance-department group. Now Bibhu and Sibb who are members of the finance-department group get the SELECT privilege to the Customer and Sales tables.

GRANT ROLE Analyst TO GROUP finance-department ;

Below are some scenarios where Hive, Impala, HDFS, and search activities are working with Sentry. Considering a few examples we will try to understand how it works.

1. Hive and Sentry :

If ID "Bibhu" submits the following Hive query:

select * from production.status

Here in the above query Hive will identify that user Bibhu is requesting SELECT access to the Status table. At this point, Hive will ask the Sentry plugin to validate the access request of Bibhu. The plugin will retrieve Bibhu's privileges related to the Status table and the policy engine will determine if the request is valid or not.

2. Impala and Sentry:

Authorization processing in Impala is more or less the same as Hive. The main difference is the caching of privileges. Usually, Impala’s Catalog server is managing caching roles and privileges or metadata, and spread it to all Impala server nodes. As a result, Impala daemon can authorize queries much faster referring to the metadata from the cache memory. The only drawback related to performance is it will take some time for privilege changes to take effect, it might take a few seconds.

3. Sentry-HDFS Synchronization:

When we are talking about Sentry and HDFS authorization, it basically speaks about Hive warehouse data. Warehouse data means whether it is Hive or Impala data related to Table. The main objective is when other components like Pig, MapReduce or Spark trying to access the hive table at that time similar authorization check will occur. At this point, this feature does not replace HDFS ACLs. The tables which are not associated with sentry those retain their old ACLs.

The mapping of Sentry privileges to HDFS ACL permissions is as follows:

  • SELECT privilege -> Read access on the file
  • INSERT privilege -> Write access on the file
  • ALL privilege -> Read and Write access on the file.

When NameNode loads a Sentry plugin that caches Sentry privileges as well as Hive metadata. It helps HDFS to keep file permissions and Hive tables privileges in sync. The Sentry plugin periodically communicates the Sentry and Metastore to keep the metadata changes are in sync.

For example, if Bibhu runs a Pig job, which is reading from the Sales table data files, anyhow data files will be stored in HDFS. Sentry plugin on the Name Node will figure out that data file is part of Hive data and cover Sentry privileges on top of the file ACLs, It means HDFS will get the same privileges for this Pig client that Hive would have applied for a SQL query.

For HDFS-Sentry synchronization to work, for doing the same you must use the Sentry service, not policy file authorization.

4. Search and Sentry:

Sentry can apply restriction on search tasks which are coming from a browser or command line or through the admin console.

With Search, Sentry stores its privilege policies in a policy file (for example, sentry-provider.ini) which is stored in an HDFS location such as hdfs://ha-nn-uri/user/solr/sentry/sentry-provider.ini.
Multiple policy files for multiple databases is not supported by Sentry with Search. However, you must use a separate policy file for each Sentry-enabled service. 

5. Disabling Hive CLI:

To execute the hive queries you have to use beeline. when you will disable Hive CLI, Hive CLI is not supported with Sentry and Hive Metastore also be disabled. This is especially necessary if the Hive metastore has sensitive metadata.

To do the same, you have to modify the hadoop.proxyuser.hive.groups in core-site.xml on the Hive Metastore host.

For example, to give the hive user permission to members of the hive and hue groups, set the property to:

<property>
<name>hadoop.proxyuser.hive.groups</name>
<value>hive,hue</value>
</property>

If More user groups that require access to the Hive Metastore can be added to the comma-separated list as needed.

Description

Hadoop is an open source framework highly adopted by several organizations to store and process a large amount of structured and unstructured data by applying the MapReduce programming model. There are so many top rated companies using Apache Hadoop framework to deal with their large amount of data that is increasing continuously every minute. Coming to the Hadoop cluster, Yahoo is the first name in the list having around 4500 nodes followed by Linkedin and Facebook

Here are some of the world’s most popular and top-rated organizations that are using Hadoop for their production and research. Adobe,  AOL, Alibaba, eBay, and Fox Audience network etc.

If you are looking to build your career in the field of big data Hadoop, then give a start with learning big data Hadoop. You can also take up Hadoop Training program and start a career as a big data Hadoop professional to solve large data problems.

Interview questions on Hadoop here are the top Hadoop Interview questions asked frequently and which are scenario based. You will also see how to explain Hadoop project in an interview which carries a lot of weight in the interview.

These Hadoop developer interview questions have been designed specially to get you familiarized with the nature of questions which you might face during your interview and will help you to crack Hadoop Interview easily & acquire your dream career as a Hadoop Developer.  Top big data Hadoop interview questions will surely boost your confidence to face an interview and will prepare you to answer to your interviewer’s questions in the best manner. These interview questions on Hadoop are suggested by the experts.

Turn yourself into a Hadoop Developer. Live your dream career!

Read More
Levels