Data Management For Distributed Sensor Networks: A Literature Review
Data Management For Distributed Sensor Networks: A Literature Review
Abstract
Sensor networks are spatially distributed autonomous sensors that mon-
itor the physical world around them and often communicate those reading
over a network to a server or servers. Sensor networks can benefit from
the generally “unlimited resources” of the cloud, namely processing, stor-
age, and network resources. This literature review surveys the major com-
ponents of distributed data management, namely, cloud computing, dis-
tributed persistence models, and distributed analytics.
Contents
1 Introduction 4
1.1 Applications of Distributed Sensor Networks . . . . . . . . . . . . 4
1.2 Organization of the Review . . . . . . . . . . . . . . . . . . . . . 6
2 Big Data 6
2.1 The Three (or Four or Five) “V’s” . . . . . . . . . . . . . . . . . . 7
2.2 Features of Big Data . . . . . . . . . . . . . . . . . . . . . . . . . 9
2.3 Examples of Big Data . . . . . . . . . . . . . . . . . . . . . . . . 10
3 Cloud Computing 11
3.1 Cloud Computing Service Models . . . . . . . . . . . . . . . . . . 13
3.2 Sensing as a Service . . . . . . . . . . . . . . . . . . . . . . . . . 13
3.3 Sensor as a Service . . . . . . . . . . . . . . . . . . . . . . . . . 15
3.4 Cloud Deployment Models . . . . . . . . . . . . . . . . . . . . . . 16
3.5 Mobile Cloud Computing . . . . . . . . . . . . . . . . . . . . . . . 16
3.6 Issues with Cloud Computing . . . . . . . . . . . . . . . . . . . . 17
1
4.1.4 Comparison of Distributed File Systems . . . . . . . . . . 25
4.2 Key-Value . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 25
4.2.1 Memcached . . . . . . . . . . . . . . . . . . . . . . . . . . 25
4.2.2 Amazon’s Dynamo . . . . . . . . . . . . . . . . . . . . . . 27
4.2.3 LinkedIn’s Voldemort . . . . . . . . . . . . . . . . . . . . . 28
4.2.4 Comparison of Key-Value Stores . . . . . . . . . . . . . . 30
4.3 Column . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 31
4.3.1 Google’s Bigtable . . . . . . . . . . . . . . . . . . . . . . . 31
4.3.2 Cassandra . . . . . . . . . . . . . . . . . . . . . . . . . . 32
4.3.3 HBase . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 33
4.3.4 Hypertable . . . . . . . . . . . . . . . . . . . . . . . . . . 33
4.3.5 Comparison of Column Stores . . . . . . . . . . . . . . . 34
4.4 Document . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 35
4.4.1 MongoDB . . . . . . . . . . . . . . . . . . . . . . . . . . . 35
4.4.2 CouchDB . . . . . . . . . . . . . . . . . . . . . . . . . . . 37
4.4.3 SimpleDB . . . . . . . . . . . . . . . . . . . . . . . . . . . 38
4.4.4 Comparison of Document Stores . . . . . . . . . . . . . . 38
4.5 Graph . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 38
4.5.1 Neo4J . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 40
4.5.2 Misc. Graph . . . . . . . . . . . . . . . . . . . . . . . . . . 41
4.5.3 Comparison of Graph Stores . . . . . . . . . . . . . . . . 41
4.6 Other Distributed Databses . . . . . . . . . . . . . . . . . . . . . 42
4.6.1 Google’s Spanner . . . . . . . . . . . . . . . . . . . . . . 42
4.7 Comparison of Big Data Consistency Models . . . . . . . . . . . 43
2
5.7.3 Comparison of High Level SQL-Like Languages] . . . . . 59
5.7.4 Comparison of Streaming Computation Engines . . . . . 60
6 Conclusion 60
List of Figures
1 The Phenomenon of Big Data. . . . . . . . . . . . . . . . . . . . 10
2 Sensing as a service layers. . . . . . . . . . . . . . . . . . . . . . 14
3 Sensor as a service layers. . . . . . . . . . . . . . . . . . . . . . 15
4 Haystack individual file layout. . . . . . . . . . . . . . . . . . . . . 24
5 Mecached architecture. . . . . . . . . . . . . . . . . . . . . . . . 26
6 Voldemort data deployment pipeline. . . . . . . . . . . . . . . . . 29
7 MapReduce execution overview. . . . . . . . . . . . . . . . . . . 46
8 Phases of query planning in Spark SQL. . . . . . . . . . . . . . . 53
List of Tables
1 Comparison of key-value stores . . . . . . . . . . . . . . . . . . . 30
2 Comparison of column stores . . . . . . . . . . . . . . . . . . . . 34
3 Comparison of document stores . . . . . . . . . . . . . . . . . . 39
4 Comparison of graph stores . . . . . . . . . . . . . . . . . . . . . 42
5 General comparison of big data analytics engines . . . . . . . . 58
6 General comparison of graph parallel engines . . . . . . . . . . . 59
7 Comparison of SQL engines . . . . . . . . . . . . . . . . . . . . 59
8 Comparison of streaming analytics engines . . . . . . . . . . . . 60
3
1 Introduction
The exponential increase in volume, variety, velocity, veracity, and value of data
has caused us to rethink traditional client-server architectures with respect to
data acquisition, storage, analysis, quality of data, and governance of data.
With the emergence of Internet of Things (IoT) and increasing numbers of ubiq-
uitous mobile sensors such as mobile phones, distributed sensor networks are
growing at an unprecedented pace and producing an unprecedented amount
of streaming data. It’s predicted by the European Commission that IoT devices
will number between 50 to 100 billion devices by 2020[75].
Sensor networks are spatially distributed autonomous sensors that monitor
the physical world around them and often communicate those reading over a
network to a server or servers. The size of sensor networks is quickly growing.
BBC Research provides figures that the market share for sensor networks in
2010 was $56 billion and was predicted to be closer to $91 billion by the end of
2016 [94]. Data generated from the IoT are surpassing the compute and mem-
ory resources of existing IT infrastructures. [24]. Not only is the size of data
rapidly exploding, but data is also becoming more complex. Data from sensor
networks is often semi-structured or unstructured with data quality issues.
IoT and sensor networks are heavily intertwined. Advancements in elec-
tronics and IoT has given researchers the ability to deploy cheap, ubiquitous,
internet connected devices pretty much anywhere they can get permission to
deploy them. Many of the IoT devices we see emerging today include a multi-
tude of sensors which report their digitized understanding of the world back to
a server or servers.
Sensor networks can benefit from the generally “unlimited resources” of the
cloud, namely processing, storage, and network resources. We believe that by
leveraging cloud computing, distributed persistence models, and distributed an-
alytics, it’s now possible to provide a platform that is able to meet the demands
of the increasing distributed sensor market and the increasing volume, velocity,
variety, and value of data that comes along with that.
This review summarizes the current state of the art surrounding distributed
sensor networks and the use of cloud computing as a means for big sensor
data acquisition and analysis. In particular, we will define Big Data and review
it in the context of sensor networks, review cloud computing and service models
related to distributed sensing, discuss modern distributed persistence for Big
Data, and modern distributed analytics for Big Data all with an emphasis on
acquiring and managing Big Sensor Data.
4
and real-time parking space inventory in San Francisco.
Perera et al. in their paper on sensing as a service[72] provide three exam-
ples of areas distributed sensor networks would excel at.
First, distributed sensors could be used by cities to optimize waste manage-
ment which consumes a significant amount of time, money, and labor. Waste
management also has many processes including collection, transport, process-
ing, disposal, and monitoring. By collecting and storing sensor data in the cloud
from these processes, various interested parties could access sensor data in
order to optimize for the current state of the system. As an example, Perera
mentions that city council members could optimize garbage routes and collec-
tion rates based on the amount of trash available and recycling centers could
forecast what to expect based off of the same sensor data. Interested parties
at all points of the management process could benefit by analyzing data points
from IoT devices in a smart city.
Second, Perera mentions that smart agriculture can take advantage of dis-
tributed sensor networks and cites the Phenonet project as an example of dis-
tributed agricultural sensing which has the ability to monitor plant growth, soil
composition, air composition, and pests. A major advantage of this system
is that it can supplement traditional research by allowing multiple researchers
access to the same data in near real-time.
Third, Perera postulates that environmental management could utilize exist-
ing distributed environmental sensors upgraded to communicate with the cloud
allowing for data sharing and data fusion among interested parties.
Gerla et al.[35] propose an internet of vehicles as a means to autonomous
vehicles. By treating vehicles as platforms of thousands of sensors each and
by creating dynamic distributed clouds, they hope to allow fleets of vehicles
to make autonomous decisions. This model uses distributed clouds based
on proximity and peer-to-peer to technologies rather than sending data to a
centralized cloud. The real-time nature and the size and amount of sensors
makes this an interesting case study.
One area that shows a lot of promise for distributed sensor networks with
centralized management is smart grids. The smart grid is an collection of tech-
nologies aiming to advance the electrical grid into the future with respect to
intelligent energy distribution and integration of renewable. Electrical grids can
benefit by using a large distributed sensor network to collect power consump-
tion, production, and quality information and use that information to control
power production and consumption in real-time.
In some cases, the sensor nodes in smart grids lack powerful local computa-
tion abilities, but generally have network connections and sensing capabilities.
This makes the cloud a perfect sink of information for analyzing complex power
trends from a large scale distributed sensor network for smart grids[16].
5
1.2 Organization of the Review
The remainder of this review is structured as follows: Section 2 provides an
overview of Big Sensor Data2. Section 3 will focus on cloud computing and
how its concepts can be utilized to manage distributed sensor data. Section
4 will examine the current state of the art in distributed persistence models
with an emphasis on how NoSQL and distributed persistence models can aid
in managing distributed sensor data. Section 5 will examine the current state
of big data analytics options in the cloud and how these can be utilized for
performing analytics on distributed sensor data.
2 Big Data
Big Data has many definitions. Cox, in 1997[26], provides us with one of the
earliest definitions where Big Data is “too large to be processed by standard
algorithms and software on the hardware one has available to them”. He also
mentions that sources for big data collections include data from remote sensors
and satellite imaging in the fields of atmospheric sciences, geophysics, and
healthcare.
Cox separates Big Data into two categories; namely, big data collections
and big data objects.
Big data objects are single, very large data sets such as computational
models computed from physical phenomena. Big data objects often do not
fit in memory or local disks. Big data objects also have adverse affects on
bandwidth and latency. Cox looks to moving computation to the data and more
advanced segmentation and paging techniques at the OS level to deal with big
data objects.
Big data collections contain many smaller objects or even many big ob-
jects. Big data collections present their own set of issues including: distributed
data, heterogeneous data formats, no platform independent definition, non-
local meta-data, large storage requirements, poor locality, and insufficient net-
work resources.
Cox provides us a useful definition to build on. He also advocates for the
development and advancement of operating system constructs for moving data
that is too large for memory in and out of memory using stenciling, segmenta-
tion, paging, and application controlled segmentation. It’s interesting to note
that this was before cloud computing and distributed systems, but we are now
facing similar problems at the distributed level rather than a local level.
The Apache Hadoop project, in 2010, defined big data as “datasets which
could not be captured, managed, and processed by general computers within
an acceptable scope”[24].
Manyika et al[61] in 2011 define big data as “the amount of data just beyond
technology’s capability to store, manage, and process efficiently” essentially
making the definition of big data a moving target that is constantly evolving as
technology becomes updated.
6
The Whitehouse report on big data[70], in 2014, defines big data as ”data
that is so large in volume, so diverse in variety or moving with such velocity,that
traditional modes of data capture and analysis are insufficient” and
Hashem et al.[39] build on these previous definition in their 2015 review on
Big Data providing the definition by attempting to create a definition that en-
compasses the spirit of many of the previous definitions. They define big data
as “a set of techniques and technologies that require new forms of integration
to uncover large hidden values from large datasets that are diverse, complex,
and of a massive scale”.
NIST, in 2015, [69] provide multiple definitions relating to big data. NIST
defines big data as “extensive datasets–primarily in the characteristics of vol-
ume, variety, velocuty, and/or variability–that require a scalable architecture for
efficient storage, manipulation, and analyst. To my knowledge, NIST is the only
organization to specify the need of a scalable architecture alongside its defini-
tion of big data. NIST next defines the big data paradigm as “the distribution of
data systems across horizontally coupled, independent resources to achieve
the scalability needed for the efficient processing of extensive datasets”.
Perhaps one of the most popular definitions of Big Data is characterizing
data by “the four Vs”[39], volume, variety, velocity, and more recently, value.
The rest of this section will look at the V’s of big data in section 2.1, features
of big data in section 2.2, and examples of big data in section 2.3.
7
to 5 zettabytes across the globe. In 2014, more than 500 million photos were
uploaded every day and more than 200 hours of video per minute[70]. Tweets
generate 12 terabytes of data per day[79]. Perera et al.[72] expect with IoT, we
could see as many as 1 billion sensors online and generating data by the year
2020. Power meters generate upwards of 350 billion readings annually[79].
According to IBM[4], “90% of the world’s data has been created in the past two
years”.
Velocity is defined by Laney as “increased point-of-interaction (POI) speed
and, consequently, the pace data used to support interactions and generated
by interactions”, or more generally, the pace of data arriving and how long
it takes to analyze, store, and act on that data. Laney offers several solu-
tions to data velocity in using operational data stored that prioritizes production
data, front-end caching, point-to-point data routing protocols, and architecting
software in such a way that balances data analysis latency with stated real-
time requirements. Some examples of high velocity data include GPS tracking
data, web site click streams, social media interactions, and data from mobile
sensors[70]. Sharma et al.[79] also mention that over 5 million trade transac-
tions must be processed daily for fraud detection.
Laney describes data variety as data that is in “incompatible formats, non-
aligned data structures, and inconsistent data semantics”. Laney’s proposed
solutions to variety include profiling data to find inconsistencies, a standardized
XML data format, interprocess application communication, middlewares on top
of “dumb data” to provide meaning and intelligent, metadata management, and
more advanced indexing techniques. The main reason for the increase in va-
riety of data is due to the prevalence of internet connected devices and the
internet of things (IoT) explosion. We now see a wide variety of data that was
born analog such as sensors measuring our physical world like temperature,
solar radiance, power quality, seismic activity, acoustics, etc. We also see
much more variety in born digital data from the web, social media, government
databases, geospatial data, surveys, healthcare, etc[70].
Value is often added as a fourth ”V” and represents the value that can only
be gained by finding insights into big data. Manika et al. in their McKinsey
report[60] describe the value of big data after studying the results of long run-
ning big data healthcare projects. Value in terms of efficiency and quality of
data can be gained from big data using cross correlations and data fusion to
gain insights that was not possible before big data. Examples include recom-
mendations from Amazon or Netflix, predict market demand, improve health-
care, and improve security[62]. Chen et al.[24] believe that value is actually
the most important V for big data in that big data often has hidden values that
can be extracted using big data analytics. Sharma et al.[79] mention that busi-
nesses are more and more heavily investing into big data because of the hidden
values that could exist.
Finally, veracity has been mentioned alongside the other V’s[79]. As data
volume, variety, and velocity increase, there is a fear that the quality of the
data may be hard to ascertain or quantify. Thus, veracity is a measure of the
trustworthiness of the data.
8
2.2 Features of Big Data
NIST[69] provides us with a list common features of big data. One common
feature of big data is associated metadata. Metadata is data about data that
includes information about how/when/where the data was collected and pro-
cessed. Metadata describing the history of data provides for data provenance.
This becomes more important as data is transferred between many processes
with multiple transformations. Provenance provides a means to track how data
was transferred and how it was transformed. Semantic metadata is an attempt
at providing metadata with the ability of describing itself. Examples of semantic
metadata include the Semantic Web[15] and NIST’s Big Data Paradigm.
Another common feature of big data is that it can often be unstructured,
semi-structured, or non-relational data. Examples of these types of data in-
clude unstructured text, audio, video, and other natural phenomenon that cre-
ate digitized signals from physical samples. We will review in great detail the big
data persistence models in section 4 and big data analytical models in section
5 which will examine storage and analysis of unstructured and non-relational
data.
Big data sets tend to exhibit their own features. NIST categorizes big data
sets into two categories, data at rest and data in motion.
At rest data is data that has already been collected and is stored in cold
storage for non-realtime analysis. The defining feature of data at rest in relation
to big data is its volume. It’s estimated that by the year 2020, there will be
500 times more data than there was in the year 2011. Big data sets often do
not fit on a single server and can be spread out over multiple data centers.
Another feature of big data at rest is variety. Data sets can contain data in
multiple formats from different domains that in some way need to be integrated
to provide value and meaning. These features of data at rest give rise to the
need for distributed big data persistence including shared-disk file systems,
distributed filesystems, distributed computing, and resource negotiation.
In motion data is processed in real-time or near real-time in order to provide
immediate feedback. Examples of data in motion include event processing
systems from distributed sensors. In motion data can come in the form of data
streams. The main feature of data in motion is its velocity. That is the quantity
of data that is required to be acquired, persisted, analyzed, and acted upon is
large compared to the time window that these operations need to take place.
The amount of data that is needed to be acted on in a time window is too
much for a single system and has given rise to parallel distributed computing
architectures.
Sensor data is a type of big data that has its own defining features which
complicate acquisition, persistence, and analysis [24]. Sensor data is often cor-
related in both time and location, producing geospatial timeseries data. One of
the most common characteristics of sensor data is the amount of noise in the
data. Environmental sensing will always include noise because sensor data is
born analog[70]. Often sensor networks provide data in a large variety of un-
structured formats with missing, partial, or conflicting meta-data. Sensor data
9
Figure 1: The Phenomenon of Big Data.[24]
can also contain a large amount of data redundancy from multiple sensors in
similar locations. The problem very quickly becomes a needle-in-the-haystack
problem, or more aptly stated, finding the signal in the noise.
Chen shows other examples of big data in figure 1.
10
In a review on smart cities and big data authored by Hashem et al.[38], the
authors review many technologies surrounding big data and how big data can
play a role in smart city infrastructures of the future. They found the following
areas could benefit from using Big Data in a smart city.
Smart grids can improve energy generation efficiency by monitoring environ-
mental data, analyzing the power habits of users, and measuring consumption
from smart meters[54]. Smart grids can also make use of big data to perform
forecasting of future load generation[8].
Healthcare is another sector that can gain insights using big data. One
healthcare project monitored and cross correlated sensors in a neo-natal inten-
sive care unit in order to identify factors that could lead to an infection or early
warning signs of infections. Data that was collected included temperatures and
heartrates. Analysis allowed doctors to make diagnosis that they would have
missed otherwise without big data analytics[20]. Analytics of big data in health-
care using big data mining techniques can be used for diagnosing patients,
predicting illnesses, and predicting epidemics[76].
Smart cities can make use of big data to decrease traffic congestion and
better plan freight management by analyzing real time traffic sensors and using
predictive analysis to determine traffic routes ahead of time[47]. Of course,
Google and other companies already do this by analyzing mobile devices to
determine and predict traffic congestion.
Hashem et al.[38] cite several examples of successful smart city projects in
Stockholm, Helsinki, and Copenhagen.
3 Cloud Computing
NIST[64] defines cloud computing as “a model for enabling ubiquitous, conve-
nient, on-demand network access to a shared pool of configurable computing
resources (e.g., networks, servers, storage, applications, and services) that
can be rapidly provisioned and released with minimal management effort or
service provider interaction”.
The major five tenants of cloud computing as defined by NIST are as fol-
lows:
On-demand self-service where the user can provision network, storage,
and compute capacity automatically without the need for human intervention.
In essence, this becomes a virtual shopping mart where to the consumer it
appears that virtually unlimited cloud resources are available to chose from
and the user (or algorithm) can increase or decrease the utilization of cloud
resources at any time.
Broad network access where computation capabilities are performed over
a network and results are delivered to clients such as mobile devices.
Resource pooling where resources within a cloud such as storage, network,
or compute capacity are shared among multiple tenants. This allows for ef-
ficient utilization of hardware when generally virtual services are provided to
11
clients. Clients don’t necessarily know where their physical hardware is located
or provisioned.
Rapid elasticity is the ability to provision or remove cloud resources (i.e.
storage, network, or compute resources) at any time from a system as demand
on that system either increases or shrinks. Often times a human may not even
be involved in making these decisions and this scaling will take place automat-
ically using a set of predefined usage thresholds.
Measure service where cloud providers provide a means of metering the
compute resources that are used by clients. This provides a transparent means
of selling cloud computing resources to clients and clients can always know how
much capacity they have consumed.
Even though the NIST definition is starting to show its age, its major ten-
ants are still the underlying foundation of cloud software even today. Many
additional service and deployment models have been developed since NIST
defined cloud computing, but an understanding of the basic underpinnings is
required before exploring the rest of this vast filed.
Cloud computing frameworks can provide on-demand availability and scal-
ing of virtual computing resources for storage, processing, and analyzing of
very large data sets in real-time or near real-time. This model makes it pos-
sible to build applications in the cloud for dealing with Big Data sets such as
those produced from large distributed sensor networks.
By using the cloud as a central sink of data for our devices within a sensor
network, it’s possible to take advantage of central repositories of information,
localized dynamic computing resources, and parallel computations. With the
advent of cheap and ubiquitous network connections, it’s becoming easier to do
less processing within sensor networks and to offload the work to a distributed
set of servers and processes in the cloud[48].
Cloud computing includes both technical and economical advantages as
discussed in [16].
On the economical side, computing resources are pay-per-use. Businesses
can dynamically increase or decrease the computing resources they are cur-
rently leasing. This makes it possible to utilize massive amounts of computing
power for short amounts of time and then scale back resources when demand
isn’t at its peak. Before cloud computing these same businesses would be re-
quired to manage and maintain their own hardware for peak load without the
ability to dynamically scale their hardware if the peak load were to increase.
On the technical side, the localization of computing resources provides for
a wide variety of benefits including energy efficiency, hardware optimizations,
software optimizations, and performance isolation.
The rest of this section will dive deeper into cloud computing. In section
3.2 we look at sensing-as-a-service. In section 3.3 we look at sensors-as-a-
service. Then we example cloud deployment models in section 3.4. Mobile
cloud computing is briefly discussed in section 3.5. Finally, section 3.6 will look
at several issue relating to cloud computing
12
3.1 Cloud Computing Service Models
When discussing cloud computing, it’s useful to understand the service models
that traditional cloud computing provide. The three major service models as de-
fined by NIST[64] are Infrastructure as a Service (IaaS), Platform as a Service
(PaaS) and Software as a Service (SaaS).
At the lowest level is the Infrastructure as a Service (IaaS) model which
provides virtual machines that users have the ability to deploy and manage.
Users can install operating systems on these virtual machines and interact
with deployed virtual machines as if they were local servers. Consumers using
IaaS have the ability to manage and provision virtual hardware and network
resources, but do not need to worry about the underlying hardware or network
infrastructures. Other than providing virtual resources, consuming utilizing IaaS
still require a decent amount of systems administration knowledge develop, de-
ploy, and secure applications into the cloud using IaaS.
Sitting in the middle of the traditional cloud service models is the Platform as
a Service (PaaS) model. In this service model consumers don’t have the ability
to interact or provision individual cloud resources such as virtual machines,
storage, networking, or compute capacity. Instead, users have the ability to
deploy their application to the cloud via custom cloud provides tools or via a
cloud provided application programming interfaces (APIs).
At the highest level is the Software as a Service (SaaS) layer. Generally
speaking, applications in a SaaS environment are generally provided by the
cloud provider. In a SaaS model, users do not have the ability to control their
own cloud resources and users do not have the ability to upload their own
applications to the cloud. Users do sometimes have the ability to alter the
configuration of the software they are interacting with in this model.
Since the original service models were penned, there have been many
other types services models introduced. Several of these focus on IoT ser-
vice layers as noted by Botta et al’s[16]. These include Sensing as a Ser-
vice (S2 aaS), Sensing and Actuation as a Service (SAaaS), Sensor Event as
a Service (SEaaS), Sensor as a Service (SenaaS), Data Base as a Service
(DBaaS), Data as a Service (DaaS), Ethernet as a Service (EaaS), Identity
and Policy Management as a Service (IPMaaS), and Video Surveillance as a
Service (VSaaS).
Some of the above mentioned service models are of particular interest for
a survey examining cloud computing and sensor networks. We will examine
these in more detail in section 3.2.
13
Figure 2: Sensing as a service layers.[72]
The sensor and sensor owners layer includes physical sensors which can
sense an increasingly broad variety of natural phenomena and sensor owners
which can be personal, household, private, public, or commercial. Sensor own-
ers have the final say in what data gets to the cloud and who can access the
data once it is in the cloud using conditions and restrictions.
The sensor publishers (SP) layer manages the detection of online sensors
and acts as a middle-man between sensor consumers and sensors and sensor
owners. Sensors register with the publisher layer. Sensor data consumers
make requests to sensor publishers for specified types of data over specified
amounts of time.
The extended service providers (ESP) layer builds abstraction on top of
sensor publishers. A single ESP can interact with multiple SPs. ESPs can be
used to automatically request data from multiple sensors depending on criteria
provided to the ESP. This can be useful if the sensor consumer does not care
about the underlying individual sensors but instead queries data at a higher
level (i.e. all temperature data within a given polygon).
Finally, the sensor data consumers layer consist of data consumers who
must register with the ESPs and provide valid digital certificates. Consumers
can either deal with SPs directly or deal with ESPs. The benefit to dealing with
SPs is reduced cost of communications with the ESP. The benefit of dealing
with ESPs is higher level querying to data and the ability to query data across
multiple SPs.
The literature on sensing as a service appears to be lacking implementation
details. To Perera’s credit, he does mention quite a few open technological
challenges that need filled including architectural designs, sensor configuration,
sensor management, data fusion, filtering, processing, storage, and energy
consumption.
Rao et al.[74] mention several other research challenges for SaaS including
14
Figure 3: Sensor as a service layers.[9]
the need for a standard distributed computing framework for distributed big sen-
sor data as well as a framework for the real-time monitoring of sensor events.
15
consumed by the clients.
16
Crowdsourced data capture was an early application of MC[19] and contin-
ues to be a main driver of MCC. One example of this is an application, that
during amber alerts, allows users to upload pictures to the cloud where thou-
sands of photographs can be analyzed in parallel, helping to track down miss-
ing children[77].
Collective sensing and location based services are other major drivers of
MCC. Collective sensing takes advantage of the sensors on mobile devices
to get a global view of some physical phenomenon. Common sensors that are
used in mobile sensing includes microphones, barometers, WiFi chipsets, GPS,
and others. Examples of mobile sensing include distributed weather gathering,
acoustic classification like Lu et al.’s[58] SoundSense framework which uses
distributed Apple iPhones to classify audio events. There has been a surge of
research relating to predicting and analyzing real time traffic congestion [83],
[42], [44].
17
issues surface. We will look at these issues in greater detail when discussion
big data persistence models in later chapters.
Data segregation issues occur with multiple virtual resources sharing the
same physical resources. Data can be unintentionally leaked or stolen either by
attacking the cloud provider multi-tenancy framework or by attacking the virtual
servers directly through SQL injection, data validation, and insecure storage.
Large organizations with multiple employees having access to the cloud
can create data access issues. Clear policies must be defined, enforced, and
updated as to which virtual resources employees have access to.
At some level, consumers are required to trust that the cloud provider they
choose will implement security and privacy best practices within their cloud
architecture. This does now however resolve the larger issues of security vul-
nerabilities within cloud software and their communication components.
18
As the amount of hardware (servers, switches, etc) increases in a dis-
tributed system so does the amount of hardware errors. Availability refers to
the ability to remain operational even as parts of a distributed system drop in
and drop out[24]. Gilbert[37] defines availability as “every request received by
a non-failing node in the system must result in a response.” He goes on further
to point out that this definition does allow for unbounded computation since it’s
possible to wait for a result that never returns.
As the amount of hardware increases in a distributed system, the number
of communication packets that drops also increases. The ability to maintain
service in the face of some amount of drops refers to partition tolerance[24].
The above ideas are all tied together into the CAP theorem proposed by
Brewer[17] which states that in any shared data system, you can only achieve
two of the three following properties: Consistency, Availability, or Partition (tol-
erance). As we review different Big Data architectures, we will examine how
they fit into the CAP theorem and what guarantees they provide for these three
major areas of distributed data management.
We also believe, ease of use, maturity of the product, and community (or
commercial) support should also factor into the comparisons between data
models.
With the above factors in mind, we can begin categorizing and analyzing
several major Big Data model solutions.
The rest of this section is organized as follows. We examine distributed file
systems in 4.1, distributed key-value systems in 4.2, distributed column stores
in 4.3, distributed document data stores in 4.4, graph based storage solutions
in section 4.5, and finally look at some miscellaneous storage solutions in 4.6.
19
4.1.1 Google File System (GFS)
Ghemawat et al. at Google introduce The Google File System[36] in 2003.
Google wanted to design a distributed file system that not only provided scala-
bility, availability, reliability, and performance, which is something that previous
distributed file systems could do, but to design it in such a way that it meets the
needs of a big data world.
The authors note that in large distributed systems comprised of commodity
hardware, failure is the norm rather than the exception. GFS is designed in
a way to provide monitoring, error detection, fault tolerance, and automatic
recovery of distributed data.
Google noticed that files were becoming much larger and multi-gigabyte
sized files were becoming common and should be prioritized. The volume of
small files also continued to increase into datasets terabytes in size consisting
of billions of small files. GFS examines traditional I/O patterns and block sizes.
The authors stated that there were common read/write patterns among ser-
vices in their cluster. Reads generally consisted of large streaming reads of
small random reads. Writes generally consist of large sequential writes. Once
written, files are often not written to again. Small writes are supported, but not
prioritized.
GFS prioritizes the ability for multiple clients to append to the same file
concurrently in an efficient with minimal synchronization overhead.
Finally, GFS should prioritize high bandwidth over low latency with the as-
sumption that most use cases require processing bulk data at a high rate.
GFS provides a non-POSIX but familiar interface utilizing a directory/file hi-
erarchy and supports creating, deleting, opening, closing, reading, and writing
of files.
A GFS system consists of a single master node and multiple chunk servers.
Files stored on GFS are divided into fixed-size 64 megabytes chunks and repli-
cated a configurable amount of times on multiple chunk servers. The single
master node coordinates locations of chunks in the systems, but only provides
routing information. The master node hands off clients to chunk servers to
read and write to directly. A chunk block size of 64 MB provides the benefits of
less routing requests to the master node and reduces the amount of meta data
stored on the master.
The master node keeps track of three types of metadata: file and chunk
name spaces, mapping from files to chunks, and locations of chunk replicas.
All metadata is stored in memory so that access and manipulation is efficient.
The master’s metadata transformations are written to an operations log and
stored locally as well as remotely and allows for replacing the master in the
event of a failure.
The master has several other responsibilities as well. The master is in
charge of optimizing replica placement by considering reliability, availability,
bandwidth utilization. The master also takes into account the health of the
servers that chunks are on including disk utilization. The master can re-replicate
chunks when other chunks fail and also rebalance chunks in order to optimize
20
reliability, availability, bandwidth utilization. Files are not immediately deleted.
They are marked deleted, but their contents are cleaned up at a later time
during a garbage collection phase coordinated by the master.
Similar to other distributed systems, GFS can not make full ACID guaran-
tees and uses a relaxed consistency model. The master node handles names-
pace mutations (i.e. file creation) and locking so that these operations are
atomic. Mutations to files from multiple clients can cause consistency issues
when overwriting or inserting into a file. Appends to a file can be made to be
atomic if the client does not specify an offset and instead allows GFS to per-
form the append. Replicas are updated in the same order on all machines so
that in general they should all have the same state at the same time. There are
instances during failure or when clients cache chunk locations and read from a
stale replica where they may retrieve old or inconsistent data.
GFS provides almost instantaneous snapshot capabilities which can be
used to create copies, divergent branches of data, or create data checkpoints.
Snapshots are created using copy-on-write techniques where if data is not
changed, copies can simply point to the original data source, but anytime the
original copy changes, a new copy is created. Hence, copies are deferred to
first write.
In an interview with Sean Quinlan[63] (2010), one of the lead architects
of GFS, Quinlan discusses how GFS has changed in the 7 years since its
conception.
The biggest change they made to GFS was the creation of a distributed
master system rather than a single master. This change came about when
data volumes grew initially from tens of terabytes to tens of petabytes in the
span of a couple of years. The overhead of meta-data became too great for
a single massive master server to handle. Even though clients rarely interact
with the master server, something as simple as creating a file could end up
in a queue with thousands of other client requests as well. Google decided to
build a distributed master server network initially by placing a master server per
data center and later multiple master servers per data center. Each master can
index 100 million files in memory and there are hundreds of masters.
A major issue that Google ran into internally with GFS was bottle necks due
to the volume of small files some projects used. Each file in GFS has meta-
data associated with its namespace and the locations of its chunks. The over-
head of meta-data caused by many small files was so substantial that Google
mandated that applications must find a way to store their information in larger
chunks of data and put a quota on the number of files an individual client can
create.
As applications changed at Google from batch processing jobs that could
take hours to run to a need for more real-time data for user applications, it
was clear that Google’s previous focus on optimizing for throughput rather than
latency needed to change. One solution to this at the GFS level is to leverage
parallel writes and perform merges later. In this scenario, if a single write hangs
or fails, one of the N parallel writes may succeed. By leveraging parallelism,
applications utilizing GFS can give the impression of low latency. Distributed
21
masters also help with latency when performing operations on the file system.
Quinlan concedes that the initial design of relaxed consistency did cause
many issues with some of their users down the road and that if he could re-
design GFS he would push to serialize writes from multiple clients that can
ensure replicas remain consistent.
22
impact on the academic realm due the the sheer number of other applications
and frameworks that build on top of it.
4.1.3 Haystack
Beaver et al. at Facebook describe a distributed object storage system for stor-
ing petabytes of photographs in their 2010 paper Finding a Needle in Haystack:
Facebook’s photo storage[14]. As of 2010, Facebook had stored over 260 bil-
lion images.
Haystack was designed to provide high throughput rather than low latency
and allow Content Delivery Networks (CDNs) to deal with caching and latency
issues. Similar to the other distributed file systems, Haystack also wants to
provide fault tolerance given the statistical likelihood of software and hardware
failures. Another requirement for Haystack is that it is cost-effective along the
dimensions of cost per terabyte and normalized read rate. Haystack claims
cost per terabyte are 28% less than a similar NFS solution and provides 4x
more reads per seconds than a similar NFS based approach.
CDNs are able to cache the most recently accessed photos, but Facebook
noticed a pattern where older photos are also accessed frequently. This re-
sulted in a large amount of cache misses on Facebook’s CDNs. Cache misses
originally resulted in making requests to NFS shares where each directly con-
tained thousands of photos. The number of seeks required to find a single
photo started to become a bottleneck. Taking inspiration from GFS, Facebook
created Haystack as a distributed object store focuses on storing and manag-
ing its billions of photographs.
Haystack is split into three components. The Haystack Store, Haystack
Cache, and Haystack Directory.
The Haystack Store is the main improvement Haystack brings to the table
compared to the NFS approach in that Haystack stores individual photos and
metadata, called “Needles” into a large (on the order of 100 GB each) contin-
uous files (see figure 4). By using large files instead of many individual files,
they can reduce the amount of disk accesses required to find and load indi-
vidual photographs. 10 TB servers are split into 100 physical volumes of 100
gigabytes each. Each physical volume resides on a single machine, but logical
volumes can span multiple machines and multiple physical volumes. When a
photo is written to the store, it is stored in a single logical volume, but duplicated
multiple times over physical volumes to provide availability. The backing filesys-
tem is XFS which provides small enough blockmaps that they can be stored
in physical memory as well as efficient file preallocation. The store also keeps
an index file that allows the in-memory mappings to be recreated on failures or
server restarts without having to read through the entire file system.
A service called Pitchfork is used to continuously monitor, detect, and repair
services in Haystack. In the event of failures, new servers can be brought online
and data can be synchronized from the replicas.
The Cache component provides an intermediate layer of photo caching be-
tween CDNs and the Haystack Store. Requests for photos contain three URLs.
23
Figure 4: Layout of Haystack Needle within a physical volume.[14]
The first URL is a lookup key for the data in the CDN. If the CDN does not
have the image cached, the CDN URL is stripped and the photo request is for-
warded to the Haystack cache. Again, if the photo is not found in the cache,
the Haystack URL is stripped and the request is forwarded to the Haystack
Store. These requests are made over HTTP and the format of the requests
are as follows: http : // < CDN > / < Cache > / < M achineId > / <
LocaicalV olume, P hoto >.
The Haystack Directory servers provide mappings from logical volumes to
physical volumes. They are also in charge of determining whether a read photo
should be stored in the Haystack Cache or in a CDN cache. Finally, Haystack
Directory servers provide load balancing for writing across logical volumes and
reading across physical volumes.
On-top of these three services, Haystack provides a small set of other im-
provements including compaction which reclaims space of deleted photos and
custom binary encoding of indexes to improve space efficiency. In the end,
Facebook was able to improve on their photo storage and management by
showing performance and efficiency gains over their previous NFS based ap-
proach.
24
4.1.4 Comparison of Distributed File Systems
At a high level GFS and HDFS are very similar[50]. GFS is proprietary and used
inside of Google and supports many of their distributed applications. HDFS is
open source and similarly has a very large ecosystem of software designed
around it including both persistence and analytics.
HDFS was designed to be an open source clone of GFS and their high level
architectures are essentially the same. HDFS provides a permissions model
that is more similar to Unix like permissions where Google uses a proprietary
model.
Haystack was optimized for high throughput and provides efficiency by uti-
lizing CDNs. Haystack was designed specifically for photo storage at Facebook
where GFS and HDFS are both much more general purpose and have many
other use cases (as evidenced by the rest of this report).
4.2 Key-Value
The simplest data model for distributed storage is likely the Key-Value (KV)
data model [89]. In this model, every piece of data stored is indexed by a
unique primary key. Queries to that item all happen via its key to access the
value. Values in a KV system can be treated as blobs and the content of the
value is irrelevant to the semantics of KV stores. KV systems are popular due
to their simplicity and ease of scaling.
Keys in KV systems are the unit parallelism that provide the main means of
concurrency. If you want to guarantee transactions, then keys can be naively
sharded across servers. This does now however provide safety of data loss
in which case a system will strive to provide replication at the cost if ACID
compliance. Stores and requests can usually be achieved in O(1) even in
distributed systems[73].
If the major advantages are simplicity and query response time[24], the
major disadvantage to KV stores is the fact that they lack advanced query ca-
pabilities. The only way to query a database is by its unique key. Range based
queries, secondary, and tertiary indexes are only supported by a third party sys-
tems or application code. Joins can only be performed in application code[7].
This section will review Memcached 4.2.1, Dynamo 4.2.2, and Voldemort
4.2.3.
4.2.1 Memcached
One of the earliest examples of a distributed KV store is memcached[33] which
was created in part to power the dynamic content of 70 distributed LiveJournal
servers. The developer, Fitzpatrick, believed that scaling out on many tiny ma-
chines rather than up was the appropriate response for increased data loads.
Even though they had SQL database clusters, they could not provide caching
on those machines in front of the database due to limitations in address space
(32-bit). Fitzpatrick realized that there is a lot of spare memory on the network,
25
Figure 5: Mecached architecture. [33]
26
4.2.2 Amazon’s Dynamo
In 2007, DeCandia and others working at Amazon, released their paper Dy-
namo: Amazon’s Highly Available Key-Value Store[30]. Decandia et al. de-
scribe Dynamo as “a highly available key-value storage system that some of
Amazon’s core services use to provide an ’always-on’ experience”. Dynamo pri-
oritizes availability over consistency. In order to achieve those goals, Dynamo
makes extensive use of object versioning and application-assisted conflict res-
olution which we will look at in greater detail in the next couple of paragraphs.
As a KV store, Dynamo only provides get and put operations against a
unique key and is intended to be used with small values (< 1 MB). Amazon, cit-
ing experience, mentions that “data stores that provide ACID guarantees tend
to have poor availability”. Since Amazon’s goal for this data store was primarily
availability, Amazon decided to use a weaker consistency model which implies
that updates will eventually propagate to all peers, at some time in the future.
Other guiding requirements for the design of Dynamo include: symmetry where
each Dynamo process has the same set of responsibilities as all others, in-
cremental scalability where scaling out has minimal impact on the system or
its users, decentralization to avoid single points of failure, and heterogeneity
where each process is tuned to the individual hardware of the server it runs on.
With these design goals in mind, we next discuss how Dynamo implements
these technologies.
Large distributed systems are prone to network, software, and hardware
failure. To ensure high availability, Dynamo utilizes optimistic replication, where
updates are propagated to N peer nodes in an eventually consistent manner.
All nodes in Dynamo’s distributed network form a logical ring. Key’s are hashed
to determine a coordinator node within the ring to initially store the key value
pair. The coordinator node stores the key value locally as well as on N − 1
successor nodes going in a clockwise direction. The list of nodes that end up
storing a key are called a preference list. Every node in the ring is capable of
determining the preference list for any given key. Dynamo allows for gets and
puts on any node in a preference list, skipping over nodes that are unhealthy.
Because of this, situations can arise where there are conflicting copies of data
when an update hasn’t persisted to all nodes in its preference list yet. Dynamo
versions all data with a timestamps to allow multiple versions of data to exist
in the data store. Using semantic reconciliation, the network is usually able to
determine the authoritative version. In certain failure situations where data can
not be semantically reconciled, it is left up to the application to determine which
value is correct.
One of Dynamo’s stated goals is to provide incremental scalability. When a
new server comes online (or others go down), data must be partitioned across
the new nodes. Dynamic partitioning is achieved using consistent hashing tech-
niques as described in [49].
Gets and puts are performed in parallel on all N healthy nodes in the prefer-
ence list. When nodes become unhealthy, a sloppy quorum takes place and the
unhealthy nodes are temporarily removed from the ring, and subsequent puts
27
and gets happen on the successive neighbors. To protect against entire data
centers going down, replicas span multiple data centers. Dynamo uses Merkle
trees to detect inconsistencies between replicas and recover from permanent
failures. A gossip-based protocol is used to maintain node membership within
the ring.
Dynamo is used to power many of Amazon’s backend services and has also
been promoted as an offering of Amazon’s cloud services.
Riak[52] is an open source implementation of Amazon’s Dynamo and was
built using DeCandia’s paper.
28
Figure 6: Steps involved in the complete data deployment pipeline. The com-
ponents involved include Hadoop, HDFS, Voldemort, and a driver program co-
ordinating the full process. The “build” steps work on the the output of the
algorithm’s job pipeline. [82]
map reduce runs, and swapping of live index data back into Voldemort. This
process is visualized in figure 6.
Voldemort usages a storage format that memory maps indexes directly into
the operating system’s address space instead of creating a custom heap based
data structure. Voldemort does this to allow the OS to handle caching and page
caches which tend to be more efficient than custom data structures. When
data is fetched from Voldemort, it is chunked and stored across HDFS in such
a way that the file sizes do not become to small which Hadoop does not handle
efficiently. Index files contain the upper 8 bytes of the MD5 of the key and then
a 4 byte offset to the associated value in the data file. MD5 is used to provide
a uniform hash space. Voldemort only uses the top 8 bytes of the MD5 to
reduce the overall index size while still providing for a low amount of collisions
(∼.0004%). The main advantage of this approach is that it is much more cache
friendly. The downside to this approach is that there is increased complexity for
dealing with collisions when they do happen.
Voldemort generates its data chunks with a single Hadoop job. The Hadoop
job uses number of chunks, cluster topology, store definition, and input location
on HDFS. The mapper phase of the Hadoop job partitions data based on the
provided routing strategy. A three tuple of generated chunk set id, partition id,
and replication factor determine how data is routed to the correct reducer. The
29
Feature / KV Mem- Dynamo Voldemort
cached
Consistency Fully Weak / eventual Strict or eventual
atomic
Indexing Index by Index by key Index by key / Relationship indices computed
key offline via MapReduce
Availability None High High
Replication None Optimistic with Consistent hashing
consistent hashing
Operations get / put get / put get / put
Versioning No Yes Yes
reducer phase of the Hadoop job writes data to a single partitioned chunk set.
By tuning the number of chunk sets, build phase parallelism can be configured
and exploited.
Voldemort makes extensive use of data versioning. Symbolic links point to
the most recent version of data on a directory. This feature makes it possible
to quickly role back versions of data in failure conditions and also makes it
possible to quickly updates indexes from offline data set computations.
LinkedIn continues to use Voldemort to power their people you may know
data set as well as their collaborative filtering datasets. They found a 10x im-
provement in throughput against traditional MySQL solutions at scale. Volde-
mort also has smaller read latencies than MySQL. The biggest improvement
comes from building data sets using MapReduce which scales linearly where
the MySQL approach does not scale at all.
30
4.3 Column
Wide-column stores at first glance appear to be a regular SQL-like table turned
on its side where each table contains a “small” amount of rows, but each row
can contain a huge number of dynamic columns. Column stores are schema-
less and support dynamically named columns. Generally, rows are the unit of
parallelism in a column store and also the main means of distribution.
This section will review Google’s Bigtable 4.3.1 first as most column ori-
ented databases are inspired by Bigtable. Then we will review Cassandra 4.3.2,
HBase 4.3.3, and Hypertable 4.3.4.
31
measurement. Access control and memory management are performed at the
column family level.
Individual stored values can have multiple versions which are sorted in
timestamp order which makes tracking of updates possible. Bigtable also pro-
vides built-in mechanisms for only storing the last N versions of data or to only
keep data received in the last N amount of time.
Bigtable’s API allows for storing, querying, and deleting of data. The API
allows for meta-data manipulation for access control. It’s also possible to run
client side scripts on the server to manipulate and filter the data similar to Redis.
Bigtable utilizes Google’s distributed file system Google File System(GFS)
for log and data storage. Data is initially stored in memory, but as it grows, it will
be frozen, compacted, optionally compressed, and then written to disk. Once
it is on disk, it is immutable and can take advantage of gains in parallelism
due to this. Bigtable provides high-availability using a distributed lock service
called Chubby[18]. Chubby manages 5 active replicas of which one is elected
to serve requests. Chubby manages consistency between replicas using the
Paxos algorithm[23], which is an algorithm for forming group consensus in the
presence of failure. A master server is used to determine routing to and from
tablet servers. However, data does not run through the master server, but
directly to the tablet servers once routing has been established. The master
server is also responsible for monitoring and managing the health of tablet
servers and performing maintenance of creating new tablets in the presence of
scale or failure. A B+ tree is used to index into tablets.
Although very successful at Google, Chang does mention that using highly
distributed systems provides for many failures including memory and network
corruption, unresponsive machines, clock skew, distributed file system quotas,
and hardware issues. Next we look at several advances which aim to improve
on some of these flaws.
4.3.2 Cassandra
Cassandra was designed at Facebook and is described in Lakshman et al’s
2010 paper Cassandra: A Decentralized Structured Storage System[55].
Cassandra has many of the same goals as Bigtable including managing
large amount of structured data spread out across many servers with an em-
phasis on availability in the face of errors. Cassandra also mimics Amazon’s
Dynamo by integrating its distributed system technologies[24]. Cassandra is
weakly consistent during concurrent operations.
Partitioning data across a cluster is done using consistent hashing where
multiple nodes in a ring are responsible for overlapping data.
Cassandra stores data using the same column store data schema as Bigtable,
however Cassandra provides an extra level of abstraction called the super col-
umn family which can contain multiple related column families providing more
flexible data structures.
In the same vain, replication is performed on up to N configurable servers.
32
Replication strategies are managed using Apache Zookeeper which manages
distributed logs.
A gossip based protocol is used to determine membership within a Cas-
sandra cluster. A gossip based protocol is one where all nodes in a cluster
communicate (gossip) their state with their nearest neighbors and eventually,
the entire cluster knows the state of every other node. For determining failed
nodes, the Accrual Failure Detector[40] is used which provided a dynamic value
for each node in a system that represents a combination of software/hardware
failures as well as network congestion and load. In this way, failed node infor-
mation is more than just an up or down, but instead provides a more nuanced
view of the cluster. As new nodes are added to the cluster, they are added into
the ring in such a way that they can immediately relieve other highly congested
nodes.
Cassandra nodes use a gossip based protocol so that each node is able to
route any request to the proper node within the cluster.
4.3.3 HBase
HBase is an open source Apache project that was designed to provide Bigtable
like storage[51]. HBase makes use of HDFS for data storage as opposed to
GFS. Unlike Bigtable, HBase provides row-level locking and transaction pro-
cessing that can be turned off for performance reasons for large scale data
sets[24].
The HBase equivalent to Bigtables services are the HBaseMaster, HRed-
gionServer, and HBaseClient.
The HBaseMaster assign reads/writes to region servers, monitor the health
of region servers, and perform administrative tasks such as schema changes
or adding and removing of column families.
The HBaseRegion servers act as data nodes and actually perform writing
and reading of data.
The HBaseClient communicated with the master server in-order to deter-
mine which HBaseRegion servers contain the data that needs to be read or writ-
ten. Once determined, the HBaseClient can communication with the HBaseRe-
gion servers directly.
HBase provides strong consistency using locks and log records. Data is
written to the end of a log and compaction is performed to save space[22].
Operations performed on rows are atomic.
Operations in HBase can be distributed using MapReduce.
4.3.4 Hypertable
Hypertable is another open source Bigtable clone[51]. What sets Hypertable
apart from the other Bigtable clones is its ability to store data using multiple
third party filesystems. For example, Hypertable can make use of HDFS, Cloud-
Store, or be ran on top of a normal file system.
33
Feature / KV Bigtable Cassandra HBase Hypertable
Consistency Eventual Weak / eventual Eventual (Atomic Eventual
at rows)
Indexing Row key, column Row key, column Row key, column Row key, column
key, timestamp key, wide row, family, column key
custom key
Query Language C++ API CQL Pig Latin / HQL HQL
Availability High (Chubby) High High High
Replication GFS HDFS HDFS HDFS
Operations gets, puts, range gets, puts, range gets, puts, range gets, puts, range
queries, queries, queries, queries,
meta-data meta-data meta-data meta-data
updates updates updates updates
Versioning Yes No Yes Yes
34
4.4 Document
Document based data models allow data to be stored in structured documents
including KV pairs. The underlying document structure can be anything as
long as its structure is something that the store can understand. Common doc-
ument formats include XML, JSON, YAML, BSON, RavenDB, and others[24].
Documents can also store other documents recursively.
Even though documents are restricted to their underlying structure (i.e. JSON,
YAML, etc), document databases do not impose a schema like traditional RDMS
databases. This allows for painless transitions of data storage when the under-
lying data change[79]. This is of special concern with heterogeneous sensor
networks which can produce large varieties of data in different and changing
formats.
Document based data stores often present less of an impedance mismatch
between data structures in a programming language and their underlying repre-
sentation in the data store. Compared to the way data is structured in a RDMS,
it’s often easier to understand the underlying data structure in a documented
based store. There are many common libraries for converting between docu-
ments in a document store and a data structure in a particular programming
language. Compared to RDBMS, fields in document stores generally do not
normalize their data which also enhances the readability of the underlying data
structure[73].
An advantage that document based stores have over KV stores is that its
possible to create more complex queries. Not only can you query on the pri-
mary key, but its possible to create queries over any keys in the document in-
cluding in sub-documents. Indexes can be created on key and sub-keys. Many
document based stores provide range and geospatial based queries. This ad-
vantage alone makes document based stores a decent choice for distributed
sensor data.
Document databases that we explore in this review include MongoDB 4.4.1,
CouchDB 4.4.2, and SimpleDB 4.4.3.
4.4.1 MongoDB
MongoDB is a highly popular, distributed document-based database[66]. The
name mongo comes from humongous as it was designed to work with very
large data sets.
As a document database, MongoDB stores data in data structures that re-
semble JSON documents. Internally, these documents are serialized into a
format called BSON which essentially provides a binary encoding over top of
JSON data structures. Documents in MongoDB can contain one or more fields
where each field can store strings, ints, longs, dates, floating point numbers,
booleans, and 128-bit decimals. Fields can also be arrays of data, binary data,
and embedded documents. This structure is very flexible and allows great
customization when designing data models. Unlike relational databases, Mon-
goDB documents tend to store all required information in a single document, a
35
form of de-normalization.
Applications must use a specific driver developed for their programming
language to interact with a MongoDB[24].
MongoDB has a very rich query model and provides support for the follow-
ing types of queries. Key-value queries similar to those in key-value stores,
range queries based on inequalities (less than, greater than, equal to), geospa-
tial queries working with points, lines, circles, and polygons such as nearest
neighbor or point-in-polygon, textual search queries, aggregation queries (i.e.
min, max, average, etc), left-outer joins, basic graph traversal queries, and
MapReduce based queries where JavaScript is moved to the data and com-
puted at the data.
A special type of query that MongoDB supports is called covered queries
which are queries taht return results containing only indexed fields. These
types of queries are very efficient as MongoDB aggressively stores indexes in
memory.
MongoDB also supports a wide range of index types. Partial indexes ap-
ply a filter at document creation time and only index documents that meet the
filter criteria. Sparse indexes only index documents that contain a specified
field (which they may not since MongoDB does not enforce schemas). Text
search indexes provide advanced linguistic rules for stemming, tokenization,
case sensitivity, and stop words. MongoDB provides several secondary index
types that can be used on any field in a document. These include unique in-
dexes that are used to reject newly created documented that have the same
value. compound indexes which group several predicates together to index on,
array indexes which will index each element in an array, TTL indexes which al-
low users to define how long a document lives in the database, and geospatial
indexes which provide the special geospatial querying capabilities mentioned
above.
For scalability, MongoDB provides automatic sharding. As the size of a
cluster increases or decreases, MongoDB automatically balances data across
the available nodes. MongoDB provides three types of sharding. Range based
sharding is used when applications need to optimize for range based queries.
Hash based sharding is used to provide a uniform distribution of data across
nodes in a cluster. Zone sharding gives the application developers complete
control of where and how their data is sharded.
MongoDB is ACID compliant at the level of a document. Fields and sub-
documents can be written to in-parallel and and errors or consistency issues
can be rolled back. Data is replicated in replica sets that are self-healing. Repli-
cas will automatically failover in the face of failure. MongoDB is strongly consis-
tent by default and all reads and writes happen at a single master server. If that
master fails, then a new master server is elected using the Raft consensus al-
gorithm. Data is replicated from the master server to secondary servers using
something called the oplog. The master replica set writes a the oplog, and all
secondary servers replay the oplog to replicate the data. Whereas other doc-
ument stores provide versioning of values, MongoDB provides atomic updates
of fields[22].
36
MongoDB can store data using multiple backends including an in-memory
storage solution, a disk based storage solution, or a hybrid based solution.
Read consistency is configurable and clients can configure to read from
replica sets which which may be eventually consistent.
MongoDB supports various security techniques including authentication and
authorization, auditing, and encryption.
4.4.2 CouchDB
CouchDB is a moderately popular document based database that stores data
uses JSON and all communication with the database is performed over a REST
interface[79]. CouchDB is an open source project originally developed at IBM
and now part of the Apache Software Foundation. and is described in great
detail in Anderson et al’s book CouchDB: The Definitive Guide[10].
Interacting with the database is performed over a RESTful interface using
HTTP methods passing JSON back and forth. Javascript is used as the query
language of CouchDB.
CouchDB is build on top of a B-tree. Also accesses, puts, and gets of
documents in the database take O(logN ) time. Because of this, CouchDB’s
query model can only perform lookups by key or by range as both query types
are well supported by B-tree data structures. This was done for speed and
efficiency reasons. Queries in CouchDB are called views and are defined using
JavaScript[22].
CouchDB handles the velocity of big data by providing lock free access
to documents. Instead, CouchDB used Multi-Version Concurrency Control
(MVCC), In other words, documents are versioned, so instead of locking the
database to perform updates or insert new documents, new values are created
with an increased version number. Versions also allow for the easy creation of
snapshots.
CouchDB provides strong consistency on a single machine and eventual
consistency over a cluster. CouchDB uses a process called incremental repli-
cation where changes are periodically copied between servers. CouchDB
spreads data out in what it calls a shard nothing configuration. In this type
of configuration, each node is independent and self-sufficient so there can not
be a single point of failure anywhere in the cluster. This is opposed to sharding
where data is spread out over a cluster. In situations where data is updated for
the same item on two different servers, CouchDB provides automatic conflict
resolution and will pick the most recent version. It’s up to developers to change
the version if this type of conflict resolution does not meet their application’s
requirements.
Chen et al[24]. add that updates to documents in CouchDB require down-
loading and re-uploading entire documents. That is, updates on individual
fields are not possible. It’s also possible to tune the replication mechanism
to create any custom replication topology, one area that’s quite different than
the other members in the document database space.
37
4.4.3 SimpleDB
SimpleDB[22] is a distributed document database developed by Amazon and
provided as a database as a service. It is built on top of Amazon’s cloud archi-
tecture.
SimpleDB is one or the older (2007) and simpler document databases. Sim-
pleDB does not support nested documents like MongoDB. The only operations
supported by SimpleDB are select, delete, getAtrributes, and putAttributes.
SimpleDB supports eventual consistency, but does not provide a way for
alerting clients when conflicts arise This is due do the fact that SimpleDB
does not use sharding, but instead uses asynchronous replication, similar to
CouchDB. The difference betweem SimpleDB and CouchDB is that CouchDB
maintains multiple versions of values where SimpleDB does not have a similar
mechanism.
SimpleDB does not provide any manual index control. Instead, you can
define multiple domains per document. Domains can contain different con-
straints. This method allows data to be queried in multiple ways, using different
constraints, using different domains and provides some facility for mimicking
indexes.
Even though SimpleDB provides a rather simple data model, it’s provided
as a service build on top of Amazon’s cloud architecture. This allows you to
scale as needed and only pay for what you use.
4.5 Graph
Graph databases are unique in the way that they store and relate data. Graph
databases specialize in efficiently managing heavily linked data[22].
Data in graph databases are related along node, edges, and properties
of a graph data structure[79]. Items in graph databases directly point to their
nearest neighbors and graph algorithms are used to traverse the data model.
38
Feature / KV MongoDB CouchDB SimpleDB
Consistency Eventual (atomic at document Eventual Eventual
level) (Atomic at
rows)
Indexing Primary, single field, compound, Local, global, Defined
multikey, geospatial, text, hashed, secondary, through
partial, sparse, TTL, covered MapReduce views, domains
geospatial, test
Query Language Multiple APIs HTTP / JavaScript HTTP /
SimpleDB
Query
Language
Nested Documents Yes Yes No
Availability High / Replica sets High High
Replication Sharding Full duplication Async
replication
Operations gets, puts, updates, rich queries gets, puts, range selects,
queries, updates deletes,
getAttributes,
putAttributes
Versioning No Yes No
39
Graph databases are an integral part of big data, especially in social rela-
tions. Social networks such and Twitter and Facebook store massive amount
of social interactions using graph databases. This data can be used in cluster
analysis or other forms to learn about the interactions and communities within
a platform. Graph databases are also used in location based services for path
finding or recommendation systems[22].
This section will review Neo4J 4.5.1 and several miscellaneous graph databases
4.5.2.
4.5.1 Neo4J
Neo4J is perhaps the most popular graph database. Neubauer describes
Neo4J works in his article Graph Databases, NOSQL and Neo4j[68] as an
open source project written in Java that has been optimized for graph algo-
rithms and traversal. Neo4J is fully ACID compliant on individual machines.
Neo4J provides a native Java API and also has bindings for many third party
programming languages.
According to a white paper released by Neo4J, Understanding Neo4j Scal-
ability [67] data is written to a master node and then the master replicated the
data in parallel to all nodes in the cluster at one time. Reads can then be per-
formed on any of the slave nodes and receive a fully consistent view of the data.
This provides high availability as each node contains the entire view of a graph.
In the result of a master failure, a quorum is performed and a new master is
elected. Neo4J does not offer sharding because portioning a graph across a
set of servers has been shown to be NP complete.
A single server in a Neo4J database can contain 34 billion nodes, 34 billion
edges, and 68 billion properties. It’s noted that as of 2013, only Google has
been able to push the limits of Neo4J and that for most big data uses cases,
these limitations are currently fine.
An important feature of Neo4J is the ability to manage big data sets. Neo4J
achieves this by utilizing the index-free adjacency property. That is, each node
has pointers only to its direct neighbors. When navigating or working with the
graph structure, Neo4J only needs to consider its current node and its neigh-
bors. As the size of data increases, the amount of time it takes navigate and
process the data remains constant.
This provides high performance up to a limit. With enough data, you start
to hit the physical constraints of the system including RAM and cache misses.
Neo4J implements cache based sharding where a cache is sharded across
servers in such a way that caches on an individual server will contain mappings
to data that has high locality in the graph. This allows reads to be performed
across the cluster in a cached way provided effective load balancing. It should
be noted that Neo4J does also provide some basic indexing on nodes for things
such as text search.
Neo4J was not built with high write loads in mind and therefor does not han-
dle velocity well in practice. This limitation tends to be acceptable as most work
40
loads on graph databases tend to be read intensive, which Neo4J is optimized
for.
Neo4J is schemaless and the nature of the data structure can be updated
dynamically[79]. Neo4J provides querying capabilities using a build-in query
language called Cypher which makes heavy use of pattern matching to query
mine the graph data structure.
41
Feature / KV Neo4J FlockDB OrientDB
Consistency Atomic updates to individual objects, Eventual Eventual (Atomic at
ACID in single database, eventual in rows)
cluster
Indexing Single field Primary Full system indexing
and
secondary
Query Language Cypher other APIs NA SQL-like
Schema Schemaless NA Schemaless,
semi-schemaless, full
schema
Replication Async replication Sharding Sharding
Operations gets, puts, updates, graph traversals gets, puts, gets, puts, updates by
set index and graph
operations traversals
42
ally consistent reads across the entire database as a specific timestamp which
allows for consistent MapReduce pipelines, consistent backups, atomic schema
updates. These features are normally not present in a distributed storage
system, but Spanner is able to provide them by assigning globally meaning-
ful timestamps to transactions. Spanner guarantees the absolute ordering of
timestamps at a global scale.
Spanner is able to provide absolute timestamps on a global scale by mak-
ing use of their TrueTime API which not only exposes clock timestamps, but
also exposes clock uncertainty. Google uses a distributed network of atomic
clocks and GPS receivers to maintain global synchronization. In the presence
of uncertainty, Spanner can dynamically change its execution speed to match
timestamp uncertainty. In the end, Spanner provides for uncertainty that is less
than 10ms.
43
5 Big Data Analytics
As we saw in the previous section, big data is generally stored on distributed
systems. Data can be spread out across hundreds or thousands of servers.
These systems may be heterogeneous and due to their distributed nature, are
not great candidates for technologies such as Message Passing Interface (MPI)
or Open Multi-Procressing (OpenMP)[24].
With a large volume of data, it’s not possible to download entire data sets to
a single server for analysis. Therefore, we need technologies that can distribute
computation over many server. Because of the wide variety of data to process,
we need technologies that learn about data sets and extract signals from a a
large volume of noise.Because of the velocity of data, we require technologies
that perform analytics on real-time streaming data.
This section reviews distributed computing platforms that are able to in one
way or others meet the demands of processing distributed big data. First we
explore MapReduce and Hadoop related technologies in 5.1. Then we look
at several other big data analytics engines that are not part of the Hadoop
ecosystem including Dryad 5.2, Pregel 5.3, GraphLab 5.4, Storm 5.5, and Flink
5.6.
5.1.1 MapReduce
We start by reviewing MapReduce, which is instrumental in influencing the de-
signs of many of the distributed computation frameworks that we will discuss is
future sections.
MapReduce was designed at Google by Jeffrey Dean and Sanjay Ghe-
mawat and is described in their 2008 paper MapReduce: Simplified Data Pro-
cessing on Large Clusters[28].
MapReduce’s stated goal is to provide “a simple and powerful interface that
enables automatic parallelization and distribution of large-scale computations,
combined with an implementation of this interface that achieves high perfor-
mance on large clusters of commodity PCs”.
The authors and others at Google noticed that individual teams at Google
were writing custom algorithms for dealing with large amounts of distributed
data on GFS. These computations included web crawling, log analysis, inverted
44
indices, different views on graph structures, aggregate summaries, query statis-
tics, and others. Engineers notices that a common pattern among many of the
custom algorithms was transforming every item of a collection, and then group-
ing results by a key. The abstraction of this idea produced MapReduce.
The name MapReduce was chosen as a homage to the map and reduce
functions of LISP programming languages. The authors point out that by taking
a functional approach to distributed computation, you get parallelism for free.
MapReduce is split into two phases, a map phase, and a reduce phase. Key
value pairs are used as both the input and the output format for MapReduce
computations.
In the map phase, key value pairs are transformed using a user provided
function into intermediate key pair values. The intermediate values are grouped
by key and passed to the reduce function.
The reduce phase takes as input a list of intermediate key value pairs, and
for each key, uses a user defined reduction function where to reduce the list of
values associated with that key to a smaller set of values, usually 0 or 1. The
actual types of the map and reduce functions are given as map(k1, v1) −→
list(k2, v2) and reduce(k2, list(v2)) ←− list(v2).
The paper provides a simple example to explain MapReduce in terms of a
word URL access frequency (word count) example. Imagine you have many
logs URLs accesses stored on a large array of servers and you want to get a
mapping of URL to the number of times that URL has been visited. This can
be accomplished in MapReduce in the following way. The map phase maps
each URL to an intermediate pair of < U RL, 1 >. The reduce phase will group
by key (URL), and then reduce the values by addition. The end result of this is
a set of pairs that contain an individual URL and the number of times its been
accessed.
The actual architecture of splitting up the work is described in great detail
in Dean’s paper. Here is a short summary. A MapReduce job is initiated by
a user library which defines the input data, map task, and reduce task. This
job is replicated on many nodes in a cluster. One of the nodes is elected the
master node who assigns M map tasks and R reduce tasks over idle workers.
Worker nodes with assigned map tasks transforms their input data with the user
defined function and buffers intermediate pairs in memory and sometimes to
disk. Worker nodes that were assigned reduce are provided the locations of
intermediate data , groups together pairs with the same key, and reduces the
values based on the user provided function. The results are these reductions
are stored in R files that have to be then managed by the user application. This
process is shown in figure 7.
The elected master node has several other duties including storing the state
of each map and reduce task and also storing the locations of the intermediate
pairs produced by the map task.
MapReduce was designed to work in situations of large scale failure. Maps
tasks that fail can simple be re-ran. If a worker node goes down, that worker’s
tasks are assigned to another worker and the computations are re-ran. The
only failure condition is in the event of a master failure. In such a case, the
45
Figure 7: MapReduce execution overview. [28]
46
also has a very large ecosystem of related technologies associated with it as
we will see in future sections.
5.1.3 Twister
Twister, described by Ekanayake et al.[32], provides extensions and improve-
ments on top of the MapReduce5.1.2 programming model that open up dis-
tributed computing to a wider class of problems.
The authors of Twister note that general MapReduce works for a single
problem and produces a single result. Their are many iterative algorithms that
can be described using MapReduce including k-means, pagerank, determinis-
tic annealing clustering, and others. Twister aims to provide a framework for
performing iterative MapReduce.
Twister adds a configure phase to both the map and reduce tasks which
allow it to read static data before it performs one of its tasks. Twister defines
static data as data that doesn’t change between iterations and variable data as
data that gets passed between iterations. These two types of data are common
in iterative processes.
Twister provides the idea of long running tasks that last for the duration of a
job which hold static data as it’s inefficient to load static data every time a task
is queued and performed.
Twister aggressively decreases granularity fo intermediate data to reduce
volume on the network. Map tasks in Twister support partial reduction during
mapping to achieve this. Hadoop and MapReduce use a combiner after the
mapped task to achieve something similar.
Twister’s runtime aims to load all data into main memory to increase effi-
ciency.
A publish-subscribe architecture is used for all communication and data
transfers.
Twister provides fault tolerance by allowing the ability to save application
state between iterations. Similar to MapReduce, Twister does not provide fault
tolerance in the case of a master server failure.
5.1.4 Pig
Pig[71] is an Apache project that uses a custom language, Pig Latin, which
mixes the declarative style of SQL with the procedural style of map-reduce.
The resulting language makes it easy to develop MapReduce jobs and run ad-
hoc queries as Pig Latin compiles directly to MapReduce jobs.
Pig Latin claims to make parallel programming easier as it’s a language de-
signed to exploit data parallelism. Since Pig is in charge of interpreting Pig
Latin and generating MapReduce jobs, the Pig interpreter can utilize optimiza-
tions that the developer may not have thought of using MapReduce directly.
Pig interprets commands issued in Pig Latin. These commands are then
use to dynamically form MapReduce jobs that can be triggered from the Pig
interpreter.
47
5.1.5 Impala
Impala[1] is currently an incubating Apache project with similar goals to Pig5.1.4.
The system is described in Kornacker et al’s paper Impala: A Modern, Open-
Source SQL Engine for Hadoop[53].
Impala was designed to be ran on top of the Hadoop ecosystem and can
work with HDFS, HBase, YARN, and other components from the ecosystem.
Impala is able to naively read many of the Hadoop serialization formats includ-
ing Parquet, Avro, and RCFiles.
Impala does not use the MapReduce model directly, and instead uses dis-
tributed daemons spread across to cluster to access data on the distributed
file systems on the distributed nodes. Impala aims to provide low latency and
high concurrent reads to distributed data and does not support update or delete
operations.
The front end of Impala compiles SQL into executable plans to be ran on
Impala’s backend. Query planning involves breaking down an SQL query by
features using semantic analysis and then generating a distributed execution
plan using cost estimation and other partitioning algorithms.
On the backend, Impala makes use of runtime code generation using the
LLVM compiler. This approach allows Impala to make use of the large number
of optimizations already built into LLVM.
Impala works along side YARN using a service called Llama. Using YARN
and Llama together, Impala is able to optimize the resource management needed
to access data and efficiently query the cluster.
5.1.6 YARN
YARN, or Yet Another Resource Negotiator[85], was designed to be the next
evolution of Hadoop5.1.2, sometimes referred to as Hadoop 2. YARN adds sev-
eral benefits to the initial Hadoop MapReduce framework that we will discuss
in this section.
Several of the limitations YARN improves upon over the initial version Hadoop
MapReduce include scaling resource and task management, providing higher
availability without the single point of failure of a JobTracker node, dynamic
resource utilization, capability to perform real-time analysis, message passing
semantics, and ad-hoc queries.
YARN provides pluggable support for more computation engines other than
just MapReduce. Now MapReduce is just one of many computation engines
available to run within the Hadoop ecosystem.
YARN provides for multi-tenancy within Hadoop. This means multiple dis-
tributed computation engines can be running at the same time on the same
Hadoop cluster, where before it was only possible to run one job at a time.
YARN at its core is a resource manager which tracks, enforces allocation,
and arbitrates contention among tenants in the system. The resource man-
ager provides other services including monitoring of workloads, security, man-
aging availability, and providing pluggable computation frameworks. YARN is
48
designed to provide cluster resource management and scale to thousands of
servers. YARN is further designed to handle dynamic scaling of clusters.
The other half of YARN is the application master whose job it is to coordi-
nate the logical plan of a single job by requesting resources from the resource
manager.
YARN is completely backwards compatible and existing MapReduce jobs
will run on top of YARN.
5.1.7 Spark
Apache Spark, described by Zaharia et al. in their paper Spark: Cluster Com-
puting with Working Sets[92], is described as “a fast and general engine for
large-scale data processing”.
Spark builds on Hadoop5.1.6 by optimizing in-memory operations and stor-
ing intermediate results for use in iterative applications. Spark also provides
the concept of a new data type, the resilient distributed data sets (RDDs).
The authors mention that MapReduce fails in two major areas. The first area
is iterative jobs which frameworks like MapReduce do not support. It’s possible
to emulate iterative jobs by running multiple jobs, but each job must reload all
data from disk. Spark provides a means to store intermediate results between
runs. The second issue with traditional MapReduce systems is the latency
involved with running interactive analytics on top of Hadoop. The authors note
that Pig and Hive can make these ad-hoc queries easier, but there is still a
large latency between queries as each query is its own MapReduce job.
The main abstraction that Spark provides is the RDD. RDDs contain collec-
tion of read-only objected that are automatically partitioned across machines in
a cluster. RDDs can be automatically rebuilt in the event that partition or node
is lost.
RDDs can be created from the filesystem (HDFS), loaded from a data store
such as Cassanda, HBase, or S3, by parallelizing a collection of Java, Scala,
Python, or R objects, by transforming an existing RDD, or by changing the
persistence of an RDD.
Once data is represented as an RDD, there are many parallel operations
that can be performed on the data including reduction, collection, filtering, map-
ping, and grouping. Each action performed on an RDD creates a new RDD
with the applied transformation. Thus the state between any transformation is
maintained by the RDD as they are immutable.
The Spark API mostly contains functional transformations and reductions
that are familiar to many programmers. This aids in the development of dis-
tributed computation of massive data sets by delegating the scheduling, data
management, and computational pipeline to Spark. Programmers can focus
on the algorithm and not worry about the details of distributed computation as
they might need to working directly with MapReduce.
49
5.1.8 Spark Streaming
Spark Streaming is an extension to Spark5.1.7 which allows for the computa-
tion and windowing of real-time streaming data. This extension is described
in detail in Zaharia et al.’s 2012 paper Discretized Streams: An Efficient and
Fault-Tolerant Model for Stream Processing on Large Clusters[93].
The stated goals of Spark Streaming are to provide scalability to hundred
of nodes, low replication overhead, second-scale latency, and second-scale
recovery from faults of stragglers.
Spark Streaming introduces a new stream processing model build on a data
type called discretized streams (D-Streams). D-Streams are broken up into
time windows. D-Streams avoid the problems of traditional stream processing
by structuring computations “as a set of short, stateless, deterministic tasks
instead of continuous, stateful operations”.
D-Streams store state in memory using RDDs which can be recomputed in
parallel in the case of node failure.
Each D-Stream contains a series of values inside of a small time window.
These values are submitted to Spark and processed using parallel operations
in a batch processing manner. The small window sizes give us the impression
the data is being processing in real-time.
The hybrid batch real-time processing notion of Spark Streaming works well
for algorithms that require sliding window computations over a set of data, as
each batch can be configured to be one view over that sliding window.
5.1.9 GraphX
GraphX[91] is a graph processing framework built on top of Apache Spark5.1.7.
GraphX aims to provide the advantages of data-parallel and graph-parallel sys-
tems into a single framework.
GraphX extends on Spark’s concept of an RDD by adding a resilient dis-
tributed graph (RDG). An RDG is immutable and transformations on one RDG
result in the creation of a new RDG. Similar to Spark, this allows the state to
me maintained between different versions of RDGs.
GraphX uses edge cutting and vertex cutting heuristics to achieve efficient
partitioning of data within a graph to maintain high locality within nodes of a
cluster.
GraphX utilized Gather-Apply-Scatter (GAS) execution model. In this model
the gather stage gathers messages assigned to the same vertex. In the apply
stage, a map operation is applied to all messages. In the scatter stage, each
vertex property is scattered to all adjacent vertices.
By taking a data-parallel approach to graph processing GraphX provides
the following parallel operations on RDGs, retrieve lists of vertices and edges,
filter vertices and edges by predicate, create subgraphs from predicate, map
transformations on vertices and edges, and aggregate date about neighbors.
GraphX also provides a graph parallel approach to processing by showing
that data-parallel transformations can be mapped to graph-parallel operations.
50
GraphX proves this by implementing the entirety of both Pregel and Power-
Graph using RDGs in less than 20 lines of code each.
The authors mention that future work will involve creating a declarative lan-
guage ontop of GraphX for constructing and working with RDGs.
5.1.10 MLlib
MLlib is a machine learning library designed to work on top of Spark. It is de-
scribed by Meng et al. in their 2016 paper MLlib: Machine Learning in Apache
Spark[65]. MLlib was designed to provide machine learning capabilities to very
large data sets taking advantage of machine learning algorithms that exploit
data or model parallelism.
Spark provides great support for distributed iterative algorithms by efficiently
storing intermediate results in memory to be used in future iterations. By build-
ing MLlib on top of Spark, any improvements to efficiency in Spark will get
passed on to the ML algorithms.
MLlib supports the following types of ML algorithms in parallel: logistic re-
gression, naive Bayes, linear regression, survival regression, decision trees,
random forests, alternating least squares, K-Means clustering, latent Dirichlet
allocation, sequential pattern mining, and many variations on these.
MLlib not only provides a wide range of algorithms on top of Spark, but also
integrates closely with Spark. Data can be imported and exported from MLlib
using any of the methods that Spark already supports. MLlib also works closely
with other technologies in the Spark ecosystem. MLlib can interface with Spark
SQL for rich data querying. Interfacing with Spark Streaming allows for real-
time learning. It’s also possible to analyze GraphX data structures using ML
algorithms.
MLlib also provides utilities for defining ML workflows and pipelines. ML-
lib specifically provides support for feature transformations, model evaluation,
parameter tuning, and persisting models to and from the ML pipeline.
51
closely with Spark’s big data tools and libraries. This effectively allows pro-
grammers to query data and perform Extract, Transform, Load (ETL) opera-
tions using a declarative language while at the same time providing the ability
to run large scale graph processing jobs and machine learning algorithms over
those queried data sets.
Spark SQL provides automatic support for semi-structured data sets such
as JSON. Spark can read JSON records and automatically infer the schema
based on the records that have been fed into it.
The main stated goals for this project include supporting relational process-
ing within Spark programs and on external data sources, provide high perfor-
mance using established DBMS techniques, support new data structures in-
cluding semi-structured data and external databases, and to enable extension
with advanced analytics algorithms such as graph processing and machine
learning. Spark SQL accomplishes these goals by adding two major compo-
nents to the Spark ecosystem. A DataFrame API and an extensible query
optimizer called Catalyst.
The DataFrame API is similar to RDDs in Spark with the key difference
that they keep track of their own schema and support relational operations
not supported by RDDs. DataFrames are a distributed collection of rows with
a homogeneous schema. DataFrames can be constructed from many data
sources including RDDs, the file system, or other distributed data stores such
as HBase.
DataFrames are created using the programming language API into Spark.
This differs from other languages on top of Spark such as Pig where instead of
creating an entirely new programming language, the DataFrame API is simply
added the Spark Framework and provides a DSL on top of the programming
language the developer is already using. In this way, developers do not need to
learn a new programming language to enjoy the benefits of using Spark SQL.
DataFrames support most common relational operations including select,
where, join, and aggregations like groupBy.
DataFrames are lazy by default. That is, when applications define DataFrames,
only a logical plan is formulated to query the data and transform the data, but
no operations take place until a terminal action is called. The logical planning
is part of the Catalyst optimizer, Spark SQL’s second major contribution.
The main job of the Catalyst optimizer is to take queries constructed using
DataFrames, and to create logical and physical plans of execution that get
passed and ran on Spark using RDDs behind the scenes.
Catalyst builds an abstract syntax tree (AST) from provided DataFrame op-
erations and applies a combination of rule and cost based optimization trans-
formations on that tree. The tree is then used to to formulate a set of physical
plans. Cost analysis optimizations are applied to the physical plans, and a sin-
gle physical plan is selected. Spark SQL creates and optimizes RDDs and their
transformations across Spark to execute the job and produce a result. This pro-
cess is visualized in figure 8.
Catalyst and Spark SQL were designed with extension in mind. Developers
can add external data sources with varying degrees of optimization and allow
52
Figure 8: Phases of query planning in Spark SQL. Rounded rectangles repre-
sent Catalyst trees. [11]
those data sources to be queries using the DataFrame API. As of the time of
this paper, this technique was already utilized to create data source adapters
to CSV files, Avro files, Parquet files, and JDBC data sources such as RDBMS.
5.2 Dryad
Dryad is a distributed execution engine developed at Microsoft Research by
Isard et al. and described in their 2007 paper Dryad: Distributed Data-Parallel
Programs from Sequential Building Blocks[45].
Dryad is described as a “general-purpose distributed execution engine for
coarse-grain data-parallel applications”.
Distributed computations in Dryad are modeled using a directed acyclic
graph (DAG) data structure where each node in the graph represents a com-
putation and edges represent data transport (files, TCP, etc). This allows an
approach that is much more akin to a computational pipeline rather than a sin-
gle MapReduce operation or iterative MapReduce operations. Applications can
define what computations happen at each vertex and control the flow of data
to other vertices using conditional logic. Dryad further distances itself from
MapReduce by allowing any number of inputs and outputs at each node, rather
than a single value. These differences can make it easy to model problems
over distributed systems as compared to MapReduce.
A Dryad cluster uses a name server to enumerate all servers in the cluster.
The name server also has meta-data detailing the position of each server which
allows Dryad to create graphs using efficient topologies based on the hardware
configuration. Each worker node has a daemon process that creates other
processes on behalf of the job manager. Vertices get sent to worker nodes
based on the topology reported by the name node.
Graphs can be constructed and described using C++. Once constructed,
it’s possible to modify the graph by adding vertices, edges, or merging two
graphs together
Because the structure of computation is a DAG, if any vertex in the DAG
fails, it’s possible to re-run the failed vertex (on a new machine if needed). If
there is a failure of an edge (file corruption, network error, etc), the previous
vertex can re-run its execution to recreate that edge. The job manager keeps
53
track of the state of each edge and vertex and is in charge or recognizing and
reacting to failures.
5.3 Pregel
Pregal was designed at Google and is described in the 2010 paper Pregel: A
System for Large-Scale Graph Processing[59] by Malewicz et al.
Pregel was designed as a computational model to compute results on large
graph data structures (trillions of edges) using a vertex centered approach
where vertices can send and receive messages to other vertices and modify
state based on contents of incoming and outgoing messages.
Unlike other graph processing engines, Pregel was not designed to create
MapReduce jobs in the background. Pregel is it’s own distributed computation
model. This was done for performance reasons and Pregal keeps vertices and
edges within machines in such a way to maintain high locality. Further the
MapReduce model forces the entire state of the graph to be loaded for each
iteration, where Pregal can be used to perform updates locally on the graph.
Pregel models distributed computations using a DAG data structure. Each
node contains user defined values and each edge also contains user defined
values. At any point, each vertex can send and receive messages to other
vertices. These steps are broken up into discreet pieces of time. Pregel calls
these supersteps, but you can think of the graph as existing within a large state
machine.
For each superstep S, Pregel maps a user defined function over each vertex
in the graph in parallel. The function at each vertex can read messages send
to that vertex during the S − 1 superstep and also send messages to other
vertices that will be received during the S + 1 superstep. This data model
provides developers with a flexible way of representing large scale distributed
problems that can be solved using graph transformations.
One optimization that Pregel provides that we saw similarly in MapReduce
is the use of an intermediate combiner function. Vertices have the ability to
perform message passing to other vertices, but it’s possible that other vertices
could be on other machine or even in other data centers. In cases where mes-
sages can be combined, users can implement combiners to reduce network
overheard.
A typical Pregel run involves initializing the DAG to an initial state, apply-
ing a number of iterative supersteps (with synchronization points being saved
between supersteps), and then halting once all vertices are in the done state.
The output of a Pregel job is a list of values, one for each vertex in the graph. In
between each superstep, vertices have the capability to change the state of the
graph by message passing and by psychically adding, removing, or updating
edges and other vertices.
A physical Pregel cluster contains many worker nodes who contain copies
of an individual job. One of the worker nodes is elected as a master. The
master has the job of coordinating worker activity, assigns partitions of the
54
graphs to machines in the cluster, divides input data among machines in the
cluster, and informs vertices when to perform their superstep.
Fault-tolerance in Pregel is achieved through checkpointing between super-
steps. In the event of a failure, all nodes, vertices, and edges reload their data
from the most recent checkpoint which can be reconstructed due to the fact
that data is checkpointed on GFS.
Pregel has an aggregation system where each vertex can store and lookup
aggregate statistics during their superstep. Common statistics in this system
include counts, mins, maxes, and so on.
Google showed that Pregel can be used for large scale distributed computa-
tions by applying the framework to several distributed graph problems including
PageRank, shortest path calculations, bipartite matching, and semi-clustering.
5.4 GraphLab
GraphLab, presented by Low et al.[57] was developed to provide a distributed
computation engine for machine learning algorithms by exploiting the sparse
structure and common patterns of machine learning algorithms.
GraphLab’s stated contributions are a graph-based data model which repre-
sents both data and dependencies, a set of concurrent access models, modular
scheduling mechanism, and aggregation framework to manage global state.
GraphLab’s data model consists of a direction data graph and a shared data
table. Each node and edge in the graph can contain user defined data. State
is shared via the shared data table. This allows all edges and nodes in the
graph to read the global state at any time. This is different than the way Pregel
defined state using message passing between supersteps.
Computation in GraphLab is performed by an user defined update function
which updates data locally or through a synchronization mechanism which per-
forms global aggregation. Update is similar to map in Map reduce, but can
access overlapping contexts within the graph. Updates can also trigger other
vertices to run their updates as well.
GraphLab provides several consistency models since overlapping updates
could potentially cause race conditions if different updates were applied to part
of the graph at the same time. The consistency models provide protection
against race conditions, but decrease parallelism. Full consistency will ensure
atomic access to nodes, but parallelism can only be achieved on vertices that
do not share common neighbors. Edge consistency is weaker, protects against
some race conditions, but provides for parallelism on non-adjacent vertices.
Vertex consistency is the weakest, but allows for full parallelism and the appli-
cation should ensure that race conditions do not occur.
GraphLab’s sync mechanism will aggregate data across all vertices using
fold and reduce techniques.
GraphLab programs can contain multiple update functions and the sched-
uler is required to determine which functions to run, when to run them, and
if they are able to be ran in parallel. GraphLab does include a synchronous
55
scheduler which ensure that all vertices are updated at the same time, similar
to Pregel’s5.3 approach, but GraphLab also provides schedulers for updating
vertices at different times using different techniques such as task schedulers,
round-robin schedulers, FIFO schedulers, and priority schedulers.
GraphLab using distributed locking techniques to provide consistency when
deployed on a cluster.
Fault tolerance is provided using a distributed checkpointing mechanism
similar to Pregel.
5.5 Storm
Apache Storm[2][12][78] is another stream processing engine aiming at ana-
lyzing real-time big data streams. Apache Storm fills a similar role as Spark
Streaming, but takes several different approaches in the way that it handles
and processes data.
Unlike Spark5.1.8, Storm processes individual items in a Stream while Spark
processes small batches of items in a user configurable window.
Unlike Storm, Spark streaming provides a much richer set of integration
libraries such as graph processing, machine learning, and SQL like access on
top of streams.
Storm uses the concept of a Spout which conceptually is simply a data
source. By default, Storm has a wide array of Spouts that it can accept data
from. These include queues such as Kestrel, RabbitMQ, Kafka, KMS, and
Amazon Kinesis. Storm also provides Spouts for log files, distributed logs, API
calls, and distributed filesystems such as HDFS and others.
Computations in Storm are performed using an abstraction called a Bolt.
Bolts take as input a stream(s) of data tuples and output a new stream(s) of
data tuples. Bolts can be thought of as individual units of computation. Bolts
are often used to perform functions such as mapping, filtering, streaming joins,
streaming aggregations, communicating with databases or files, etc. Bolts run
as tasks inside of a JVM on each worker node.
Bolts are mixed together to form a DAG which Storm refers to as topologies.
Distributed computations are performed by creating and laying out topologies
so that input streams are constantly transformed into intermediate streams and
eventually to final output stream. Once a topology is deployed, it continuously
runs and acts on any input streams supplied to its spouts.
Storm provides multiple flexible partitioning techniques to distribute compu-
tation over a cluster. It is up to the application developer to decide on what
technique best fits their computation model. Shuffle partitioning uses a ran-
domizing approach to evenly split data tuples across bolts. Fields partitioning
will hash on a subset of attributes in the tuple grouping similar data. All parti-
tioning replicates the same streams to all consumers. Global partitioning sends
all data to a single bolt. Local partitioning sends data to bolts on the machine
in which the data originated[84].
56
Storm is highly fault tolerant. If worker bolts die they are restarted. If an
entire node goes down, bolts are restarted on other nodes.
Storm provides two daemons Nimbus and Supervisors which monitor nodes
in a cluster and manage the resources within a cluster. These daemons are
stateless and meant to fail-fast and restart fast. Failure of the daemons will
cause a restart on a new machine without any downtime.
Storm topologies can be defined using Thrift, an open source protocol spec-
ification language. Storm can be interacted with directly using languages that
reside on the JVM and can be interacted with from any programming language
over a JSON based protocol.
5.6 Flink
Apache Flink[21] is an open source stream and batch processing engine that
servers similar purposes to Spark Streaming and Storm. Flink claims to provide
better throughput than Storm while also offering several different streaming
data models.
One thing that sets Flink apart from Storm5.5 is that is uses something
called event time semantics. Instead processing data using the timestamps of
arrival, Flink processes data using the event time of the data. In the case of
data being delayed or arriving out of order, Flink is able to sort of the data and
perform analysis using event time.
Flink also supports multiple windowing algorithms. Where Spark5.1.8 fo-
cuses on windowing by time, Flink allows windows to be created using counts,
time, or user defined functions. The windowing model is very flexible making it
easier to implement and model real world problems.
Flink operates on two types of datasets. Unbounded datasets are your typi-
cal stream that are continuously appended to. Bounded datasets are finite and
unchanging. Flink hopes to bridge the gap between analysis of both streaming
and static data sets.
Flink provides both a functional programming model similar to Spark but
also provides constructs for passing state around.
Computations in Flink are represented as a DAG that Flink calls Dataflow
Graphs. Each dataflow contains stateful operators as vertices and data streams
represented as edges. The graph is then executed in a data parallel fashion.
Flink supports efficient distributed checkpointing to provide fault tolerance.
Recovery from errors means applying the most recent checkpoint state to all
operators
57
Feature / KV Engine Graph BatchStream- Iter- Data Store Fault Tolerance In-
Parallel ing a- Memory
tive
MapReduce MapRe- No Yes No No GFS Re-run failed tasks No
duce
Hadoop MapRe- No Yes No No HDFS Re-run failed tasks No
duce
Twister MapRe- No Yes No Yes HDFS Save state between Yes
duce iterations
YARN Pluggable No Yes Yes Yes HDFS RM intervention Yes
Spark Spark / No Yes No Yes HDFS, Cassandra, RDDs (can be rebuilt Yes
RDDs HBase, S3, sequence on partition failure)
DAG files
Spark Streaming Spark No Yes Yes No HDFS, Cassandra, RDDs (can be rebuilt Yes
HBase, S3, sequence on partition failure)
files
GraphX Spark Yes Yes Yes Yes HDFS, Cassandra, RDDs (can be rebuilt Yes
HBase, S3, sequence on partition failure)
files
Dryad Custom No Yes No No HDFS Re-run railed vertex No
DAG
Pregel Custom Yes Yes Yes Yes File System Checkpoints Yes
DAG
GraphLab Custom Yes Yes No Yes HDFS, Cassandra, Checkpoints Yes
DAG HBase, S3, sequence
files
Storm Custom No No Yes Yes HDFS, Cassandra, Sequences are Yes
DAG HBase, S3, sequence recomputed
files
MLlib Spark No Yes Yes Yes HDFS, Cassandra, RDDs (can be rebuilt Yes
HBase, S3, sequence on partition failure)
files
Flink Custom No Yes Yes Yes DFS, logs, DBs Checkpoints Yes
DAG
58
Feature / KV GraphX GraphLab Pregel
Programming Model RDD / RDG DAG and global state table DAG
Execution Model Gather-Apply- User supplied overlapping async Vertices update each superstep, message
Scatter update functions passing between vertices
(GAS)
Partitioning Vertex/edge- Two-phase Hash based
cutting
Data Parallel Yes No No
59
Feature / KV Storm Flink Spark Streaming
Smallest Streaming Unit Individual Individual Batch
Programming Model Spouts and bolts in a DAG DAG, Dataflow graphs D-Streams / RDDs
Execution Model Topology, DAG, streaming bolts (vertices) Data parallel DAG DAG built from RDDs
such as JSON. Spark provides its own SQL like language in the form of the
DataFrame API and uses a custom optimization enging called Catalyst.
6 Conclusion
This review looked at management of large-scale distributed data with an em-
phasis on big data collected from distributed sensor networks.
In section 2, we examined the properties of big data and looked at what
makes big data unique in comparison to other types of data. We examines
several definitions of big data including the 4 V’s. We looked at specific features
of big data and described several examples of big data.
Section 3 introduced the topics of cloud computing and how cloud comput-
ing and big data tie in together. We examined cloud computing service models
and looked at several service models that were designed specifically for sensor
data management. We ended this section by reviewing mobile cloud computing
and a list of issues that currently face cloud computing.
We focused on the distributed storage of big data in section 4 examining
distributed file systems, and persistence models such as key-value, column,
document, and graph storage systems and examined the key characteristics of
each storage system.
Finally, in section 5, we looked at different frameworks that are capable of
handling distributed data and performing analytics on large scale distributed
data.
60
References
[1] Apache Impala. https://impala.apache.org.
[2] Apache Storm. http://storm.apache.org/.
[3] Hadoop. https://hadoopecosystemtable.github.io/.
[13] Marco V. Barbera, Sokol Kosta, Alessandro Mei, and Julinda Stefa. To
offload or not to offload? the bandwidth and energy costs of mobile cloud
computing. In INFOCOM, 2013 Proceedings IEEE, pages 1285–1293.
IEEE, 2013.
[14] Doug Beaver, Sanjeev Kumar, Harry C. Li, Jason Sobel, Peter Vajgel, and
others. Finding a Needle in Haystack: Facebook’s Photo Storage. In
OSDI, volume 10, pages 1–8, 2010.
61
[15] Tim Berners-Lee, James Hendler, Ora Lassila, and others. The semantic
web. Scientific american, 284(5):28–37, 2001.
[16] Alessio Botta, Walter de Donato, Valerio Persico, and Antonio Pescap.
Integration of cloud computing and internet of things: A survey. 56:684–
700.
[20] IBM Canada. Smarter healthcare in canada: Redefining value and suc-
cess. 2012.
[21] Paris Carbone, Stephan Ewen, Seif Haridi, Asterios Katsifodimos, Volker
Markl, and Kostas Tzoumas. Apache flink: Stream and batch processing
in a single engine. Data Engineering, page 28, 2015.
[22] Rick Cattell. Scalable SQL and NoSQL data stores. Acm Sigmod Record,
39(4):12–27, 2011.
[23] Tushar D Chandra, Robert Griesemer, and Joshua Redstone. Paxos
made live: an engineering perspective. In Proceedings of the twenty-sixth
annual ACM symposium on Principles of distributed computing, pages
398–407. ACM, 2007.
[24] Min Chen, Shiwen Mao, and Yunhao Liu. Big data: A survey. 19(2):171–
209.
[25] James C. Corbett, Jeffrey Dean, Michael Epstein, Andrew Fikes, Christo-
pher Frost, Jeffrey John Furman, Sanjay Ghemawat, Andrey Gubarev,
Christopher Heiser, Peter Hochschild, and others. Spanner: Googles
globally distributed database. ACM Transactions on Computer Systems
(TOCS), 31(3):8, 2013.
[26] Michael Cox and David Ellsworth. Managing big data for scientific visual-
ization. In ACM Siggraph, volume 97, pages 146–162, 1997.
[27] Sanjit Kumar Dash, Subasish Mohapatra, and Prasant Kumar Pattnaik. A
survey on applications of wireless sensor network using cloud computing.
International Journal of Computer science & Engineering Technologies
(E-ISSN: 2044-6004), 1(4):50–55, 2010.
62
[28] Jeffrey Dean and Sanjay Ghemawat. Mapreduce: simplified data process-
ing on large clusters. Communications of the ACM, 51(1):107–113, 2008.
[29] Mike Dean, Guus Schreiber, Sean Bechhofer, Frank van Harmelen, Jim
Hendler, Ian Horrocks, Deborah L. McGuinness, Peter F. Patel-Schneider,
and Lynn Andrea Stein. OWL web ontology language reference. W3C
Recommendation February, 10, 2004.
[30] Giuseppe DeCandia, Deniz Hastorun, Madan Jampani, Gunavardhan
Kakulapati, Avinash Lakshman, Alex Pilchin, Swaminathan Sivasubrama-
nian, Peter Vosshall, and Werner Vogels. Dynamo: amazon’s highly avail-
able key-value store. ACM SIGOPS operating systems review, 41(6):205–
220, 2007.
[31] Aurielle Destiche. Fleet tracking devices will be installed in 22,000 ups
trucks to cut costs and improve driver efficiency in 2010, 2010.
[32] Jaliya Ekanayake, Hui Li, Bingjing Zhang, Thilina Gunarathne, Seung-Hee
Bae, Judy Qiu, and Geoffrey Fox. Twister: a runtime for iterative mapre-
duce. In Proceedings of the 19th ACM international symposium on high
performance distributed computing, pages 810–818. ACM, 2010.
[33] Brad Fitzpatrick. Distributed caching with memcached. Linux Journal,
2004(124):5, August 2004.
[34] John Gantz and David Reinsel. Extracting value from chaos. IDC iview,
1142:1–12, 2011.
[35] Mario Gerla, Eun-Kyu Lee, Giovanni Pau, and Uichin Lee. Internet of
vehicles: From intelligent grid to autonomous cars and vehicular clouds.
In Internet of Things (WF-IoT), 2014 IEEE World Forum on, pages 241–
246. IEEE, 2014.
[36] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google
file system. In ACM SIGOPS operating systems review, volume 37, pages
29–43. ACM, 2003.
[37] Seth Gilbert and Nancy Lynch. Brewer’s conjecture and the feasibility of
consistent, available, partition-tolerant web services. ACM SIGACT News,
33(2):51–59, 2002.
[38] Ibrahim Abaker Targio Hashem, Victor Chang, Nor Badrul Anuar, Kayode
Adewole, Ibrar Yaqoob, Abdullah Gani, Ejaz Ahmed, and Haruna Chiroma.
The role of big data in smart city. International Journal of Information
Management, 36(5):748–758, October 2016.
[39] Ibrahim Abaker Targio Hashem, Ibrar Yaqoob, Nor Badrul Anuar, Salimah
Mokhtar, Abdullah Gani, and Samee Ullah Khan. The rise of big data on
cloud computing: Review and open research issues. Information Systems,
47:98–115, January 2015.
63
[40] Naohiro Hayashibara, Xavier Defago, Rami Yared, and Takuya Katayama.
The/spl phi/accrual failure detector. In Reliable Distributed Systems, 2004.
Proceedings of the 23rd IEEE International Symposium on, pages 66–78.
IEEE, 2004.
[41] Robin Hecht and Stefan Jablonski. NoSQL evaluation: A use case ori-
ented survey. In Cloud and Service Computing (CSC), 2011 International
Conference on, pages 336–341. IEEE, 2011.
[42] Ryan Herring, Aude Hofleitner, Saurabh Amin, T. Nasr, A. Khalek, Pieter
Abbeel, and Alexandre Bayen. Using mobile phones to forecast arterial
traffic through statistical learning. In 89th Transportation Research Board
Annual Meeting, pages 10–2493, 2010.
[43] John H. Howard, Michael L. Kazar, Sherri G. Menees, David A. Nichols,
Mahadev Satyanarayanan, Robert N. Sidebotham, and Michael J. West.
Scale and performance in a distributed file system. ACM Transactions on
Computer Systems (TOCS), 6(1):51–81, 1988.
[44] Timothy Hunter, Teodor Moldovan, Matei Zaharia, Samy Merzgui, Justin
Ma, Michael J. Franklin, Pieter Abbeel, and Alexandre M. Bayen. Scaling
the mobile millennium system in the cloud. In Proceedings of the 2nd ACM
Symposium on Cloud Computing, page 28. ACM, 2011.
[45] Michael Isard, Mihai Budiu, Yuan Yu, Andrew Birrell, and Dennis Fetterly.
Dryad: distributed data-parallel programs from sequential building blocks.
In ACM SIGOPS operating systems review, volume 41, pages 59–72.
ACM, 2007.
[46] A. Jin, C. Cheng, F. Ren, and S. Song. An index model of global subdivi-
sion in cloud computing environment. In 2011 19th International Confer-
ence on Geoinformatics, pages 1–5, June 2011.
[47] Guannan Ju, Mengjiao Cheng, Meng Xiao, Jianmei Xu, Kai Pan, Xing
Wang, Yajun Zhang, and Feng Shi. Smart transportation between three
phases through a stimulus-responsive functionally cooperating device. Ad-
vanced Materials, 25(21):2915–2919, 2013.
64
[51] Ankur Khetrapal and Vinay Ganesh. HBase and Hypertable for large scale
distributed storage systems. Dept. of Computer Science, Purdue Univer-
sity, pages 22–28, 2006.
[52] Rusty Klophaus. Riak core: Building distributed applications without
shared state. In ACM SIGPLAN Commercial Users of Functional Pro-
gramming, CUFP ’10, pages 14:1–14:1, New York, NY, USA, 2010. ACM.
[53] Marcel Kornacker, Alexander Behm, Victor Bittorf, Taras Bobrovytsky,
Casey Ching, Alan Choi, Justin Erickson, Martin Grund, Daniel Hecht,
Matthew Jacobs, et al. Impala: A modern, open-source sql engine for
hadoop. In CIDR, volume 1, page 9, 2015.
[54] Chun Sing Lai and Malcolm D McCulloch. Big data analytics for smart
grid. Newsletter, 2015.
[55] Avinash Lakshman and Prashant Malik. Cassandra: a decentralized
structured storage system. ACM SIGOPS Operating Systems Review,
44(2):35–40, 2010.
[56] Doug Laney. 3d data management: Controlling data volume, velocity and
variety. META Group Research Note, 6:70, 2001.
[57] Yucheng Low, Danny Bickson, Joseph Gonzalez, Carlos Guestrin, Aapo
Kyrola, and Joseph M Hellerstein. Distributed graphlab: a framework for
machine learning and data mining in the cloud. Proceedings of the VLDB
Endowment, 5(8):716–727, 2012.
[58] Hong Lu, Wei Pan, Nicholas D. Lane, Tanzeem Choudhury, and Andrew T.
Campbell. SoundSense: scalable sound sensing for people-centric appli-
cations on mobile phones. In Proceedings of the 7th international con-
ference on Mobile systems, applications, and services, pages 165–178.
ACM, 2009.
[59] Grzegorz Malewicz, Matthew H. Austern, Aart JC Bik, James C. Dehnert,
Ilan Horn, Naty Leiser, and Grzegorz Czajkowski. Pregel: a system for
large-scale graph processing. In Proceedings of the 2010 ACM SIGMOD
International Conference on Management of data, pages 135–146. ACM,
2010.
[60] James Manyika, Michael Chui, Brad Brown, Jacques Bughin, Richard
Dobbs, Charles Roxburgh, and Angela H Byers. Big data: The next fron-
tier for innovation, competition, and productivity. 2011.
[61] James Manyika, Michael Chui, Brad Brown, Jacques Bughin, Richard
Dobbs, Charles Roxburgh, and Angela Hung Byers. Big data: The next
frontier for innovation, competition, and productivity. Big Data: The Next
Frontier for Innovation, Competition and Productivity, pages 1 – 143, 2011.
65
[62] Benard Marr. Why only one of the 5 vs of big data really matters, Mar
2015.
[63] Kirk McKusick and Sean Quinlan. GFS: evolution on fast-forward. Com-
munications of the ACM, 53(3):42, March 2010.
[64] Peter Mell and Tim Grance. The nist definition of cloud computing. 2011.
[65] Xiangrui Meng, Joseph Bradley, Burak Yavuz, Evan Sparks, Shivaram
Venkataraman, Davies Liu, Jeremy Freeman, DB Tsai, Manish Amde,
Sean Owen, et al. Mllib: Machine learning in apache spark. Journal
of Machine Learning Research, 17(34):1–7, 2016.
[69] NIST Big Data Public Working Group Definitions and Taxonomies Sub-
group. NIST Big Data Interoperability Framework: Volume 1, Definitions.
Technical Report NIST SP 1500-1, National Institute of Standards and
Technology, October 2015. DOI: 10.6028/NIST.SP.1500-1.
[70] President’s Council of Advisors on Science and author Technology (U.S.).
Report to the President, big data and privacy : a technology perspective.
Washington, District of Columbia : Executive Office of the President, Pres-
ident’s Council of Advisors on Science and Technology, 2014. Includes
bibliographical references.
[71] Christopher Olston, Benjamin Reed, Utkarsh Srivastava, Ravi Kumar, and
Andrew Tomkins. Pig latin: a not-so-foreign language for data processing.
In Proceedings of the 2008 ACM SIGMOD international conference on
Management of data, pages 1099–1110. ACM, 2008.
[72] Charith Perera, Arkady Zaslavsky, Peter Christen, and Dimitrios Geor-
gakopoulos. Sensing as a service model for smart cities supported by
Internet of Things. Transactions on Emerging Telecommunications Tech-
nologies, 25(1):81–93, January 2014.
[73] A. Rahien and O. Eini. RavenDB Mythology Documentation.
[74] BB Prahlada Rao, Paval Saluia, Neetu Sharma, Ankit Mittal, and
Shivay Veer Sharma. Cloud computing for Internet of Things & sensing
based applications. In Sensing Technology (ICST), 2012 Sixth Interna-
tional Conference on, pages 374–380. IEEE, 2012.
[75] D. Reed, J. R. Larus, and D. Gannon. Imagining the future: Thoughts on
computing. Computer, 45(1):25–30, Jan 2012.
66
[76] Nirmalya Roy, Gautham Pallapa, and Sajal K Das. A middleware frame-
work for ambiguous context mediation in smart healthcare application. In
Wireless and Mobile Computing, Networking and Communications, 2007.
WiMOB 2007. Third IEEE International Conference on, pages 72–72.
IEEE, 2007.
[77] Mahadev Satyanarayanan. Mobile computing: the next decade. In Pro-
ceedings of the 1st ACM workshop on mobile cloud computing & services:
social networks and beyond, page 5. ACM, 2010.
[78] Jim Scott. Stream processing everywhere: What to use?
[79] Sugam Sharma. An Extended Classification and Comparison of NoSQL
Big Data Models. arXiv preprint arXiv:1509.08035, 2015.
[80] Konstantin Shvachko, Hairong Kuang, Sanjay Radia, and Robert Chansler.
The hadoop distributed file system. In Mass storage systems and tech-
nologies (MSST), 2010 IEEE 26th symposium on, pages 1–10. IEEE,
2010.
[81] S. Subashini and V. Kavitha. A survey on security issues in service deliv-
ery models of cloud computing. Journal of Network and Computer Appli-
cations, 34(1):1–11, January 2011.
[82] Roshan Sumbaly, Jay Kreps, Lei Gao, Alex Feinberg, Chinmay Soman,
and Sam Shah. Serving large-scale batch computed data with project
voldemort. In Proceedings of the 10th USENIX Conference on File and
Storage Technologies, FAST’12, pages 18–18, Berkeley, CA, USA, 2012.
USENIX Association.
[83] Arvind Thiagarajan, Lenin Ravindranath, Katrina LaCurts, Samuel Mad-
den, Hari Balakrishnan, Sivan Toledo, and Jakob Eriksson. VTrack: ac-
curate, energy-aware road traffic delay estimation using mobile phones.
page 85. ACM Press, 2009.
[84] Ankit Toshniwal, Siddarth Taneja, Amit Shukla, Karthik Ramasamy, Jig-
nesh M Patel, Sanjeev Kulkarni, Jason Jackson, Krishna Gade, Maosong
Fu, Jake Donham, et al. Storm at twitter. In Proceedings of the 2014
ACM SIGMOD international conference on Management of data, pages
147–156. ACM, 2014.
[85] Vinod Kumar Vavilapalli, Arun C Murthy, Chris Douglas, Sharad Agar-
wal, Mahadev Konar, Robert Evans, Thomas Graves, Jason Lowe, Hitesh
Shah, Siddharth Seth, et al. Apache hadoop yarn: Yet another resource
negotiator. In Proceedings of the 4th annual Symposium on Cloud Com-
puting, page 5. ACM, 2013.
[86] R. Vijayakumari, R. Kirankumar, and K. Gangadhara Rao. Comparative
analysis of google file system and hadoop distributed file system. ICETS-
International Journal of Advanced Trends in Computer Science and Engi-
neering, 3(1):553–558, 2014.
67
[87] Werner Vogels. Eventually consistent. Queue, 6(6):14–19, 2008.
[88] Yating Wang, Ray Chen, and Ding-Chau Wang. A survey of mobile cloud
computing applications: perspectives and challenges. Wireless Personal
Communications, 80(4):1607–1623, 2015.
68