Cloudera Kudu
Cloudera Kudu
Important Notice
© 2010-2021 Cloudera, Inc. All rights reserved.
Hadoop and the Hadoop elephant logo are trademarks of the Apache Software
Foundation. All other trademarks, registered trademarks, product names and company
names or logos mentioned in this document are the property of their respective owners.
Reference to any products, services, processes or other information, by trade name,
trademark, manufacturer, supplier or otherwise does not constitute or imply
endorsement, sponsorship or recommendation thereof by us.
Complying with all applicable copyright laws is the responsibility of the user. Without
limiting the rights under copyright, no part of this document may be reproduced, stored
in or introduced into a retrieval system, or transmitted in any form or by any means
(electronic, mechanical, photocopying, recording, or otherwise), or for any purpose,
without the express written permission of Cloudera.
The information in this document is subject to change without notice. Cloudera shall
not be liable for any damages resulting from technical errors or omissions which may
be present in this document, or from use of this document.
Cloudera, Inc.
395 Page Mill Road
Palo Alto, CA 94306
info@cloudera.com
US: 1-888-789-1488
Intl: 1-650-362-0488
www.cloudera.com
Release Information
Kudu Security.........................................................................................................64
Kudu Authentication with Kerberos...................................................................................................................64
Internal Private Key Infrastructure (PKI)..............................................................................................................................64
Authentication Tokens..........................................................................................................................................................64
Client Authentication to Secure Kudu Clusters.....................................................................................................................65
Scalability...........................................................................................................................................................65
Encryption..........................................................................................................................................................65
Coarse-grained Authorization............................................................................................................................65
Web UI Encryption.............................................................................................................................................66
Web UI Redaction..............................................................................................................................................66
Log Redaction.....................................................................................................................................................66
Configuring a Secure Kudu Cluster using Cloudera Manager.............................................................................66
Configuring a Secure Kudu Cluster using the Command Line............................................................................68
Kudu-Impala Integration
Apache Kudu has tight integration with Apache Impala, allowing you to use Impala to insert, query, update, and delete
data from Kudu tablets using Impala's SQL syntax, as an alternative to using the Kudu APIs to build a custom Kudu
application. In addition, you can use JDBC or ODBC to connect existing or new applications written in any language,
framework, or business intelligence tool to your Kudu data, using Impala as the broker.
• CREATE/ALTER/DROP TABLE - Impala supports creating, altering, and dropping tables using Kudu as the persistence
layer. The tables follow the same internal/external approach as other tables in Impala, allowing for flexible data
ingestion and querying.
• INSERT - Data can be inserted into Kudu tables from Impala using the same mechanisms as any other table with
HDFS or HBase persistence.
• UPDATE/DELETE - Impala supports the UPDATE and DELETE SQL commands to modify existing data in a Kudu
table row-by-row or as a batch. The syntax of the SQL commands is designed to be as compatible as possible with
existing solutions. In addition to simple DELETE or UPDATE commands, you can specify complex joins in the FROM
clause of the query, using the same syntax as a regular SELECT statement.
• Flexible Partitioning - Similar to partitioning of tables in Hive, Kudu allows you to dynamically pre-split tables by
hash or range into a predefined number of tablets, in order to distribute writes and queries evenly across your
cluster. You can partition by any number of primary key columns, with any number of hashes, a list of split rows,
or a combination of these. A partition scheme is required.
• Parallel Scan - To achieve the highest possible performance on modern hardware, the Kudu client used by Impala
parallelizes scans across multiple tablets.
• High-efficiency queries - Where possible, Impala pushes down predicate evaluation to Kudu, so that predicates
are evaluated as close as possible to the data. Query performance is comparable to Parquet in many workloads.
Predictive Modeling
Data scientists often develop predictive learning models from large sets of data. The model and the data might need
to be updated or modified often as the learning takes place or as the situation being modeled changes. In addition,
the scientist might want to change one or more factors in the model to see what happens over time. Updating a large
set of data stored in files in HDFS is resource-intensive, as each file needs to be completely rewritten. In Kudu, updates
happen in near real time. The scientist can tweak the value, re-run the query, and refresh the graph in seconds or
minutes, rather than hours or days. In addition, batch or incremental algorithms can be run across the data at any
time, with near-real-time results.
Related Information
• Apache Kudu Concepts and Architecture on page 10
• Apache Kudu Installation and Upgrade on page 18
• Kudu Security on page 64
• More Resources for Apache Kudu on page 98
Columnar Datastore
Kudu is a columnar datastore. A columnar datastore stores data in strongly-typed columns. With a proper design, a
columnar store can be superior for analytical or data warehousing workloads for the following reasons:
Read Efficiency
For analytical queries, you can read a single column, or a portion of that column, while ignoring other columns. This
means you can fulfill your request while reading a minimal number of blocks on disk. With a row-based store, you
need to read the entire row, even if you only return values from a few columns.
Data Compression
Because a given column contains only one type of data, pattern-based compression can be orders of magnitude
more efficient than compressing mixed data types, which are used in row-based solutions. Combined with the
efficiencies of reading data from columns, compression allows you to fulfill your query while reading even fewer
blocks from disk.
Table
A table is where your data is stored in Kudu. A table has a schema and a totally ordered primary key. A table is split
into segments called tablets, by primary key.
Tablet
A tablet is a contiguous segment of a table, similar to a partition in other data storage engines or relational databases.
A given tablet is replicated on multiple tablet servers, and at any given point in time, one of these replicas is considered
the leader tablet. Any replica can service reads. Writes require consensus among the set of tablet servers serving the
tablet.
Tablet Server
A tablet server stores and serves tablets to clients. For a given tablet, one tablet server acts as a leader and the others
serve follower replicas of that tablet. Only leaders service write requests, while leaders or followers each service read
requests. Leaders are elected using Raft consensus. One tablet server can serve multiple tablets, and one tablet can
be served by multiple tablet servers.
Master
The master keeps track of all the tablets, tablet servers, the catalog table, and other metadata related to the cluster.
At a given point in time, there can only be one acting master (the leader). If the current leader disappears, a new master
is elected using Raft consensus.
The master also coordinates metadata operations for clients. For example, when creating a new table, the client
internally sends the request to the master. The master writes the metadata for the new table into the catalog table,
and coordinates the process of creating tablets on the tablet servers.
All the master's data is stored in a tablet, which can be replicated to all the other candidate masters.
Tablet servers heartbeat to the master at a set interval (the default is once per second).
Catalog Table
The catalog table is the central location for metadata of Kudu. It stores information about tables and tablets. The
catalog table is accessible to clients through the master, using the client API. The catalog table cannot be read or written
directly. Instead, it is accessible only through metadata operations exposed in the client API. The catalog table stores
two categories of metadata:
Logical Replication
Kudu replicates operations, not on-disk data. This is referred to as logical replication, as opposed to physical replication.
This has several advantages:
• Although inserts and updates transmit data over the network, deletes do not need to move any data. The delete
operation is sent to each tablet server, which performs the delete locally.
• Physical operations, such as compaction, do not need to transmit the data over the network in Kudu. This is
different from storage systems that use HDFS, where the blocks need to be transmitted over the network to fulfill
the required number of replicas.
• Tablets do not need to perform compactions at the same time or on the same schedule. They do not even need
to remain in sync on the physical storage layer. This decreases the chances of all tablet servers experiencing high
latency at the same time, due to compactions or heavy write loads.
Architectural Overview
The following diagram shows a Kudu cluster with three masters and multiple tablet servers, each serving multiple
tablets. It illustrates how Raft consensus is used to allow for both leaders and followers for both the masters and tablet
servers. In addition, a tablet server can be a leader for some tablets and a follower for others. Leaders are shown in
gold, while followers are shown in grey.
Cells
No individual cell may be larger than 64KB before encoding or compression. The cells making up a composite key
are limited to a total of 16KB after the internal composite-key encoding done by Kudu. Inserting rows not conforming
to these limitations will result in errors being returned to the client.
Columns
• By default, Kudu will not permit the creation of tables with more than 300 columns. We recommend schema
designs that use fewer columns for best performance.
• CHAR, VARCHAR, DATE, and complex types such as ARRAY are not supported.
• Type and nullability of existing columns cannot be changed by altering the table.
• Dropping a column does not immediately reclaim space. Compaction must run first.
• The precision and scale of DECIMAL columns cannot be changed by altering the table.
Tables
• Tables must have an odd number of replicas, with a maximum of 7.
• Replication factor (set at table creation time) cannot be changed.
• There is no way to run compaction manually, but dropping a table will reclaim the space immediately.
If you are using Apache Impala to query Kudu tables, refer to the section on Impala Integration Limitations on page
16 as well.
Partitioning Limitations
• Tables must be manually pre-split into tablets using simple or compound primary keys. Automatic splitting is not
yet possible. Kudu does not allow you to change how a table is partitioned after creation, with the exception of
adding or dropping range partitions.
• Data in existing tables cannot currently be automatically repartitioned. As a workaround, create a new table with
the new partitioning and insert the contents of the old table.
• Tablets that lose a majority of replicas (such as 1 left out of 3) require manual intervention to be repaired.
• Kudu does not support all the types supported by Spark SQL. For example, Date and complex types are not
supported.
• Kudu tables can only be registered as temporary tables in SparkSQL.
• Kudu tables cannot be queried using HiveContext.
Security Limitations
• Data encryption at rest is not directly built into Kudu. Encryption of Kudu data at rest can be achieved through
the use of local block device encryption software such as dmcrypt.
• Authorization is only available at a system-wide, coarse-grained level. Table-level, column-level, and row-level
authorization features are not available.
• Kudu does not support configuring a custom service principal for Kudu processes. The principal must follow the
pattern kudu/<HOST>@<DEFAULT.REALM>.
• Server certificates generated by Kudu IPKI are incompatible with bouncycastle version 1.52 and earlier.
Platform Requirements
Before you proceed with installation or upgrade, review Platform Requirements.
Installing Kudu
On a cluster managed by Cloudera Manager, Kudu is installed as part of CDH and does not need to be installed separately.
With Cloudera Manager, you can enable or disable the Kudu service, but the Kudu component remains present on the
cluster. For instructions, see Cloudera Installation Guide.
Upgrading Kudu
Before you proceed with an upgrade, review the Upgrade Notes .
On a managed cluster,
• If you have just upgraded Cloudera Manager from a version that did not include Kudu, then Kudu will not be
installed automatically. You will need to add the Kudu service manually. Upgrading Cloudera Manager does not
automatically upgrade CDH or other managed services.
• Parcels: If you are upgrading CDH and were previously using the standalone Kudu parcel (version 1.4.0 and lower),
then you must deactivate this parcel and activate the latest CDH parcel that includes Kudu. For instructions, see
Upgrade Guide .
• Packages: If you are upgrading CDH and were previously using the Kudu package (version 1.4.0 and lower), then
you must uninstall the kudu package and upgrade to the latest CDH package that includes Kudu. For instructions,
see Upgrade Guide .
Experimental Flags
Some configuration flags are marked 'unsafe' and 'experimental'. Such flags are disabled by default. You can access
these flags by enabling the additional flags, --unlock_unsafe_flags and --unlock_experimental_flags. Note
that these flags might be removed or modified without a deprecation period or any prior notice in future Kudu releases.
Cloudera does not support using unsafe and experimental flags. As a rule of thumb, Cloudera will not support any
configuration flags not explicitly documented in the Kudu Configuration Reference Guide.
Directory Configurations
Every Kudu node requires the specification of directory flags.
The --fs_wal_dir configuration indicates where Kudu will place its write-ahead logs.
The --fs_metadata_dir configuration indicates where Kudu will place metadata for each tablet. It is recommended,
although not necessary, that these directories be placed on a high-performance drives with high bandwidth and low
latency, e.g. solid-state drives. If --fs_metadata_dir is not specified, metadata will be placed in the directory
specified by --fs_wal_dir.
Since a Kudu node cannot tolerate the loss of its WAL or metadata directories, you might want to mirror the drives
containing these directories in order to make recovering from a drive failure easier. However, mirroring may increase
the latency of Kudu writes.
The --fs_data_dirs configuration indicates where Kudu will write its data blocks. This is a comma-separated list of
directories; if multiple values are specified, data will be striped across the directories. If not specified, data blocks will
be placed in the directory specified by --fs_wal_dir.
Note: While a single data directory backed by a RAID-0 array will outperform a single data directory
backed by a single storage device, it is better to let Kudu manage its own striping over multiple devices
rather than delegating the striping to a RAID-0 array.
Additionally, --fs_wal_dir and --fs_metadata_dir may be the same as one of the directories listed in
--fs_data_dirs, but must not be sub-directories of any of them.
Warning: Each directory specified by a configuration flag on a given machine should be used by at
most one Kudu process. If multiple Kudu processes on the same machine are configured to use the
same directory, Kudu may refuse to start up.
Warning: Once --fs_data_dirs is set, extra tooling is required to change it. For more details, see
the Changing Directory Configuration on page 36.
Note: The --fs_wal_dir and --fs_metadata_dir configurations can be changed, provided the
contents of the directories are also moved to match the flags.
kudu-master --help
For the complete list of flags for masters, see the Kudu Master Configuration Reference.
kudu-tserver --help
For the complete list of flags for tablet servers, see the Kudu Tablet Server Configuration Reference.
Configure the Kudu services to start automatically when the server starts, by adding them to the default runlevel.
These interfaces are linked from the landing page of each daemon’s web UI.
Kudu Metrics
Kudu daemons expose a large number of metrics. Some metrics are associated with an entire server process, whereas
others are associated with a particular tablet replica.
$ kudu-tserver --dump_metrics_json
$ kudu-master --dump_metrics_json
This will output a large JSON document. Each metric indicates its name, label, description, units, and type. Because
the output is JSON-formatted, this information can easily be parsed and fed into other tooling which collects metrics
from Kudu servers.
For the complete list of metrics collected by Cloudera Manager for a Kudu service, look for the Kudu metrics listed
under Cloudera Manager Metrics .
If you are using Cloudera Manager, see Cloudera Manager Metrics for Kudu on page 99 for the complete list of metrics
collected by Cloudera Manager for a Kudu service.
For example:
$ curl -s 'http://example-ts:8050/metrics?include_schema=1&metrics=connections_accepted'
[
{
"type": "server",
"id": "kudu.tabletserver",
"attributes": {},
"metrics": [
{
"name": "rpc_connections_accepted",
"label": "RPC Connections Accepted",
"type": "counter",
"unit": "connections",
$ curl -s 'http://example-ts:8050/metrics?metrics=log_append_latency'
[
{
"type": "tablet",
"id": "c0ebf9fef1b847e2a83c7bd35c2056b1",
"attributes": {
"table_name": "lineitem",
"partition": "hash buckets: (55), range: [(<start>), (<end>))",
"table_id": ""
},
"metrics": [
{
"name": "log_append_latency",
"total_count": 7498,
"min": 4,
"mean": 69.3649,
"percentile_75": 29,
"percentile_95": 38,
"percentile_99": 45,
"percentile_99_9": 95,
"percentile_99_99": 167,
"max": 367244,
"total_sum": 520098
}
]
}
]
Diagnostics Logging
Kudu may be configured to periodically dump all of its metrics to a local log file using the
--metrics_log_interval_msflag. Set this flag to the interval at which metrics should be written to a diagnostics
log file.
The diagnostics log will be written to the same directory as the other Kudu log files, with a similar naming format,
substituting diagnostics instead of a log level like INFO. After any diagnostics log file reaches 64MB uncompressed,
the log will be rolled and the previous file will be gzip-compressed.
The log file generated has three space-separated fields. The first field is the word metrics. The second field is the
current timestamp in microseconds since the Unix epoch. The third is the current value of all metrics on the server,
using a compact JSON encoding. The encoding is the same as the metrics fetched via HTTP described above.
characters in directory and file names are restricted to the set [a-zA-Z0-9_-.]. Presently, Kudu does not use the
hierarchical structure of locations, but it may in the future. Location assignment is done by a user-provided command,
whose path should be specified using the --location_mapping_cmd master flag. The command should take a single
argument, the IP address or hostname of a tablet server, and return the location for the tablet server. Make sure that
all Kudu masters are using the same location mapping command.
The second element of Kudu’s rack awareness feature is the placement policy: Do not place a majority of replicas of
a tablet on tablet servers in the same location.
The leader master, when placing newly created replicas on tablet servers and when re-replicating existing tablets, will
attempt to place the replicas in a way that complies with the placement policy. For example, in a cluster with five tablet
servers A, B, C, D, and E, with respective locations /L0, /L0, /L1, /L1, /L2, to comply with the placement policy a
new 3x replicated tablet could have its replicas placed on A, C, and E, but not on A, B, and C, because then the tablet
would have 2/3 replicas in location /L0. As another example, if a tablet has replicas on tablet servers A, C, and E, and
then C fails, the replacement replica must be placed on D in order to comply with the placement policy.
In the case where it is impossible to place replicas in a way that complies with the placement policy, Kudu will violate
the policy and place a replica anyway. For example, using the setup described in the previous paragraph, if a tablet
has replicas on tablet servers A, C, and E, and then E fails, Kudu will re-replicate the tablet onto one of B or D, violating
the placement policy, rather than leaving the tablet under-replicated indefinitely. The kudu cluster rebalance
tool can reestablish the placement policy if it is possible to do so. The kudu cluster rebalance tool can also be
used to reimpose the placement policy on a cluster if the cluster has just been configured to use the rack awareness
feature and existing replicas need to be moved to comply with the placement policy. See Running the tablet rebalancing
tool on a rack-aware cluster on page 43 for more information.
Important:
• This workflow is unsafe for adding new masters to an existing multi-master configuration that
already has three or more masters. Do not use it for that purpose.
• An even number of masters doesn't provide any benefit over having one fewer masters. This
guide should always be used for migrating to three masters.
• This workflow presumes you are familiar with Kudu configuration management, with or without
Cloudera Manager.
• All of the command line steps below should be executed as the Kudu UNIX user. The example
commands assume the Kudu Unix user is kudu, which is typical.
• Identify and record the directories where the master’s write-ahead log (WAL) and data live. If using Kudu
system packages, their default locations are /var/lib/kudu/master, but they may be customized using
the fs_wal_dir and fs_data_dirs configuration parameters. The command below assume that
fs_wal_dir is /data/kudu/master/wal and fs_data_dirs is /data/kudu/master/data. Your
configuration may differ. For more information on configuring these directories, see the Kudu Configuration
docs.
• Identify and record the port the master is using for RPCs. The default port value is 7051, but it may have been
customized using the rpc_bind_addresses configuration parameter.
• Identify the master’s UUID. It can be fetched using the following command:
master_data_dir
The location of the existing master’s previously recorded data directory.
For example:
• (Optional) Configure a DNS alias for the master. The alias could be a DNS cname (if the machine already has
an A record in DNS), an A record (if the machine is only known by its IP address), or an alias in /etc/hosts.
The alias should be an abstract representation of the master (e.g. master-1).
Important: Without DNS aliases, it is not possible to recover from permanent master failures
without bringing the cluster down for maintenance. t is highly recommended that you use
DNS aliases.
4. If you have Kudu tables that are accessed from Impala, you must update the master addresses in the Apache Hive
Metastore (HMS) database.
• If you set up the DNS aliases, run the following statement in impala-shell, replacing master-1, master-2,
and master-3 with your actual aliases.
• If you do not have DNS aliases set up, see Step #11 in the Performing the migration section for updating HMS.
5. Perform the following preparatory steps for each new master:
• Choose an unused machine in the cluster. The master generates very little load so it can be collocated with
other data services or load-generating processes, though not with another Kudu master from the same
configuration.
• Ensure Kudu is installed on the machine, either using system packages (in which case the kudu and
kudu-master packages should be installed), or some other means.
• Choose and record the directory where the master’s data will live.
• Choose and record the port the master should use for RPCs.
• (Optional) Configure a DNS alias for the master (e.g. master-2, master-3, etc).
master_data_dir
The new master’s previously recorded data directory.
For example:
3. If you are using Cloudera Manager, add the new Kudu master roles now, but do not start them.
• If using DNS aliases, override the empty value of the Master Address parameter for each role (including
the existing master role) with that master’s alias.
• Add the port number (separated by a colon) if using a non-default RPC port value.
4. Rewrite the master’s Raft configuration with the following command, executed on the existing master:
master_data_dir
The existing master’s previously recorded data directory
tablet_id
This must be set to the string, 00000000000000000000000000000000.
all_masters
A space-separated list of masters, both new and existing. Each entry in the list must be a string of the form
<uuid>:<hostname>:<port>.
uuid
The master’s previously recorded UUID.
hostname
The master’s previously recorded hostname or alias.
port
The master’s previously recorded RPC port number.
For example:
$ sudo -u kudu kudu local_replica cmeta rewrite_raft_config
--fs_wal_dir=/data/kudu/master/wal --fs_data_dirs=/data/kudu/master/data
00000000000000000000000000000000 4aab798a69e94fab8d77069edff28ce0:master-1:7051
f5624e05f40649b79a757629a69d061e:master-2:7051
988d8ac6530f426cbe180be5ba52033d:master-3:7051
Important: If you are using Cloudera Manager, skip the next step.
5. Modify the value of the master_addresses configuration parameter for both existing master and new masters.
The new value must be a comma-separated list of all of the masters. Each entry is a string of the form,
<hostname>:<port>.
hostname
The master's previously recorded hostname or alias.
port
The master's previously recorded RPC port number.
6. Start the existing master.
7. Copy the master data to each new master with the following command, executed on each new master machine.
Important: If your Kudu cluster is secure, in addition to running as the Kudu UNIX user, you must
authenticate as the Kudu service user prior to running this command.
master_data_dir
The new master's previously recorded data directory.
tablet_id
Must be set to the string, 00000000000000000000000000000000.
existing_master
RPC address of the existing master. It must be a string of the form <hostname>:<port>.
hostname
The existing master's previously recorded hostname or alias.
port
The existing master's previously recorded RPC port number.
Example
Important: If you are using Cloudera Manager, skip the next step.
9. Modify the value of the tserver_master_addrs configuration parameter for each tablet server. The new value
must be a comma-separated list of masters where each entry is a string of the form <hostname>:<port>
hostname
The master's previously recorded hostname or alias
port
The master's previously recorded RPC port number
10. Start all the tablet servers.
11. If you have Kudu tables that are accessed from Impala and you didn’t set up DNS aliases, update the HMS database
manually in the underlying database that provides the storage for HMS.
• The following is an example SQL statement you would run in the HMS database:
UPDATE TABLE_PARAMS
SET PARAM_VALUE =
'master-1.example.com,master-2.example.com,master-3.example.com'
WHERE PARAM_KEY = 'kudu.master_addresses' AND PARAM_VALUE = 'old-master';
INVALIDATE METADATA;
To verify that all masters are working properly, consider performing the following sanity checks:
• Using a browser, visit each master’s web UI and navigate to the /masters page. All the masters should now be
listed there with one master in the LEADER role and the others in the FOLLOWER role. The contents of /masters
on each master should be the same.
• Run a Kudu system check (ksck) on the cluster using the kudu command line tool. For more details, see Monitoring
Cluster Health with ksck on page 35.
Important:
• Kudu does not yet support live Raft configuration changes for masters. As such, it is only possible
to replace a master if the deployment was created with DNS aliases or if every node in the cluster
is first shut down. See the previous multi-master migration workflow for more details on deploying
with DNS aliases.
• The workflow presupposes at least basic familiarity with Kudu configuration management. If
using Cloudera Manager, the workflow also presupposes familiarity with it.
• All of the command line steps below should be executed as the Kudu UNIX user, typically kudu.
3. Choose one of the remaining live masters to serve as a basis for recovery. The rest of this workflow will refer to
this master as the "reference" master.
4. Choose an unused machine in the cluster where the new master will live. The master generates very little load so
it can be collocated with other data services or load-generating processes, though not with another Kudu master
from the same configuration. The rest of this workflow will refer to this master as the "replacement" master.
5. Perform the following preparatory steps for the replacement master:
• Ensure Kudu is installed on the machine, either via system packages (in which case the kudu and kudu-master
packages should be installed), or via some other means.
• Choose and record the directory where the master’s data will live.
master_data_dir
live master’s previously recorded data directory
Example
master_data_dir
The reference master’s previously recorded data directory.
tablet_id
Must be set to the string, 00000000000000000000000000000000.
For example
8. Using the two previously-recorded lists of UUIDs (one for all live masters and one for all masters), determine and
record (by process of elimination) the UUID of the dead master.
master_data_dir
The replacement master’s previously recorded data directory.
uuid
The dead master’s previously recorded UUID.
For example:
2. Copy the master data to the replacement master with the following command.
Important: If your Kudu cluster is secure, in addition to running as the Kudu UNIX user, you must
authenticate as the Kudu service user prior to running this command.
master_data_dir
The replacement master’s previously recorded data directory.
tablet_id
Must be set to the string, 00000000000000000000000000000000.
reference_master
The RPC address of the reference master. It must be a string of the form <hostname>:<port>.
hostname
The reference master’s previously recorded hostname or alias.
port
The reference master’s previously recorded RPC port number.
For example:
3. If you are using Cloudera Manager, add the replacement Kudu master role now, but do not start it.
• Override the empty value of the Master Address parameter for the new role with the replacement master’s
alias.
• If you are using a non-default RPC port, add the port number (separated by a colon) as well.
4. If the cluster was set up with DNS aliases, reconfigure the DNS alias for the dead master to point at the replacement
master.
5. If the cluster was set up without DNS aliases, perform the following steps:
a. Stop the remaining live masters.
b. Rewrite the Raft configurations on these masters to include the replacement master. See Step 4 of Perform
the Migration for more details.
6. Start the replacement master.
7. Restart the remaining masters in the new multi-master deployment. While the masters are shut down, there will
be an availability outage, but it should last only as long as it takes for the masters to come back up.
To verify that all masters are working properly, consider performing the following sanity checks:
• Using a browser, visit each master’s web UI and navigate to the /masters page. All the masters should now be
listed there with one master in the LEADER role and the others in the FOLLOWER role. The contents of /masters
on each master should be the same.
• Run a Kudu system check (ksck) on the cluster using the kudu command line tool. For more details, see Monitoring
Cluster Health with ksck on page 35.
Important:
• In planning the new multi-master configuration, keep in mind that the number of masters should
be odd and that three or five node master configurations are recommended.
• Dropping the number of masters below the number of masters currently needed for a Raft
majority can incur data loss. To mitigate this, ensure that the leader master is not removed during
this process.
Important: If you are using Cloudera Manager, skip the next step.
5. Modify the value of the tserver_master_addrs configuration parameter for the tablet servers to remove any
unwanted masters. See Kudu Configuration docs for the steps to modify a configuration parameter.
6. Start all of the tablet servers.
To verify that all masters are working properly, consider performing the following sanity checks:
• Using a browser, visit each master’s web UI and navigate to the /masters page. All the masters should now be
listed there with one master in the LEADER role and the others in the FOLLOWER role. The contents of /masters
on each master should be the same.
• Run a Kudu system check (ksck) on the cluster using the kudu command line tool. For more details, see Monitoring
Cluster Health with ksck on page 35.
For example:
a. In each master’s Web UI, click Masters on the Status Pages. All of the masters should be listed there with
one master in the LEADER role field and the others in the FOLLOWER role field. The contents of Masters on
all master should be the same.
b. Run the below command to verify all masters are up and listening. The UUIDs are the same and belong to
the same master as before the hostname change:
b. In impala-shell, run:
INVALIDATE METADATA;
c. Verify updating the metadata worked by running a simple SELECT query on a Kudu-backed Impala table.
To see a full list of the options available with ksck, use the --help flag. If the cluster is healthy, ksck will print
information about the cluster, a success message, and return a zero (success) exit status.
Master Summary
UUID | Address | Status
----------------------------------+-----------------------+---------
a811c07b99394df799e6650e7310f282 | master-01.example.com | HEALTHY
b579355eeeea446e998606bcb7e87844 | master-02.example.com | HEALTHY
cfdcc8592711485fad32ec4eea4fbfcd | master-02.example.com | HEALTHY
Version Summary
Version | Servers
---------+-------------------------
1.7.1 | all 6 server(s) checked
Summary by table
Name | RF | Status | Total Tablets | Healthy | Recovering | Under-replicated |
Unavailable
----------+----+---------+---------------+---------+------------+------------------+-------------
my_table | 3 | HEALTHY | 8 | 8 | 0 | 0 | 0
| Total Count
----------------+-------------
Masters | 3
Tablet Servers | 3
Tables | 1
Tablets | 8
Replicas | 24
OK
If the cluster is unhealthy, for instance if a tablet server process has stopped, ksck will report the issue(s) and return
a non-zero exit status, as shown in the abbreviated snippet of ksck output below:
------------------
Errors:
------------------
Network error: error fetching info from tablet servers: failed to gather info for all
FAILED
Runtime error: ksck discovered errors
To verify data integrity, the optional --checksum_scan flag can be set, which will ensure the cluster has consistent
data by scanning each tablet replica and comparing results. The --tables or --tablets flags can be used to limit
the scope of the checksum scan to specific tables or tablets, respectively.
For example, checking data integrity on the my_table table can be done with the following command:
By default, ksck will attempt to use a snapshot scan of the table, so the checksum scan can be done while writes
continue.
Finally, ksck also supports output in JSON format using the --ksck_format flag. JSON output contains the same
information as the plain text output, but in a format that can be used by other tools. See kudu cluster ksck
--help for more information.
Note: Unless the --force flag is specified, Kudu will not allow for the removal of a directory across
which tablets are configured to spread data. If --force is specified, all tablets configured to use that
directory will fail upon starting up and be replicated elsewhere.
Note: If the metadata directory overlaps with a data directory, as was the default prior to Kudu 1.7,
or if a non-default metadata directory is configured, the --fs_metadata_dir configuration must
be specified when running the kudu fs update_dirs tool.
Note: Only new tablet replicas, i.e. brand new tablets' replicas and replicas that are copied to the
server for high availability, will use the new directory. Existing tablet replicas on the server will not
be rebalanced across the new directory.
1. The tool can only run while the server is offline, so establish a maintenance window to update the server. The
tool itself runs quickly, so this offline window should be brief, and as such, only the server to update needs to be
offline.
However, if the server is offline for too long (see the follower_unavailable_considered_failed_sec flag),
the tablet replicas on it may be evicted from their Raft groups. To avoid this, it may be desirable to bring the entire
cluster offline while performing the update.
2. Run the tool with the desired directory configuration flags. For example, if a cluster was set up with
--fs_wal_dir=/wals, --fs_metadata_dir=/meta, and --fs_data_dirs=/data/1,/data/2,/data/3,
and /data/3 is to be removed (e.g. due to a disk error), run the command:
3. Modify the values of the fs_data_dirs flags for the updated sever. If using Cloudera Manager, make sure to
only update the configurations of the updated server, rather than of the entire Kudu service.
4. Once complete, the server process can be started. When Kudu is installed using system packages, service is
typically used:
Node Type Kudu Directory Type Kudu Releases that Crash on Disk
Failure
Master All All
Tablet Server Directory containing WALs All
Tablet Server Directory containing tablet metadata All
Tablet Server Directory containing data blocks only Pre-1.6.0
When a disk failure occurs that does not lead to a crash, Kudu will stop using the affected directory, shut down tablets
with blocks on the affected directories, and automatically re-replicate the affected tablets to other tablet servers. The
affected server will remain alive and print messages to the log indicating the disk failure, for example:
E1205 19:06:24.163748 27115 data_dirs.cc:1011] Directory /data/8/kudu/data marked as
failed
E1205 19:06:30.324795 27064 log_block_manager.cc:1822] Not using report from
/data/8/kudu/data: IO error: Could not open container 0a6283cab82d4e75848f49772d2638fe:
/data/8/kudu/data/0a6283cab82d4e75848f49772d2638fe.metadata: Read-only file system (error
30)
E1205 19:06:33.564638 27220 ts_tablet_manager.cc:946] T 4957808439314e0d97795c1394348d80
P 70f7ee61ead54b1885d819f354eb3405: aborting tablet bootstrap: tablet has data in a
failed directory
While in this state, the affected node will avoid using the failed disk, leading to lower storage volume and reduced
read parallelism. The administrator should schedule a brief window to Changing Directory Configuration on page 36
to exclude the failed disk.
When the disk is repaired, remounted, and ready to be reused by Kudu, take the following steps:
1. Make sure that the Kudu portion of the disk is completely empty.
2. Stop the tablet server.
3. Run the update_dirs tool. For example, to add /data/3, run the following:
Note: Note that existing tablets will not stripe to the restored disk, but any new tablets will stripe to
the restored disk.
Prior to Kudu 1.7.0, Kudu stripes tablet data across all directories, and will avoid writing data to full directories. Kudu
will crash if all data directories are full.
In 1.7.0 and later, new tablets are assigned a disk group consisting of data directories. The number of data directories
are as specified by the -fs_target_data_dirs_per_tablet flag with the default being 3. If Kudu is not configured
with enough data directories for a full disk group, all data directories are used. When a data directory is full, Kudu will
stop writing new data to it and each tablet that uses that data directory will write new data to other data directories
within its group. If all data directories for a tablet are full, Kudu will crash. Periodically, Kudu will check if full data
directories are still full, and will resume writing to those data directories if space has become available.
If Kudu does crash because its data directories are full, freeing space on the full directories will allow the affected
daemon to restart and resume writing. Note that it may be possible for Kudu to free some space by running:
However, the above command may also fail if there is too little space left.
It’s also possible to allocate additional data directories to Kudu in order to increase the overall amount of storage
available. See the documentation on updating a node’s directory configuration for more information. Note that existing
tablets will not use new data directories, so adding a new data directory does not resolve issues with full disks.
Suppose a tablet has lost a majority of its replicas. The first step in diagnosing and fixing the problem is to examine the
tablet's state using ksck:
State: FAILED
Data state: TABLET_DATA_READY
Last status: <failure message>
This output shows that, for tablet e822cab6c0584bc0858219d1539a17e6, the two tablet replicas on tserver-01
and tserver-02 failed. The remaining replica is not the leader, so the leader replica failed as well. This means the
chance of data loss is higher since the remaining replica on tserver-00 may have been lagging. In general, to accept
the potential data loss and restore the tablet from the remaining replicas, divide the tablet replicas into two groups:
1. Healthy replicas: Those in RUNNING state as reported by ksck
2. Unhealthy replicas
For example, in the above ksck output, the replica on tablet server tserver-00 is healthy while the replicas on
tserver-01 and tserver-02 are unhealthy. On each tablet server with a healthy replica, alter the consensus
configuration to remove unhealthy replicas. In the typical case of 1 out of 3 surviving replicas, there will be only one
healthy replica, so the consensus configuration will be rewritten to include only the healthy replica.
Once the healthy replicas' consensus configurations have been forced to exclude the unhealthy replicas, the healthy
replicas will be able to elect a leader. The tablet will become available for writes though it will still be under-replicated.
Shortly after the tablet becomes available, the leader master will notice that it is under-replicated, and will cause the
tablet to re-replicate until the proper replication factor is restored. The unhealthy replicas will be tombstoned by the
master, causing their remaining data to be deleted.
Note: These steps use a tablet server as an example, but the steps are the same for Kudu master
servers.
Warning: If multiple nodes need their FS layouts rebuilt, wait until all replicas previously hosted on
each node have finished automatically re-replicating elsewhere before continuing. Failure to do so
can result in permanent data loss.
1. The first step to rebuilding a server with a new directory configuration is emptying all of the server’s existing
directories. For example, if a tablet server is configured with --fs_wal_dir=/data/0/kudu-tserver-wal,
--fs_metadata_dir=/data/0/kudu-tserver-meta, and
--fs_data_dirs=/data/1/kudu-tserver,/data/2/kudu-tserver, the following commands will remove
the WAL directory’s and data directories' contents:
# Note: this will delete all of the data from the local tablet server.
$ rm -rf /data/0/kudu-tserver-wal/* /data/0/kudu-tserver-meta/* /data/1/kudu-tserver/*
/data/2/kudu-tserver/*
2. If using Cloudera Manager, update the configurations for the rebuilt server to include only the desired directories.
Make sure to only update the configurations of servers to which changes were applied, rather than of the entire
Kudu service.
3. After directories are deleted, the server process can be started with the new directory configuration. The appropriate
sub-directories will be created by Kudu upon starting up.
mkdir /current/data/dir
Migrating Kudu Data from One Directory to Another on the Same Host
Take the following steps to move the entire Kudu data from one directory to another.
Note:
The steps were verified on an environment where the master and the server instances were configured
to write the WAL/Data to the same directory.
where <num_seconds> is the number of seconds that will encompass the downtime. Once the downtime is finished,
reset the flag to its original value.
In Kudu versions 1.7 and lower, the --force flag must be provided in the above commands.
When run, the rebalancer will report on the initial tablet replica distribution in the cluster, log the replicas it moves,
and print a final summary of the distribution when it terminates:
If more details are needed in addition to the replica distribution summary, use the
--output_replica_distribution_details flag. If added, the flag makes the tool print per-table and per-tablet
server replica distribution statistics as well.
Use the --report_only flag to get a report on table-wide and cluster-wide replica distribution statistics without
starting any rebalancing activity.
The rebalancer can also be restricted to run on a subset of the tables by supplying the --tables flag. Note that, when
running on a subset of tables, the tool will not attempt to balance the cluster as a whole.
The length of time rebalancing is run for can be controlled with the flag --max_run_time_sec. By default, the
rebalancer will run until the cluster is balanced. To control the amount of resources devoted to rebalancing, modify
the flag --max_moves_per_server. See kudu cluster rebalance --help for more.
It's safe to stop the rebalancer tool at any time. When restarted, the rebalancer will continue rebalancing the cluster.
The rebalancer tool requires all registered tablet servers to be up and running to proceed with the rebalancing process
in order to avoid possible conflicts and races with the automatic re-replication and to keep replica placement optimal
for current configuration of the cluster. If a tablet server becomes unavailable during the rebalancing session, the
rebalancer will exit. As noted above, it's safe to restart the rebalancer after resolving the issue with unavailable tablet
servers.
The rebalancing tool can rebalance Kudu clusters running older versions as well, with some restrictions. Consult the
following table for more information. In the table, "RF" stands for "replication factor".
If the rebalancer is running against a cluster where rebalancing replication factor one tables is not supported, it will
rebalance all the other tables and the cluster as if those singly-replicated tables did not exist.
Important: Do not shut down multiple tablet servers at once. To remove multiple tablet servers from
the cluster, follow the above instructions for each tablet server, ensuring that the previous tablet
server is removed from the cluster and ksck is healthy before shutting down the next.
• To configure a different dump directory for the Kudu master, modify the value of the Kudu Master Core
Dump Directory property.
• To configure a different dump directory for the Kudu tablet servers, modify the value of the Kudu Tablet
Server Core Dump Directory property.
6. Click Save Changes.
that you are viewing. You can use these charts to keep track of disk space usage, the rate at which data is being
inserted/modified in Kudu across all tables, or any critical cluster events. You can also use them to keep track of
individual tables. For example, to find out how much space a Kudu table is using on disk:
1. Go to the Kudu service and navigate to the Charts Library tab.
2. On the left-hand side menu, click Tables to display the list of tables currently stored in Kudu.
3. Click on a table name to view the default dashboard for that table. The Total Tablet Size On Disk Across Kudu
Replicas chart displays the total size of the table on disk using a time-series chart.
Hovering with your mouse over the line on the chart opens a small pop-up window that displays information
about that data point. Click the data stream within the chart to display a larger pop-up window that includes
additional information for the table at the point in time where the mouse was clicked.
Warning: Use of server-side or private interfaces is not supported, and interfaces which are not part
of public APIs have no stability guarantees.
These examples should serve as helpful starting points for your own Kudu applications and integrations.
Maven Artifacts
The following Maven <dependency> element is valid for the Apache Kudu GA release:
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.9.0-cdh6.2.0</version>
</dependency>
Convenience binary artifacts for the Java client and various Java integrations (e.g. Spark, Flume) are also now available
via the ASF Maven repository and the Central Maven repository.
thirdparty/download-thirdparty.sh
thirdparty/build-thirdparty.sh protobuf
To build the Java client, clone the Kudu Git repository, change to the java directory, and issue the following command:
For more information about building the Java API, as well as Eclipse integration, see java/README.md.
import kudu
from kudu.client import Partitioning
from datetime import datetime
schema = builder.build()
# Open a table
table = client.table('python-example')
# Insert a row
op = table.new_insert({'key': 1, 'ts_val': datetime.utcnow()})
session.apply(op)
# Upsert a row
op = table.new_upsert({'key': 2, 'ts_val': "2016-01-01T00:00:00.000000"})
session.apply(op)
# Updating a row
op = table.new_update({'key': 1, 'ts_val': ("2017-01-01", "%Y-%m-%d")})
session.apply(op)
# Delete a row
op = table.new_delete({'key': 2})
session.apply(op)
Note: kudu-spark versions 1.8.0 and below have slightly different syntax. See the documentation of
your version for a valid example. Versioned documentation can be found on the releases page.
Then import kudu-spark and create a dataframe as demonstrated in the following sample code. In the example, replace
<kudu.master> with the actual hostname of the host running a Kudu master service, and <kudu_table> with the name
of a pre-existing table in Kudu.
import org.apache.kudu.spark.kudu._
import org.apache.kudu.client._
import collection.JavaConverters._
// Insert data
kuduContext.insertRows(df, "test_table")
// Delete data
kuduContext.deleteRows(filteredDF, "test_table")
// Upsert data
kuduContext.upsertRows(df, "test_table")
// Update data
val alteredDF = df.select("id", $"count" + 1)
kuduContext.updateRows(filteredRows, "test_table")
// Data can also be inserted into the Kudu table using the data source, though the
methods on
// KuduContext are preferred
// NB: The default is to upsert rows; to perform standard inserts instead, set operation
= insert
// in the options map
// NB: Only mode Append is supported
df.write
.options(Map("kudu.master"-> "kudu.master:7051", "kudu.table"-> "test_table"))
.mode("append")
.format("kudu").save
dataDF.registerTempTable(simpleTableName)
dataDF.show()
// Upsert a row with existing key 0 and val Null with ignoreNull set to true
val nullDF = spark.createDataFrame(Seq((0, null.asInstanceOf[String]))).toDF("key",
"val")
val wo = new KuduWriteOptions
wo.ignoreNull = true
kuduContext.upsertRows(nullDF, simpleTableName, wo)
dataDF.show()
// The val field stays unchanged
+---+---+
|key|val|
+---+---+
| 0|foo|
+---+---+
// Upsert a row with existing key 0 and val Null with ignoreNull default/set to false
kuduContext.upsertRows(nullDF, simpleTableName)
// Equivalent to:
// val wo = new KuduWriteOptions
// wo.ignoreNull = false
// kuduContext.upsertRows(nullDF, simpleTableName, wo)
df.show()
// The val field is set to Null this time
+---+----+
|key| val|
+---+----+
| 0|null|
+---+----+
• <> and ORpredicates are not pushed to Kudu, and instead will be evaluated by the Spark task. Only LIKE predicates
with a suffix wildcard are pushed to Kudu. This means LIKE "FOO%" will be pushed, but LIKE "FOO%BAR" won't.
• Kudu does not support all the types supported by Spark SQL. For example, Date and complex types are not
supported.
• Kudu tables can only be registered as temporary tables in SparkSQL.
• Kudu tables cannot be queried using HiveContext.
To diagnose multiple KuduClient instances in a Spark job, look for signs in the logs of the master being overloaded
by many GetTableLocations or GetTabletLocations requests coming from different clients, usually around the
same time. This symptom is especially likely in Spark Streaming code, where creating a KuduClient per task will result
in periodic waves of master requests from new clients.
Prerequisites
• To use Impala to query Kudu data as described in this topic, you will require Cloudera Manager and CDH 5.10.x
or higher.
• The syntax described in this topic is specific to Impala included in CDH 5.10 and higher, and will not work on
previous versions. If you are using an lower version of Impala (including the IMPALA_KUDU releases previously
available), upgrade to CDH 5.10 or higher.
Note that this topic does not describe Impala installation or upgrade procedures. Refer to the Impala documentation
to make sure you are able to run queries against Impala tables on HDFS before proceeding.
• Lower versions of CDH and Cloudera Manager used an experimental fork of Impala which is referred to as
IMPALA_KUDU. If you have previously installed the IMPALA_KUDU service, make sure you remove it from your
cluster before you proceed. Install Kudu 1.2.x (or higher) using Cloudera Manager.
For example, to specify the my_first_table table in database impala_kudu, as opposed to any other table with
the same name in another database, refer to the table as impala::impala_kudu.my_first_table. This also
applies to INSERT, UPDATE, DELETE, and DROP statements.
External
An external table (created by CREATE EXTERNAL TABLE) is not managed by Impala, and dropping such a table
does not drop the table from its source location (here, Kudu). Instead, it only removes the mapping between Impala
and Kudu. This is the mode used in the syntax provided by Kudu for mapping an existing table to Impala.
See the Impala documentation for more information about internal and external tables.
• Although not necessary, it is recommended that you configure Impala with the locations of the Kudu Masters
using the --kudu_master_hosts=<master1>[:port] flag. If this flag is not set, you will need to manually
provide this configuration each time you create a table by specifying the kudu.master_addresses property
inside a TBLPROPERTIES clause. If you are using Cloudera Manager, no such configuration is needed. The Impala
service will automatically recognize the Kudu Master hosts.
The rest of this guide assumes that this configuration has been set.
• Start Impala Shell using the impala-shell command. By default, impala-shell attempts to connect to the
Impala daemon on localhost on port 21000. To connect to a different host, use the -i <host:port> option.
To automatically connect to a specific Impala database, use the -d <database> option. For instance, if all your
Kudu tables are in Impala in the database impala_kudu, use -d impala_kudu to use this database.
• To quit the Impala Shell, use the following command: quit;
When creating a new table in Kudu, you must define a partition schema to pre-split your table. The best partition
schema to use depends upon the structure of your data and your data access patterns. The goal is to maximize parallelism
and use all your tablet servers evenly. For more information on partition schemas, see Partitioning Tables on page 54.
Note: In Impala included in CDH 5.13 and higher, the PARTITION BY clause is optional for Kudu
tables. If the clause is omitted, Impala automatically constructs a single partition that is not connected
to any column. Because such a table cannot take advantage of Kudu features for parallelized queries
and query optimizations, omitting the PARTITION BY clause is only appropriate for small lookup
tables.
The following CREATE TABLE example distributes the table into 16 partitions by hashing the id column, for simplicity.
By default, Kudu tables created through Impala use a tablet replication factor of 3. To specify the replication factor for
a Kudu table, add a TBLPROPERTIES clause to the CREATE TABLE statement as shown below where n is the replication
factor you want to use:
TBLPROPERTIES ('kudu.num_tablet_replicas' = 'n')
A replication factor must be an odd number.
Changing the kudu.num_tablet_replicas table property using the ALTER TABLE currently has no effect.
The Impala SQL Reference CREATE TABLE topic has more details and examples.
You can refine the SELECT statement to only match the rows and columns you want to be inserted into the new table.
You can also rename the columns by using syntax like SELECT name as new_col_name.
Partitioning Tables
Tables are partitioned into tablets according to a partition schema on the primary key columns. Each tablet is served
by at least one tablet server. Ideally, a table should be split into tablets that are distributed across a number of tablet
servers to maximize parallel operations. The details of the partitioning schema you use will depend entirely on the
type of data you store and how you access it.
Kudu currently has no mechanism for splitting or merging tablets after the table has been created. Until this feature
has been implemented, you must provide a partition schema for your table when you create it. When designing your
tables, consider using primary keys that will allow you to partition your table into tablets which grow at similar rates.
You can partition your table using Impala's PARTITION BY clause, which supports distribution by RANGE or HASH. The
partition scheme can contain zero or more HASH definitions, followed by an optional RANGE definition. The RANGE
definition can refer to one or more primary key columns. Examples of basic and advanced partitioning are shown
below.
Note: In Impala included in CDH 5.13 and higher, the PARTITION BY clause is optional for Kudu
tables. If the clause is omitted, Impala automatically constructs a single partition that is not connected
to any column. Because such a table cannot take advantage of Kudu features for parallelized queries
and query optimizations, omitting the PARTITION BY clause is only appropriate for small lookup
tables.
Monotonically Increasing Values - If you partition by range on a column whose values are monotonically increasing,
the last tablet will grow much larger than the others. Additionally, all data being inserted will be written to a single
tablet at a time, limiting the scalability of data ingest. In that case, consider distributing by HASH instead of, or in
addition to, RANGE.
Note: Impala keywords, such as group, are enclosed by back-tick characters when they are used as
identifiers, rather than as keywords.
Basic Partitioning
PARTITION BY RANGE
You can specify range partitions for one or more primary key columns. Range partitioning in Kudu allows splitting a
table based on specific values or ranges of values of the chosen partition keys. This allows you to balance parallelism
in writes with scan efficiency.
For instance, if you have a table that has the columns state, name, and purchase_count, and you partition the table
by state, it will create 50 tablets, one for each US state.
PARTITION BY HASH
Instead of distributing by an explicit range, or in combination with range distribution, you can distribute into a specific
number of partitions by hash. You specify the primary key columns you want to partition by, and the number of
partitions you want to use. Rows are distributed by hashing the specified key columns. Assuming that the values being
hashed do not themselves exhibit significant skew, this will serve to distribute the data evenly across all partitions.
You can specify multiple definitions, and you can specify definitions which use compound primary keys. However, one
column cannot be mentioned in multiple hash definitions. Consider two columns, a and b:
• HASH(a), HASH(b) -- will succeed
• HASH(a,b) -- will succeed
• HASH(a), HASH(a,b) -- will fail
Note: PARTITION BY HASH with no column specified is a shortcut to create the desired number of
partitions by hashing all primary key columns.
Hash partitioning is a reasonable approach if primary key values are evenly distributed in their domain and no data
skew is apparent, such as timestamps or serial IDs.
The following example creates 16 tablets by hashing the id column. A maximum of 16 tablets can be written to in
parallel. In this example, a query for a range of sku values is likely to need to read from all 16 tablets, so this may not
be the optimum schema for this table. See Advanced Partitioning on page 56 for an extended example.
Advanced Partitioning
You can combine HASH and RANGE partitioning to create more complex partition schemas. You can also specify zero
or more HASH definitions, followed by zero or one RANGE definitions. Each schema definition can encompass one or
more columns. While enumerating every possible distribution schema is out of the scope of this topic, the following
examples illustrate some of the possibilities.
PARTITION BY HASH and RANGE
Consider the basic PARTITION BY HASH example above. If you often query for a range of sku values, you can optimize
the example by combining hash partitioning with range partitioning.
The following example still creates 16 tablets, by first hashing the id column into 4 partitions, and then applying range
partitioning to split each partition into four tablets, based upon the value of the sku string. At least four tablets (and
possibly up to 16) can be written to in parallel, and when you query for a contiguous range of sku values, there's a
good chance you only need to read a quarter of the tablets to fulfill the query.
By default, the entire primary key (id, sku) will be hashed when you use PARTITION BY HASH. To hash on only
part of the primary key, and use a range partition on the rest, use the syntax demonstrated below.
The example creates 16 partitions. You could also use HASH (id, sku) PARTITIONS 16. However, a scan for sku
values would almost always impact all 16 partitions, rather than possibly being limited to 4.
Note: See Range Partitioning on page 73 for the caveats of non-covering range partitions.
The following example creates a tablet per year (5 tablets total), for storing log data. The table only accepts data from
2012 to 2016. Keys outside of these ranges will be rejected.
)
STORED AS KUDU;
When records start coming in for 2017, they will be rejected. At that point, the 2017 range should be added as follows:
In use cases where a rolling window of data retention is required, range partitions may also be dropped. For example,
if data from 2012 should no longer be retained, it may be deleted in bulk:
Note that just like dropping a table, this irrecoverably deletes all data stored in the dropped partition.
Partitioning Guidelines
• For large tables, such as fact tables, aim for as many tablets as you have cores in the cluster.
• For small tables, such as dimension tables, ensure that each tablet is at least 1 GB in size.
In general, be mindful the number of tablets limits the parallelism of reads, in the current implementation. Increasing
the number of tablets significantly beyond the number of cores is likely to have diminishing returns.
Inserting a Row
The syntax for inserting one or more rows using Impala is shown below.
Inserting In Bulk
When inserting in bulk, there are at least three common choices. Each may have advantages and disadvantages,
depending on your data and circumstances.
Multiple Single INSERT statements
This approach has the advantage of being easy to understand and implement. This approach is likely to be inefficient
because Impala has a high query start-up cost compared to Kudu's insertion performance. This will lead to relatively
high latency and poor throughput.
Single INSERT statement with multiple VALUES subclauses
If you include more than 1024 VALUES statements, Impala batches them into groups of 1024 (or the value of
batch_size) before sending the requests to Kudu. This approach may perform slightly better than multiple
sequential INSERT statements by amortizing the query start-up penalties on the Impala side. To set the batch size
for the current Impala Shell session, use the following syntax:
set batch_size=10000;
Note: Increasing the Impala batch size causes Impala to use more memory. You should verify the
impact on your cluster and tune accordingly.
Batch Insert
The approach that usually performs best, from the standpoint of both Impala and Kudu, is usually to import the
data using a SELECT FROM subclause in Impala.
1. If your data is not already in Impala, one strategy is to import it from a text file, such as a TSV or CSV file.
2. Create the Kudu table, being mindful that the columns designated as primary keys cannot have null values.
3. Insert values into the Kudu table by querying the table containing the original data, as in the following example:
Updating a Row
The syntax for updating one or more rows using Impala is shown below.
Important: The UPDATE statement only works in Impala when the underlying data source is Kudu.
Updating In Bulk
You can update in bulk using the same approaches outlined in Inserting In Bulk on page 58.
Upserting a Row
The UPSERT command acts as a combination of the INSERT and UPDATE statements. For each row processed by the
UPSERT statement:
• If another row already exists with the same set of primary key values, the other columns are updated to match
the values from the row being 'UPSERTed'.
• If there is no row with the same set of primary key values, the row is created, the same as if the INSERT statement
was used.
UPSERT Example
The following example demonstrates how the UPSERT statement works. We start by creating two tables, foo1 and
foo2.
Populate foo1 and foo2 using the following INSERT statements. For foo2, we leave column col2 with NULL values
to be upserted later:
Now use the UPSERT command to now replace the NULL values in foo2 with the actual values from foo1.
UPSERT INTO foo2 (id, col2) select id, col2 from foo1;
Altering a Table
You can the ALTER TABLE statement to change the default value, encoding, compression, or block size of existing
columns in a Kudu table.
The Impala SQL Reference ALTER TABLE includes a Kudu Considerations section with examples and a list of constraints
relevant to altering a Kudu table in Impala.
Deleting a Row
You can delete Kudu rows in near real time using Impala.
You can even use more complex joins when deleting rows. For example, Impala uses a comma in the FROM sub-clause
to specify a join query.
Important: The DELETE statement only works in Impala when the underlying data source is Kudu.
Deleting In Bulk
You can delete in bulk using the same approaches outlined in Inserting In Bulk on page 58.
Important: Altering table properties only changes Impala's metadata about the table, not the
underlying table itself. These statements do not modify any Kudu data.
Renaming a table using the ALTER TABLE ... RENAME statement only renames the Impala mapping table, regardless
of whether the table is an internal or external table. This avoids disruption to other applications that may be accessing
the underlying Kudu table.
Security Considerations
Kudu 1.3 (and higher) includes security features that allow Kudu clusters to be hardened against access from unauthorized
users. Kudu uses strong authentication with Kerberos, while communication between Kudu clients and servers can
now be encrypted with TLS. Kudu also allows you to use HTTPS encryption to connect to the web UI. These features
should work seamlessly in Impala as long as Impala’s user is given permission to access Kudu.
For instructions on how to configure a secure Kudu cluster, see Kudu Security on page 64.
Next Steps
The examples above have only explored a fraction of what you can do with Impala Shell.
• Learn about the Impala project.
• Read the Impala documentation.
• View the Impala SQL Reference.
• For in-depth information on how to configure and use Impala to query Kudu data, see Integrating Impala with
Kudu.
• Read about Impala internals or learn how to contribute to Impala on the Impala Wiki.
Kudu Security
Kudu includes security features that allow Kudu clusters to be hardened against access from unauthorized users. Kudu
uses strong authentication with Kerberos, while communication between Kudu clients and servers can now be encrypted
with TLS. Kudu also allows you to use HTTPS encryption to connect to the web UI.
The rest of this topic describes the security capabilities of Apache Kudu and how to configure a secure Kudu cluster.
Currently, there are a few known limitations in Kudu security that might impact your cluster. For the list, see Security
Limitations on page 17.
Authentication Tokens
After authenticating to a secure cluster, the Kudu client will automatically request an authentication token from the
Kudu master. An authentication token encapsulates the identity of the authenticated user and carries the Kudu master's
RSA signature so that its authenticity can be verified. This token will be used to authenticate subsequent connections.
By default, authentication tokens are only valid for seven days, so that even if a token were compromised, it cannot
be used indefinitely. For the most part, authentication tokens should be completely transparent to users. By using
authentication tokens, Kudu is able to take advantage of strong authentication, without paying the scalability cost of
communicating with a central authority for every connection.
When used with distributed compute frameworks such as Apache Spark, authentication tokens can simplify configuration
and improve security. For example, the Kudu Spark connector will automatically retrieve an authentication token during
the planning stage, and distribute the token to tasks. This allows Spark to work against a secure Kudu cluster where
only the planner node has Kerberos credentials.
kinit admin@EXAMPLE-REALM.COM
Once authenticated, you use the same client code to read from and write to Kudu servers with and without the Kerberos
configuration.
Scalability
Kudu authentication is designed to scale to thousands of nodes, which means it must avoid unnecessary coordination
with a central authentication authority (such as the Kerberos KDC) for each connection. Instead, Kudu servers and
clients use Kerberos to establish initial trust with the Kudu master, and then use alternate credentials for subsequent
connections. As described previously, the Kudu master issues internal X.509 certificates to tablet servers on startup,
and temporary authentication tokens to clients on first contact.
Encryption
Kudu allows you to use TLS to encrypt all communications among servers, and between clients and servers. Configure
TLS encryption on Kudu servers using the --rpc_encryption flag, which can be set to one of the following options:
• required - Kudu will reject unencrypted connections.
• optional - Kudu will attempt to use encryption, but will allow unencrypted connections.
• disabled - Kudu will not use encryption.
By default, the flag is set to optional. To secure your cluster, set --rpc_encryption to required.
Note: Kudu will automatically turn off encryption on local loopback connections, since traffic from
these connections is never exposed externally. This allows locality-aware compute frameworks, such
as Spark and Impala, to avoid encryption overhead, while still ensuring data confidentiality.
Using Vormetric encryption is considered experimental. We recommend you to experiment using Vormetric encryption
with Kudu in a development environment.
Coarse-grained Authorization
Kudu supports coarse-grained authorization checks for client requests based on the client's authenticated Kerberos
principal (user or service). Access levels are granted based on whitelist-style Access Control Lists (ACLs), one for each
level. Each ACL specifies a comma-separated list of users, or may be set to '*' to indicate that all authenticated users
have access rights at the specified level.
The two levels of access which can be configured are:
• Superuser - Principals authorized as a superuser can perform certain administrative functions such as using the
kudu command line tool to diagnose and repair cluster issues.
• User - Principals authorized as a user are able to access and modify all data in the Kudu cluster. This includes the
ability to create, drop, and alter tables, as well as read, insert, update, and delete data. The default value for the
User ACL is '*', which allows all users access to the cluster. However, if authentication is enabled, this will restrict
access to only those users who are able to successfully authenticate using Kerberos. Unauthenticated users on
the same network as the Kudu servers will be unable to access the cluster.
Note: Internally, Kudu has a third access level for the daemons themselves called Service. This is used
to ensure that users cannot connect to the cluster and pose as tablet servers.
Web UI Encryption
The Kudu web UI can be configured to use secure HTTPS encryption by providing each server with TLS certificates. Use
the --webserver_certificate_file and --webserver_private_key_file properties to specify the certificate
and private key to be used for communication.
Alternatively, you can choose to completely disable the web UI by setting --webserver_enabled flag to false on
the Kudu servers.
Web UI Redaction
To prevent sensitive data from being included in the web UI, all row data is redacted. Table metadata, such as table
names, column names, and partitioning information is not redacted. Alternatively, you can choose to completely disable
the web UI by setting the --webserver_enabled flag to false on the Kudu servers.
Note: Disabling the web UI will also disable REST endpoints such as /metrics. Monitoring systems
rely on these endpoints to gather metrics data.
Log Redaction
To prevent sensitive data from being included in Kudu server logs, all row data will be redacted. You can turn off log
redaction using the --redact flag.
Use the following set of instructions to secure a Kudu cluster using Cloudera Manager:
Important: The following instructions assume you already have a secure Cloudera Manager cluster
with Kerberos authentication enabled. If this is not the case, first secure your cluster using the steps
described at Enabling Kerberos Authentication Using the Cloudera Manager Wizard.
User Access Control List Add a comma-separated list of users who can access the cluster. By default,
this property is set to '*'.
The default value of '*' allows all users access to the cluster. However, if
authentication is enabled, this will restrict access to only those users who are
able to successfully authenticate using Kerberos. Unauthenticated users on
the same network as the Kudu servers will be unable to access the cluster.
Add the impala user to this list to allow Impala to query data in Kudu. You
might choose to add any other relevant usernames if you want to give access
to Spark Streaming jobs.
Configuring HTTPS Encryption for the Kudu Master and Tablet Server Web UIs
Use the following steps to enable HTTPS for encrypted connections to the Kudu master and tablet server web UIs.
1. Go to the Kudu service.
2. Click the Configuration tab.
3. Select Category > Security.
4. In the Search field, type TLS/SSL to show the relevant properties.
5. Edit the following properties according to your cluster configuration:
Tablet Server TLS/SSL Server Set to the path containing the Kudu tablet server host's private key
Private Key File (PEM Format) (PEM-format). This is used to enable TLS/SSL encryption (over HTTPS) for
browser-based connections to Kudu tablet server web UIs.
Master TLS/SSL Server Certificate Set to the path containing the signed certificate (PEM-format) for the Kudu
File (PEM Format) master host's private key (set in Master TLS/SSL Server Private Key File). The
certificate file can be created by concatenating all the appropriate root and
intermediate certificates required to verify trust.
Tablet Server TLS/SSL Server Set to the path containing the signed certificate (PEM-format) for the Kudu
Certificate File (PEM Format) tablet server host's private key (set in Tablet Server TLS/SSL Server Private
Key File). The certificate file can be created by concatenating all the
appropriate root and intermediate certificates required to verify trust.
Enable TLS/SSL for Master Server Enables HTTPS encryption on the Kudu master web UI.
Enable TLS/SSL for Tablet Server Enables HTTPS encryption on the Kudu tablet server Web UIs.
The following configuration parameters should be set on all servers (master and tablet servers) to ensure that a Kudu
cluster is secure:
# Connection Security
#--------------------
--rpc_authentication=required
--rpc_encryption=required
--keytab_file=<path-to-kerberos-keytab>
# Web UI Security
#--------------------
--webserver_certificate_file=<path-to-cert-pem>
--webserver_private_key_file=<path-to-key-pem>
# optional
--webserver_private_key_password_cmd=<password-cmd>
# Coarse-grained authorization
#--------------------------------
# This example ACL setup allows the 'impala' user as well as the
# 'etl_service_account' principal access to all data in the
# Kudu cluster. The 'hadoopadmin' user is allowed to use administrative
# tooling. Note that by granting access to 'impala', other users
# may access data in Kudu via the Impala service subject to its own
# authorization rules.
--user_acl=impala,etl_service_account
--admin_acl=hadoopadmin
More information about these flags can be found in the configuration reference documentation.
The perfect schema depends on the characteristics of your data, what you need to do with it, and the topology of your
cluster. Schema design is the single most important thing within your control to maximize the performance of your
Kudu cluster.
Column Design
A Kudu table consists of one or more columns, each with a defined type. Columns that are not part of the primary key
may be nullable. Supported column types include:
• boolean
• 8-bit signed integer
• 16-bit signed integer
• 32-bit signed integer
• 64-bit signed integer
• unixtime_micros (64-bit microseconds since the Unix epoch)
• single-precision (32-bit) IEEE-754 floating-point number
• double-precision (64-bit) IEEE-754 floating-point number
• decimal (see Decimal Type on page 71 for details)
• UTF-8 encoded string (up to 64KB uncompressed)
• binary (up to 64KB uncompressed)
Kudu takes advantage of strongly-typed columns and a columnar on-disk storage format to provide efficient encoding
and serialization. To make the most of these features, columns should be specified as the appropriate type, rather
than simulating a 'schemaless' table using string or binary columns for data which could otherwise be structured. In
addition to encoding, Kudu allows compression to be specified on a per-column basis.
Decimal Type
The decimal type is a numeric data type with fixed scale and precision suitable for financial and other arithmetic
calculations where the imprecise representation and rounding behavior of float and double make those types
impractical. The decimal type is also useful for integers larger than int64 and cases with fractional values in a primary
key.
The decimal type is a parameterized type that takes precision and scale type attributes.
Precision represents the total number of digits that can be represented by the column, regardless of the location of
the decimal point. This value must be between 1 and 38 and has no default. For example, a precision of 4 is required
to represent integer values up to 9999, or to represent values up to 99.99 with two fractional digits. You can also
represent corresponding negative values, without any change in the precision. For example, the range -9999 to 9999
still only requires a precision of 4.
Scale represents the number of fractional digits. This value must be between 0 and the precision. A scale of 0 produces
integral values, with no fractional part. If precision and scale are equal, all of the digits come after the decimal point.
For example, a decimal with precision and scale equal to 3 can represent values between -0.999 and 0.999.
Performance considerations:
Kudu stores each value in as few bytes as possible depending on the precision specified for the decimal column. For
that reason it is not advised to just use the highest precision possible for convenience. Doing so could negatively impact
performance, memory and storage.
Before encoding and compression:
• Decimal values with precision of 9 or less are stored in 4 bytes.
• Decimal values with precision of 10 through 18 are stored in 8 bytes.
• Decimal values with precision greater than 18 are stored in 16 bytes.
Note: The precision and scale of decimal columns cannot be changed by altering the table.
Column Encoding
Depending on the type of the column, Kudu columns can be created with the following encoding types.
Plain Encoding
Data is stored in its natural format. For example, int32 values are stored as fixed-size 32-bit little-endian integers.
Bitshuffle Encoding
A block of values is rearranged to store the most significant bit of every value, followed by the second most significant
bit of every value, and so on. Finally, the result is LZ4 compressed. Bitshuffle encoding is a good choice for columns
that have many repeated values, or values that change by small amounts when sorted by primary key. The bitshuffle
project has a good overview of performance and use cases.
Run Length Encoding
Runs (consecutive repeated values) are compressed in a column by storing only the value and the count. Run length
encoding is effective for columns with many consecutive repeated values when sorted by primary key.
Dictionary Encoding
A dictionary of unique values is built, and each column value is encoded as its corresponding index in the dictionary.
Dictionary encoding is effective for columns with low cardinality. If the column values of a given row set are unable
to be compressed because the number of unique values is too high, Kudu will transparently fall back to plain encoding
for that row set. This is evaluated during flush.
Prefix Encoding
Common prefixes are compressed in consecutive column values. Prefix encoding can be effective for values that
share common prefixes, or the first column of the primary key, since rows are sorted by primary key within tablets.
Each column in a Kudu table can be created with an encoding, based on the type of the column. Starting with Kudu
1.3, default encodings are specific to each column type.
Column Compression
Kudu allows per-column compression using the LZ4, Snappy, or zlib compression codecs.
By default, columns that are Bitshuffle-encoded are inherently compressed with the LZ4 compression. Otherwise,
columns are stored uncompressed. Consider using compression if reducing storage space is more important than raw
scan performance.
Every data set will compress differently, but in general LZ4 is the most efficient codec, while zlib will compress to
the smallest data sizes. Bitshuffle-encoded columns are automatically compressed using LZ4, so it is not recommended
to apply additional compression on top of this encoding.
Each time a row is inserted into a Kudu table, Kudu looks up the primary key in the primary key index storage to check
whether that primary key is already present in the table. If the primary key exists in the table, a "duplicate key" error
is returned. In the typical case where data is being inserted at the current time as it arrives from the data source, only
a small range of primary keys are "hot". So, each of these "check for presence" operations is very fast. It hits the cached
primary key storage in memory and doesn’t require going to disk.
In the case when you load historical data, which is called "backfilling", from an offline data source, each row that is
inserted is likely to hit a cold area of the primary key index which is not resident in memory and will cause one or more
HDD disk seeks. For example, in a normal ingestion case where Kudu sustains a few million inserts per second, the
"backfill" use case might sustain only a few thousand inserts per second.
To alleviate the performance issue during backfilling, consider the following options:
• Make the primary keys more compressible.
For example, with the first column of a primary key being a random ID of 32-bytes, caching one billion primary
keys would require at least 32 GB of RAM to stay in cache. If caching backfill primary keys from several days ago,
you need to have several times 32 GB of memory. By changing the primary key to be more compressible, you
increase the likelihood that the primary keys can fit in cache and thus reducing the amount of random disk I/Os.
• Use SSDs for storage as random seeks are orders of magnitude faster than spinning disks.
• Change the primary key structure such that the backfill writes hit a continuous range of primary keys.
Partitioning
In order to provide scalability, Kudu tables are partitioned into units called tablets, and distributed across many tablet
servers. A row always belongs to a single tablet. The method of assigning rows to tablets is determined by the partitioning
of the table, which is set during table creation.
Choosing a partitioning strategy requires understanding the data model and the expected workload of a table. For
write-heavy workloads, it is important to design the partitioning such that writes are spread across tablets in order to
avoid overloading a single tablet. For workloads involving many short scans, where the overhead of contacting remote
servers dominates, performance can be improved if all of the data for the scan is located on the same tablet.
Understanding these fundamental trade-offs is central to designing an effective partition schema.
Important: Kudu does not provide a default partitioning strategy when creating tables. It is
recommended that new tables which are expected to have heavy read and write workloads have at
least as many tablets as tablet servers.
Kudu provides two types of partitioning: range partitioning and hash partitioning. Tables may also have multilevel
partitioning, which combines range and hash partitioning, or multiple instances of hash partitioning.
Range Partitioning
Range partitioning distributes rows using a totally-ordered range partition key. Each partition is assigned a contiguous
segment of the range partition keyspace. The key must be comprised of a subset of the primary key columns. If the
range partition columns match the primary key columns, then the range partition key of a row will equal its primary
key. In range partitioned tables without hash partitioning, each range partition will correspond to exactly one tablet.
The initial set of range partitions is specified during table creation as a set of partition bounds and split rows. For each
bound, a range partition will be created in the table. Each split will divide a range partition in two. If no partition bounds
are specified, then the table will default to a single partition covering the entire key space (unbounded below and
above). Range partitions must always be non-overlapping, and split rows must fall within a range partition.
must not overlap with any existing range partitions. Kudu allows dropping and adding any number of range partitions
in a single transactional alter table operation.
Dynamically adding and dropping range partitions is particularly useful for time series use cases. As time goes on, range
partitions can be added to cover upcoming time ranges. For example, a table storing an event log could add a month-wide
partition just before the start of each month in order to hold the upcoming events. Old range partitions can be dropped
in order to efficiently remove historical data, as necessary.
Hash Partitioning
Hash partitioning distributes rows by hash value into one of many buckets. In single-level hash partitioned tables, each
bucket will correspond to exactly one tablet. The number of buckets is set during table creation. Typically the primary
key columns are used as the columns to hash, but as with range partitioning, any subset of the primary key columns
can be used.
Hash partitioning is an effective strategy when ordered access to the table is not needed. Hash partitioning is effective
for spreading writes randomly among tablets, which helps mitigate hot-spotting and uneven tablet sizes.
Multilevel Partitioning
Kudu allows a table to combine multiple levels of partitioning on a single table. Zero or more hash partition levels can
be combined with an optional range partition level. The only additional constraint on multilevel partitioning beyond
the constraints of the individual partition types, is that multiple levels of hash partitions must not hash the same
columns.
When used correctly, multilevel partitioning can retain the benefits of the individual partitioning types, while reducing
the downsides of each. The total number of tablets in a multilevel partitioned table is the product of the number of
partitions in each level.
Partition Pruning
Kudu scans will automatically skip scanning entire partitions when it can be determined that the partition can be
entirely filtered by the scan predicates. To prune hash partitions, the scan must include equality predicates on every
hashed column. To prune range partitions, the scan must include equality or range predicates on the range partitioned
columns. Scans on multilevel partitioned tables can take advantage of partition pruning on any of the levels
independently.
Partitioning Examples
To illustrate the factors and tradeoffs associated with designing a partitioning strategy for a table, we will walk through
some different partitioning scenarios. Consider the following table schema for storing machine metrics data (using
SQL syntax and date-formatted timestamps for clarity):
Range Partitioning
A natural way to partition the metrics table is to range partition on the time column. Let’s assume that we want to
have a partition per year, and the table will hold data for 2014, 2015, and 2016. There are at least two ways that the
table could be partitioned: with unbounded range partitions, or with bounded range partitions.
The image above shows the two ways the metrics table can be range partitioned on the time column. In the first
example (in blue), the default range partition bounds are used, with splits at 2015-01-01 and 2016-01-01. This
results in three tablets: the first containing values before 2015, the second containing values in the year 2015, and the
third containing values after 2016. The second example (in green) uses a range partition bound of [(2014-01-01),
(2017-01-01)], and splits at 2015-01-01 and 2016-01-01. The second example could have equivalently been
expressed through range partition bounds of [(2014-01-01), (2015-01-01)], [(2015-01-01), (2016-01-01)],
and [(2016-01-01), (2017-01-01)], with no splits. The first example has unbounded lower and upper range
partitions, while the second example includes bounds.
Each of the range partition examples above allows time-bounded scans to prune partitions falling outside of the scan’s
time bound. This can greatly improve performance when there are many partitions. When writing, both examples
suffer from potential hot-spotting issues. Because metrics tend to always be written at the current time, most writes
will go into a single range partition.
The second example is more flexible, because it allows range partitions for future years to be added to the table. In
the first example, all writes for times after 2016-01-01 will fall into the last partition, so the partition may eventually
become too large for a single tablet server to handle.
Hash Partitioning
Another way of partitioning the metrics table is to hash partition on the host and metric columns.
In the example above, the metrics table is hash partitioned on the host and metric columns into four buckets.
Unlike the range partitioning example earlier, this partitioning strategy will spread writes over all tablets in the table
evenly, which helps overall write throughput. Scans over a specific host and metric can take advantage of partition
pruning by specifying equality predicates, reducing the number of scanned tablets to one. One issue to be careful of
with a pure hash partitioning strategy, is that tablets could grow indefinitely as more and more data is inserted into
the table. Eventually tablets will become too big for an individual tablet server to hold.
hash(host, metric) - writes are spread evenly - scans on specific hosts and - tablets could grow too
among tablets metrics can be pruned large
Hash partitioning is good at maximizing write throughput, while range partitioning avoids issues of unbounded tablet
growth. Both strategies can take advantage of partition pruning to optimize scans in different scenarios. Using multilevel
partitioning, it is possible to combine the two strategies in order to gain the benefits of both, while minimizing the
drawbacks of each.
In the example above, range partitioning on the time column is combined with hash partitioning on the host and
metric columns. This strategy can be thought of as having two dimensions of partitioning: one for the hash level and
one for the range level. Writes into this table at the current time will be parallelized up to the number of hash buckets,
in this case 4. Reads can take advantage of time bound and specific host and metric predicates to prune partitions.
New range partitions can be added, which results in creating 4 additional tablets (as if a new column were added to
the diagram).
In the example above, the table is hash partitioned on host into 4 buckets, and hash partitioned on metric into 3
buckets, resulting in 12 tablets. Although writes will tend to be spread among all tablets when using this strategy, it is
slightly more prone to hot-spotting than when hash partitioning over multiple independent columns, since all values
for an individual host or metric will always belong to a single tablet. Scans can take advantage of equality predicates
on the host and metric columns separately to prune partitions.
Multiple levels of hash partitioning can also be combined with range partitioning, which logically adds another dimension
of partitioning.
Schema Alterations
You can alter a table’s schema in the following ways:
• Rename the table
• Rename primary key columns
• Rename, add, or drop non-primary key columns
• Add and drop range partitions
While Kudu is designed to eventually be fully ACID (Atomic, Consistent, Isolated, Durable), multi-tablet transactions
have not yet been implemented. As such, the following discussion focuses on single-tablet write operations, and only
briefly touches multi-tablet reads.
Kudu was designed to be externally consistent, that is, preserving consistency when operations span multiple tablets
and even multiple data centers. In practice this means that if a write operation changes item x at tablet A, and a
following write operation changes item y at tablet B, you might want to enforce that if the change to y is observed,
the change to x must also be observed. There are many examples where this can be important. For example, if Kudu
is storing clickstreams for further analysis, and two clicks follow each other but are stored in different tablets, subsequent
clicks should be assigned subsequent timestamps so that the causal relationship between them is captured.
• CLIENT_PROPAGATED Consistency
Kudu’s default external consistency mode is called CLIENT_PROPAGATED. This mode causes writes from a single
client to be automatically externally consistent. In the clickstream scenario above, if the two clicks are submitted
by different client instances, the application must manually propagate timestamps from one client to the other
for the causal relationship to be captured. Timestamps between clients a and b can be propagated as follows:
Java Client
Call AsyncKuduClient#getLastPropagatedTimestamp() on client a, propagate the timestamp to client b,
and call AsyncKuduClient#setLastPropagatedTimestamp() on client b.
C++ Client
Call KuduClient::GetLatestObservedTimestamp() on client a, propagate the timestamp to client b, and
call KuduClient::SetLatestObservedTimestamp() on client b.
• COMMIT_WAIT Consistency
Kudu also has an experimental implementation of an external consistency model (used in Google’s Spanner),
called COMMIT_WAIT. COMMIT_WAIT works by tightly synchronizing the clocks on all machines in the cluster. Then,
when a write occurs, timestamps are assigned and the results of the write are not made visible until enough time
has passed so that no other machine in the cluster could possibly assign a lower timestamp to a following write.
When using this mode, the latency of writes is tightly tied to the accuracy of clocks on all the cluster hosts, and
using this mode with loose clock synchronization causes writes to either take a long time to complete, or even
time out.
The COMMIT_WAIT consistency mode may be selected as follows:
Java Client
Call KuduSession#setExternalConsistencyMode(ExternalConsistencyMode.COMMIT_WAIT)
C++ Client
Call KuduSession::SetExternalConsistencyMode(COMMIT_WAIT)
Warning:
COMMIT_WAIT consistency is an experimental feature. It may return incorrect results, exhibit
performance issues, or negatively impact cluster stability. Its use in production environments is
discouraged.
C++ Client
Call KuduScanner::SetReadMode()
The following modes are available in both clients:
READ_LATEST
This is the default read mode. The server takes a snapshot of the MVCC state and proceeds with the read immediately.
Reads in this mode only yield 'Read Committed' isolation.
READ_AT_SNAPSHOT
In this read mode, scans are consistent and repeatable. A timestamp for the snapshot is selected either by the
server, or set explicitly by the user through KuduScanner::SetSnapshotMicros(). Explicitly setting the timestamp
is recommended.
The server waits until this timestamp is 'safe'; that is, until all write operations that have a lower timestamp have
completed and are visible). This delay, coupled with an external consistency method, will eventually allow Kudu to
have full strict-serializable semantics for reads and writes. However, this is still a work in progress and some
anomalies are still possible. Only scans in this mode can be fault-tolerant.
Selecting between read modes requires balancing the trade-offs and making a choice that fits your workload. For
instance, a reporting application that needs to scan the entire database might need to perform careful accounting
operations, so that scan may need to be fault-tolerant, but probably doesn’t require a to-the-microsecond up-to-date
view of the database. In that case, you might choose READ_AT_SNAPSHOT and select a timestamp that is a few seconds
in the past when the scan starts. On the other hand, a machine learning workload that is not ingesting the whole data
set and is already statistical in nature might not require the scan to be repeatable, so you might choose READ_LATEST
instead for better scan performance.
Note:
Kudu also provides replica selection API for you to choose at which replica the scan should be
performed:
Java Client
Call KuduScannerBuilder#replicaSelection(...)
C++ Client
Call KuduScanner::SetSelection(...)
This API is a means to control locality and, in some cases, latency. The replica selection API has no
effect on the consistency guarantees, which will hold no matter which replica is selected.
Writes
Support for COMMIT_WAIT is experimental and requires careful tuning of the time-synchronization protocol, such as
NTP (Network Time Protocol). Its use in production environments is discouraged.
Recommendation
If external consistency is a requirement and you decide to use COMMIT_WAIT, the time-synchronization protocol
needs to be tuned carefully. Each transaction will wait 2x the maximum clock error at the time of execution, which
is usually in the 100 msec. to 1 sec. range with the default settings, maybe more. Thus, transactions would take at
least 200 msec. to 2 sec. to complete when using the default settings and may even time out.
• A local server should be used as a time server. We’ve performed experiments using the default NTP time source
available in a Google Compute Engine data center and were able to obtain a reasonable tight max error bound,
usually varying between 12-17 milliseconds.
• The following parameters should be adjusted in /etc/ntp.conf to tighten the maximum error:
– server my_server.org iburst minpoll 1 maxpoll 8
– tinker dispersion 500
– tinker allan 0
Reads (Scans)
• On a leader change, READ_AT_SNAPSHOT scans at a snapshot whose timestamp is beyond the last write, may
yield non-repeatable reads (see KUDU-1188).
Recommendation
If repeatable snapshot reads are a requirement, use READ_AT_SNAPSHOT with a timestamp that is slightly in the
past (between 2-5 seconds, ideally). This will circumvent the anomaly described above. Even when the anomaly
has been addressed, back-dating the timestamp will always make scans faster, since they are unlikely to block.
• Impala scans are currently performed as READ_LATEST and have no consistency guarantees.
• In AUTO_BACKGROUND_FLUSH mode, or when using "async" flushing mechanisms, writes applied to a single client
session may get reordered due to the concurrency of flushing the data to the server. This is particularly noticeable
if a single row is quickly updated with different values in succession. This phenomenon affects all client API
implementations. Workarounds are described in the respective API documentation for FlushMode or
AsyncKuduSession. See KUDU-1767.
Maintenance Manager
The maintenance manager schedules and runs background tasks. At any given point in time, the maintenance manager
is prioritizing the next task based on improvements needed at that moment, such as relieving memory pressure,
improving read performance, or freeing up disk space. The number of worker threads dedicated to running background
tasks can be controlled by setting --maintenance_manager_num_threads.
With Kudu 1.4, the maintenance manager features improved utilization of the configured maintenance threads.
Previously, maintenance work would only be scheduled a maximum of 4 times per second, but now maintenance work
will be scheduled immediately whenever any configured thread is available. Make sure that the
--maintenance_manager_num_threads property is set to at most a 1:3 ratio for Maintenance Manager threads
to the number of data directories (for spinning disks). This will improve the throughput of write-heavy workloads.
For more information on what these compaction operations do, see the Kudu Tablet design document.
The metrics associated with these tasks have the prefix compact_rs, delta_minor_compact_rs, and
delta_major_compact_rs, respectively.
There are two background tasks that remove historical MVCC data older than the AHM:
• The one that runs the merging compaction, called CompactRowSetsOp (see above).
• A separate background task deletes old undo delta blocks, called UndoDeltaBlockGCOp. Running
UndoDeltaBlockGCOp reduces disk space usage in all workloads, but particularly in those with a higher volume
of updates or upserts. The metrics associated with this background task have the prefix undo_delta_block.
Terms
We will use the following terms in this topic:
• hot replica: A tablet replica that is continuously receiving writes. For example, in a time series use case, tablet
replicas for the most recent range partition on a time column would be continuously receiving the latest data,
and would be hot replicas.
• cold replica: A tablet replica that is not hot, i.e. a replica that is not frequently receiving writes, for example, once
every few minutes. A cold replica may be read from. For example, in a time series use case, tablet replicas for
previous range partitions on a time column would not receive writes at all, or only occasionally receive late updates
or additions, but may be constantly read.
• data on disk: The total amount of data stored on a tablet server across all disks, post-replication, post-compression,
and post-encoding.
Example Workload
The sections below perform sample calculations using the following parameters:
• 200 hot replicas per tablet server
• 1600 cold replicas per tablet server
• 8TB of data on disk per tablet server (about 4.5GB/replica)
• 512MB block cache
• 40 cores per server
• limit of 32000 file descriptors per server
• a read workload with 1 frequently-scanned table with 40 columns
This workload resembles a time series use case, where the hot replicas correspond to the most recent range partition
on time.
Memory
The flag --memory_limit_hard_bytes determines the maximum amount of memory that a Kudu tablet server may
use. The amount of memory used by a tablet server scales with data size, write workload, and read concurrency. The
following table provides numbers that can be used to compute a rough estimate of memory usage.
Using this information for the example load gives the following breakdown of memory usage:
Type Amount
8TB data on disk 8TB * 1.5GB / 1TB = 12GB
200 hot replicas 200 * 128MB = 25.6GB
1 40-column, frequently-scanned table 40 * 40 * 256KB = 409.6MB
Block Cache --block_cache_capacity_mb=512 = 512MB
Using this as a rough estimate of Kudu’s memory usage, select a memory limit so that the expected memory usage of
Kudu is around 50-75% of the hard limit.
Occasional rejections due to memory pressure are fine and act as backpressure to clients. Clients will transparently
retry operations. However, no operations should time out.
File Descriptors
Processes are allotted a maximum number of open file descriptors (also referred to as fds). If a tablet server attempts
to open too many fds, it will typically crash with a message saying something like "too many open files". The following
table summarizes the sources of file descriptor usage in a Kudu tablet server process:
Every replica has at least one WAL segment and at least one WAL index, and should have the same number of segments
and indices; however, the number of segments and indices can be greater for a replica if one of its peer replicas is
falling behind. WAL segment and index fds are closed as WALs are garbage collected.
Using this information for the example load gives the following breakdown of file descriptor usage, under the assumption
that some replicas are lagging and using 10 WAL segments:
Type Amount
file cache 40% * 32000 fds = 12800 fds
1600 cold replicas 1600 cold replicas * 3 fds / cold replica = 4800 fds
200 hot replicas (2 / segment * 10 segments/hot replica * 200 hot replicas)
+ (1 / index * 10 indices / hot replica * 200 hot replicas) =
6000 fds
Total 23600 fds
So for this example, the tablet server process has about 32000 - 23600 = 8400 fds to spare.
There is typically no downside to configuring a higher file descriptor limit if approaching the currently configured limit.
Threads
Processes are allotted a maximum number of threads by the operating system, and this limit is typically difficult or
impossible to change. Therefore, this section is more informational than advisory.
If a Kudu tablet server’s thread count exceeds the OS limit, it will crash, usually with a message in the logs like
"pthread_create failed: Resource temporarily unavailable". If the system thread count limit is exceeded, other processes
on the same node may also crash.
Threads and thread pools are used all over Kudu for various purposes, but the number of threads found in nearly all
of these does not scale with load or data/tablet size; instead, the number of threads is either a hard coded constant,
a constant defined by a configuration parameter, or based on a static dimension (such as the number of CPU cores).
The only exception to this is the WAL append thread, one of which exists for every "hot" replica.
Note that all replicas may be considered hot at startup, so tablet servers' thread usage will generally peak when started
and settle down thereafter.
Error during hole punch test. The log block manager requires a
filesystem with hole punching support such as ext4 or xfs. On el6,
kernel version 2.6.32-358 or newer is required. To run without hole
punching (at the cost of some efficiency and scalability), reconfigure
Kudu with --block_manager=file. Refer to the Kudu documentation for more
details. Raw error message follows.
Note:
ext4 mountpoints may actually be backed by ext2 or ext3 formatted devices, which do not support
hole punching. The hole punching test will fail when run on such filesystems. There are several different
ways to determine whether an ext4 mountpoint is backed by an ext2, ext3, or ext4 formatted device;
see this Stack Exchange post for details.
Without hole punching support, the log block manager is unsafe to use. It won’t ever delete blocks, and will consume
ever more space on disk.
If you can’t use hole punching in your environment, you can still try Kudu. Enable the file block manager instead of the
log block manager by adding the --block_manager=file flag to the commands you use to start the master and
tablet servers. The file block manager does not scale as well as the log block manager.
Check failed: _s.ok() Bad status: Already present: Could not create new FS layout:
FSManager root is not empty: /data0/kudu/data
This could be because Kudu was configured with non-empty data directories on first startup, or because a
previously-running, healthy Kudu process was restarted and at least one data directory was deleted or is somehow
corrupted, perhaps because of a disk error. If it is the latter, refer Changing Directory Configuration on page 36.
F0924 20:24:36.336809 14550 hybrid_clock.cc:191 Couldn't get the current time: Clock
unsynchronized. Status: Service unavailable: Error reading clock. Clock considered
unsynchronized.
If NTP is installed and synchronized, but the maximum clock error is too high, the user will see a message such as:
Sep 17, 8:13:09.873 PM FATAL hybrid_clock.cc:196 Couldn't get the current time: Clock
synchronized, but error: 11130000, is past the maximum allowable error: 10000000
or
Sep 17, 8:32:31.135 PM FATAL tablet_server_main.cc:38 Check failed: _s.ok() Bad status:
Service unavailable: Cannot initialize clock: Cannot initialize HybridClock. Clock
synchronized but error was too high (11711000 us).
Installing NTP
To install NTP, use the command appropriate for your operating system:
OS Command
Debian/Ubuntu sudo apt-get install ntp
If NTP is installed but not running, start it using one of these commands:
OS Command
Debian/Ubuntu sudo service ntp restart
In contrast, a system without NTP properly configured and running will output something like the following:
Note: Both opeers and lpeers may be helpful as lpeers lists refid and jitter, while opeers
lists dispersion.
Note: After configuring NTP, use the ntpq tool described above to verify that ntpd was able to
connect to a variety of peers. If no public peers appear, it is possiblbe that the NTP protocol is being
blocked by a firewall or other network connectivity issue.
Note: Kudu 1.5.0 and earlier versions were less resilient to brief NTP outages. In addition, they
contained a bug which could cause Kudu to incorrectly measure the maximum error, resulting in
crashes. If you experience crashes related to clock synchronization on these earlier versions of Kudu
and it appears that the system’s NTP configuration is correct, consider upgrading to Kudu 1.6.0 or
later.
Note: NTP requires a network connection and may take a few minutes to synchronize the clock at
startup. In some cases a spotty network connection may make NTP report the clock as unsynchronized.
A common, though temporary, workaround for this is to restart NTP with one of the commands above.
report the disk space used by Kudu. For example, the size listed by ls -l does not accurately reflect the disk space
used by Kudu data files:
$ ls -lh /data/kudu/tserver/data
total 117M
-rw------- 1 kudu kudu 160M Mar 26 19:37 0b9807b8b17d48a6a7d5b16bf4ac4e6d.data
-rw------- 1 kudu kudu 4.4K Mar 26 19:37 0b9807b8b17d48a6a7d5b16bf4ac4e6d.metadata
-rw------- 1 kudu kudu 32M Mar 26 19:37 2f26eeacc7e04b65a009e2c9a2a8bd20.data
-rw------- 1 kudu kudu 4.3K Mar 26 19:37 2f26eeacc7e04b65a009e2c9a2a8bd20.metadata
-rw------- 1 kudu kudu 672M Mar 26 19:37 30a2dd2cd3554d8a9613f588a8d136ff.data
-rw------- 1 kudu kudu 4.4K Mar 26 19:37 30a2dd2cd3554d8a9613f588a8d136ff.metadata
-rw------- 1 kudu kudu 32M Mar 26 19:37 7434c83c5ec74ae6af5974e4909cbf82.data
-rw------- 1 kudu kudu 4.3K Mar 26 19:37 7434c83c5ec74ae6af5974e4909cbf82.metadata
-rw------- 1 kudu kudu 672M Mar 26 19:37 772d070347a04f9f8ad2ad3241440090.data
-rw------- 1 kudu kudu 4.4K Mar 26 19:37 772d070347a04f9f8ad2ad3241440090.metadata
-rw------- 1 kudu kudu 160M Mar 26 19:37 86e50a95531f46b6a79e671e6f5f4151.data
-rw------- 1 kudu kudu 4.4K Mar 26 19:37 86e50a95531f46b6a79e671e6f5f4151.metadata
-rw------- 1 kudu kudu 687 Mar 26 19:26 block_manager_instance
Notice that the total size reported is 117MiB, while the first file’s size is listed as 160MiB. Adding the -s option to ls
will cause ls to output the file’s disk space usage.
The du and df utilities report the actual disk space usage by default.
$ du -h /data/kudu/tserver/data118M /data/kudu/tserver/data
The apparent size can be shown with the --apparent-size flag to du.
minidump-2-core -o 02cb4a97-ee37-6454-73a9d9cb-590c7dde.core \
02cb4a97-ee37-6454-73a9d9cb-590c7dde.dmp
gdb /opt/cloudera/parcels/KUDU/lib/kudu/sbin-release/kudu-master \
-s /opt/cloudera/parcels/KUDU/lib/debug/usr/lib/kudu/sbin-release/kudu-master.debug \
02cb4a97-ee37-6454-73a9d9cb-590c7dde.core
For more information, see Getting started with Breakpad and Linux Minidump to Core.
Daemon URL
Tablet Server <tablet-server-1.example.com>:8050/tracing.html
Master <master-1.example.com>:8051/tracing.html
Saving Traces
After you have collected traces, you can save these traces as JSON files by clicking Save. To load and analyze a saved
JSON file, click Load and choose the file.
These traces can indicate which part of the request was slow. Make sure you include them when filing bug reports
related to RPC latency outliers.
User stack:
@ 0x3a1ace10c4 (unknown)
@ 0x1262103 (unknown)
@ 0x12622d4 (unknown)
@ 0x12603df (unknown)
@ 0x8e7bfb (unknown)
@ 0x8f478b (unknown)
@ 0x8f55db (unknown)
@ 0x12a7b6f (unknown)
@ 0x3a1b007851 (unknown)
@ 0x3a1ace894d (unknown)
@ (nil) (unknown)
These traces can be useful for diagnosing root-cause latency issues in Kudu especially when they are caused by underlying
systems such as disk controllers or filesystems.
Memory Limits
Kudu has a hard and soft memory limit. The hard memory limit is the maximum amount a Kudu process is allowed to
use, and is controlled by the --memory_limit_hard_bytes flag. The soft memory limit is a percentage of the hard
memory limit, controlled by the flag memory_limit_soft_percentage and with a default value of 80%, that
determines the amount of memory a process may use before it will start rejecting some write operations.
If the logs or RPC traces contain messages such as the following example, then Kudu is rejecting writes due to memory
back pressure. This may result in write timeouts.
• Increase the rate at which Kudu can flush writes from memory to disk by increasing the number of disks or
increasing the number of maintenance manager threads --maintenance_manager_num_threads. Generally,
the recommended ratio of maintenance manager threads to data directories is 1:3.
• Reduce the volume of writes flowing to Kudu on the application side.
Finally, in Kudu versions 1.7 and lower, check the value of the --block_cache_capacity_mb setting. This setting
determines the maximum size of Kudu's block cache. While a higher value can help with read and write performance,
setting it too high as a percentage of the --memory_limit_hard_bytes setting is harmful. Do not raise
--block_cache_capacity_mb above --memory_pressure_percentage (default 60%) of
--memory_limit_hard_bytes, as this will cause Kudu to flush aggressively even if write throughput is low. The
recommended value for --block_cache_capacity_mb is below the following:
(50% * --memory_pressure_percentage) * --memory_limit_hard_bytes
With the defaults, this means the --block_cache_capacity_mb should not exceed 30% of
--memory_limit_hard_bytes.
In Kudu 1.8 and higher, servers will refuse to start if the block cache capacity exceeds the memory pressure threshold.
{
"name": "block_cache_inserts",
"value": 64
},
{
"name": "block_cache_lookups",
"value": 512
},
{
"name": "block_cache_evictions",
"value": 0
},
{
"name": "block_cache_misses",
"value": 96
},
{
"name": "block_cache_misses_caching",
"value": 64
},
{
"name": "block_cache_hits",
"value": 0
},
{
"name": "block_cache_hits_caching",
"value": 352
},
{
"name": "block_cache_usage",
"value": 6976
}
To judge the efficiency of the block cache on a tablet server, first wait until the server has been running and serving
normal requests for some time, so the cache is not cold. Unless the server stores very little data or is idle,
block_cache_usage should be equal or nearly equal to block_cache_capacity_mb. Once the cache has reached
steady state, compare block_cache_lookups to block_cache_misses_caching. The latter metric counts the
number of blocks that Kudu expected to read from cache but which weren’t found in the cache. If a significant amount
of lookups result in misses on expected cache hits, and theblock_cache_evictions metric is significant compared
to block_cache_inserts, then raising the size of the block cache may provide a performance boost. However, the
utility of the block cache is highly dependent on workload, so it’s necessary to test the benefits of a larger block cache.
Heap Sampling
For advanced debugging of memory usage, administrators may enable heap sampling on Kudu daemons. This allows
Kudu developers to associate memory usage with the specific lines of code and data structures responsible. When
reporting a bug related to memory usage or an apparent memory leak, heap profiling can give quantitative data to
pinpoint the issue.
To enable heap sampling on a Kudu daemon, pass the flag --heap-sample-every-n-bytes=524588. If heap
sampling is enabled, the current sampled heap occupancy can be retrieved over HTTP by visiting
http://tablet-server.example.com:8050/pprof/heap or http://master.example.com:8051/pprof/heap.
The output is a machine-readable dump of the stack traces with their associated heap usage.
Rather than visiting the heap profile page directly in a web browser, it is typically more useful to use the pprof tool
that is distributed as part of the gperftools open source project. For example, a developer with a local build tree
can use the following command to collect the sampled heap usage and output an SVG diagram:
The resulting SVG may be visualized in a web browser or sent to the Kudu community to help troubleshoot memory
occupancy issues.
Tip: Heap samples contain only summary information about allocations and do not contain any data from the heap.
It is safe to share heap samples in public without fear of exposing confidential or sensitive data.
nscd can alleviate slow name resolution by providing a cache for the most common name service requests, such as
for passwords, groups, and hosts.
Refer to your operating system documentation for how to install and enable nscd.
Usability Issues
ClassNotFoundException: com.cloudera.kudu.hive.KuduStorageHandler
You will encounter this exception when you try to access a Kudu table using Hive. This is not a case of a missing jar,
but simply that Impala stores Kudu metadata in Hive in a format that is unreadable to other tools, including Hive itself.
and Spark. Currently, there is no workaround for Hive users. Spark users can work around this by creating temporary
tables.
Runtime Error: Could not create thread: Resource temporarily unavailable (error 11)
You will encounter this error when Kudu is unable to create more threads, usually on versions older than CDH 5.15 /
Kudu 1.7. It happens on tablet servers, and is a sign that the tablet server hosts too many tablet replicas.
To fix the issue, you can raise the nproc ulimit as detailed in the documentation for your operating system or distribution.
However, the better solution is to reduce the number of replicas on the tablet server. This may involve rethinking the
table's partitioning schema. For the recommended limits on number of replicas per tablet server, see the known issues
and scaling limitations documentation.
If there is at least one replica for each tablet that does not return a corruption error, you can repair the bad copies by
deleting them and forcing them to be re-replicated from the leader using the remote_replica delete tool.
If all of the replica are corrupt, then some data loss has occurred. Until KUDU-2526 is completed, this can happen if
the corrupt replica became the leader and the existing follower replicas are replaced.
If data has been lost, you can repair the table by replacing the corrupt tablet with an empty one using the
unsafe_replace_tablet tool.
From versions 1.8.0 onwards, Kudu will mark the affected replicas as failed, leading to their automatic re-replication
elsewhere.
Support
Bug reports and feedback can be submitted through the public JIRA, our Cloudera Community Kudu forum, and a public
mailing list monitored by the Kudu development team and community members. In addition, a public Slack instance
is available to communicate with the team.
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
100 | Cloudera
Appendix: Apache License, Version 2.0
licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their
Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against
any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated
within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under
this License for that Work shall terminate as of the date such litigation is filed.
4. Redistribution.
You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You meet the following conditions:
1. You must give any other recipients of the Work or Derivative Works a copy of this License; and
2. You must cause any modified files to carry prominent notices stating that You changed the files; and
3. You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark,
and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part
of the Derivative Works; and
4. If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute
must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices
that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE
text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along
with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party
notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify
the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or
as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be
construed as modifying the License.
You may add Your own copyright statement to Your modifications and may provide additional or different license
terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as
a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated
in this License.
5. Submission of Contributions.
Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the
Licensor shall be under the terms and conditions of this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement
you may have executed with Licensor regarding such Contributions.
6. Trademarks.
This License does not grant permission to use the trade names, trademarks, service marks, or product names of the
Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing
the content of the NOTICE file.
7. Disclaimer of Warranty.
Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides
its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied,
including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or
FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or
redistributing the Work and assume any risks associated with Your exercise of permissions under this License.
8. Limitation of Liability.
In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required
by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable
to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising
as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss
of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even
if such Contributor has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability.
Cloudera | 101
Appendix: Apache License, Version 2.0
While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance
of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in
accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any
other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional
liability.
END OF TERMS AND CONDITIONS
http://www.apache.org/licenses/LICENSE-2.0
102 | Cloudera