Cloudera Impala - Fast, Interactive Queries with Hadoop
Join the DZone community and get the full member experience.
Join For FreeIntroduction
As discussed in the previous post about Twitter’s Storm, Hadoop is a batch oriented solution that has a lack of support for ad-hoc, real-time queries. Many of the players in Big Data have realised the need for fast, interactive queries besides the traditional Hadooop approach. Cloudera, one the key solution vendors in Big Data/Hadoop domain has just recently launched Cloudera Impala that addresses this gap.
As Cloudera Engineering team descibed in ther blog, their work was inspired by Google Dremel paper which is also the basis for Google BigQuery. Cloudera Impala provides a HiveQL-like query language for wide variety of SELECT statements with WHERE, GROUP BY, HAVING clauses, with ORDER BY – though currently LIMIT is mandatory with ORDER BY -, joins (LEFT, RIGTH, FULL, OUTER, INNER), UNION ALL, external tables, etc. It also supports arithmetic and logical operators and Hive built-in functions such as COUNT, SUM, LIKE, IN or BETWEEN. It can access data stored on HDFS but it does not use mapreduce, instead it is based on its own distributed query engine.
The current Impala release (Impala 1.0beta) does not support DDL statements (CREATE, ALTER, DROP TABLE), all the table creation/modification/deletion functions have to be executed via Hive and then refreshed in Impala shell.
Cloudera Impala is open-source under Apache Licence, the code can be retrieved fromGithub. Its components are written in C++, Java and Python.
Cloudera Impala Architecture
Cloudera Impala has 3 key components: impalad, impala-state-store and impala-shell.
Impala shell is essentially a short shell script which starts the impala-shell.py python program to run the queries.
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" SHELL_HOME=${IMPALA_SHELL_HOME:-${SCRIPT_DIR}} PYTHONPATH="${SHELL_HOME}/gen-py:${SHELL_HOME}/lib:${PYTHONPATH}" \ python ${SHELL_HOME}/impala_shell.py "$@"
Impalad is running on each Hadoop datanode and and it plans and executes the queries sent from impala-shell.
Impala-state-store stores information (location and status) about all the running impalad instances.
Installing Cloudera Impala
As of writing this article, Cloudera Impala requires 64-bit RHEL/Centos 6.2 or higher. I was running the tests on RHEL6.3 (64-bit) on AWS. You need to have Cloudera CDH4.1, Hive and Mysql installed (the latter is used to store Hive metastore).
Note: AWS t1.micro instances are not suitable for CDH4.1, that requires more memory than t1.micro provides.
Cloudera recommends to use Cloudera Manager to install Impala but I used manual steps, just to ensure that I have a complete understanding of what is going on during the installation.
Step 1: Install CDH4.1
To install CDH4.1 you need to run the following commands (these steps describe how to install Hadoop MRv1 – if you want to have YARN instead, that requires another MapReduce rpms to be installed. However, Cloudera stated in the install instructions that they do not consider MapReduce 2.0 (YARN) production-ready yet thus I decided to stick with MRv1 for these tests. CDH4.1 MRV1 can be installed as a pseudo distribution or a full cluster solution, for the tests we will see pseudo distribution:
$ sudo yum --nogpgcheck localinstall cloudera-cdh-4-0.noarch.rpm $ sudo rpm --import http://archive.cloudera.com/cdh4/redhat/6/x86_64/cdh/RPM-GPG-KEY-cloudera $sudo yum install hadoop-0.20-conf-pseudo
This will install Haddop-0.20-MapReduce (jobtracker, tasktracker) and Hadoop HDFS (namenode, secondarynamenode, datanode) as dependencies, too.
After the packages are installed, you need to execute the following command in order to setup Hadoop pseudo-distributed mode properly:
# Format namenode $ sudo -u hdfs hdfs namenode -format # Start HDFS daemons $ for service in /etc/init.d/hadoop-hdfs-* $ do $ sudo $service start $ done # Create /tmp dir $ sudo -u hdfs hadoop fs -mkdir /tmp $ sudo -u hdfs hadoop fs -chmod -R 1777 /tmp # Create system dirs $ sudo -u hdfs hadoop fs -mkdir /var $ sudo -u hdfs hadoop fs -mkdir /var/lib $ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs $ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs/cache $ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs/cache/mapred $ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs/cache/mapred/mapred $ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs/cache/mapred/mapred/staging $ sudo -u hdfs hadoop fs -chmod 1777 /var/lib/hadoop-hdfs/cache/mapred/mapred/staging $ sudo -u hdfs hadoop fs -chown -R mapred /var/lib/hadoop-hdfs/cache/mapred # Create user dirs $ sudo -u hdfs hadoop fs -mkdir /user/istvan $ sudo -u hdfs hadoop fs -chown istvan /user/istvan # Start MapReduce $ for service in /etc/init.d/hadoop-0.20-mapreduce-* $ do $ sudo $service start $ done
Step 2: Install MySQL (used by Hive as metastore)
$ sudo yum install mysql-server
Step 3: Install Hive
$ sudo yum install hive
Step 4: Configure Hive to use MySQL as metastore
To configure Hive and MySQL to work together you need to install an MysQL connector:
$ curl -L 'http://www.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.22.tar.gz/from/http://mysql.he.net/' | tar xz $ sudo cp mysql-connector-java-5.1.22/mysql-connector-java-5.1.22-bin.jar /usr/lib/hive/lib/
Then create the Hive metastore database
$ mysql -u root -p mysql> CREATE DATABASE metastore; mysql> USE metastore; mysql> SOURCE /usr/lib/hive/scripts/metastore/upgrade/mysql/hive-schema-0.9.0.mysql.sql;
and hive user:
mysql> CREATE USER 'hive'@'localhost' IDENTIFIED BY 'hive'; mysql> GRANT ALL PRIVILEGES ON metastore.* TO 'hive'@'localhost' WITH GRANT OPTION; mysql> FLUSH PRIVILEGES; mysql> quit;
To configure Hive to use MySQL as local metatstore you need to modify hive-site.xml as follows:
<property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://myhost/metastore</value> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>hive</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>hive</value> </property> <property> <name>hive.metastore.local</name> <value>true</value> </property>
Step 5: Install Impala and Impala Shell
$ sudo yum install impala $ sudo yum install impala-shell
If you follow the manual installation procedure, there will be no impala config files created automatically. You need to create /usr/lib/impala/conf directory and copy the following file into it: core-site.xml, hdfs-site.xml, hive-site.xml and log4j.properties.
$ ls -ltr total 16 -rw-r--r--. 1 root root 2094 Nov 4 16:37 hive-site.xml -rw-r--r--. 1 root root 2988 Nov 4 16:43 impala-log4j.properties -rwxr-xr-x. 1 root root 2052 Nov 4 16:58 hdfs-site.xml -rwxr-xr-x. 1 root root 1701 Nov 9 16:53 core-site.xml
Configure Impala
In order to support direct reads for Impala you need to configure the following Hadoop properties:
core-site.xml
<property> <name>dfs.client.read.shortcircuit</name> <value>true</value> </property> <property> <name>dfsclient.read.shortcircuit.skip.checksum</name> <value>false</value> </property>
hdfs-site.xml:
<property> <name>dfs.datanode.data.dir.perm</name> <value>755</value> </property> <property> <name>dfs.block.local-path-access.user</name> <value>impala,mapred,istvan</value> </property>
You can also enable data locality tracking:
<property> <name>dfs.datanode.hdfs-blocks-metadata.enabled</name> <value>true</value> </property>
Step 6: Startup Impala daemons
Login as impala user and execute the following commands:
$ GVLOG_v=1 nohup /usr/bin/impala-state-store -state_store_port=24000 & $ GVLOG_v=1 nohup /usr/bin/impalad -state_store_host=localhost -nn=localhost -nn_port=8020 -hostname=localhost -ipaddress=127.0.0.1 &
Run Impala queries
In order to run Impala interactive queries from impala shell, we need to create the tables via Hive (remember, the current Impala beta version does not support DDLs). I used Google stockprices in this example (retrieved from http://finance.yahoo.com in csv format):
hive> CREATE EXTERNAL TABLE stockprice > (yyyymmdd STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT, > close_price FLOAT, stock_volume INT, adjclose_price FLOAT) > ROW FORMAT DELIMITED > FIELDS TERMINATED BY ',' > LINES TERMINATED BY '\n' > LOCATION '/user/istvan/input/'; OK Time taken: 14.499 seconds hive> show tables; OK stockprice Time taken: 16.04 seconds hive> DESCRIBE stockprice; OK yyyymmdd string open_price float high_price float low_price float close_price float stock_volume int adjclose_price float Time taken: 0.583 seconds
A similar table – not external one this time – can be created and loaded from hive, too:
hive> CREATE TABLE imp_test > (yyyymmdd STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT, > close_price FLOAT, stock_volume INT, adjclose_price FLOAT) > ; OK Time taken: 0.542 seconds hive> LOAD DATA LOCAL INPATH '/home/istvan/goog_stock.csv' OVERWRITE INTO TABLE imp_test; Copying data from file:/home/istvan/goog_stock.csv Copying file: file:/home/istvan/goog_stock.csv Loading data to table default.imp_test rmr: DEPRECATED: Please use 'rm -r' instead. Deleted /user/hive/warehouse/imp_test O Time taken: 1.903 seconds
Then you can run a Hive query to retrieve the top 5 highest prices. As you can see, Hive will initiate a MapReduce job under the hood:
hive> select * from stockprice order by high_price desc; Total MapReduce jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer= In order to limit the maximum number of reducers: set hive.exec.reducers.max= In order to set a constant number of reducers: set mapred.reduce.tasks= Starting Job = job_201211110837_0006, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201211110837_0006 Kill Command = /usr/lib/hadoop/bin/hadoop job -Dmapred.job.tracker=localhost:8021 -kill job_201211110837_0006 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 2012-11-11 13:16:30,895 Stage-1 map = 0%, reduce = 0% 2012-11-11 13:16:46,554 Stage-1 map = 100%, reduce = 0% 2012-11-11 13:17:05,918 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 6.2 sec 2012-11-11 13:17:07,061 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 6.2 sec 2012-11-11 13:17:08,243 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 6.2 sec .... 2012-11-11 13:17:17,274 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 6.2 sec 2012-11-11 13:17:18,334 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 6.2 sec MapReduce Total cumulative CPU time: 6 seconds 200 msec Ended Job = job_201211110837_0006 MapReduce Jobs Launched: Job 0: Map: 1 Reduce: 1 Cumulative CPU: 6.2 sec HDFS Read: 0 HDFS Write: 0 SUCCESS Total MapReduce CPU Time Spent: 6 seconds 200 msec OK 05/10/2012770.71774.38765.01767.652735900767.65 04/10/2012762.75769.89759.4768.052454200768.05 02/10/2012765.2765.99750.27756.992790200756.99 01/10/2012759.05765.0756.21761.783168000761.78 25/09/2012753.05764.89747.66749.166058500749.16 03/10/2012755.72763.92752.2762.52208300762.5 08/10/2012761.0763.58754.15757.841958600757.84 27/09/2012759.95762.84751.65756.53931100756.5 09/10/2012759.67761.32742.53744.093003200744.09 26/09/2012749.85761.24741.0753.465672900753.46 ... Time taken: 82.09 seconds
Running the very same query from impala-shell executes significantly faster. Cloudera claim that it can be executed an order of magnitude or even faster, depending on the query. I can confirm from my experiences that impala-shell returned the result as an average around in one second, compared to the Hive version which took roughly 82 seconds.
hive> [istvan@ip-10-227-137-76 ~]$ impala-shell Welcome to the Impala shell. Press TAB twice to see a list of available commands. Copyright (c) 2012 Cloudera, Inc. All rights reserved. (Build version: Impala v0.1 (1fafe67) built on Mon Oct 22 13:06:45 PDT 2012) [Not connected] > connect localhost:21000 [localhost:21000] > select * from stockprice order by high_price desc limit 20000; 05/10/2012770.7100219726562774.3800048828125765.010009765625767.65002441406252735900767.6500244140625 04/10/2012762.75769.8900146484375759.4000244140625768.04998779296882454200768.0499877929688 02/10/2012765.2000122070312765.989990234375750.27001953125756.9899902343752790200756.989990234375 01/10/2012759.0499877929688765756.2100219726562761.7800292968753168000761.780029296875 25/09/2012753.0499877929688764.8900146484375747.6599731445312749.15997314453126058500749.1599731445312 03/10/2012755.719970703125763.9199829101562752.2000122070312762.52208300762.5 08/10/2012761763.5800170898438754.1500244140625757.84002685546881958600757.8400268554688 27/09/2012759.9500122070312762.8400268554688751.6500244140625756.53931100756.5 09/10/2012759.6699829101562761.3200073242188742.530029296875744.09002685546883003200744.0900268554688 26/09/2012749.8499755859375761.239990234375741753.46002197265625672900753.4600219726562 18/10/2012755.5399780273438759.419982910156267669512430200695 ...
If you use DDL commands from Hive (e.g. create table) then you need to run refresh command in impala-shell to reflect the changes.:
[localhost:21000] > refresh Successfully refreshed catalog [localhost:21000] > describe stockprice yyyymmdd string open_price float high_price float low_price float close_price float stock_volume int adjclose_price float [localhost:21000] > show tables; imp_test stockprice [localhost:21000] >
Conclusion
There are more and more efforts in the Big Data world to support ad-hoc, fast queries and realtime data processing for large datasets. Cloudera Impala is certainly an exciting solution that is utilising the same concept as Google BigQuery but promises to support wider range of input formats and by making it available as an open source technology it can attract external developers to improve the software and take it to the next stage.
If you want to learn more about Clodera Impala, here is a free course to start with.
Opinions expressed by DZone contributors are their own.
Trending
-
Exploring the Capabilities of eBPF
-
Designing a New Framework for Ephemeral Resources
-
Constructing Real-Time Analytics: Fundamental Components and Architectural Framework — Part 2
-
Building a Robust Data Engineering Pipeline in the Streaming Media Industry: An Insider’s Perspective
Comments