UNIT - IV
Hadoop Architecture: Hadoop: RDBMS Vs Hadoop, Hadoop
Overview, Hadoop distributors,HDFS, HDFS Daemons,
Anatomy of File Write and Read., Name Node, Secondary Name
Node,and Data Node, HDFS Architecture, Hadoop
Configuration, Map Reduce Framework, Role of HBase in Big
Data processing, HIVE, PIG.
RDBMS vs Hadoop
HADOOP OVERVIEW
Open-source software framework to store and
processmassive amounts of data in a distributed fashion onlarge
clusters of commodity hardware.
Basically, Hadoop accomplishes two tasks:
1. Massive data storage.
2. Faster data processing.
Key Aspects of Hadoop
Hadoop Components
Hadoop Conceptual Layer
It is conceptually divided into
1. Data Storage Layer: Stores huge volumes of data
2. Data Processing Layer: Processes data in parallelto
extract richer and meaningful insights from data
High Level Architecture of Hadoop
Hadoop is a distributed Master-Slave Architecture.
Master node is known as NameNode and slave nodesare
known as DataNodes.
Key components of the Master Node
1. Master HDFS: Its main responsibility is partitioningthe data
storage across the slave nodes.It also keeps track
of locations of data on DataNodes.
2. Master MapReduce:It decides and schedulescomputation task
on slave nodes
Hadoop Distributors
HDFS (Hadoop Distributed File System) Key Points
1. Storage component of Hadoop
2. Distributed File System: A Distributed File System (DFS) is
a file system that allows data to be stored and accessed
across multiple servers or nodes, providing a unified view of
data as if it were all stored on a single system. DFS's primary
goal is to enable users of physically distributed systems to
share resources and information through the Common File
System (CFS). It is a file system that runs as a part of the
Operating System. Its configuration is a set of workstations
and mainframes that a LAN connects. The process of
creating a namespace in DFS is transparent to the clients.
DFS has two components in its services such as Local
Transparency, Redundancy.
The features of DFS includes such as
● Structure transparency: The client need not know about the
number or locations of file servers and the storage devices.
Multiple file servers should be provided for performance,
adaptability, and dependability.
● Access transparency: Both local and remote files should be
accessible in the same manner. The file system should be
automatically located on the accessed file and send it to the
client’s side.
● Naming transparency: There should not be any hint in the
name of the file to the location of the file. Once a name is
given to the file, it should not be changed during transferring
from one node to another.
● Replication transparency: If a file is copied on multiple
nodes, both the copies of the file and their locations should
be hidden from one node to another.
● User mobility: It will automatically bring the user’s home
directory to the node where the user logs in.
● Performance: Performance is based on the average amount
of time needed to convince the client requests.
● Simplicity and ease of use: The user interface of a file system
should be simple and the number of commands in the file
should be small.
● Data integrity: Multiple users frequently share a file system.
The integrity of data saved in a shared file must be
guaranteed by the file system. That is, concurrent access
requests from many users who are competing for access to
the same file must be correctly synchronized using a
concurrency control method. Atomic transactions are a high-
level concurrency management mechanism for data integrity
that is frequently offered to users by a file system.
● Security: A distributed file system should be secure so that
its users may trust that their data will be kept private. To
safeguard the information contained in the file system from
unwanted & unauthorized access, security mechanisms must
be implemented.
It enables large-scale storage and data management by
distributing the load across many machines, improving
performance, scalability, and reliability. Distributed file systems
are commonly used in cloud computing, big data platforms, and
environments where high availability and fault tolerance are
critical.
3. Modeled after Google File System
4. Optimized for high throughput(HDFS leverages large block
size and moves computation where daa is stored)
5. You can replicate a file for a configured number of times,
which is tolerant in terms of both software and hardware
6. Re-replicates data blocks automatically on nodes that have
failed
7. You can realize the power of HDFS when you perform read
or write on large files(gigabytes and larger)
8. Sits on top of native file system such as ext3 and ext4 which
is
HDFS
Native OS file system
Disk storage
HDFS Architecture
Apache Hadoop HDFS Architecture follows aMaster/Slave
Architecture, where a cluster comprises of a single NameNode
(Master node) and all the other nodes are DataNodes (Slave
nodes).
HDFSis a block-structured file systemwhereeachfile
isdivided into blocks of apre-determined size. These blocks are
stored across a cluster of one or several machines.
HDFS can be deployedon a broad spectrum ofmachinesthat
support
Java.ThoughonecanrunseveralDataNodesonasinglemachine,buti
nthepracticalworld,these DataNodes are spread across various
machines.
StoringdataintoHDFS
HDFS stores data in a reliable fashion using replication and
distribution. Here is the series of steps that happen when a client
writes a file in hdfs:
1.ClientrequestsNamenode
tocreatethefile.Itpassessizeoffileasaparameter
2.Namenode responds with location of nodes where client can
store data. By default there'll be 3 locations per block.
If file size is 200mb, there'll be 2 blocks, first 128 mb, 2nd
72 mb. Similarly depending on the size, you'll have n
number of blocks.
3.Clientdirectlystartswritingdatatothefirstdatanodeoutofthree
givenbynamenode. Please note that if there are 2 blocks to
be written client can start writing them in parallel.
4.When the first datanode has stored the block, it replies to
the client with success and now it passes on the same block
to 2nd datanode. 2nd datanode will write this block and pass
it on to 3rd datanode.
5.So basically writing of blocks from client to datanodes
happens in parallel but replication happens in series.
6.Blocks of same file can go to different nodes, at least the
replicated blocks will always be on
differentnodes.Thefirstblockisalwaysonthedatanodewhichi
snearesttotheclient,2ndand 3rd blocks are stored based on
free capacity of the datanodes and/or rack awareness.
File Blocks:
Blocksarenothingbutthesmallestcontinuouslocationon
yourharddrivewheredataisstored. In general, in any of theFile
System, you store the data as a collection ofblocks. Similarly,
HDFS
storeseachfileasblockswhicharescatteredthroughouttheApacheHa
doopcluster.Thedefault
sizeofeachblockis128MBinApacheHadoop2.x(64MBinApacheH
adoop 1.x)whichyoucan configure as per your requirement. All
blocks of the file are the same size except the last block,
whichcanbeeitherthesamesizeorsmaller.Thefilesaresplitinto128
MBblocksandthe stored into the Hadoop file system. The Hadoop
application is responsiblefor distributing the data block across
multiple nodes.
Let’s take an example where we have a file “example.txt” of size
514 MB as shown in above
figure.Supposethatweareusingthedefaultconfiguration
ofblocksize,whichis128MB.Then,
5blockswillbecreated.Thefirstfourblockswillbe
of128MB.But,thelastblockwillbeof2MB size only.
Data Replication
HDFS is designed to reliably store very large files across
machines in a large cluster. It stores each file as a sequence of
blocks; all blocks in a file except the last block are the same size.
The blocks of a file are replicated for fault tolerance. The block
size and replication factor are configurable per file. An
application can specify the number of replicas of a file. The
replication factor can be specified at file creation time and can be
changed later. Files in HDFS are write-once and have strictly one
writer at any time.
The NameNode makes all decisions regarding replication of
blocks. It periodically receives a Heartbeat and a Blockreport
from each of the DataNodes in the cluster. Receipt of a Heartbeat
implies that the DataNode is functioning properly. A Blockreport
contains a list of all blocks on a DataNode.
In Figure above there are two files foo and bar respectively with
three data nodes and two blocks each. The blocks have been
scattered across data nodes with replication factor 2. The blocks
are scattered such that if any one of the data nodes is down still
all the file blocks are available in the remaining two nodes of data.
An image file known as FSImage contains data about the file
blocks, including their number and location. This file is kept in
the name node and the name node will always maintain an
updated FSImage file. If any of the data nodes is down, the
FSImage has to be updated and in this process, the name node
will come to know about the existence of data nodes through
heartbeat which is sent by all data nodes to the name node every
3 seconds. If the name node does not receive any data node’s
heartbeat beat then it will assume that the data node is down and
accordingly the FSImage file is also updated
WhatisReplicationManagement?
● HDFSperformsreplicationtoprovideFaultTolerant
andtoimprovedata reliability.
● There could be situations where the data is lost in many
ways-
o node is down,
o Node lost the network connectivity, o a node is
physically damaged, and
o a node is intentionally made unavailable for horizontal
scaling.
● For any of the above-mentioned reasons, data will not be
available if the replication is not
made.HDFSusuallymaintains3copiesofeach
DataBlockindifferentnodesand different Racks. By doing
this, data is made available even if one of the systems is
down.
● Downtime will be reduced by making data replications. This
improves the reliability and makes HDFs fault tolerant.
● Blockreplicationprovidesfaulttolerance.Ifonecopyisnotacce
ssibleandcorrupted,wecan read data from other copy.
● The number of copies or replicas of each block of a file in
HDFS Architecture is replication factor. The default
replication factor is 3 which are again configurable. So,
each block replicates three times and stored on different
DataNodes.
● So, as you can see inthefigurebelow where each block is
replicated three times and stored ondifferentDataNodes
(consideringthedefault replicationfactor):Ifweare
storingafile of
128MBinHDFSusingthedefaultconfiguration,wewillendupo
ccupyingaspaceof384MB (3*128 MB).
RackAwarenessinHDFSArchitecture:
Rack-Itthecollectionofmachinesaround30-40.Allthesemac
hinesareconnectedusingthe
samenetworkswitchandifthatnetworkgoesdownthenallmac
hinesinthatrackwillbeout of service. Thus we say rack is
down.
RackAwarenesswasintroducedbyApacheHadoop
toovercomethisissue. Rack awareness is the knowledge
that how the data nodes are distributed across the rack of
Hadoop cluster.
In the large cluster of Hadoop, in order to improve
thenetworktrafficwhile reading/writing HDFS
file,NameNode choosestheDataNodewhich isclosertothe
same rackor nearbyrack to Read /write request.NameNode
achieves rack information bymaintainingthe rack ids of
each DataNode. This concept that chooses Datanodes based
on the rack information is called Rack Awareness in
Hadoop.
InHDFS,NameNodemakessurethatallthereplicasarenotstor
edonthesamerackorsingle rack; it follows Rack Awareness
Algorithm to reduce latency as well as fault tolerance. As
we know the defaultReplication Factor is3 and Client want
to place a file in HDFS, then Hadoop places the replicas as
follows:
1)Thefirstreplicaiswrittentothedatanodecreatingthefile
,toimprovethewrite performance because of the write
affinity.
2)Thesecondreplicaiswrittentoanotherdatanodewithint
hesamerack,to minimize the cross-rack network traffic.
3)Thethirdreplicaiswrittentoadatanodeinadifferentrac
k,ensuringthatevenif a switch or rack fails, the data is
not lost (Rack awareness).
Thisconfigurationismaintained to makesurethattheFileis
never lost incaseofa Node Failure or even an entire Rack
Failure.
AdvantagesofRack Awareness:
Minimize the writing cost andMaximize read speed –Rack
awareness places read/write requests to replicas on the same
or nearby rack. Thus minimizing writing cost and
maximizing reading speed.
Provide maximize network bandwidth and low latency
–Rack awareness maximizes
networkbandwidthbyblockstransferwithinarack.Thisisparti
cularlybeneficialincases where tasks cannot be assigned to
nodes where their data is stored locally.
Dataprotectionagainstrackfailure–Bydefault,thenamenodea
ssigns2nd&3rdreplicas
ofablocktonodesinarackdifferentfromthefirstreplica.Thispr
ovidesdataprotection even against rack failure
Replica Selection
To minimize global bandwidth consumption and read
latency, HDFS tries to satisfy a read request from a replica that is
closest to the reader. If there exists a replica on the same rack as
the reader node, then that replica is preferred to satisfy the read
request.
Features of HDFS
The following are the main advantages of HDFS:
● HDFS can be configured to create multiple replicas for a
particular file. If any one replica fails, the user can still
access the data from other replicas. HDFS provides the
option to configure automatic failover in case of a failure.
So, in case of any hardware failure or an error, the user can
get his data from another node where the data has been
replicated. HDFS provides the facility to perform software
failover. This is similar to automatic failover; however, it is
performed at the data provider level. So, in case of any
hardware failure or an error, the user can get his data from
another node where the data has been replicated.
● Horizontal scalability means that the data stored on multiple
nodes can be stored in a single file system. Vertical
scalability means that data can be stored on multiple nodes.
Data can be replicated to ensure data integrity. Replication
occurs through the use of replication factors rather than the
data itself. HDFS can store up to 5PB of data in a single
cluster and handles the load by automatically choosing the
best data node to store data on. Data can be read/updated
quickly as it is stored on multiple nodes. Data stored on
multiple nodes through replication increases the reliability
of data.
● Data is stored on HDFS, not on the local filesystem of your
computer. In the event of a failure, the data is stored on a
separate server, and can be accessed by the application
running on your local computer. Data is replicated on
multiple servers to ensure that even in the event of a server
failure, your data is still accessible. Data can be accessed via
a client tool such as the Java client, the Python client, or the
CLI. Access to data is accessible via a wide variety of client
tools. This makes it possible to access data from a wide
variety of programming languages.
Advantages of HDFS Architecture
1. It is a highly scalable data storage system. This makes it ideal
for data-intensive applications like Hadoop and streaming
analytics. Another major benefit of Hadoop is that it is easy
to set up. This makes it ideal for non-technical users.
2. It is very easy to implement, yet very robust. There is a lot
of flexibility you get with Hadoop. It is a fast and reliable
file system.
3. This makes Hadoop a great fit for a wide range of data
applications. The most common one is analytics. You can
use Hadoop to process large amounts of data quickly, and
then analyze it to find trends or make recommendations. The
most common type of application that uses Hadoop analytics
is data crunching.
4. You can increase the size of the cluster by adding more nodes
or increase the size of the cluster by adding more nodes. If
you have many clients that need to be stored on HDFS you
can easily scale your cluster horizontally by adding more
nodes to the cluster. To scale your cluster vertically, you can
increase the size of the cluster. Once the size of the cluster is
increased, it can serve more clients.
5. This can be done by setting up a centralized database, or by
distributing data across a cluster of commodity personal
computers, or a combination of both. The most common
setup for this type of virtualization is to create a virtual
machine on each of your servers.
6. Specialization reduces the overhead of data movement
across the cluster and provides high availability of data.
7. Automatic data replication can be accomplished with a
variety of technologies, including RAID, Hadoop, and
database replication. Logging data and monitoring it for
anomalies can also help to detect and respond to hardware
and software failures.
Disadvantages of HDFS Architecture
1. It is important to have a backup strategy in place. The cost
of downtime can be extremely high, so it is important to keep
things running smoothly. It is also recommended to have a
security plan in place. If your company does not have a data
backup plan, you are putting your company’s data at risk.
2. The chances are that the data in one location is vulnerable to
hacking. Imagine the fear of losing valuable data when a
disaster strikes. To protect data, backup data to a remote
location. In the event of a disaster, the data can be quickly
restored to its original location.
3. This can be done manually or through a data migration
process. Once the data is copied to the local environment, it
can be accessed, analyzed, and used for any purpose.
HDFS Daemons:
NameNode:
NameNode is the master node in the Apache Hadoop
HDFS Architecture that maintains and
managestheblockspresentontheDataNodes(slavenodes).
It also manages the file system namespace and regulates
access to files by clients. NameNodeisaveryhighlyavailable
serverthatmanagestheFileSystem.
Namespaceandcontrolsaccesstofilesbyclients.Internally, a file is
split into one or more blocks and these blocks are stored in a set
of DataNodes. The NameNode executes file system namespace
operations like opening, closing, and renaming files and
directories. It also determines the mapping of blocks to
DataNodes. The DataNodes are responsible for serving read and
write requests from the file system’s clients. The DataNodes also
perform block creation, deletion, and replication upon instruction
from the NameNode.TheHDFS
architectureisbuiltinsuchawaythattheuserdataneverresidesontheN
ameNode.Namenode contains metadata and the data resides on
DataNodes only.
The NameNode and DataNode are pieces of software
designed to run on commodity machines. These machines
typically run a GNU/Linux operating system (OS). HDFS is built
using the Java language; any machine that supports Java can run
the NameNode or the DataNode software. Usage of the highly
portable Java language means that HDFS can be deployed on a
wide range of machines. A typical deployment has a dedicated
machine that runs only the NameNode software. Each of the other
machines in the cluster runs one instance of the DataNode
software. The architecture does not preclude running multiple
DataNodes on the same machine but in a real deployment that is
rarely the case.
The existence of a single NameNode in a cluster greatly
simplifies the architecture of the system. The NameNode is the
arbitrator and repository for all HDFS metadata. The system is
designed in such a way that user data never flows through the
NameNode.
FunctionsofNameNode:
● It
isthemasterdaemonthatmaintainsandmanagestheDataNodes
(slave nodes)
● Name node manages file-related operations such as read,
write, create and delete.
Managesthefilesystemnamespace. File System Namespace
is a collection of files in the cluster.HDFS supports a
traditional hierarchical file organization. A user or an
application can create directories and store files inside these
directories. The file system namespace hierarchy is similar
to most other existing file systems; one can create and
remove files, move a file from one directory to another, or
rename a file. HDFS does not yet implement user quotas.
HDFS does not support hard links or soft links. However,
the HDFS architecture does not preclude implementing these
features.Any change to the file system namespace or its
properties is recorded by the NameNode. An application can
specify the number of replicas of a file that should be
maintained by HDFS. The number of copies of a file is called
the replication factor of that file. This information is stored
by the NameNode.
● It records the metadata of all the files stored in the
cluster, e.g. thelocation of blocks stored,thesize of the
files, permissions, hierarchy, etc. There are two files
associated with the metadata:
● FsImage: It contains the complete state of the file
system namespace since the start of the NameNode. File
System Namespace includes mapping of blocks to file,
file properties and is stored in a file called FSImage.
● EditLogs:It contains all the recent modifications made
to the file system with respect to the most recent
FsImage. It records every transaction(change) that
happens to the files system metadata.
Forexample,ifafile is deleted in HDFS, the NameNode
will immediately record this in the EditLog.
● It regularlyreceives aHeartbeatand
ablockreportfromalltheDataNodesinthe cluster to ensure
that the DataNodes are live.
● Itkeepsarecordof
alltheblocksinHDFSandinwhichnodestheseblocksare
located.
● The NameNode is also responsible to take care of the
replicationfactorof all the blocks
● In
caseoftheDataNodefailure,theNameNodechoosesnewD
ataNodesfornewreplicas, balance disk usage and
manages the communication traffic to the DataNodes.
DataNode:
Data Nodes are the slave nodes in HDFS. Unlike
NameNode, DataNode is commodity hardware,
thatis,anon-expensivesystemwhichisnotofhighqualityorhigh-avai
lability.TheDataNodeis a block server that stores the data in the
local file ext3 or ext4.
FunctionsofDataNode:
● TheactualdataisstoredonDataNodes.
● Datanodesperformread-
writeoperationsonthefilesystems,as perclientrequest.
● Theyalsoperformoperationssuchasblockcreation,deletion,a
n dreplicationaccording to the instructions of the
namenode.
● TheysendheartbeatstotheNameNodeperiodicallytoreportth
e overallhealthofHDFS, by default; this frequency is set to
3 seconds.
SecondaryNameNode:
It is a separate physical machine which acts as a helper of
name node. It performs periodic check points. It communicates
with the name node and take snapshot of meta data which
helpsminimizedowntimeandlossofdata.
TheSecondaryNameNodeworksconcurrentlywith the primary
NameNode as a helper daemon.
FunctionsofSecondaryNameNode:
● TheSecondaryNameNodeisonewhichconstantlyreadsallthe
fi lesystemsandmetadata from the RAM of the NameNode
and writes it into the hard disk or the file system.
● ItisresponsibleforcombiningtheEditLogswithFsImagefrom
t heNameNode.
● It downloads the EditLogs from the NameNode at regular
intervals and applies to
FsImage.ThenewFsImageiscopiedbacktotheNameNode,whi
chisusedwheneverthe NameNode is started the next time.
● Hence,SecondaryNameNodeperformsregularcheckpointsi
n HDFS.Therefore,it isalso called CheckpointNode.
Hadoop Filesystems
Hadoop has an abstract notion of filesystems, of which
HDFS is just one implementation. The Java abstract class
org.apache.hadoop.fs.FileSystem represents the client interface to
a filesystem in Hadoop, and there are several concrete
implementations.
The main ones that ship with Hadoop are described in Table
To list the files in the root directory of the local filesystem,
type:
hadoop fs -ls /tmp
A file in a Hadoop filesystem is represented by a Hadoop
Path object (and not a java.io.File object, since its semantics are
too closely tied to the local filesystem).
You can think of a Path as a Hadoop filesystem URI, such
as hdfs://localhost/user/ tom/quangle.txt.
FileSystem is a general filesystem API, so the first step is to
retrieve an instance for the filesystem we want to use—HDFS, in
this case.
A Configuration object encapsulates a client or server’s
configuration, which is set using configuration files read from the
classpath, such as etc/hadoop/core-site.xml.
There are several static factory methods for getting a
FileSystem instance:
● public static FileSystemget(Configuration conf) throws
IOException
returns the default filesystem (as specified in core-
site.xml, or the default local filesystem if not specified
there)
● public static FileSystemget(URI uri, Configuration
conf) throws IOException
uses the given URI’s scheme and authority to
determine the filesystem to use, falling back to the default
filesystem if no scheme is specified in the given URI.
● public static FileSystemget(URI uri, Configuration
conf, String user) throws IOException retrieves the
filesystem as the given user, which
is important in the context of security
Data Flow Anatomy of a File Read
To get an idea of how data flows between the client
interacting with HDFS, the namenode, and the datanodes,
consider Figure below, which shows the main sequence of events
when reading a file.
● Step 1:The client opens the file it wishes to read by calling
open() on the
FileSystem object, which for HDFS is an instance of
DistributedFileSystem
● Step 2:DistributedFileSystem calls the namenode, using
remote procedure calls (RPCs), to determine the locations
of the first few blocks in the file. For each block, the
namenode returns the addresses of the datanodes that have
a copy of that block. Furthermore, the datanodes are
sorted according to their proximity to the client. If the
client is itself a datanode (in the case of a MapReduce
task, for instance), the client will read from the local
datanode if that datanode hosts a copy of the block.
The DistributedFileSystem returns an
FSDataInputStream (an input stream that supports file
seeks) to the client for it to read data
from.FSDataInputStream in turn wraps a
DFSInputStream, which manages the datanode and
namenode I/O.
● Step 3:The client then calls read() on the
stream.
DFSInputStream, which has stored the datanode addresses
for the first few
blocks in the file, then connects to the first(closest)
datanode for the
first block in the
file.
● Step 4:Data is streamed from the datanode back to the
client, which calls read() repeatedly on the stream.
● Step 5: When the end of the block is
reached,
DFSInputStream will close the connection to the datanode,
then find the best datanode for thenext block. This
happens transparently to the client, which from its
point of view is just reading a continuous stream.
Blocks are read in order, with the DFSInputStream
opening new connections to datanodes as the client
reads through the stream. It will also call the
namenode to retrieve the datanode locations for the
next batch of blocks as needed.
● Step 6:When the client has finished reading, it calls close()
on the
FSDataInputStream.
During reading, if the DFSInputStream encounters an
error while communicating with a datanode, it will try the
next closest one for that block. It will also remember
datanodes that have failed so that it doesn’t needlessly retry
them for later blocks. The DFSInput Stream also verifies
checksums for the data transferred to it from the datanode.
If a corrupted block is found, the DFSInputStream attempts
to read a replica of the block from another datanode; it also
reports the corrupted block to the namenode.
One important aspect of this design is that the client
contacts datanodes directly to retrieve data and is guided by
the namenode to the best datanode for each block. This
design allows HDFS to scale to a large number of concurrent
clients because the data traffic is spread across all the
datanodes in the cluster. Meanwhile, the namenode merely
has to service block location requests (which it stores in
memory, making them very efficient) and does not, for
example, serve data, which would quickly become a bottle
neck as the number of clients grew.
Network Topology and Hadoop
What does it mean for two nodes in a local network to be
“close” to each other?
In the context of high-volume data processing, the limiting
factor is the rate at which we can transfer data between nodes—
bandwidth is a scarce commodity. The idea is to use the
bandwidth between two nodes as a measure of distance. Rather
than measuring bandwidth between nodes, which can be difficult
to do in practice (it requires a quiet cluster, and the number of
pairs of nodes in a cluster grows as the square of the number of
nodes), Hadoop takes a simple approach in which the network is
represented as a tree and the distance between two nodes is the
sum of their distances to their closest common ancestor. Levels in
the tree are not predefined, but it is common to have levels that
correspond to the data center, the rack, and the node that a process
is running on.
The idea is that the bandwidth available for each of the
following scenarios becomes progressively less:
• Processes on the same node
• Different nodes on the same rack
•Nodes on different racks in the same data center
•Nodes in different data centers
For example, imagine a node n1 on rack r1 in data center d1.
This can be represented as /d1/r1/n1. Using this notation, here are
the distances for the four scenarios:
• distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the
same node)
• distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on
the same rack)
• distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different
racks in the same data center)
• distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different
data centers)
This is illustrated schematically in Figure 3-3.
(Mathematically inclined readers will notice that this is an
example of a distance metric.)
Finally, it is important to realize that Hadoop cannot magically
discover your network topology for you; it needs some help. By
default, though, it assumes that the network is flat—a single level
hierarchy—or in other words, that all nodes are on a single rack
in a single data center. For small clusters, this may actually be the
case, and no further configuration is required
Anatomy of a File Write
We’re going to consider the case of creating a new file,
writing data to it, then closing the file. This is illustrated in
Figure below
● Step 1:The client creates the file by calling create() on
DistributedFile
System.
● Step 2:DistributedFileSystem makes an RPC call to the
namenode to create a new file in the filesystem’s
namespace, with no blocks associated with it. The
namenode performs various checks to make sure the file
doesn’t already exist and that the client has the right
permissions to create the file. If these checks pass, the
namenode makes a record of the new file; otherwise, file
creation fails and the client is thrown an IOException. The
DistributedFileSystem returns an
FSDataOutputStream for the client to start writing data to.
Just as in the read case, FSDataOutputStream wraps a
DFSOutputStream, which handles communication
with the datanodes and namenode.
● Step 3:As the client writes data, the DFSOutputStream
splits it into packets, which it writes to an internal queue
called the data queue. The data queue is
consumed by the DataStreamer, which is
responsible for asking the namenode to allocate new blocks
by picking a list of suitable datanodes to store the replicas.
The list of datanodes forms a pipeline, and here we’ll
assume the replication level is three, so there are three
nodes in the pipeline.
● Step 4:The DataStreamer streams the packets to the first
datanode inthe pipeline, which stores each packet and
forwards it to the second datanode in the pipeline. Similarly,
the second datanode stores the packet and forwards it to the
third (and last) datanode in the pipeline.
● Step 5:The DFSOutputStream also maintains an internal
queue of packets that are waiting to be acknowledged by
datanodes, called the ack queue. A packet is removed from
the ack queue only when it has been acknowledged by all
the datanodes in the pipeline. If any datanode fails while
data is being written to it, then the following actions are
taken, which are transparent to the client writing the data.
First, the pipeline is
closed, and any packets in the ack queue are added
to the front of the data queue so that datanodes that
are downstream from the failed node will not miss
any packets. The current block on the good
datanodes is given a new identity, which is
communicated to the namenode, so that the partial
block on the failed datanode will be deleted if the
failed datanode recovers later on. The failed
datanode is removed from the pipeline, and a new
pipeline is constructed from the two good datanodes.
The remainder of the block’s data is written to the
good datanodes in the pipeline. The namenode
notices that the block is under-replicated, and it
arranges for a further replica to be created on another
node. Subsequent blocks are then treated as normal.
It’s possible, but unlikely, for multiple datanodes to
fail while a block is being written. As long as
dfs.namenode.replication.min replicas (which
defaults to 1) are written, the write will succeed, and
the block will be asynchronously replicated across
the cluster until its target replication factor is reached
(dfs.replication, which defaults to 3).
● Step 6:When the client has finished writing data, it calls
close() on the stream.
● Step 7:This action flushes all the remaining packets to the
datanode pipeline and waits for acknowledgments before
contacting the namenode to signal that the file is complete.
The namenode already knows which blocks the file is made
up of (because Data Streamer asks for block allocations), so
it only has to wait for blocks to be minimally replicated
before returning successfully.
Replica Placement
How does the namenode choose which datanodes to store
replicas on?
There’s a trade off between reliability and write bandwidth
and read bandwidth here. For example, placing all replicas on a
single node incurs the lowest write bandwidth penalty (since the
replication pipeline runs on a single node), but this offers no real
redundancy (if the node fails, the data for that block is lost).
Also, the read bandwidth is high for off-rack reads. At the other
extreme, placing replicas in different data centers may maximize
redundancy, but at the cost of bandwidth. Even in the same data
center (which is what all Hadoop clusters to date have run in),
there are a variety of possible placement strategies. Hadoop’s
default strategy is to
place the first replica on the same node as the client (for
clients running outside the cluster, a node is chosen at random,
although the system tries not to pick nodes that are too full or too
busy).
The second replica is placed on a different rack from the first
(off-rack), chosen at random.
The third replica is placed on the same rack as the second,
but on a different node chosen at random.
Further replicas are placed on random nodes in the cluster,
although the system tries to avoid placing too many replicas on
the same rack. Once the replica locations have been chosen, a
pipeline is built, taking network topology into account. For a
replication factor of 3, the pipeline might look like in Figure
below
A typical replica pipeline Overall, this strategy gives a good
balance among reliability (blocks are stored on two racks), write
bandwidth (writes only have to traverse a single network switch),
read performance (there’s a choice of two racks to read from), and
block distribution across the cluster (clients only write a single
block on the local rack)
Explain basic HDFS File operations with an example.
1. Creating a directory:
Syntax: hdfsdfs –mkdir<path>
Eg. hdfsdfs –mkdir /chp
2. Remove a file in specified path:
Syntax: hdfsdfs –rm <src>
Eg. hdfsdfs –rm /chp/abc.txt
3. Copy file from local file system to hdfs: Syntax: hdfsdfs –
copyFromLocal<src><dst>
Eg. hdfsdfs –copyFromLocal
/home/hadoop/sample.txt /chp/abc1.txt
4. To display list of contents in a directory:
Syntax: hdfsdfs –ls <path>
Eg. hdfsdfs –ls /chp
5. To display contents in a file: Syntax: hdfsdfs –cat <path>
Eg. hdfsdfs –cat /chp/abc1.txt
6. Copy file from hdfs to local file system:
Syntax: hdfsdfs –copyToLocal<src<dst>
Eg. hdfsdfs –copyToLocal /chp/abc1.txt
/home/hadoop/Desktop/sample.txt
7. To display last few lines of a file:
Syntax: hdfsdfs –tail <path>
Eg. hdfsdfs –tail /chp/abc1.txt
8. Display aggregate length of file in bytes:
Syntax: hdfsdfs –du <path>
Eg. hdfsdfs –du /chp
9. To count no.of directories, files and bytes under given path:
Syntax: hdfsdfs –count <path>
Eg. hdfsdfs –count /chp o/p: 1 1
60
10. Remove a directory from hdfs Syntax: hdfsdfs –rmr<path>
Eg. hdfsdfsrmr /chp
Hadoop Configuration
Hadoop must have its configuration set appropriately to run
in distributed mode on a cluster.
There are a handful of files for controlling the configuration
of a Hadoop installation; the most important ones are listed in
Table
These files are all found in the etc/hadoop directory of the
Hadoop distribution.
The configuration directory can be relocated to another part
of the filesystem (outside the Hadoop installation, which makes
upgrades marginally easier) as long as daemons are started with
the --config option (or, equivalently, with the
HADOOP_CONF_DIR environment variable set) specifying the
location of this directory on the local filesystem.
Configuration Management
Hadoop does not have a single, global location for
configuration information. Instead, each Hadoop node in the
cluster has its own set of configuration files, and it is up to
administrators to ensure that they are kept in sync across the
system. Using parallel shell tools such as dsh or pdsh. This is an
area where Hadoop cluster management tools like Cloudera
Manager and Apache Ambari really shine, since they take care of
propagating changes across the cluster.
Hadoop is designed so that it is possible to have a single set
of configuration files that are used for all master and worker
machines. The great advantage of this is simplicity, both
conceptually (since there is only one configuration to deal with)
and operationally (as the Hadoop scripts are sufficient to manage
a single configuration setup). For some clusters, the one-size-fits-
all configuration model breaks down. For example, if you expand
the cluster with new machines that have a different hardware
specification from the existing ones, you need a different
configuration for the new machines to take advantage of their
extra resources. In these cases, you need to have the concept of a
class of machine and maintain a separate configuration for each
class. Hadoop doesn’t provide tools to do this, but there are
several excellent tools for doing precisely this type of
configuration management, such as Chef, Puppet, CFEngine,
and Bcfg2. For a cluster of any size, it can be a challenge to keep
all of the machines in sync.
Consider what happens if the machine is unavailable when
you push out an update. Who ensures it gets the update when it
becomes available? This is a big problem and can lead to
divergent installations, so even if you use the Hadoop control
scripts for managing Hadoop, it may be a good idea to use
configuration management tools for maintaining the cluster.
These tools are also excellent for doing regular maintenance, such
as patching security holes and updating system packages
Environment Settings
We consider how to set the variables in hadoop-env.sh.
There are also analogous configuration files for MapReduce
and YARN (but not for HDFS), called
mapred-env.sh and yarn-env.sh, where variables pertaining to
those components can be set.
Note that the MapReduce and YARN files override the
values set in hadoop-
env.sh.
Java The location of the Java implementation to use is determined
by the JAVA_HOME setting in hadoop-env.sh or the
JAVA_HOME shell environment variable, if not set in
hadoop-env.sh.
It’s a good idea to set the value in hadoop-env.sh, so that it
is clearly defined in one place and to ensure that the whole cluster
is using the same version of Java.
Memory heap size
By default, Hadoop allocates 1,000 MB (1 GB) of memory
to each daemon it runs. This is controlled by the
HADOOP_HEAPSIZE setting in hadoop-env.sh.
There are also environment variables to allow you to change
the heap size for a single daemon. For example, you can
set YARN_RESOURCEMANAGER_HEAPSIZE in
yarn-env.sh to override the heap size for the resource
manager.
Surprisingly, there are no corresponding environment
variables for HDFS daemons, despite it being very common to
give the namenode more heap space.
In addition to the memory requirements of the daemons, the
node manager allocates containers to applications, so we need to
factor these into the total memory footprint of a worker machine;
How Much Memory Does a Namenode Need?
A namenode can eat up memory, since a reference to every
block of every file is maintained in memory. It’s difficult to give
a precise formula because memory usage depends on the number
of blocks per file, the filename length, and the number of
directories in the filesystem; plus, it can change from one Hadoop
release to another.
The default of 1,000 MB of namenode memory is normally
enough for a few million files, but as a rule of thumb for sizing
purposes, you can conservatively allow 1,000 MB per million
blocks of storage. For example, a 200-node cluster with 24 TB of
disk space per node, a block size of 128 MB, and a replication
factor of 3 has room for about 2 million blocks (or more): 200 ×
24,000,000 MB ⁄ (128 MB × 3). So in this case, setting the
namenode memory to 12,000 MB would be a good starting point.
You can increase the namenode’s memory without changing the
memory allocated to other Hadoop daemons by setting
HADOOP_NAMENODE_OPTS in hadoop-env.sh to include
a JVM option for setting the memory size.
HADOOP_NAMENODE_OPTS allows you to pass extra
options to the namenode’s JVM. So, for example, if you were
using a Sun JVM-Xmx2000m would specify that 2,000 MB of
memory should be allocated to the name node.
If you change the namenode’s memory allocation, don’t
forget to do the same for the secondary namenode (using the
HADOOP_SECONDARYNAMENODE_OPTS variable),
since its memory requirements are comparable to the primary
namenode’s.
System logfiles
System logfiles produced by Hadoop are stored in
$HADOOP_HOME/logs by default.
This can be changed using the HADOOP_LOG_DIR
setting in hadoop-env.sh. It’s a good idea to change this so that
logfiles are kept out of the directory that Hadoop is installed in.
Changing this keeps logfiles in one place, even after the
installation directory changes due to an upgrade. A common
choice is /var/log/hadoop, set by including the following line in
hadoop-env.sh:
export HADOOP_LOG_DIR=/var/log/hadoop
The log directory will be created if it doesn’t already exist.
(If it does not exist, confirm that the relevant Unix Hadoop user
has permission to create it.)
Each Hadoop daemon running on a machine produces two
logfiles.
● The first is the log output written via log4j. This file, whose
name ends in .log, should be the first port of call when
diagnosing problems because most application log messages
are written here. The standard Hadoop log4j configuration
uses a daily rolling file appender to rotate logfiles. Old
logfiles are never deleted, so you should arrange for them to
be periodically deleted or archived, so as to not run out of
disk space on the local node.
● The second logfile is the combined standard output and
standard error log. This logfile, whose name ends in .out,
usually contains little or no output, since Hadoop uses log4j
for logging. It is rotated only when the daemon is restarted,
and only the last five logs are retained. Old logfiles are
suffixed with a number between 1 and 5, with 5 being the
oldest file.
Logfile names (of both types) are a combination of
the name of the user running the daemon, the
daemon name, and the machine hostname.
For example,
hadoop-hdfs-datanode-ip-10-45-174-112.log.2014-09-20 is the
name of a logfile after it has been rotated. This naming structure
makes it possible to archive logs from all machines in the cluster
in a single directory, if needed, since the filenames are unique.
The username in the logfile name is actually the default for
the HADOOP_IDENT_STRING setting in hadoop-env.sh. If
you wish to give the Hadoop instance a different identity for the
purposes of naming the logfiles, change
HADOOP_IDENT_STRING to be the identifier you want.
SSH settings
The control scripts allow you to run commands on (remote)
worker nodes from the master node using SSH. It can be useful to
customize the SSH settings, for various reasons. For example, you
may want to reduce the connection timeout (using the
ConnectTimeout option) so the control scripts don’t hang around
waiting to see whether a dead node is going to respond.
Obviously, this can be taken too far. If the timeout is too low, then
busy nodes will be skipped, which is bad.
Another useful SSH setting is StrictHostKeyChecking,
which can be set to no to automatically add new host keys to the
known hosts files. The default, ask, prompts the user to confirm
that the key fingerprint has been verified, which is not a suitable
setting in a large cluster environment.To pass extra options to
SSH, define the HADOOP_SSH_OPTS environment variable
in hadoop-env.sh.
Important Hadoop Daemon Properties
Hadoop has a bewildering number of configuration
properties. These properties are set in the Hadoop site files: core-
site.xml, hdfs-site.xml, and yarn-site.xml.
To find the actual configuration of a running daemon, visit
the /conf page on its web server. For example, http://resource-
manager-host:8088/conf shows the configuration that the
resource manager is running with. This page shows the combined
site and default configuration files that the daemon is running
with, and also shows which file each property was picked up
from.
Example 10-1. A typical core-site.xml configuration file
Example 10-2. A typical hdfs-site.xml configuration file
Example 10-3. A typical yarn-site.xml configuration file
HDFS
To run HDFS, you need to designate one machine as a
namenode. In this case, the property fs.defaultFS is an HDFS
filesystem URI whose host is the namenode’s host name or IP
address and whose port is the port that the namenode will listen
on for RPCs. If no port is specified, the default of 8020 is used.
The fs.defaultFS property also doubles as specifying the
default filesystem. The default filesystem is used to resolve
relative paths, which are handy to use because they save typing
(and avoid hardcoding knowledge of a particular namenode’s
address). For example, with the default filesystem defined in
Example 10-1, the relative URI/a/b is resolved to
hdfs://namenode/a/b. There are a few other configuration
properties you should set for HDFS: those that set the storage
directories for the namenode and for datanodes.
The property dfs.namenode.name.dir specifies a list of
directories where the namenode stores persistent filesystem
metadata (the edit log and the filesystem image).
A copy of each metadata file is stored in each directory for
redundancy. It’s common to configure dfs.namenode.name.dir so
that the namenode metadata is written to one or two local disks,
as well as a remote disk, such as an NFS-mounted directory. Such
a setup guards against failure of a local disk and failure of the
entire namenode, since in both cases the files can be recovered
and used to start a new namenode. (The secondary namenode
takes only periodic checkpoints of the namenode, so it does not
provide an up-to-date backup of the namenode.)
You should also set the dfs.datanode.data.dir property,
which specifies a list of directories for a datanode to store its
blocks in. Unlike the namenode, which uses multiple directories
for redundancy, a datanode round-robins writes between its
storage directories, so for performance you should specify a
storage directory for each local disk. Read performance also
benefits from having multiple disks for storage, because blocks
will be spread across them and concurrent reads for distinct
blocks will be correspondingly spread across disks.
Finally, you should configure where the secondary
namenode stores its checkpoints of the filesystem. The
dfs.namenode.checkpoint.dir property specifies a list of
directories where the checkpoints are kept. Like the storage
directories for the namenode, which keep redundant copies of the
namenode metadata, the checkpointed filesystem image is stored
in each checkpoint directory for redundancy. Table 10-2
summarizes the important configuration properties for HDFS.
Table 10-2. Important HDFS daemon properties
YARN
To run YARN, you need to designate one machine as a
resource manager. The simplest way to do this is to set the
property yarn.resourcemanager.hostname to the hostname or IP
address of the machine running the resource manager. Many of
the resource manager’s server addresses are derived from this
property. For example, yarn.resourcemanager.address takes the
form of a host-port pair, and the host defaults to
yarn.resourcemanager.hostname.
In a MapReduce client configuration, this property is used to
connect to the resource manager over RPC. During a MapReduce
job, intermediate data and working files are written to temporary
local files. Because this data includes the potentially very large
output of map tasks, you need to ensure that the
yarn.nodemanager.local-dirs property, which controls the
location of local temporary storage for YARN containers, is
configured to use disk partitions that are large enough. The
property takes a comma-separated list of directory names, and
you should use all available local disks to spread disk I/O (the
directories are used in round-robin fashion).
Typically, you will use the same disks and partitions (but
different directories) for YARN local storage as you use for
datanode block storage, as governed by the
dfs.datanode.data.dir property.
Unlike MapReduce 1, YARN doesn’t have tasktrackers to
serve map outputs to reduce tasks, so for this function it relies on
shuffle handlers, which are long-running auxiliary services
running in node managers. Because YARN is a general-purpose
service, the MapReduce shuffle handlers need to be enabled
explicitly in yarn-site.xml by setting the
yarn.nodemanager.aux-services property to
mapreduce_shuffle. Table 10-3 summarizes the important
configuration properties for YARN. Table 10-3. Important
YARN daemon properties
Memory settings in YARN and MapReduce
YARN treats memory in a more fine-grained manner than
the slot-based model used in MapReduce 1. Rather than
specifying a fixed maximum number of map and reduce slots that
may run on a node at once, YARN allows applications to request
an arbitrary amount of memory (within limits) for a task. In the
YARN model, node managers allocate memory from a pool, so
the number of tasks that are running on a particular node depends
on the sum of their memory requirements, and not simply on a
fixed number of slots. The calculation for how much memory to
dedicate to a node manager for running containers depends on the
amount of physical memory on the machine. Each Hadoop
daemon uses 1,000 MB, so for a datanode and a node manager,
the total is 2,000 MB. Set aside enough for other processes that
are running on the machine, and the remainder can be dedicated
to the node manager’s containers by setting the configuration
property
yarn.nodemanager.resource.memory-mb to the total allocation
in MB. (The default is 8,192 MB, which is normally too low for
most setups.).
The next step is to determine how to set memory options for
individual jobs. There are two main controls: one for the size of
the container allocated by YARN, and another for the heap size of
the Java process run in the container. Container sizes are
determined by mapreduce.map.memory.mb and
mapreduce.reduce.memory.mb; both default to 1,024 MB.
These settings are used by the application master when
negotiating for resources in the cluster, and also by the node
manager, which runs and monitors the task containers. The heap
size of the Java process is set by mapred.child.java.opts, and
defaults to 200 MB. You can also set the Java options separately
for map and reduce tasks (see Table 10-4).
Table 10-4. MapReduce job memory properties (set by the
client)
For example, suppose mapred.child.java.opts is set to -
Xmx800m and mapreduce.map.memory.mb is left at its default
value of 1,024 MB.
When a map task is run, the node manager will allocate a
1,024 MB container (decreasing the size of its pool by that
amount for the duration of the task) and will launch the task JVM
configured with an 800 MB maximum heap size.
Note that the JVM process will have a larger memory
footprint than the heap size, and the overhead will depend on such
things as the native libraries that are in use, the size of the
permanent generation space, and so on. The important thing is
that the physical memory used by the JVM process, including any
processes that it spawns, such as Streaming processes, does not
exceed its allocation (1,024 MB). If a container uses more
memory than it has been allocated, then it may be terminated by
the node manager and marked as failed. YARN schedulers impose
a minimum or maximum on memory allocations. The default
minimum is 1,024 MB (set by
yarn.scheduler.minimum-allocation-mb), and the de fault
maximum is 8,192 MB (set by yarn.scheduler.maximum-
allocation-mb). There are also virtual memory constraints that a
container must meet. If a container’s virtual memory usage
exceeds a given multiple of the allocated physical memory, the
node manager may terminate the process. The multiple is
expressed by the
yarn.nodemanager.vmem-pmem-ratio property, which
defaults to 2.1. In the example used earlier, the virtual memory
threshold above which the task may be terminated is 2,150 MB,
which is 2.1 × 1,024 MB. When configuring memory parameters
it’s very useful to be able to monitor a task’s actual memory usage
during a job run, and this is possible via
MapReduce task counters. The counters
PHYSICAL_MEMORY_BYTES,
VIRTUAL_MEMORY_BYTES, and
COMMITTED_HEAP_BYTES provide snapshot values of
memory usage and are therefore suitable for observation during
the course of a task attempt. Hadoop also provides settings to
control how much memory is used for MapReduce operations.
CPU settings in YARN and MapReduce
In addition to memory, YARN treats CPU usage as a
managed resource, and applications can request the number of
cores they need.
The number of cores that a node manager can allocate to
containers is controlled by the
yarn.nodemanager.resource.cpu.vcores property. It should be
set to the total number of cores on the machine, minus a core for
each daemon process running on the machine (datanode, node
manager, and any other long-running processes). MapReduce
jobs can control the number of cores allocated to map and reduce
containers by setting mapreduce.map.cpu.vcores and
mapreduce.reduce.cpu.vcores. Both de fault to 1, an appropriate
setting for normal single-threaded MapReduce tasks, which can
only saturate a single core.
Hadoop Daemon Addresses and Ports
Hadoop daemons generally run both an RPC server for
communication between daemons (Table 10-5)
and an HTTP server to provide web pages for human consumption
(Table 10-6).
Each server is configured by setting the network address and
port number to listen on. A port number of 0 instructs the server
to start on a free port, but this is generally discouraged because it
is incompatible with setting cluster-wide firewall policies. In
general, the properties for setting a server’s RPC and HTTP
addresses serve double duty: they determine the network interface
that the server will bind to, and they are used by clients or other
machines in the cluster to connect to the server. For example, node
managers use the
yarn.resourcemanager.resource-tracker.address property to
find the address of their resource manager. It is often desirable for
servers to bind to multiple network interfaces, but setting the
network address to 0.0.0.0, which works for the server, breaks the
second case, since the address is not resolvable by clients or other
machines in the cluster. One solution is to have separate
configurations for clients and servers, but a better way is to set
the bind host for the server. By setting
yarn.resourcemanager.hostname to the (externally re solvable)
hostname or IP address and yarn.resourcemanager.bind-host to
0.0.0.0, you ensure that the resource manager will bind to all
addresses on the machine, while at the same time providing a
resolvable address for node managers and clients. In addition to
an RPC server, datanodes run a TCP/IP server for block transfers.
The server address and port are set by the dfs.datanode.address
property , which has a default value of
0.0.0.0:50010.
There is also a setting for controlling which network interfaces
the datanodes use as their IP addresses (for HTTP and RPC
servers). The relevant property is dfs.datanode.dns.interface,
which is set to default to use the default network interface. You
can set this explicitly to report the address of a particular interface
(eth0, for example)
MapReduce Framework
Hadoop MapReduce is a software framework that makes it
simple to write programs that process enormous volumes of data
(multi-terabyte data sets) in parallel on large clusters (thousands
of nodes) of commodity hardware in a reliable, fault-tolerant way.
A MapReduce job divides the input data set into separate
chunks, which are then processed in parallel by the map jobs. The
reduction jobs are fed the outputs of the maps, which are then
sorted by the framework.
In most cases, the job’s input and output are stored in a file
system.
Tasks are scheduled, monitored, and failed tasks are re-
executed by the framework.
Phases of the MapReduce model
MapReduce model has three major and one optional phase:
1. Mapper
● It is the first phase of MapReduce programming and contains
the coding logic of the mapper function.
● The conditional logic is applied to the ‘n’ number of data
blocks spread across various data nodes.
● Mapper function accepts key-value pairs as input as (k, v),
where the key represents the offset address of each record
and the value represents the entire record content.
● The output of the Mapper phase will also be in the key-value
format as (k’, v’).
2. Shuffle and Sort
● The output of various mappers (k’, v’), then goes into
Shuffle and Sort phase.
● All the duplicate values are removed, and different values
are grouped together based on similar keys.
● The output of the Shuffle and Sort phase will be key-value
pairs again as key and array of values (k, v[]).
3. Reducer
● The output of the Shuffle and Sort phase (k, v[]) will be the
input of the Reducer phase.
● In this phase reducer function’s logic is executed and all the
values are aggregated against their corresponding keys.
● Reducer consolidates outputs of various mappers and
computes the final job output.
● The final output is then written into a single file in an output
directory of HDFS.
4. Combiner
● It is an optional phase in the MapReduce model.
● The combiner phase is used to optimize the performance of
MapReduce jobs.
● In this phase, various outputs of the mappers are locally
reduced at the node level.
● For example, if different mapper outputs (k, v) coming from
a single node contains duplicates, then they get combined i.e.
locally reduced as a single (k, v[]) output.
● This phase makes the Shuffle and Sort phase work even
quicker thereby enabling additional performance in
MapReduce jobs.
All these phases in a MapReduce job can be depicted as below
For example, MapReduce logic to find the word count on an array
of words can be shown as below:
fruits_array = [apple, orange, apple, guava, grapes,
orange, apple]
● The mapper phase tokenizes the input array of words into
the ‘n’ number of words to give the output as (k, v). For
example, consider ‘apple’. Mapper output will be (apple, 1),
(apple, 1), (apple, 1).
● Shuffle and Sort accept the mapper (k, v) output and group
all values according to their keys as (k, v[]). i.e. (apple, [1,
1, 1]).
● The Reducer phase accepts Shuffle and sort output and gives
the aggregate of the values (apple, [1+1+1]), corresponding
to their keys. i.e. (apple, 3).
Speculative Execution of MapReduce Work
The speed of MapReduce is dominated by the slowest task.
So, to up the speed, a new mapper will work on the same dataset
at the same time. Whichever completes the task first is considered
as the final output and the other one is killed.
It is an optimization technique.
MRUnit
MRUnit is a JUnit-based java library to facilitate unit testing
of Hadoop MapReduce jobs by providing drivers and mock
objects to simulate the Hadoop runtime environment of a map
reduce job.
This makes it easy to develop as well as to maintain Hadoop
MapReduce code bases. This code base already exists as a
subproject of the Apache Hadoop MapReduce project and lives in
the "contrib" directory of the source tree.
MRUnit supports testing Mappers and Reducers separately
as well as testing MapReduce computations as a whole.
With MRUnit, you can
● craft test input
● push it through your mapper and/or reducer
● verify its output all in a JUnit test
As do other JUnit tests, this allows you to debug your code
using JUnit test as a driver.
Map/reduce pair can be tested using MRUnit’s MapReduce
Driver. A combiner can be tested using Map Reduce Driver as
well.
A Pipeline Map Reduce Driver allows you to test a workflow
of map/reduce jobs.
Currently, partitioners do not have a test driver under
MRUnit.
MRUnit allows you to do TDD (Test Driven
Development) and write light weight unit tests which
accommodate Hadoop’s specific architecture and constructs.
How a MapReduce Job Run Works
There is only one function you need to use to start a
MapReduce job: submit() on a Job object (you can also use
waitForCompletion(), which submits the task if it hasn't already
and then waits for it to finish). This method call hides a lot of
processing that occurs in the background.
In the provided figure, the entire procedure is depicted.
There are five distinct entities at the highest level:
● MapReduce job submission by the client.
● The cluster's compute resource allocation is managed by the
YARN resource management.
● Launching and keeping an eye on the computing containers
on cluster machines are the YARN node managers.
● The master MapReduce application, which manages the
tasks involved in running the MapReduce operation.
● Running in containers that are scheduled by the resource
manager and controlled by the node managers are the
application master and the MapReduce tasks.
● The distributed filesystem is used to distribute job files
among the many entities
HowMapReduceWorks
● Step 1: When a client submits a MR job. The submit() will
create an instance and calls the method submitJobInternal().
After the job is submitted a method waitForCompletion()
will poll the process of the job and acted upon whether the
job is a success or failure.
● Step 2,3,4: Before the job instance gets to
WaitForCompletion(), it will try to get a new application ID
from the resource Manager, checks the input and output of
the job, copy resources such as the configs, JAR required to
run the job. If all is successful it will submit the application
to the Resource Manager.
● Step 5a: The Resource Manager will then start the YARN
scheduler which will allocate a container for the resource
manager to launch an application master process in the Node
Manager.
● Step 5b,6: The Node manager know in this example the
application is a Map Reduce program and initialise a job with
a MRAppMaster application.
● Step 7: The MRAppMaster will then retrieves the data
blocks from the shared Filesystem (which is HDFS in this
example) and create a map task object for each split, and the
reduce task objects.
● Step 8,9,10,11: Once a task has been assigned resources for
a container on a node by the resource manager scheduler, the
application master starts a container by contacting the node
manager. Finally the node manager will runs the map or
reduce task.
● When the last task for your job is complete the application
master will get notified and changes the job status to
‘successfully. When the job polls this status, it will tell the
client and return from the waitForComplete() method.
● Lastly the application master and task containers will clean
up their working status and remove any intermediate outputs.
Failure handling in MapReduce
Many times, when you run your MapReduce application, it
becomes imperative to handle errors that can occur when your
complex processing of data is in progress. If it is not handled
aggressively, it may cause failure and take your output into
inconsistent state. Such situations may require a lot of human
intervention to cleanse the data and re-run it. So, handling
expected failures much in advance in the code and configuration
helps a lot.
There could be different types of error; let's look at common
errors:
The first two errors can be handled by your program (in fact
run-time errors can be handled only partially). Errors pertaining
to the system, network, and cluster will get handled automatically,
thanks to Apache Hadoop's distributed multi-node High
Availability cluster.
● Run-time errors:
o Errors due to failure of tasks — child
tasks o Issues pertaining to resources
The child task fails at times, for unforeseen reasons
such as user-written code through RuntimeException or
processing resource timeout. These errors get logged into the
user logging file for Hadoop. For both map and reduce
functions, the Hadoop configuration provides
mapreduce.map.maxattempts for Map tasks and
mapreduce.reduce.maxattempts with the default value 4.
This means if a task fails a maximum of four times and it
fails again, the job will be marked as failed.
● Data errors:
o Errors due to bad input records o
Malformed data errors
When it comes down to handling bad records, you need
to have conditions to detect such records, log them, and
ignore them. One such example is the use of a counter to
keep track of such records. Apache provides a way to keep
track of different entities, through its counter mechanism.
There are system-provided counters, such as bytes read and
number of map tasks; we have seen some of them in Job
History APIs. In addition to that, users can also define their
own counters for tracking. So, your mapper can be enriched
to keep track of these counts; look at the following example:
if (color not red condition true)
{
context.getCounter(COLOR.NOT_RED).increment(1);
}
Or, you can handle your exception, as follows:
catch (NullPointerException npe)
{ context.getCounter(EXCEPTION.NPE).increment(1);
}
You can then get the final count through job history
APIs or from the Job instance directly, as follows:
….
job.waitForCompletion(true);
Counters counters = job.getCounters();
Counter cl = counters.findCounter(COLOR.NOT_RED);
System.out.println("Errors" + cl .getDisplayName()+":" +
cl.getValue());
If a Mapper or Reducer terminates for any reason, the
counters will be reset to zero, so you need to be careful.
Similarly, you may connect to a database and pass on the
status or alternatively log it in the logger. It all depends upon
how you are planning to act on the output of failures. For
example, if you are planning to process the failed records
later, then you cannot keep the failure records in the log file,
as it would require script or human intervention to extract it.
Well-formed data cannot be guaranteed when you work
with very large datasets so, in such cases, your mapper and
reducer need to handle even the key and value fields. For
example, text data needs to have a maximum length of line,
to ensure that no junk is getting in. Typically, such data is
ignored by Hadoop programs, as most of the applications of
Hadoop look at analytics over large-scale data, unlike any
other transaction system, which requires each data element
and its dependencies
● Other errors: o
System issues o
Cluster issues o
Network issues
Role of Hbase in Big Data Processing
HBase
● HBase is a distributed column-oriented database built on top
of the Hadoop file system.
● It is a part of the Hadoop ecosystem that provides random
real-time read/write access to data in the Hadoop File
System.
● HBase provides low latency random read and write access to
petabytes of data by distributing requests from applications
across a cluster of hosts. Each host has access to data in
HDFS and S3, and serves read and write requests in
milliseconds.
● Apache HBase is an open-source, distributed, versioned,
non-relational database modeled after Google's Bigtable
designed to provide quick random access to huge amounts of
structured data.
● It is horizontally scalable.
● It leverages the fault tolerance provided by the HDFS.
● One can store the data in HDFS either directly or through
HBase.
● Hadoop does not support CRUD operations
● HBASE support CRUD operations
History
● The HBase project was started toward the end of 2006 by
Chad Walters and Jim Kellerman at Powerset. It was
modelled after Google’s Bigtable, which had just been
published.
● In February 2007, Mike Cafarella made a code drop
of a mostly working system that Jim Kellerman then carried
forward.
● The first HBase release was bundled as part of Hadoop 0.1
in October 2007.
● In May 2010, HBase graduated from a Hadoop subproject to
become an Apache Top Level Project.
● Today, HBase is a mature technology used in production
across a wide range of industries.
HBase and HDFS
HDFS HBase
HDFS is a distributed file HBase is a database built on
system suitable for storing top of the HDFS.
large files.
HDFS does not support fast HBase provides fast lookups
individual record lookups. for larger tables.
It provides high latency batch
processing It provides low latency access
to single rows from billions of
records (Random access).
It provides only sequential
access of data. HBase internally uses Hash
tables and provides random
access, and it stores the data in
indexed HDFS files for faster
lookups.
Features
● Linear and modular scalability.
● Strictly consistent reads and writes.
● Automatic and configurable sharding of tables
● Automatic fail over support between RegionServers.
● It integrates with Hadoop, both as a source and a destination.
● Convenient base classes for backing Hadoop MapReduce
jobs with Apache HBase tables.
● Easy to use Java API for client access.
● It provides data replication across clusters.
● Block cache (HBase supports block cache to improve read
performance. When performing a scan, if block cache is
enabled and there is room remaining, data blocks read from
StoreFiles on HDFS are cached in region server's Java heap
space, so that next time, accessing data in the same block can
be served by the cached block) is used for real-time queries
● Bloom Filters(A bloom filter is a probabilistic data structure
that is based on hashing) used for real-time queries. Query
predicate push down via server side
Filters.
● Thrift gateway (HBase Thrift Gateway includes an API and
a service that accepts Thrift requests to connect to HPE
Ezmeral Data Fabric Database and HBase tables) and a
REST-ful(RESTful API is an interface that two computer
systems use to exchange information securely over the
internet) Web service that supports XML, Protobuf, and
binary data encoding options
● Extensible jruby-based (JIRB) shell.
● Companies such as Facebook, Twitter, Yahoo, and Adobe
use HBase internally.
Limitations of Hadoop
Hadoop can perform only batch processing, and data will be
accessed only in a sequential manner. That means one has to
search the entire dataset even for the simplest of jobs.
A huge dataset when processed results in another huge data
set, which should also be processed sequentially. At this point, a
new solution is needed to access any point of data in a single unit
of time (random access).
Hadoop Random Access Databases
Applications such as HBase, Cassandra, couchDB, Dynamo,
and MongoDB are some of the databases that store huge amounts
of data and access the data in a random manner.
Storage Mechanism in HBase
HBase is a column-oriented database and the tables in it
are sorted by row.
The table schema defines only column families, which are
the key value pairs.
A table have multiple column families and each column
family can have any number of columns.
Subsequent column values are stored contiguously on the
disk.
Each cell value of the table has a timestamp.
In short, in an HBase:
● Table is a collection of rows.
● Row is a collection of column families.
● Column family is a collection of columns.
● Column is a collection of key value pairs.
Given below is an example schema of table in HBase.
Rowi Column Column Column Column
d Family Family Family Family
col col col col col col col col col col col col
1 2 3 1 2 3 1 2 3 1 2 3
Column Oriented and Row Oriented
Column-oriented databases are those that store data tables as
sections of columns of data, rather than as rows of data.
Shortly, they will have column families.
Row-Oriented Database Column-Oriented
Database
It is suitable for Online It is suitable for Online
Transaction Process (OLTP). Analytical Processing
(OLAP).
Such databases are designed for Column-oriented databases
small number of rows and are designed for huge tables.
columns.
The following image shows column families in a
column-oriented database:
HBase and RDBMS
HBase RDBMS
An RDBMS is governed by
HBase is schema-less, its schema, which describes
It doesn't have the concept of the whole structure of tables.
fixed columns schema;
defines only column families.
It is built for wide tables. HBase It is thin and built for small
is horizontally scalable. tables. Hard to scale.
No transactions are there in RDBMS is transactional.
HBase.
It has de-normalized data. It will have normalized data.
It is good for semi-structured as It is good for structured
well as structured data. data.
Is a column oriented data store Is arrow oriented data store
Supports automatic partitioning
Has no built-in support for
partitioning
LSM Tree B+ Tree
HBase - Architecture
In HBase, tables are split into regions and are served by the
region servers. Regions are vertically divided by column families
into “Stores”. Stores are saved as files in HDFS. Shown below is
the architecture of HBase.
Note: The term ‘store’ is used for regions to explain the storage
structure.
HBase has three major components: the client library, a
master server, and region servers. Region servers can be added or
removed as per requirement.
Regions
● Regions are nothing but tables that are split up and spread across
the region servers.
● Regions Tables are automatically partitioned horizontally by
HBase into regions. Each region comprises a subset of a table’s
rows. A region is denoted by the table it belongs to.
Region server
The region servers have regions that -
● Communicate with the client and handle data-related operations.
● Handle read and write requests for all the regions under it.
● Decide the size of the region by following the region size
thresholds.
● The store contains memory store and HFiles.
● Memstore is just like a cache memory. Anything that is entered into
the HBase is stored here initially.
● Later, the data is transferred and saved in Hfiles as blocks and the
memstore is flushed.
● When we take a deeper look into the region server, it contain
regions and stores as shown below:
Master Server
● Assigns regions to the region servers and takes the help of Apache
ZooKeeper for this task.
● Handles load balancing of the regions across region servers.
Maintains the state of the cluster by negotiating the load balancing.
● It unloads the busy servers and shifts the regions to less occupied
servers.
● Is responsible for schema changes and other metadata operations
such as creation of tables and column families.
Zookeeper
● Zookeeper is an open-source project that provides services like
maintaining configuration information, naming, providing
distributed synchronization, etc.
● Zookeeper has ephemeral nodes representing different region
servers. Master servers use these nodes to discover available
servers.
● In addition to availability, the nodes are also used to track server
failures or network partitions.
● Clients communicate with region servers via zookeeper.
● In pseudo and standalone modes, HBase itself will take care of
zookeeper.
HBase made up of an HBase master node orchestrating a
cluster of one or more regionserver workers.
HBase Responsibilities Summary
HBase Proper
● Perform administrative operations on the cluster
● Apply DDL actions for creating or altering
HMaster
tables
● Assign and distribute regions among
RegionServers (stored in META system table)
● Conduct region load balancing across
RegionServers
● Handle RegionServer failures by assigning the
Region to another RegionServer
The master node is lightly loaded.
RegionServer ● Function as clients that buffer I/O operations
s store/access data on HDFS
● Host MemStore per column-family
● Manage WAL per column-family
● Manage one or more regions
● Typically are collocated on the same hardware
as HDFS DataNodes
● Manage field client read/write requests.
● They also manage region splits, informing the
HBase master about the new daughter regions
so it can manage the offlining of parent
regions and assignment of the replacement
daughters.
● Are used to balance the data processing load
● Rows of a table are first stored in a single
Regions Region
● Rows are spread across Regions, in key ranges, as
data in the table grows
Leveraging Hadoop
● Store HLogs; write ahead log (WAL) files
HDFS ● Store HFiles persisting all columns in a
column-family
ZooKeeper ● Track location of META system table
● Receive heartbeat signals from HMaster and
RegionServers
● Provide HMaster with RegionServer failure
notifications
● Initiate HMaster fail-over protocol
Creating a Table using HBase Shell
The syntax to create a table in HBase shell is shown below.
create ‘<table name>’,’<column family>’ EX:
create ‘CustomerContactInformation’,’ CustomerName’ , ’
ContactInfo’
In the HBase data model column qualifiers are specific
names assigned to your data values in order to make sure you’re
able to accurately identify them
PIG
● Apache Pig is a platform for data anlaysis
● Apache Pig is a high-level programming language and
scripting platform that runs on HADOOP clusters designed
for analyzing large data sets.
● Pig is extensible, self-optimizing, and easily programmed.
● Programmers can use Pig to write data transformations
without knowing Java.
● Pig uses both structured and unstructured data as input to
perform analytics and uses HDFS to store the results.
● It is an alternative to Map Reduce Pogramming
Why Pig?
● In the MapReduce framework, programs are required to be
translated into a sequence of Map and Reduce stages.To
eliminate the complexities associated with MapReduce an
abstraction called Pig was built on top of Hadoop.
● Developers who are not good at Java struggle a lot while
working with Hadoop, especially when executing tasks
related to the MapReduce framework. Apache Pig is the best
solution for all such programmers.
● Pig Latin simplifies the work of programmers by eliminating
the need to write complex codes in java for performing
MapReduce tasks.
● The multi-query approach of Apache Pig reduces the length
of code drastically and minimizes development time.
● Pig Latin is almost similar to SQL and if you are familiar
with SQL then it becomes very easy for you to learn
History of Apache Pig
✔In 2006, Apache Pig was developed as a research project at
Yahoo, especially to create and execute MapReduce jobs on
every dataset.
✔In 2007, Apache Pig was open sourced via Apache
incubator.
✔In 2008, the first release of Apache Pig came out.
✔In 2010, Apache Pig graduated as an Apache top-level
project.
Pig Philosophy
Apache Pig Architecture
As shown in the above diagram, Apache pig consists of
various components.
Parser : All the Pig scripts initially go through this parser
component. It conducts various checks which include syntax
checks of the script, type checking, and other miscellaneous
checks.
The Parser components produce output as a DAG (directed
acyclic graph) which depicts the Pig Latin logical operators and
logical statements. In the DAG the data flows are shown as edges
and the logical operations represent Pig Latin statements.
Optimizer: The Direct Acyclic Graph is passed to the
logical optimizer, which performs logical optimizations such as
pushdown and projection.
Compiler : The compiler component transforms the
optimized logical plan into a sequence of MapReduce jobs.
Execution Engine: This component submits all the
MapReduce jobs in sorted order to the Hadoop. Finally, all the
MapReduce jobs are executed on Apache Hadoop to produce
desired results.
Pig Components / Anatomy of Pig
1. PIG LATIN SCRIPT : contains syntax, data model and
operators. A Language used to express data flows
2. EXECUTION ENGINE: An engine on top of HADOOP-2
execution environment, takes the scripts written in Pig Latin
as an input and converts them into MapReduce jobs.
Pig Latin
● Pig Latin is a Hadoop extension that simplifies hadoop
programming by giving a high-level data processing
language. In order to analyze the large volumes of data,
programmers write scripts using Pig Latin language. These
scripts are then transformed internally into Map and Reduce
tasks. This is a highly flexible language and supports users
in developing custom functions for writing, reading and
processing data.
● It enables the resources to focus more on data analysis by
minimizing the time taken for writing Map-Reduce
programs.
Pig Latin – Statemets
● While processing data using Pig Latin, statements are the
basic constructs.
● These statements work with relations. They include
expressions and schemas.
● Every statement ends with a semicolon (;).
● We will perform various operations using operators provided
by Pig Latin, through statements.
● Except LOAD and STORE, while performing all other
operations, Pig Latin statements take a relation as input and
produce another relation as output.
● Pig Latin Statements are generally ordered as follows: o
LOAD statement that reads data from the file system.
o Series of statements to perform transformations
o DUMP or STORE to display/store result.
Example 1:
A = load ‘students’(rollno,name,gpa);
A = filter A by gpa>4.0;
A=foreach A generate UPPER(name);
STORE A INTO ‘myreport’
A is a relation
● As soon as you enter a Load statement in the Grunt shell, its
semantic checking will be carried out. To see the contents of
the schema, you need to use
the Dump operator. Only after performing
the dump operation, the MapReduce job for loading the
data into the file system will be carried out.
Comments in Pig Script
While writing a script in a file, we can include comments in
it as shown below.
Multi-line comments
We will begin the multi-line comments with '/*', end them
with '*/'.
/* These are the multi-line comments
In the pig script */
Single –line comments
We will begin the single-line comments with '--'.
--we can write single line comments like this.
Pig Latin – Arithmetic Operators
The following table describes the arithmetic operators of Pig
Latin.
Suppose a = 10 and b = 20.
Operator Description Example
+ a + b will
Addition: Adds values on either side
of the operator give 30
− a − b will
Subtraction: Subtracts right hand
operand from left hand operand give −10
Multiplication: Multiplies values on a * b will give
* either side of the operator 200
Division: Divides left hand operand by b / a will give
/ right hand operand 2
Modulus: Divides left hand operand
by right hand operand and returns b % a will
% remainder give 0
b = (a == 1)?
20: 30;
if a = 1 the
Bincond: Evaluates the Boolean value of b is
operators. It has three operands as 20.
shown below. if a!=1 the
Variable x = (expression) ? value1 if value of b is
?: true : value2 if false. 30.
CASE f2 % 2
CASE
WHEN 0
WHEN
Case: The case operator is equivalent THEN 'even'
THEN
to nested bincond operator. WHEN 1
ELSE
THEN 'odd'
END
END
Pig Latin – Comparison Operators
The following table describes the comparison operators of Pig
Latin.
Operator Description Example
Equal: Checks if the values of two
(a = = b) is
== operands are equal or not; if yes, then
not true
the condition becomes true.
Not Equal : Checks if the values of two
operands are equal or not. If the values
are not equal, then condition becomes (a != b)
!= true. is true.
Greater than: Checks if the value of
the left operand is greater than the
value of the right operand. If yes, then (a > b) is not
> the condition becomes true. true.
Less than: Checks if the value of the is
left operand is less than the value of the
right operand. If yes, then the condition (a < b)
< becomes true. true.
Greater than or equal to: Checks if is
the value of the left operand is greater
than or equal to the value of the right
operand. If yes, then the condition (a >= b)
>= becomes true. not true.
is
Less than or equal to: Checks if the
value of the left operand is less than or (a <= b)
<=
equal to the value of the right operand. true.
If yes, then the condition becomes true.
Pattern matching: Checks whether
the string in the left-hand side f1 matches
matches
matches with the constant in the '.*tutorial.*'
right-hand side.
Pig Latin – Type Construction Operators
The following table describes the Type construction operators of
Pig Latin.
Operator Description Example
Tuple constructor operator:
This operator is used to
() construct a tuple. (Raju, 30)
Bagconstructor operator:
This operator is used to {(Raju, 30),
{} construct a bag. (Mohammad, 45)}
Map constructor operator :
This operator is used to [name#Raja,
[] construct a tuple. age#30]
Pig Latin – Relational Operations
The following table describes the relational operators of Pig
Latin.
Operat
Description
or
Loading and Storing
To Load the data from the file system (local/HDFS) into
LOAD a relation.
STORE To save a relation to the file system (local/HDFS).
Filtering
FILTER To remove unwanted rows from a relation.
DISTINCT To remove duplicate rows from a relation.
FOREACH, To generate data transformations based on columns
GENERATE of data.
STREAM To transform a relation using an external program.
Grouping and Joining
JOIN To join two or more relations.
COGROU
To group the data in two or more relations.
P
GROUP To group the data in a single relation.
CROSS To create the cross product of two or more relations.
Sorting
ORDE To arrange a relation in a sorted order based on one or
R more fields (ascending or descending).
LIMIT To get a limited number of tuples from a relation.
Combining and Splitting
UNIO
N To combine two or more relations into a single relation.
SPLIT To split a single relation into two or more relations.
Diagnostic Operators
DUMP To print the contents of a relation on the console.
DESCRIBE To describe the schema of a relation.
To view the logical, physical, or MapReduce
EXPLAIN execution plans to compute a relation.
ILLUSTRAT To view the step-by-step execution of a series of
E statements.
Pig Latin – Data types
S.No. Data Type Description & Example
Represents a signed 32-bit integer.
1 int Example : 8
Represents a signed 64-bit integer.
2 long Example : 5L
Represents a sig ned 32-bit floating point.
3 float
Example : 5.5F
Represents a 64-bit floating point.
4 double
Example : 10.5
Represents a character array
5 chararray (string) in Unicode UTF-8 format.
Example : ‘tutorials point’
6 Bytearray Represents a bytearray (blob).
Represents a boolean value. Example : true/
7 Boolean false.
Represents a date-time.
8 Datetime Example : 1970-01-01T00:00:00.000+00:00
Represents a Java Biginteger.
9 Biginteger
Example : 60708090709
Represents a Java Bigdecimal
10 Bigdecimal
Example : 185.98376256272893883
Complex Types
A tuple is an ordered set of fields. Example :
11 Tuple (raja, 30)
A bag is a collection of tuples.
12 Bag Example : {(raju,30),(Mohhammad,45)}
A Map is a set of key-value pairs.
13 Map Example : [ ‘name’#’Raju’, ‘age’#30]
Apache Pig Run Modes
Local Mode
o Here Pig language makes use of a local file system and runs
in a single JVM. The local mode is ideal for analyzing small
data sets. o Here, files are installed and run using localhost.
o The local mode works on a local file system. The input and
output data stored in the local file system.
The command for local mode grunt shell:
$ pig –x local
MapReduce Mode o The MapReduce mode is also
known as Hadoop Mode. o It is the default mode.
o All the queries written using Pig Latin are converted into
MapReduce jobs and these jobs are run on a Hadoop
cluster.
o It can be executed against semi-distributed or
fully distributed Hadoop installation. o Here, the input and
output data are present on HDFS.
The command for Map reduce mode:
$ pig
Ways to execute Pig Program
These are the following ways of executing a Pig program on
local and MapReduce mode:
o Interactive Mode / Grunt mode in anatomy of pig: In this
mode, the Pig is executed in the Grunt shell. To invoke Grunt
shell, run the pig command. Once the Grunt mode executes,
we can provide Pig Latin statements and command
interactively at the command line.
o Batch Mode / script mode in anatomy of pig: In this mode,
we can run a script file having a .pig extension. These files
contain Pig Latin commands.
o Embedded Mode: In this mode, we can define our own
functions. These functions can be called as UDF (User
Defined Functions). Here, we use programming languages
like Java and Python.
Invoking the Grunt Shell
You can invoke the Grunt shell in a desired mode
(local/MapReduce) using the −x option as shown below.
Local mode MapReduce mode
Command − Command −
$ ./pig –x local $ ./pig -x mapreduce
Output − Output −
Either of these commands gives you the Grunt shell prompt
as shown below.
grunt>
You can exit the Grunt shell using ‘ctrl + d’.
After invoking the Grunt shell, you can execute a Pig script
by directly entering the Pig Latin statements in it.
grunt> customers = LOAD 'customers.txt' USING
PigStorage(',');
Executing Apache Pig in Batch Mode
You can write an entire Pig Latin script in a file and execute
it using the –x command. Let us suppose we have a Pig script in
a file named sample_script.pig as shown below.
Sample_script.pig
student= LOAD 'hdfs://localhost:9000/pig_data/student.txt'
USING PigStorage(',')
as (id:int, name:chararray, city:chararray);
Dump student;
Now, you can execute the script in the above file as shown below.
Local mode MapReduce mode
$ pig -x $ pig -x
local Sample_script.pig mapreduce Sample_script.pig
Pig Latin Data Model
● The data model of Pig Latin is fully nested.
● A Relation is the outermost structure of the Pig Latin data
model. And it is a bag where − ● A bag is a collection of
tuples.
● A tuple is an ordered set of fields.
● A field is a piece of data.
● It allows complex non-atomic datatypes such
as map and tuple. Given below is the diagrammatical
representation of Pig Latin’s data model.
● The data model of Pig is fully nested.
As part of its data model, Pig supports following basic types.
Atom
It is a simple atomic value like int, long, double, or string.
Any single value in Pig Latin, irrespective of their data, type
is known as an Atom.
It is stored as string and can be used as string and number.
int, long, float, double, chararray, and bytearray are the atomic
values of Pig.
A piece of data or a simple atomic value is known as
a field.
Example − ‘raja’ or ‘30’
Tuple
It is a sequence of fields that can be of any data type. A
record that is formed by an ordered set of fields is known
as a tuple, the fields can be of any type.
A tuple is similar to a row in a table of RDBMS.
Example − (Raja, 30)
Bag
It is a collection of tuples of potentially varying structures
and can contain duplicates.
A bag is an unordered set of tuples. In other words, a
collection of tuples (non-unique) is known as a bag.
Each tuple can have any number of fields (flexible schema).
A bag is represented by ‘{ }’. It is similar to a table in RDBMS,
but unlike a table in RDBMS, it is not necessary that every tuple
contain the same number of fields or that the fields in the same
position (column) have the same type.
Example − {(Raja, 30), (Mohammad, 45)}
A bag can be a field in a relation; in that context, it is known
as inner bag.
Example − {Raja, 30, {9848022338,
raja@gmail.com,}}
Map
It is an associative array. A map (or data map) is a set of key-
value pairs.
The key needs to be of type chararray and should be unique.
The value might be of any type. It is represented by ‘[ ]’
Example − [name#Raja, age#30]
Relation
A relation is a bag of tuples.
The relations in Pig Latin are unordered (there is no
guarantee that tuples are processed in any particular order).
Pig on Hadoop
● Pig runs on Hadoop
● Pig uses both HDFS and Map Reduce programming
● By default, Pig reads input files from HDFS, Pig stores the
intermediate data (data produced by Map Reduce jobs) and
the output in HDFS.
● However, Pig can also read input from and place output to
other sources.
● In general, Apache Pig works on top of Hadoop. It is an
analytical tool that analyzes large datasets that exist in the
Hadoop File System.
● To analyze data using Apache Pig, we have to initially load
the data into Apache Pig.
Studen First
Last Name Phone City
t ID Name
001 Rajiv Reddy 9848022337 Hyderabad
002 siddarth Battacharya 9848022338 Kolkata
003 Rajesh Khanna 9848022339 Delhi
004 Preethi Agarwal 9848022330 Pune
005 Trupthi Mohanthy 9848022336 Bhuwaneshwar
006 Archan Mishra 9848022335 Chennai
a
Apache Pig - Load & Store Functions
The Load and Store functions in Apache Pig are used to
determine how the data goes ad comes out of Pig. These functions
are used with the load and store operators.
Given below is the list of load and store functions available
in Pig.
S.No. Function & Description
1 PigStorage( ): To load and store structured files.
2 TextLoader( ): To load unstructured data into Pig.
3 BinStorage( ): To load and store data into Pig using
machine readable format.
4 Handling Compression: In Pig Latin, we can load and
store compressed data.
The Load Operator
You can load data into Apache Pig from the file system
(HDFS/ Local) using LOAD operator of Pig Latin
The load statement consists of two parts divided by the “=”
operator. On the left-hand side, we need to mention the name of
the relation where we want to store the data, and on the right-
hand side, we have to define how we store the data.
Syntax:
Relation_name = LOAD 'Input file path' USING
function as schema;
Where,
● relation_name : We have to mention the relation in which
we want to store the data.
● Input file path: We have to mention the HDFS directory
where the file is stored. (In MapReduce mode)
● Function: We have to choose a function from the set of load
functions provided by Apache Pig (BinStorage,
JsonLoader, PigStorage, TextLoader).
● Schema: We have to define the schema of the data. We can
define the required schema as follows −(column1 : data type,
column2 : data type, column3 : data type); Note − We load
the data without specifying the schema. In that case, the
columns will be addressed as $01, $02, etc… (check).
Example
As an example, let us load the data in student_data.txt in
Pig under the schema named Student using the LOAD
command.
Start the Pig Grunt Shell
First of all, open the Linux terminal. Start the Pig Grunt shell
in MapReduce mode as shown below.
$ Pig–x mapreduce
It will start the Pig Grunt shell as shown below.
15/10/01 12:33:37 INFO pig.ExecTypeProvider: Trying
ExecType : LOCAL
15/10/01 12:33:37 INFO pig.ExecTypeProvider: Trying
ExecType : MAPREDUCE
15/10/01 12:33:37 INFO pig.ExecTypeProvider: Picked
MAPREDUCE as the ExecType
2015-10-01 12:33:38,080 [main] INFO org.apache.pig.Main
Apache Pig version 0.15.0 (r1682971) compiled Jun 01 2015,
11:44:35
2015-10-01 12:33:38,080 [main] INFO org.apache.pig.Main -
Logging error messages to:
/home/Hadoop/pig_1443683018078.log
2015-10-01 12:33:38,242 [main] INFO
org.apache.pig.impl.util.Utils - Default bootup file
/home/Hadoop/.pigbootup not found
2015-10-01 12:33:39,630 [main]
INFO
org.apache.pig.backend.hadoop.executionengine.HExecutionEn
gine - Connecting to hadoop file system at: hdfs://localhost:9000
grunt>
Execute the Load Statement
Now load the data from the file student_data.txt into Pig by
executing the following Pig Latin statement in the Grunt shell.
grunt> student = LOAD
'hdfs://localhost:9000/pig_data/student_data.txt'
USING PigStorage(',')
as
(id:int,firstname:chararray,lastname:chararray,phone:chararray,
city:chararray);
or
grunt> student = LOAD
'hdfs://localhost:9000/pig_data/student_data.txt'
as(id,firstname,lastname,phone,city);
We can take the input file separated with tab space for each
column with above one, and no need of specify the complete
schema (data types) of the relation also.
Following is the description of the above statement.
Relation We have stored the data in the schema student.
name
Input We are reading data from the
file
file student_data.txt, which is in the /pig_data/
path
directory of HDFS.
Storage We have used the PigStorage() function. It loads and
function stores data as structured text files. It takes a delimiter
using which each entity of a tuple is separated, as a
parameter. By default, it takes ‘\t’ as a parameter.
Schema e stored the data using the following
We hav
schema.
Colum firstnam lastnam
id Phone city
n e e
Dataty in char char char char
pe t array array array array
Note − The load statement will simply load the data into the
specified relation in Pig. To verify the execution of
the Load statement, you have to use the Diagnostic Operators.
The PigStorage() function loads and stores data as structured
text files. It takes a delimiter using which each entity of a tuple is
separated as a parameter. By default, it takes ‘\t’ as a parameter.
Syntax grunt>
PigStorage(field_delimiter)
Example
Let us suppose we have a file named student_data.txt in the
HDFS directory named /data/ with the following content.
001,Rajiv,Reddy,9848022337,Hyderabad
002,siddarth,Battacharya,9848022338,Kolkata
003,Rajesh,Khanna,9848022339,Delhi
004,Preethi,Agarwal,9848022330,Pune
005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar
006,Archana,Mishra,9848022335,Chennai.
We can load the data using the PigStorage function as shown
below.
grunt> student = LOAD
'hdfs://localhost:9000/pig_data/student_data.txt'
USING PigStorage(',') as (id:int, firstname:chararray,
lastname:chararray,
phone:chararray, city:chararray );
In the above example, we have seen that we have used comma
(‘,’) delimiter. Therefore, we have separated the values of a record
using (,).
You can store the loaded data in the file system using the store
operator.
Syntax
STORE Relation_name INTO ' required_directory_path '
[USING function];
Example
Assume we have a file student_data.txt in HDFS with the
following content.
And we have read it into a relation student using the LOAD
operator as shown below.
grunt> student = LOAD
'hdfs://localhost:9000/pig_data/student_data.txt'
USING PigStorage(',')
as(id:int,firstname:chararray,lastname:chararray,phone:chararray
,
city:chararray);
Now, let us store the relation in the HDFS
directory “/pig_Output/” as shown below.
grunt> STORE student INTO '
hdfs://localhost:9000/pig_Output/ ' USING
PigStorage(',');
Output
After executing the store statement, you will get the
following output.
A directory is created with the specified name and the data
will be stored in it.
The load statement will simply load the data into the specified
relation in Apache Pig. To verify the execution of
the Load statement, you have to use the Diagnostic Operators.
Pig Latin provides four different types of diagnostic operators −
● Dump operator
● Describe operator
● Explanation operator
● Illustration operator
Dump Operator
The Dump operator is used to run the Pig Latin statements
and display the results on the screen.
It is generally used for debugging Purpose.
Syntax grunt> Dump
Relation_Name
Describe Operator
The describe operator is used to view the schema of a
relation.
Syntax grunt> Describe
Relation_name
Explain Operator
The explain operator is used to display the logical, physical,
and MapReduce execution plans of a relation.
Syntax grunt> explain
Relation_name;
Illustrate Operator
The illustrate operator gives you the step-by-step
execution of a sequence of statements.
Syntax grunt> illustrate
Relation_name;
Group Operator
The GROUP operator is used to group the data in one or
more relations. It collects the data having the same key.
Syntax grunt>Group_data = GROUP Relation_name BY
age;
Example
Assume that we have a file named student_details.txt in the
HDFS directory /pig_data/ as shown below. student_details.txt
001,Rajiv,Reddy,21,9848022337,Hyderabad
002,siddarth,Battacharya,22,9848022338,Kolkata
003,Rajesh,Khanna,22,9848022339,Delhi
004,Preethi,Agarwal,21,9848022330,Pune
005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar
006,Archana,Mishra,23,9848022335,Chennai
007,Komal,Nayak,24,9848022334,trivendram
008,Bharathi,Nambiayar,24,9848022333,Chennai
And we have loaded this file into Apache Pig with the
relation name student_details as shown below.
grunt> student_details= LOAD
'hdfs://localhost:9000/pig_data/student_details.txt'
USING PigStorage(',')
as(id:int,firstname:chararray,lastname:chararray,age:int,phone:c
hararray,
city:chararray);
Now, let us group the records/tuples in the relation by age as
shown below.
grunt> group_data = GROUP student_details by age;
We can verify the relation group_data using the DUMP
operator as shown below. grunt> Dump group_data;
Output
Then you will get output displaying the contents of the
relation named group_data as shown below.
Here you can observe that the resulting schema has two columns
−
● One is age, by which we have grouped the relation.
● The other is a bag, which contains the group of tuples,
student records with the respective age.
(21,{(4,Preethi,Agarwal,21,9848022330,Pune),(1,Rajiv,Reddy,2
1,9848022337,Hydera bad)})
(22,{(3,Rajesh,Khanna,22,9848022339,Delhi),(2,siddarth,Battac
harya,22,984802233 8,Kolkata)})
(23,{(6,Archana,Mishra,23,9848022335,Chennai),(5,Trupthi,Mo
hanthy,23,9848022336 ,Bhuwaneshwar)})
(24,{(8,Bharathi,Nambiayar,24,9848022333,Chennai),(7,Komal,
Nayak,24,9848022334, trivendram)})
You can see the schema of the table after grouping the data using
the describe command as shown below.
grunt> Describe group_data;
group_data: {group: int,student_details: {(id: int,firstname:
chararray,
lastname: chararray,age: int,phone: chararray,city: chararray)}}
Try:grunt>illustrate group_data;
Try:grunt>explain group_data;
Grouping by Multiple Columns
Let us group the relation by age and city as shown below.
grunt> group_multiple = GROUP student_details by (age,
city);
You can verify the content of the relation named
group_multiple using the Dump operator .
Joins in Pig
The JOIN operator is used to combine records from two or
more relations. While performing a join operation, we declare one
(or a group of) tuple(s) from each relation, as keys. When these
keys match, the two particular tuples are matched, else the records
are dropped.
Joins can be of the following types −
● Self-join
● Inner-join
● Outer-join − left join, right join, and full join
Assume that we have two files namely customers.txt and
orders.txt in the /pig_data/ directory of HDFS as shown below.
customers.txt
1,Ramesh,32,Ahmedabad,2000.00
2,Khilan,25,Delhi,1500.00
3,kaushik,23,Kota,2000.00
4,Chaitali,25,Mumbai,6500.00
5,Hardik,27,Bhopal,8500.00
6,Komal,22,MP,4500.00
7,Muffy,24,Indore,10000.00
orders.txt
102,2009-10-08 00:00:00,3,3000
100,2009-10-08 00:00:00,3,1500 101,2009-11-
20 00:00:00,2,1560
103,2008-05-20 00:00:00,4,2060
And we have loaded these two files into Pig with the
relations customers and orders as shown below.
grunt> customers = LOAD
'hdfs://localhost:9000/pig_data/customers.txt' USING
PigStorage(',')
as(id:int,name:chararray,age:int,address:chararray,salary:int);
grunt> orders = LOAD
'hdfs://localhost:9000/pig_data/orders.txt' USING
PigStorage(',')
as(oid:int,date:chararray,customer_id:int,amount:int);
Let us now perform various Join operations on these two
relations.
Self - join
Self-join is used to join a table with itself as if the table were
two relations, temporarily renaming at least one relation.
Generally, in Apache Pig, to perform self-join, we will load
the same data multiple times, under different aliases (names).
Therefore let us load the contents of the file customers.txt as two
tables as shown below.
grunt> customers1 = LOAD
'hdfs://localhost:9000/pig_data/customers.txt'
USING PigStorage(',')
as(id:int,name:chararray,age:int,address:chararray,salary:int);
grunt> customers2 = LOAD
'hdfs://localhost:9000/pig_data/customers.txt'
USING PigStorage(',')
as(id:int,name:chararray,age:int,address:chararray,salary:int);
Syntax
Given below is the syntax of performing self-join operation
using the JOIN operator.
grunt> Relation3_name = JOIN Relation1_name BY
key, Relation2_name BY key ;
Example
Let us perform self-join operation on the relation
customers, by joining the two relations customers1 and
customers2 as shown below.
grunt> customers3 = JOIN customers1 BY id, customers2 BY
id;
Verify the relation customers3 using the DUMP operator as
shown below.
grunt>Dump customers3;
Output
It will produce the following output, displaying
the contents of the relation customers.
(1,Ramesh,32,Ahmedabad,2000,1,Ramesh,32,Ahmedabad,2000
)
(2,Khilan,25,Delhi,1500,2,Khilan,25,Delhi,1500)
(3,kaushik,23,Kota,2000,3,kaushik,23,Kota,2000)
(4,Chaitali,25,Mumbai,6500,4,Chaitali,25,Mumbai,6500)
(5,Hardik,27,Bhopal,8500,5,Hardik,27,Bhopal,8500)
(6,Komal,22,MP,4500,6,Komal,22,MP,4500)
(7,Muffy,24,Indore,10000,7,Muffy,24,Indore,10000)
Inner Join
Inner Join is used quite frequently; it is also referred to as
equijoin. An inner join returns rows when there is a match in both
tables.
It creates a new relation by combining column values of two
relations (say A and B) based upon the join-predicate. The query
compares each row of A with each row of B to find all pairs of
rows which satisfy the join-predicate. When the join-predicate is
satisfied, the column values for each matched pair of rows of A
and B are combined into a result row.
Syntax
Here is the syntax of performing inner join operation using
the JOIN operator.
grunt> result = JOIN relation1 BY columnname, relation2
BY
columnname;
Example
Let us perform inner join operation on the two relations
customers and orders as shown below.
grunt> coustomer_orders= JOIN customers BY id, orders BY
customer_id;
Verify the relation coustomer_orders using the DUMP operator
as shown below.
grunt> Dump coustomer_orders;
Output
You will get the following output that will the contents of the
relation named coustomer_orders.
(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)
(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)
(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)
(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)
Note:
Outer Join: Unlike inner join, outer join returns all the rows
from at least one of the relations.
An outer join operation is carried out in three ways −
● Left outer join
● Right outer join
● Full outer join
Left Outer Join
The left outer Join operation returns all rows from the left
table, even if there are no matches in the right relation.
Syntax
Given below is the syntax of performing
left outer join operation using the JOIN operator.
grunt> Relation3_name = JOIN Relation1_name BY id LEFT
OUTER, Relation2_name BY customer_id;
Example
Let us perform left outer join operation on the two relations
customers and orders as shown below.
grunt>outer_left= JOIN customers BY id LEFT OUTER, orders
BY customer_id;
Verification
Verify the relation outer_left using the DUMP operator as
shown below.
grunt>Dump outer_left;
Output
It will produce the following output, displaying
the contents of the relation outer_left.
(1,Ramesh,32,Ahmedabad,2000,,,,)
(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)
(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)
(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)
(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)
(5,Hardik,27,Bhopal,8500,,,,)
(6,Komal,22,MP,4500,,,,)
(7,Muffy,24,Indore,10000,,,,)
Right Outer Join
The right outer join operation returns all rows from the
right table, even if there are no matches in the left table.
Syntax
Given below is the syntax of performing right outer join
operation using the JOIN operator.
grunt> outer_right = JOIN customers BY id RIGHT orders BY
customer_id;
Full Outer Join
The full outer join operation returns rows when there is a
match in one of the relations.
Syntax
Given below is the syntax of performing full outer join
using the JOIN operator.
grunt>outer_full = JOIN customers BY id FULL OUTER, orders
BY customer_id;
Using Multiple Keys
We can perform JOIN operation using multiple keys.
Syntax
Here is how you can perform a JOIN operation on two tables
using multiple keys.
grunt> Relation3_name = JOIN Relation2_name BY (key1,
key2),
Relation3_name BY (key1, key2);
Cross Operator
The CROSS operator computes the cross-product of two or
more relations. This chapter explains with example how to use the
cross operator in Pig Latin.
Syntax
Given below is the syntax of the CROSS operator.
grunt> Relation3_name = CROSS Relation1_name,
Relation2_name;
Foreach Operator
The FOREACH operator is used to generate specified data
transformations based on the column data.
Syntax
Given below is the syntax of FOREACH operator.
grunt> Relation_name2 = FOREACH Relatin_name1
GENERATE (required
data);
grunt> foreach_data= FOREACH student_details GENERATE
id,age,city;
grunt> foreach_data= FOREACH student_details GENERATE
age>25;
Order By Operator
The ORDER BY operator is used to display the contents of
a relation in a sorted order based on one or more fields.
Syntax
Given below is the syntax of the ORDER BY operator.
grunt> Relation_name2 = ORDER Relatin_name1 BY
(ASC|DESC);
grunt> order_by_data= ORDER student_details BY age DESC;
Limit Operator
The LIMIT operator is used to get a limited number of
tuples from a relation.
Syntax
Given below is the syntax of the LIMIT operator.
grunt> Result = LIMIT Relation_name required number of tuples;
Example
grunt> student_details= LOAD
'hdfs://localhost:9000/pig_data/student_details.txt'
USING PigStorage(',')
as(id:int,firstname:chararray,lastname:chararray,age:int,phone:c
hararray,
city:chararray);
grunt> limit_data= LIMIT student_details 4;
Verify the relation limit_data using the DUMP operator
as shown below.
grunt> Dump limit_data;
Apache Pig - User Defined Functions
In addition to the built-in functions, Apache Pig provides
extensive support for User Defined Functions (UDF’s). Using
these UDF’s, we can define our own functions and use them. The
UDF support is provided in six programming languages, namely,
Java, Jython, Python, JavaScript, Ruby and Groovy.
Executing Pig Script in Batch mode
While executing Apache Pig statements in batch mode,
follow the steps given below.
Step 1
Write all the required Pig Latin statements in a single file.
We can write all the Pig Latin statements and commands in a
single file and save it as .pig file.
Step 2
Execute the Apache Pig script. You can execute the Pig script
from the shell (Linux) as shown below.
Local mode MapReduce mode
-x -x
$ pig $ pig
local Sample_script.pig mapreduce Sample_script.pig
You can execute it from the Grunt shell as well using the exec/run
command as shown below.
grunt> exec/sample_script.pig
Executing a Pig Script from HDFS
We can also execute a Pig script that resides in the
HDFS. Suppose there is a Pig script with the name
Sample_script.pig in the HDFS directory named /pig_data/.
We can execute it as shown below.
$ pig -x mapreduce
hdfs://localhost:9000/pig_data/Sample_script.pig
Features of Pig
● It provides an engine for executing data flows (how your
data should flow).
● Pig processes data in parallel on the Hadoop cluster
● It provides a language called “Pig Latin” to express data
flows.
● Rich set of operators − It provides many operators to
perform operations like join, sort, filer, etc.
● Ease of programming − Pig Latin is similar to SQL and it
is easy to write a Pig script if you are good at SQL.
understand and maintain (1/20th the lines of code and 1/16th
the development time)
● Optimization opportunities − The tasks in Apache Pig
optimize their execution automatically, so the programmers
need to focus only on semantics of the language.
● Extensibility − Using the existing operators, users can
develop their own functions to read, process, and write data.
● UDF’s − Pig provides the facility to create User-defined
Functions in other programming languages such as Java and
invoke or embed them in Pig Scripts.
● Handles all kinds of data − Apache Pig analyzes all kinds
of data, both structured as well as unstructured. It stores the
results in HDFS.
Advantages of Apache Pig o Less code - The Pig consumes less
line of code to perform any operation.
o Reusability - The Pig code is flexible enough to reuse again.
o Nested data types - The Pig provides a useful concept of
nested data types like tuple, bag, and map.
Applications of Apache Pig
Apache Pig is generally used by data scientists for performing
tasks involving ad-hoc processing and quick prototyping.
Apache Pig is used −
● To process huge data sources such as web logs.
● To perform data processing for search platforms.
● To process time sensitive data loads.
Apache Pig Vs Hive
● Both Apache Pig and Hive are used to create MapReduce
jobs. And in some cases, Hive operates on HDFS in a similar
way Apache Pig does. In the following table, we have listed
a few significant points that set Apache Pig apart from Hive.
Apache Pig Hive
Apache Pig uses a language Hive uses a language called
called Pig Latin. It was HiveQL. It was originally
originally created at Yahoo. created at Facebook.
Pig Latin is a data HiveQL is a query processing
flow language. language.
Pig Latin is a procedural
HiveQL is a declarative
language and it fits in pipeline language.
paradigm.
Apache Pig can handle Hive is mostly for structured
structured, unstructured, and data.
semi-structured data.
Differences between Apache MapReduce and PIG
Apache MapReduce Apache PIG
It is a low-level data processing
It is a high-level data flow tool.
tool.
Here, it is required to develop
It is not required to develop
complex programs using Java
complex programs.
or Python.
It is difficult to perform data It provides built-in operators to
operations in MapReduce. perform data operations like
union, sorting and ordering.
It doesn't allow nested It provides nested data types like
data types. tuple, bag, and map.
Apache Pig Vs SQL
Listed below are the major differences between Apache Pig and
SQL.
Pig SQL
Pig Latin is a procedural language. SQL is
a declarative language.
In Apache Pig, schema is optional. Schema is mandatory in
SQL.
We can store data without designing
a schema (values are
stored as $01, $02 etc.)
The data model in Apache Pig The data model used in
is nested relational. SQL is flat relational.
Apache Pig provides limited There is more opportunity
for query optimization in
opportunity for Query
SQL.
optimization.
In addition to above differences, Apache Pig Latin − ●
Allows splits in the pipeline.
● Allows developers to store data anywhere in the pipeline.
● Declares execution plans.
● Provides operators to perform ETL (Extract, Transform, and
Load) functions.
Hive
a funky hybrid of an elephant and a bee.
Hive is a data warehousing tool that sits on top of Hadoop.
Apache Warehouse is a Warehouse software.
Facebook initially created Hive component to manage their
ever growing volumes of log data. Later Apache software
foundation developed it as open-source and it came to be known
as Apache Hive.
Hive is suitable for o Data
Warehousing applications
o Processes batch jobs on huge data that is immutable
(data whose structure cannot be changed after it is
created is called immutable data)
o Examples: Web Logs, Application Logs
Hive is used to process structured data in Hadoop.
The three main tasks performed by Apache Hive are
1. Summarization
2. Querying
3. Analysis
Hive provides extensive data type functions and formats for
data summarization and analysis.
Hive makes use of the following o HDFS for
storage o MapReduce for execution o
Stores metadata / schemas in an RDBMS.
Hive programs are written in the Hive Query language(HQL),
which is a declarative language similar to SQL. Hive translates
hive queries into MapReduce jobs and then runs the job in the
Hadoop cluster.
Hive supports OLAP(Online Analytical Processing)
Hive queries can be used to replace complicated java
MapReduce programs with structured and semi-structured data
processing and analysis. A person who is knowledgeable about
SQL statements can write the hive queries relatively easily.
The Hive platform makes it simple to perform tasks like:
● Large-scale data analysis
● Perform Ad-hoc queries
● Perform Data encapsulation
Note:
1. Hive is not RDBMS
2. It is not designed to support OLTP
3. It is not designed for real time queries
4. It is not designed to support row-level updates
History of Hive
Features of Hive:
1.It provides SQL-like queries (i.e., HQL) that are implicitly
transformed to MapReduce or Spark jobs. We don’t need to
know any programming languages to work with Hive. We can
work with Hive using only basic SQL.
2.The table structure in Hive is the same as the table structure
in a relational database.
3.HQL is easy to code
4.Hive supports rich data types such as structs, lists and maps
5.Hive supports SQL filters, group-by and order-by clauses
6.Hive is fast and scalable.
7.It is capable of analyzing large datasets stored in HDFS and
other similar data storage systems such as HBase to access
data.
8.It allows different storage types such as plain text, RCFile,
and HBase.
9.It uses indexing to accelerate queries.
10. It can operate on compressed data stored in the Hadoop
ecosystem.
11. We can use Apache Hive for free. It is open-source.
12. Multiple users can perform queries on the data at the same
time.
13. The Apache Hive software perfectly matches the
low-level interface requirements of Apache Hadoop.
14. In order to improve performance, Apache Hive partition
and bucket data at the table level.
15. Hive provides support for a variety of file formats,
including textFile, orc, Avro, sequence file, parquet,
Copying, LZO Compression, and so on.
16. Hive has a variety of built-in functions.
17. The Oracle BI Client Developer’s Kit also provides
support for User-Defined Functions for data cleansing and
filtering. We can define UDFs according to our
requirements.
18. External tables are supported by Apache Hive. We can
process data without actually storing data in HDFS
because of this feature.
19. Hive support includes ETLs. Hive is an effective ETL tool.
20. Hive can accommodate client applications written in PHP,
Python, Java, C++, and Ruby.
21. Hive has an optimizer that applies rules to logical plans to
improve performance.
22. We can run Ad-hoc queries in Hive, which are loosely
typed commands or queries whose values depend on some
variable for the data analysis.
23. Hive can be used to implement data visualisation in Tez.
Hive can be used to integrate with Apache Tez to provide
real-time processing capabilities.
Hive Integration and Work Flow
Figure: Flow of log analysis file
Hourly Log Data can be stored directly into HDFS and then
data cleaning is performed on the log file. Finally, Hive tables can
be created to query the log file
Hive Data Units
1. Databases: The namespace for Tables
2. Tables: Set of records that have similar schema
3. Partitions: Logical separations of data based on
classification of given information as per specific attributes.
Once hive has partitioned the data based on a specified key,
it starts to assemble the records into specific folders as and
when the records are inserted
4. Buckets(or Clusters): Similar to partitions but uses hash
function to segregate data and determines the cluster or
bucket into which the record should be placed.
Partitioning in Hive
The partitioning in Hive means dividing the table into some
parts based on the values of a particular column like date, course,
city or country.
The advantage of partitioning is that since the data is stored
in slices, the query response time becomes faster.
As we know that Hadoop is used to handle the huge amount
of data, it is always required to use the best approach to deal with
it.
Example: Let's assume we have a data of 10 million students
studying in an institute. Now, we have to fetch the students of a
particular course. If we use a traditional approach, we have to go
through the entire data. This leads to performance degradation. In
such a case, we can adopt the better approach i.e., partitioning in
Hive and divide the data among the different datasets based on
particular columns.
The partitioning in Hive can be executed in two ways -
Static Partitioning
In static or manual partitioning, it is required to pass the
values of partitioned columns manually while loading the data
into the table. Hence, the data file doesn't contain the partitioned
columns.
Example of Static Partitioning
First, select the database in which we want to create a table.
hive> use test;
Create the table and provide the partitioned columns by
using the following command:
hive> create table student (id int, name string, age int,
institute string)
>partitioned by (course string)
>row format delimited
>fields terminated by ',';
Let's retrieve the information associated with the table. hive>
describe student;
Load the data into the table and pass the values of partition
columns with it by using the following command: -
hive> load data local inpath '/home/codegyani/hive/stu
dent_detail1' into table student
partition(course= "java");
Here, we are partitioning the students of an institute based on
courses.
Load the data of another file into the same table and pass the
values of partition columns with it by using the following
command: -
hive> load data local inpath '/home/codegyani/hive/student
_details2' into table student
partition(course= "hadoop");
In the following screenshot, we can see that the table student
is divided into two categories.
Let's retrieve the entire data of the table by using the following
command: -
hive> select * from student;
Now, try to retrieve the data based on partitioned columns by
using the following command: -
hive> select * from student where course="java";
In this case, we are not examining the entire data. Hence, this
approach improves query response time.
Let's also retrieve the data of another partitioned dataset by
using the following command: hive> select * from student where
course= "hadoop";
Dynamic Partitioning
In dynamic partitioning, the values of partitioned columns
exist within the table. So, it is not required to pass the values of
partitioned columns manually
First, select the database in which we want to create a table.
hive> use show;
Enable the dynamic partition by using the following
commands:
hive> set hive.exec.dynamic.partition=true; hive> set
hive.exec.dynamic.partition.mode=nonstrict; Create a
dummy table to store the data.
hive> create table stud_demo(id int, name string, age int, in
stitute string,
course string) row
format delimited
fields terminated by
',';
Now, load the data into the table.
hive> load data local inpath '/home/codegyani/hive/stude
nt_details' into table
stud_demo;
Create a partition table by using the following command:
hive> create table student_part (id int, name string, age i
nt, institute sting) partitioned by
(course string) row format
delimited fields terminated
by ',';
Now, insert the data of dummy table into the partition table.
hive> insert into student_part partition(course)
select id, name, age, institute, course from stud_demo
;
In the following screenshot, we can see that the table
student_part is divided into two categories.
Let's retrieve the entire data of the table by using the
following command:
hive> select * from student_part;
Now, try to retrieve the data based on partitioned columns by
using the following command: -
hive> select * from student_part where course= "java ";
In this case, we are not examining the entire data. Hence, this
approach improves query response time.
Let's also retrieve the data of another partitioned dataset by
using the following command: -
hive> select * from student_part where course= "hadoop";
Bucketing in Hive
The bucketing in Hive is a data organizing technique.
It is similar to partitioning in Hive with an added
functionality that it divides large datasets into more manageable
parts known as buckets. So, we can use bucketing in Hive when
the implementation of partitioning becomes difficult. However,
we can also divide partitions further in buckets.
Working of Bucketing in Hive
o The concept of bucketing is based on the
hashing technique.
o Here, modules of current column value and the number of
required buckets is calculated (let say, F(x) % 3).
o Now, based on the resulted value, the data is stored into the
corresponding bucket.
Hive Architecture:
o Hive CLI: The Hive CLI (Command Line Interface) is a
shell where we can execute Hive queries and commands.
o Hive Web User Interface: The Hive Web UI is just an
alternative of Hive CLI. It provides a web-based GUI for
executing Hive queries and commands.
o Hive Server: This is an optional server. This can be used to
submit Hive Jobs from a remote client.
o JDBC: Job can be submitted from a JDBC Client. One can
write a Java code to connect to Hive and submit jobs on it.
o ODBC: It allows the applications that support the ODBC
protocol to connect to Hive.
o Driver: Hive queries are sent to the driver for compilation,
optimization and execution. The Hive driver receives the
HiveQL statements submitted by the user through the
command shell and creates session handles for the query.
o Metastore: Metastore stores metadata information about
tables and partitions, including column and column type
information, in order to improve search engine indexing. The
metastore also stores information about the serializer and
deserializer as well as HDFS files where data is stored and
provides data storage. It is usually a relational database. Hive
metadata can be queried and modified through Metastore.
The figure above provides a glimpse of the architecture of
Apache Hive and its main sections. Apache Hive is a large and
complex software system. It has the following components:
Hive Client:
Hive drivers support applications written in any language
like Python, Java, C++, and Ruby, among others, using JDBC,
ODBC, and Thrift drivers, to perform queries on the Hive.
Therefore, one may design a hive client in any language of their
choice.
The three types of Hive clients are referred to as Hive clients:
1. Thrift Clients: The Hive server can handle requests from a
thrift client by using Apache Thrift.
2. JDBC client: A JDBC driver connects to Hive using the
Thrift framework. Hive Server communicates with the Java
applications using the JDBC driver.
3. ODBC client: The Hive ODBC driver is similar to the JDBC
driver in that it uses Thrift to connect to Hive. However, the
ODBC driver uses the Hive Server to communicate with it
instead of the Hive Server.
Hive Services:
Hive provides numerous services, including the Hive server2,
Beeline, etc. The services offered by Hive are:
1. Beeline: HiveServer2 supports the Beeline, a command shell
that which the user can submit commands and queries to. It is
a JDBC client that utilises SQLLINE CLI (a pure Java console
utility for connecting with relational databases and executing
SQL queries). The Beeline is based on JDBC.
2. Hive Server 2: HiveServer2 is the successor to
HiveServer1. It provides clients with the ability to execute
queries against the Hive. Multiple clients may submit
queries to Hive and obtain the desired results. Open API
clients such as JDBC and ODBC are supported by
HiveServer2.
Note: Hive server1, which is also known as a Thrift server, is used
to communicate with Hive across platforms. Different client
applications can submit requests to Hive and receive the results
using this server.
HiveServer2 handled concurrent requests from more than one
client, so it was replaced by HiveServer1.
Hive Driver:
The Hive driver receives the HiveQL statements submitted
by the user through the command shell and creates session
handles for the query.
Hive Compiler:
Metastore and hive compiler both store metadata in order to
support the semantic analysis and type checking performed on the
different query blocks and query expressions by the hive
compiler. The execution plan generated by the hive compiler is
based on the parse results.
The DAG (Directed Acyclic Graph) is a DAG structure
created by the compiler. Each step is a map/reduce job on HDFS,
an operation on file metadata, and a data manipulation step.
Optimizer:
The optimizer splits the execution plan before performing
the transformation operations so that efficiency and scalability are
improved.
Execution Engine:
After the compilation and optimization steps, the execution
engine uses Hadoop to execute the prepared execution plan,
which is dependent on the compiler’s execution plan.
Metastore:
Metastore stores metadata information about tables and
partitions, including column and column type information, in
order to improve search engine indexing.
The metastore also stores information about the serializer and
deserializer as well as HDFS files where data is stored and
provides data storage. It is usually a relational database. Hive
metadata can be queried and modified through Metastore. We can
either configure the metastore in either of the two modes:
1. Remote: A metastore is not enabled in remote mode, and
non-Java applications cannot benefit from Thrift services.
2. Embedded: A client in embedded mode can directly access
the metastore via JDBC.
HCatalog:
HCatalog is a Hadoop table and storage management layer
that provides users with different data processing tools such as
Pig, MapReduce, etc. with simple access to read and write data
on the grid.
The data processing tools can access the tabular data of Hive
metastore through It is built on the top of Hive metastore and
exposes the tabular data to other data processing tools.
WebHCat:
The REST API for HCatalog provides an HTTP interface to
perform Hive metadata operations. WebHCat is a service
provided by the user to run Hadoop MapReduce (or YARN), Pig,
and Hive jobs.
Processing and Resource Management:
Hive uses a MapReduce framework as a default engine for
performing the queries, because of that fact. MapReduce
frameworks are used to write large-scale applications that process
a huge quantity of data in parallel on large clusters of commodity
hardware. MapReduce tasks can split data into chunks, which are
processed by map-reduce jobs.
Distributed Storage:
Hive is based on Hadoop, which means that it uses the
Hadoop Distributed File System for distributed storage.
Hive is a data warehouse system which is used to analyze
structured data.
It is built on the top of Hadoop.
It was developed by Facebook.
Hive provides the functionality of reading, writing, and
managing large datasets residing in distributed storage.
It runs SQL like queries called HQL (Hive query language)
which gets internally converted to MapReduce jobs.
Using Hive, we can skip the requirement of the traditional
approach of writing complex MapReduce programs.
Hive supports Data Definition Language (DDL), Data
Manipulation Language (DML), and User Defined Functions
(UDF).
Hive Architecture
The following architecture explains the flow of submission
of query into Hive.
Hive Client
Hive allows writing applications in various languages,
including Java, Python, and C++.
It supports different types of clients such as:o Thrift Server -
It is a cross-language service provider platform that serves the
request from all those programming languages that supports
Thrift.
o JDBC Driver - It is used to establish a connection between
hive and Java applications. The JDBC Driver is present in
the class org.apache.hadoop.hive.jdbc.HiveDriver.
o ODBC Driver - It allows the applications that support the
ODBC protocol to connect to Hive.
Hive Services
The following are the services provided by Hive:o Hive CLI -
The Hive CLI (Command Line Interface) is a shell where we
can execute Hive queries and commands.
o Hive Web User Interface - The Hive Web UI is just an
alternative of Hive CLI. It provides a web-based GUI for
executing Hive queries and commands.
o Hive MetaStore - It is a central repository that stores all the
structure information of various tables and partitions in the
warehouse. It also includes metadata of column and its type
information, the serializers and deserializers which is used
to read and write data and the corresponding HDFS files
where the data is stored.
o Hive Server - It is referred to as Apache Thrift Server. It
accepts the request from different clients and provides it to
Hive Driver.
o Hive Driver - It receives queries from different sources like
web UI, CLI, Thrift, and JDBC/ODBC driver. It transfers the
queries to the compiler.
o Hive Compiler - The purpose of the compiler is to parse the
query and perform semantic analysis on the different query
blocks and expressions. It converts HiveQL statements into
MapReduce jobs.
o Hive Execution Engine - Optimizer generates the logical
plan in the form of DAG of map-reduce tasks and HDFS
tasks. In the end, the execution engine executes the incoming
tasks in the order of their dependencies.
Working of Hive
o The following diagram depicts the workflow
between Hive and Hadoop.
o
The following table defines how Hive interacts
with Hadoop framework:
Step
Operation
No.
Execute Query
The Hive interface such as Command Line or Web
1
UI sends query to Driver (any database driver such as
JDBC, ODBC, etc.) to execute.
Get Plan
The driver takes the help of query compiler that
2 parses the query to check the syntax and query plan or
the requirement of query.
Get Metadata
3 The compiler sends metadata request to Metastore
(any database).
Send Metadata
4 Metastore sends metadata as a response to the compiler.
Send Plan
The compiler checks the requirement and resends
5 the plan to the driver. Up to here, the parsing and
compiling of a query is complete.
Execute Plan
6 The driver sends the execute plan to the execution
engine.
Execute Job
Internally, the process of execution job is a
MapReduce job. The execution engine sends the job
7 to JobTracker, which is in Name node and it assigns this
job to TaskTracker, which is in Data node. Here, the
query executes MapReduce job.
Metadata Ops
7.1 Meanwhile in execution, the execution engine can
execute metadata operations with Metastore.
Fetch Result
8 The execution engine receives the results from Data
nodes.
Send Results
9 The execution engine sends those resultant values to the
driver.
10 Send Results
The driver sends the results to Hive Interfaces.
HIVE Data Types
Numeric Types
Integer
Type Size Range
signed
1-byte
TINYINT -128 to 127
integer
signed
2-byte
SMALLINT 32,768 to 32,767
integer
INT 4-byte signed 2,147,483,648 to
integer 2,147,483,647
BIGINT 8-byte signed -9,223,372,036,854,775,80
integer
8 to
9,223,372,036,854,775,807
Decimal
Type Size Range
precision
Single point
FLOAT 4-byte floating
number
DOUBLE 8-byte Double precision
floating point
number
String Types
STRING: The string is a sequence of characters. It values can be
enclosed within single quotes (') or double quotes ("). Varchar:
The varchar is a variable length type whose range lies between 1
and 65535, which specifies that the maximum number of
characters allowed in the character string.
CHAR: The char is a fixed-length type whose maximum length
is fixed at 255.
Date/Time Types
TIMESTAMP
o It supports traditional UNIX timestamp with optional
nanosecond precision.
o As Integer numeric type, it is interpreted as UNIX timestamp
in seconds.
o As Floating point numeric type, it is interpreted as UNIX
timestamp in seconds with decimal precision.
o As string, it follows java.sql.Timestamp format "YYYY-
MM-DD HH:MM:SS.fffffffff" (9 decimal place precision)
DATES
The Date value is used to specify a particular year, month and
day, in the form YYYY--MM--DD. However, it didn't provide the
time of the day. The range of Date type lies between
0000--01--01 to 9999--12--31.
Collection / Complex Type
Type Size Range
It is similar to
Struct struct('James','Roy')
C struct or an
object where
fields are
accessed using
the "dot"
notation.
Map It contains the map('first','James','last','Roy')
key-value
tuples where
the fields are
accessed using
array notation.
Array It is a array('James','Roy')
collection of
similar type of
values that
indexable
using zero-
based integers.
Miscellaneous types:
Boolean and binary
File Formats
Apache Hive supports several familiar file formats used in
Apache Hadoop. Hive can load and query different data file
created by other Hadoop components such as Pig or MapReduce.
Hive Different File Formats
Different file formats and compression codecs work better
for different data sets in Apache Hive.
Following are the Apache Hive different file formats:
● Text File
● Sequence File
● RC File
● AVRO File
● ORC File
● Parquet File
Hive Text File Format
Hive Text file format is a default storage format. You can
use the text format to interchange the data with other client
application. The text file format is very common most of the
applications. Data is stored in lines, with each line being a record.
Each lines are terminated by a newline character (\n).
The text format is simple plane file format. You can use the
compression (BZIP2) on the text file to reduce the storage spaces.
Create a TEXT file by add storage option as ‘STORED AS
TEXTFILE’ at the end of a Hive CREATE TABLE command
Examples
Below is the Hive CREATE TABLE command with storage
format specification:
Create table textfile_table
(column_specs) stored as
textfile;
Hive Sequence File Format
Sequence files are Hadoop flat files which stores values in
binary key-value pairs. The sequence files are in binary format
and these files are able to split. The main advantages of using
sequence file is to merge two or more files into one file.
Create a sequence file by add storage option as ‘STORED AS
SEQUENCEFILE’ at the end of a Hive CREATE TABLE
command.
Example
Below is the Hive CREATE TABLE command with storage
format specification:
Create table sequencefile_table
(column_specs) stored
as sequencefile;
Hive RC File Format
RCFile is row columnar file format. This is another form of
Hive file format which offers high row level compression rates. If
you have requirement to perform multiple rows at a time then you
can use RCFile format.
The RCFile are very much similar to the sequence file
format. This file format also stores the data as key-value pairs.
Create RCFile by specifying ‘STORED AS
RCFILE’ option at the end of a CREATE TABLE Command:
Example
Below is the Hive CREATE TABLE command with storage
format specification:
Create table RCfile_table
(column_specs) stored as
rcfile;
Hive AVRO File Format
AVRO is open source project that provides data serialization
and data exchange services for Hadoop. You can exchange data
between Hadoop ecosystem and program written in any
programming languages. Avro is one of the popular file format in
Big Data Hadoop based applications.
Create AVRO file by specifying ‘STORED AS AVRO’
option at the end of a CREATE TABLE Command.
Example
Below is the Hive CREATE TABLE command with storage
format specification:
Create table avro_table
(column_specs) stored
as avro;
Hive ORC File Format
The ORC file stands for Optimized Row Columnar file
format. The ORC file format provides a highly efficient way to
store data in Hive table. This file system was actually designed to
overcome limitations of the other Hive file formats. The Use of
ORC files improves performance when Hive is reading, writing,
and processing data from large tables.
Create ORC file by specifying ‘STORED AS ORC’ option at the
end of a CREATE TABLE Command.
Examples
Below is the Hive CREATE TABLE command with storage
format specification:
Create table orc_table
(column_specs)
stored as orc;
Hive Parquet File Format
Parquet is a column-oriented binary file format. The parquet
is highly efficient for the types of large-scale queries. Parquet is
especially good for queries scanning particular columns within a
particular table. The Parquet table uses compression Snappy,
gzip; currently Snappy by default.
Create Parquet file by specifying ‘STORED AS
PARQUET’ option at the end of a CREATE TABLE Command.
Example
Below is the Hive CREATE TABLE command with storage
format specification:
Create table parquet_table
(column_specs) stored as
parquet;
Hive - Create Database
In Hive, the database is considered as a catalog or namespace
of tables. So, we can maintain multiple tables within a database
where a unique name is assigned to each table. Hive also provides
a default database with a name default.
Initially, we check the default database provided by Hive.
So, to check the list of existing databases, follow the below
command: -
hive> show databases;
Here, we can see the existence of a default database provided by
Hive.
Let's create a new database by using the following command:
hive> create database demo;
If we want to suppress the warning generated by Hive on creating
the database with the same name, follow the below command: -
hive> create a database if not exists demo;
Hive also allows assigning properties with the database in the
form of key-value pair.
hive>create database demo
>WITH DBPROPERTIES ('creator' = 'Gaurav Chawla', 'date' = '
2019-06-03');
Now, drop the database by using the following command.
hive> drop database demo;
Hive - Create Table
In Hive, we can create a table by using the conventions
similar to the SQL. It supports a wide range of flexibility where
the data files for tables are stored.
It provides two types of table: o
Internal table o External
table
Internal Table
The internal tables are also called managed tables as the
lifecycle of their data is controlled by the Hive.
By default, these tables are stored in a subdirectory under
the directory defined by hive.metastore.warehouse.dir(i.e.
/user/hive/warehouse).
The internal tables are not flexible enough to share with other
tools like Pig.
If we try to drop the internal table, Hive deletes both table
schema and data.
Let's create an internal table by using the following
command:-
hive> create table demo.employee (Id int, Name string , Salary f
loat)
row format delimited fields terminated by ',' ;
Let's see the metadata of the created table by using the following
command: -
hive> describe new_employee;
External Table
The external table allows us to create and access a table and
a data externally. The external keyword is used to specify the
external table, whereas the location keyword is used
to determine the location of loaded data.
As the table is external, the data is not present in the Hive
directory. Therefore, if we try to drop the table, the metadata of
the table will be deleted, but the data still exists.
Hive - Load Data
Once the internal table has been created, the next step is to
load the data into it.
So, in Hive, we can easily load data from any file to the
database.
Let's load the data of the file into the database by using the
following command: -
hive>load data local inpath '/home/codegyani/hive/emp_deta
ils' into table demo.employee;
Here, emp_details is the file name that contains the data.
Now, we can use the following command to retrieve the data
from the database.
hive>select * from demo.employee;
Hive - Drop Table
Hive facilitates us to drop a table by using the SQL drop
table command. Let's follow the below steps to drop the table
from the database.
Now, drop the table by using the following command: hive>
drop table new_employee;
Limitations of Hive o Hive is not capable of
handling real-time data.
o It is not designed for online transaction processing.
o Hive queries contain high latency.
Differences between Hive and Pig
Hive Pig
Hive is commonly used by Pig is commonly used by
Data Analysts. programmers.
It follows the data-flow
It follows SQL-like queries.
language.
It can handle semi-structured
It can handle structured data.
data.
It works on server-side of It works on client-side of
HDFS cluster. HDFS cluster.
Pig is comparatively faster
Hive is slower than Pig.
than Hive.