Accessing Phoenix Data Using HBase API
In this article, I will cover how data stored in Phoenix can be accessed using the HBase API, from the Java programming language.
Join the DZone community and get the full member experience.
Join For FreeIntroduction
This article is the last one of the series of three articles covering HBase, Phoenix, and Java. In Part 1 of the series, I provided an overview of HBase and how to access data stored in it using Java. In Part 2, I covered Phoenix, which is an Open Source skin over HBase. While HBase is a column store, Phoenix enables a SQL interface over it, making it easy for traditional database people to use the features of HBase, while holding on to the familiarity of SQL.
In this article, I will cover how data stored in Phoenix can be accessed using the HBase API, from the Java programming language.
Accessing Phoenix Data Using HBase API
Phoenix is a SQL skin over HBase. Though it uses HBase underneath, the way it stores data is different as compared to the way native HBase stores data. While the standard way of accessing Phoenix is using SQL queries, we can use the HBase API simply because Phoenix is built on top of HBase.
A few observations while developing a Java example to access Phoenix tables using native HBase API are:
- Phoenix stores the name of each table using capital letters.
- HBase API is able to scan a table created via Phoenix, but Phoenix is not able to read a table created via HBase shell.
- The volume of data (bytes) transferred in HBase API is very large (almost 10 times in many cases). Network performance will play a significant factor in overall performance. This is because HBase native API returns the complete row when returning the result set, while Phoenix returns only the selected columns.
- In HBase API, the resulting rows contain names of each column, increasing the payload size. Hence, using small names for columns is a good practice in HBase (though counter-intuitive from legibility perspective).
- Data is fetched to the client side in case of HBase, while Phoenix has a query engine that resides on the server, reducing the volume of data transferred over the network.
- Much of the work that a client that uses HBase API is done by Phoenix on the server side.
- A client that uses HBase API will be more complex. Use of HBase will mean we have a thick client. Phoenix provides a simple interface for interaction.
- To interact with non-string values in Phoenix, we need to use the Phoenix provided "encode" and "decode" methods as it stores data differently as compared to native HBase.
- Phoenix uses a special column family, named "0," and all columns are stored in this column family. With HBase API, multiple column families can be used.
- Even if a column family named "0" is created and all columns are stored in this column family, Phoenix is still not able to read tables created from HBase shell.
Example
In this example, we will use the native HBase Java API to access Phoenix tables. Because Phoenix stores data types like integers and doubles (to name a few) in a special manner, a few helper functions have been written for ease of use.
BMarksPhoenix.java
package bp.hbase_bmarks;
import helpers.HBaseHelper;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableFactory;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.DependentColumnFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import bp.misc.Misc;
import bp.misc.PairLong;
import bp.misc.Triplet;
import bp.misc.UTPair;
public class BMarksPhoenix {
private final static byte[] tableName = Bytes.toBytes("BMARKSP");
private final static byte[] cellData = Bytes.toBytes("cell_data");
private final static byte[] r1 = Bytes.toBytes("r1");
private final static byte[] r2 = Bytes.toBytes("r2");
private final static byte[] r3 = Bytes.toBytes("r3");
private final static byte[] r4 = Bytes.toBytes("r4");
private final static byte[] r5 = Bytes.toBytes("r5");
private final static byte[] colENG = Bytes.toBytes("ENG");
private final static byte[] colMATH = Bytes.toBytes("MATH");
private final static byte[] colSCI = Bytes.toBytes("SCI");
private final static byte[] colHIST = Bytes.toBytes("HIST");
private final static byte[] colGEO = Bytes.toBytes("GEO");
private final static String HBaseAddress = "127.0.0.1";
private final static String HBasePort = "2181";
private final static String HBaseURL = "/hbase-unsecure";
private final static String tableStr = "BMARKSP";
/** Drop tables if this value is set true. */
static boolean INITIALIZE_AT_FIRST = false;
private static void System.out.println(String msg)
{
System.out.println(msg);
}
private void scan(HBaseAdmin admin, HTableInterface table) throws IOException
{
System.out.println("*** bmarksp -- scan -- all records ***");
long start = System.currentTimeMillis();
Scan scan = new Scan();
PairLong retVal = null;
ResultScanner scanner = table.getScanner(scan);
try {
retVal = HBaseHelper.stepThroughResultScannerResultsPhoenix(scanner);
long end = System.currentTimeMillis();
System.out.println("Time in sec: " + Misc.getTimeInSeconds(start, end));
System.out.println("Number of bytes: " + retVal.getFirst());
System.out.println("Number of records: " + retVal.getSecond());
} finally {
scanner.close();
}
System.out.println("- - - - - Done. - - - - -");
}
private void scanColumn(HBaseAdmin admin, HTableInterface table, String columnName) throws IOException
{
System.out.println("*** bmarksp -- scan column -- " + columnName + " ***");
long start = System.currentTimeMillis();
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("0"), Bytes.toBytes(columnName));
PairLong retVal = null;
ResultScanner scanner = table.getScanner(scan);
try {
retVal = HBaseHelper.stepThroughResultScannerResultsPhoenix(scanner);
long end = System.currentTimeMillis();
System.out.println("Time in sec: " + Misc.getTimeInSeconds(start, end));
System.out.println("Number of bytes: " + retVal.getFirst());
System.out.println("Number of records: " + retVal.getSecond());
} finally {
scanner.close();
}
System.out.println("- - - - - Done. - - - - -");
}
private void scan2(HBaseAdmin admin, HTableInterface table) throws IOException
{
System.out.println("*** bmarksp -- scan2 -- all records ***");
long start = System.currentTimeMillis();
Scan scan = new Scan();
long count = 0;
ResultScanner scanner = table.getScanner(scan);
try {
Iterator<Result> iterator = scanner.iterator();
while (iterator.hasNext()) {
Result next = iterator.next();
System.out.println("Found row: " + next);
for(Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> columnFamilyMap : next.getMaSystem.out.println().entrySet()) {
for (Entry<byte[], NavigableMap<Long, byte[]>> entryVersion : columnFamilyMap.getValue().entrySet()) {
for (Entry<Long, byte[]> entry : entryVersion.getValue().entrySet()) {
String row = Bytes.toString(next.getRow());
String column = Bytes.toString(entryVersion.getKey());
byte[] value = entry.getValue();
long timestamp = entry.getKey();
System.out.println("\t row: " + row);
System.out.println("\t column: " + column);
System.out.println("\t value (byte array to string): " + Misc.byteArrayToString(value));
if ( !"NAME".equals(column) ) {
System.out.println("\t Value (str) : " + value.toString());
System.out.println("\t Value (byte array to int) : " + Misc.byteArrayToInt(value));
System.out.println("\t Value (byteArrayToLeInt) : " + Misc.byteArrayToLeInt(value));
System.out.println("\t Value (int biginteger): " + Misc.byteToIntBigInteger(value));
System.out.println("\t Value (int big endian): " + Misc.byteToIntBigEndian(value));
System.out.println("\t Value (int little endian): " + Misc.byteToIntLittleEndian(value));
}
System.out.println("\t timestamp: " + timestamp);
}
}
}
}
long end = System.currentTimeMillis();
System.out.println("Time in sec: " + Misc.getTimeInSeconds(start, end));
} finally {
scanner.close();
}
System.out.println("- - - - - Done. - - - - -");
}
public void phoenixSelectAll()
{
System.out.println("*** bmarksp -- phoenix -- select * ***");
long size = 0;
long count = 0;
long start = System.currentTimeMillis();
Connection conn;
Properties prop = new Properties();
try {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
conn = DriverManager.getConnection("jdbc:phoenix:" + HBaseAddress + ":" + HBasePort + ":" + HBaseURL);
//System.out.println("got connection");
ResultSet rst = conn.createStatement().executeQuery("select * from " + tableStr);
while (rst.next()) {
//System.out.println(rst.getString(1) + " " + rst.getString(2) + " " + rst.getString(3) + " " + rst.getString(4) + " " + rst.getString(5) + " " + rst.getString(6));
size = size + rst.toString().length();
count++;
}
long end = System.currentTimeMillis();
System.out.println("Time in sec: " + Misc.getTimeInSeconds(start, end));
System.out.println("Number of bytes: " + size);
System.out.println("Number of records: " + count);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
System.out.println("- - - - - Done. - - - - -");
}
public void phoenixSelectAllForQuery(String query)
{
System.out.println("*** bmarksp -- phoenix -- select * for '" + query + "' ***");
long count = 0;
long size = 0;
long start = System.currentTimeMillis();
Connection conn;
Properties prop = new Properties();
try {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
conn = DriverManager.getConnection("jdbc:phoenix:" + HBaseAddress + ":" + HBasePort + ":" + HBaseURL);
//System.out.println("got connection");
ResultSet rst = conn.createStatement().executeQuery(query);
while (rst.next()) {
//System.out.println(rst.getString(1) + " " + rst.getString(2) + " " + rst.getString(3) + " " + rst.getString(4) + " " + rst.getString(5) + " " + rst.getString(6));
size = size + rst.toString().length();
count++;
}
long end = System.currentTimeMillis();
System.out.println("Time in sec: " + Misc.getTimeInSeconds(start, end));
System.out.println("Number of bytes: " + size);
System.out.println("Number of records: " + count);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
System.out.println("- - - - - Done. - - - - -");
}
public void phoenixSelectColumn(String column)
{
System.out.println("*** bmarksp -- phoenix -- Select " + column + " ***");
long count = 0;
long size = 0;
long start = System.currentTimeMillis();
Connection conn;
Properties prop = new Properties();
try {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
conn = DriverManager.getConnection("jdbc:phoenix:" + HBaseAddress + ":" + HBasePort + ":" + HBaseURL);
ResultSet rst = conn.createStatement().executeQuery("select " + column + " from " + tableStr);
while (rst.next()) {
size = size + rst.toString().length();
count++;
}
long end = System.currentTimeMillis();
System.out.println("Time in sec: " + Misc.getTimeInSeconds(start, end));
System.out.println("Number of bytes: " + size);
System.out.println("Number of records: " + count);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
System.out.println("- - - - - Done. - - - - -");
}
public void phoenixSelectEngGreaterThan(int value)
{
System.out.println("*** bmarksp -- phoenix -- select end greater than " + value + " ***");
long count = 0;
long size = 0;
long start = System.currentTimeMillis();
Connection conn;
Properties prop = new Properties();
try {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
conn = DriverManager.getConnection("jdbc:phoenix:" + HBaseAddress + ":" + HBasePort + ":" + HBaseURL);
ResultSet rst = conn.createStatement().executeQuery("select * from " + tableStr + " where ENG > " + value);
while (rst.next()) {
//System.out.println(rst.getString(1) + " " + rst.getString(2) + " " + rst.getString(3) + " " + rst.getString(4) + " " + rst.getString(5) + " " + rst.getString(6));
size = size + rst.toString().length();
count++;
}
long end = System.currentTimeMillis();
System.out.println("Time in sec: " + Misc.getTimeInSeconds(start, end));
System.out.println("Number of bytes: " + size);
System.out.println("Number of records: " + count);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
System.out.println("- - - - - Done. - - - - -");
}
public void phoenixSelectEqualTo(String column, String value)
{
System.out.println("*** bmarksp -- phoenix -- select " + column + " = '" + value + "' ***");
long count = 0;
long size = 0;
long start = System.currentTimeMillis();
Connection conn;
String str = "select * from " + tableStr + " where " + column + " = '" + value + "'";
Properties prop = new Properties();
try {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
conn = DriverManager.getConnection("jdbc:phoenix:" + HBaseAddress + ":" + HBasePort + ":" + HBaseURL);
ResultSet rst = conn.createStatement().executeQuery(str);
while (rst.next()) {
size = size + rst.toString().length();
count++;
}
long end = System.currentTimeMillis();
System.out.println("Time in sec: " + Misc.getTimeInSeconds(start, end));
System.out.println("Number of bytes: " + size);
System.out.println("Number of records: " + count);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
System.out.println("- - - - - Done. - - - - -");
}
public void phoenixSelectEqualTo(String column, int value)
{
System.out.println("*** bmarksp -- phoenixSelectEqualTo " + column + " and " + value + " ***");
long count = 0;
long size = 0;
long start = System.currentTimeMillis();
Connection conn;
String str = "select * from " + tableStr + " where " + column + " = " + value;
Properties prop = new Properties();
try {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
conn = DriverManager.getConnection("jdbc:phoenix:" + HBaseAddress + ":" + HBasePort + ":" + HBaseURL);
ResultSet rst = conn.createStatement().executeQuery(str);
while (rst.next()) {
size = size + rst.toString().length();
count++;
}
long end = System.currentTimeMillis();
System.out.println("Time in sec: " + Misc.getTimeInSeconds(start, end));
System.out.println("Number of bytes: " + size);
System.out.println("Number of records: " + count);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
System.out.println("- - - - - Done. - - - - -");
}
private void filterColumnInitial(HBaseAdmin admin, HTableInterface table, String columnName, int value) throws IOException
{
System.out.println("*** bmarksp - filter column " + columnName + " > " + value + " ***");
long start = System.currentTimeMillis();
Filter filterFinal = new SingleColumnValueFilter(Bytes.toBytes("0"), Bytes.toBytes(columnName), CompareOp.GREATER, Bytes.toBytes(value));
Scan scan = new Scan();
scan.setFilter(filterFinal);
long size = 0;
long count = 0;
ResultScanner scanner = table.getScanner(scan);
try {
for (Result result : scanner) {
size = size + Result.getTotalSizeOfCells(result);
count++;
}
long end = System.currentTimeMillis();
System.out.println("Time in sec: " + Misc.getTimeInSeconds(start, end));
System.out.println("Found results: " + count);
System.out.println("Number of bytes: " + size);
} finally {
scanner.close();
}
System.out.println("- - - - - Done. - - - - -");
}
private void filterColumnMultiple(HBaseAdmin admin, HTableInterface table) throws IOException
{
System.out.println("*** bmarksp - filter column multiple ***");
long start = System.currentTimeMillis();
SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("0"), Bytes.toBytes("NAME"), CompareOp.EQUAL, Bytes.toBytes("jack"));
// To prevent the entire row from being emitted if the column is not found on a row
filter1.setFilterIfMissing(true);
SingleColumnValueFilter filter2 = new SingleColumnValueFilter(Bytes.toBytes("0"), Bytes.toBytes("ENG"), CompareOp.EQUAL, HBaseHelper.encodePhoenixInteger(45));
// To prevent the entire row from being emitted if the column is not found on a row
filter2.setFilterIfMissing(true);
SingleColumnValueFilter filter3 = new SingleColumnValueFilter(Bytes.toBytes("0"), Bytes.toBytes("SCI"), CompareOp.GREATER, HBaseHelper.encodePhoenixInteger(70));
// To prevent the entire row from being emitted if the column is not found on a row
filter3.setFilterIfMissing(true);
FilterList filterList = new FilterList(Operator.MUST_PASS_ALL);
filterList.addFilter(filter1);
filterList.addFilter(filter2);
filterList.addFilter(filter3);
Scan scan = new Scan();
scan.setFilter(filterList);
long size = 0;
long count = 0;
ResultScanner scanner = table.getScanner(scan);
try {
for (Result result : scanner) {
size = size + Result.getTotalSizeOfCells(result);
count++;
}
long end = System.currentTimeMillis();
System.out.println("Time in sec: " + Misc.getTimeInSeconds(start, end));
System.out.println("Found results: " + count);
System.out.println("Number of bytes: " + size);
} finally {
scanner.close();
}
System.out.println("- - - - - Done. - - - - -");
}
private void filterColumnMultiple(HBaseAdmin admin, HTableInterface table, String columnName, List<Triplet<byte[], CompareOp, byte[]>> values) throws IOException
{
System.out.println("*** bmarksp - filter column multiple ***");
int len = values.size();
Filter[] filters = new Filter[len];
long start = System.currentTimeMillis();
SingleColumnValueFilter f;
for ( int i = 0; i < len; i++ ) {
f = new SingleColumnValueFilter(Bytes.toBytes(columnName), values.get(i).getFirst(), values.get(i).getSecond(), values.get(i).getThird());
// To prevent the entire row from being emitted if the column is not found on a row
f.setFilterIfMissing(true);
filters[i] = f;
}
List<Filter> filterList = Arrays.asList(filters);
Filter filterFinal = new FilterList(Operator.MUST_PASS_ALL, filterList);
Scan scan = new Scan();
scan.setFilter(filterFinal);
long size = 0;
long count = 0;
ResultScanner scanner = table.getScanner(scan);
try {
for (Result result : scanner) {
size = size + Result.getTotalSizeOfCells(result);
count++;
}
long end = System.currentTimeMillis();
System.out.println("Time in sec: " + Misc.getTimeInSeconds(start, end));
System.out.println("Found results: " + count);
System.out.println("Number of bytes: " + size);
} finally {
scanner.close();
}
System.out.println("- - - - - Done. - - - - -");
}
private void filterColumn(HBaseAdmin admin, HTableInterface table, String columnName) throws IOException
{
System.out.println("*** bmarksp - filter column " + columnName + " ***");
long start = System.currentTimeMillis();
Filter filterFinal = new DependentColumnFilter(Bytes.toBytes("0"), Bytes.toBytes(columnName));
Scan scan = new Scan();
scan.setFilter(filterFinal);
PairLong retVal = null;
ResultScanner scanner = table.getScanner(scan);
try {
retVal = HBaseHelper.stepThroughResultScannerResultsPhoenix(scanner);
long end = System.currentTimeMillis();
System.out.println("Time in sec: " + Misc.getTimeInSeconds(start, end));
System.out.println("Number of bytes: " + retVal.getFirst());
System.out.println("Number of records: " + retVal.getSecond());
} finally {
scanner.close();
}
System.out.println("- - - - - Done. - - - - -");
}
private void filterColumn(HBaseAdmin admin, HTableInterface table, byte[] columnName, CompareOp compareOp, byte[] value) throws IOException
{
long start = System.currentTimeMillis();
SingleColumnValueFilter valueFilter = new SingleColumnValueFilter(Bytes.toBytes("0"), columnName, compareOp, value);
// To prevent the entire row from being emitted if the column is not found on a row
valueFilter.setFilterIfMissing(true);
Scan scan = new Scan();
scan.setFilter(valueFilter);
UTPair retVal = null;
ResultScanner scanner = table.getScanner(scan);
try {
retVal = HBaseHelper.stepThroughResultScannerResults(scanner);
long end = System.currentTimeMillis();
System.out.println("Time in sec: " + Misc.getTimeInSeconds(start, end));
System.out.println("Number of bytes: " + retVal.getFirst());
System.out.println("Number of records: " + retVal.getSecond());
} finally {
scanner.close();
}
}
private void filterColumnGreaterThan(HBaseAdmin admin, HTableInterface table, String columnName, int value) throws IOException
{
System.out.println("*** bmarksp - filter column -- " + columnName + " > " + value + " ***");
filterColumn(admin, table, Bytes.toBytes(columnName), CompareOp.GREATER, HBaseHelper.encodePhoenixInteger(value));
System.out.println("- - - - - Done. - - - - -");
}
private void filterColumnEqualTo(HBaseAdmin admin, HTableInterface table, String columnName, int value) throws IOException
{
System.out.println("*** bmarksp - filter column -- " + columnName + " = " + value + " ***");
filterColumn(admin, table, Bytes.toBytes(columnName), CompareOp.EQUAL, HBaseHelper.encodePhoenixInteger(value));
System.out.println("- - - - - Done. - - - - -");
}
private void filterColumnEqualTo(HBaseAdmin admin, HTableInterface table, String columnName, String value) throws IOException
{
System.out.println("*** bmarksp - filter column -- " + columnName + " = " + value + " ***");
filterColumn(admin, table, Bytes.toBytes(columnName), CompareOp.EQUAL, Bytes.toBytes(value));
System.out.println("- - - - - Done. - - - - -");
}
public void run(Configuration config) throws IOException
{
HBaseAdmin admin = new HBaseAdmin(config);
HTableFactory factory = new HTableFactory();
HTableInterface table = factory.createHTableInterface(config, tableName);
HBaseMetadata metadata = new HBaseMetadata(admin);
System.out.println("= = = = = = = = = = All records = = = = = = = = = =");
scan(admin, table);
//scan2(admin, table);
phoenixSelectAll();
System.out.println("= = = = = = = = = = ENG column = = = = = = = = = =");
scanColumn(admin, table, "ENG");
//filterColumn(admin, table, "ENG");
phoenixSelectColumn("ENG");
System.out.println("= = = = = = = = = = ENG > 80 = = = = = = = = = =");
filterColumnGreaterThan(admin, table, "ENG", 80);
phoenixSelectEngGreaterThan(80);
System.out.println("= = = = = = = = = = ENG = 45 = = = = = = = = = =");
filterColumnEqualTo(admin, table, "ENG", 45);
phoenixSelectEqualTo("ENG", 45);
System.out.println("= = = = = = = = = = NAME = jane = = = = = = = = = =");
filterColumnEqualTo(admin, table, "NAME", "jane");
phoenixSelectEqualTo("NAME", "jane");
System.out.println("= = = = = = = = = = multiple columns = = = = = = = = = =");
filterColumnMultiple(admin, table);
Triplet<byte[], CompareOp, byte[]> t1 = new Triplet<byte[], CompareOp, byte[]>(Bytes.toBytes("NAME"), CompareOp.EQUAL, Bytes.toBytes("jack"));
Triplet<byte[], CompareOp, byte[]> t2 = new Triplet<byte[], CompareOp, byte[]>(Bytes.toBytes("ENG"), CompareOp.EQUAL, HBaseHelper.encodePhoenixInteger(45));
Triplet<byte[], CompareOp, byte[]> t3 = new Triplet<byte[], CompareOp, byte[]>(Bytes.toBytes("SCI"), CompareOp.GREATER, HBaseHelper.encodePhoenixInteger(70));
List<Triplet<byte[], CompareOp, byte[]>> list = new ArrayList<Triplet<byte[], CompareOp, byte[]>>();
list.add(t1);
list.add(t2);
list.add(t3);
filterColumnMultiple(admin, table, "0", list);
phoenixSelectAllForQuery("select * from BMARKSP where NAME = 'jack' and ENG = 45 and GEO > 70");
factory.releaseHTableInterface(table); // Disconnect
}
}
Triplet.java
package bp.misc;
// http://stackoverflow.com/questions/6010843/java-how-to-store-data-triple-in-a-list
public class Triplet<T, U, V> {
private final T first;
private final U second;
private final V third;
public Triplet(T first, U second, V third) {
this.first = first;
this.second = second;
this.third = third;
}
public T getFirst() { return first; }
public U getSecond() { return second; }
public V getThird() { return third; }
}
PairLong.java
package bp.misc;
public class PairLong {
private Long first;
private Long second;
public PairLong(Long first, Long second) {
this.first = first;
this.second = second;
}
public Long getFirst() { return first; }
public Long getSecond() { return second; }
}
Misc.java
package bp.misc;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import org.apache.hadoop.hbase.util.Bytes;
public class Misc {
public static String toHex(String arg) {
return String.format("%040x", new BigInteger(1, arg.getBytes(StandardCharsets.UTF_8)));
}
public static String byteArrayToString(byte[] data) {
return new String(data, StandardCharsets.UTF_8);
}
public static byte[] StringToByte(String data) {
return Bytes.toBytes(data);
}
public static int byteToIntBigInteger(byte[] bytes) {
return new BigInteger(bytes).intValue();
}
public static int byteToIntBigEndian(byte[] bytes) {
ByteBuffer wrapped = ByteBuffer.wraSystem.out.println(bytes);
wrapped.order(ByteOrder.BIG_ENDIAN);
int num = wrapped.getInt();
return num;
}
public static int byteToIntLittleEndian(byte[] bytes)
{
ByteBuffer wrapped = ByteBuffer.wraSystem.out.println(bytes);
wrapped.order(ByteOrder.LITTLE_ENDIAN);
int num = wrapped.getInt();
return num;
}
public static int byteArrayToInt(byte[] b)
{
return b[3] & 0xFF | (b[2] & 0xFF) << 8 |
(b[1] & 0xFF) << 16 | (b[0] & 0xFF) << 24;
}
public static byte[] intToByteArray(int a)
{
return new byte[] {
(byte) ((a >> 24) & 0xFF), (byte) ((a >> 16) & 0xFF),
(byte) ((a >> 8) & 0xFF), (byte) (a & 0xFF)
};
}
// http://stackoverflow.com/questions/5399798/byte-array-and-int-conversion-in-java
public static int byteArrayToLeInt(byte[] b) {
final ByteBuffer bb = ByteBuffer.wraSystem.out.println(b);
bb.order(ByteOrder.LITTLE_ENDIAN);
return bb.getInt();
}
public static byte[] leIntToByteArray(int i) {
final ByteBuffer bb = ByteBuffer.allocate(Integer.SIZE / Byte.SIZE);
bb.order(ByteOrder.LITTLE_ENDIAN);
bb.putInt(i);
return bb.array();
}
public static long getTimeInMillis()
{
long time = System.currentTimeMillis();
return time;
}
public static double getTimeInSeconds(long start, long end)
{
long time = end - start;
return time / 1000.0;
}
public static void showElapsedTimeInSeconds(long start, long end)
{
System.out.println("Elapsed time: " + getTimeInSeconds(start, end) + " sec");
}
public static void showElapsedTimeInSeconds(long start)
{
long end = System.currentTimeMillis();
System.out.println("Elapsed time: " + getTimeInSeconds(start, end) + " sec");
}
//http://stackoverflow.com/questions/6374915/java-convert-int-to-byte-array-of-4-bytes
public static byte[] my_int_to_bb_le(int myInteger){
return ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(myInteger).array();
}
public static int my_bb_to_int_le(byte [] byteBarray){
return ByteBuffer.wraSystem.out.println(byteBarray).order(ByteOrder.LITTLE_ENDIAN).getInt();
}
public static byte[] my_int_to_bb_be(int myInteger){
return ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN).putInt(myInteger).array();
}
public static int my_bb_to_int_be(byte [] byteBarray){
return ByteBuffer.wraSystem.out.println(byteBarray).order(ByteOrder.BIG_ENDIAN).getInt();
}
}
UTPair.java
package bp.misc;
// http://stackoverflow.com/questions/6010843/java-how-to-store-data-triple-in-a-list
public class UTPair<T, U> {
private final T first;
private final U second;
public UTPair(T first, U second) {
this.first = first;
this.second = second;
}
public T getFirst() { return first; }
public U getSecond() { return second; }
}
HBaseHelper.java
/* * * * * * * * * * * * * * * * * *
Due to the fact that Phoenix uses special encoding for some data types, we defined a class with helper functions to help us do the task of conversion. Phoenix uses the PInteger data type to store integer data and PVarchar to store string data.
* * * * * * * * * * * * * * * * * */
package helpers;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.ByteUtil;
import bp.misc.Misc;
import bp.misc.PairLong;
import bp.misc.UTPair;
@SuppressWarnings("deprecation")
public class HBaseHelper
{
/* Decode an integer from its Phoenix specific encoding */
public static Integer decodePhoenixInteger(Result result, String columnFamily, String columnName)
{
Object colValue = PDataType.INTEGER.toObject(result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName)), 0, 4);
return (Integer)colValue;
}
/* Decode an integer from its Phoenix specific encoding */
public static Integer decodePhoenixInteger(byte[] value)
{
Object colValue = PDataType.INTEGER.toObject(value, 0, 4);
return (Integer)colValue;
}
/* Encode an integer to its Phoenix specific encoding */
public static byte[] encodePhoenixInteger(int value)
{
return PInteger.INSTANCE.toBytes(value);
}
public static String integerToString(int v)
{
return new Integer(v).toString();
}
@SuppressWarnings("unused")
public static PairLong stepThroughResultScannerResultsPhoenix(ResultScanner scanner)
{
long count = 0;
long size = 0;
for ( Result result : scanner ) {
count++;
size = size + Result.getTotalSizeOfCells(result);
System.out.println("Row key: " + decodePhoenixInteger(result.getRow()));
System.out.println ("\tRow.toString: " + result.toString());
System.out.println ("\tRow.value: " + Misc.byteArrayToString(result.value()));
List<Cell> cells = result.listCells();
byte[] family;
byte[] val;
String columnName;
String value;
for ( Cell c : cells ) {
val = c.getValueArray();
family = c.getFamilyArray();
columnName = Bytes.toString(CellUtil.cloneQualifier(c));
value = "";
if ( "NAME".equals(columnName) ) {
value = Bytes.toString(CellUtil.cloneValue(c));
} else if ( "_0".equals(columnName) ) {
//value = decodePhoenixInteger(result, "0", columnName).toString();
} else {
value = decodePhoenixInteger(result, "0", columnName).toString();
}
System.out.println("\t\tColumn name: " + columnName + ", Value: " + value);
}
}
PairLong retVal = new PairLong(size, count);
return retVal;
}
@SuppressWarnings("unused")
public static UTPair<Long, Long> stepThroughResultScannerResults(ResultScanner scanner)
{
long size = 0;
long count = 0;
for ( Result result : scanner ) {
size = size + Result.getTotalSizeOfCells(result);
count++;
List<Cell> cells = result.listCells();
byte[] family;
byte[] val;
String columnName;
String value;
for ( Cell c : cells ) {
val = c.getValueArray();
family = c.getFamilyArray();
columnName = Bytes.toString(CellUtil.cloneQualifier(c));
value = Bytes.toString(CellUtil.cloneValue(c));
}
}
UTPair<Long, Long> rt = new UTPair<Long, Long>(size, count);
return rt;
}
@SuppressWarnings("unused")
public static long stepThroughResultScannerResults(HTableInterface table, ResultScanner scanner)
{
long count = 0;
try {
for ( Result result : scanner ) {
count = count + Result.getTotalSizeOfCells(result);
Get g = new Get(result.getRow());
Result r;
r = table.get(g);
byte[] v = r.getValue(Bytes.toBytes("0"), null);
System.out.println ("Fetched value: " + Bytes.toString(v));
List<Cell> cells = result.listCells();
byte[] family;
byte[] val;
for ( Cell c : cells ) {
val = c.getValueArray();
family = c.getFamilyArray();
String qualifier = Bytes.toString(CellUtil.cloneQualifier(c));
String value = Bytes.toString(CellUtil.cloneValue(c));
}
}
} catch (IOException e) {
e.printStackTrace();
}
return count;
}
@SuppressWarnings("unused")
public static long stepThroughResultScannerResults2(HTableInterface table, ResultScanner scanner)
{
long count = 0;
try {
for ( Result result : scanner ) {
count = count + Result.getTotalSizeOfCells(result);
byte[] startRow = ByteUtil.concat(PVarchar.INSTANCE.toBytes("primaryKeyCol1Value"), QueryConstants.SEPARATOR_BYTE_ARRAY, PVarchar.INSTANCE.toBytes("primaryKeyCol2Value"));
Get get = new Get(result.getRow());
Result r = table.get(get);
Object colValue = PDataType.INTEGER.toObject(result.getValue(Bytes.toBytes("0"), Bytes.toBytes("ENG")), 0, 4);
}
} catch (IOException e) {
e.printStackTrace();
}
return count;
}
}
Conclusion
In conclusion, as Phoenix is a layer on top of HBase, it is possible to access data stored using Phoenix, using the native HBase API. While using Phoenix, you need to keep the following things in mind
- Phoenix is additional software that needs to be installed along with HBase, typically on the same server.
- Tables that are read and updated from Phoenix, need to be created using Phoenix.
- Tables created using HBase shell seem to be not readable by Phoenix.
- Performance of HBase API and Phoenix is quite comparable in most cases.
- Data payload transferred over the network when using HBase API is significantly larger than that with Phoenix (almost 10 times more in most cases).
- With Phoenix, query execution and data fetch happens (typically) on the same server where HBase is installed, thus reducing the volume of data transferred to the client.
- Non-string data is encoded by Phoenix using custom logic and is thus not directly usable using HBase API.
- With Phoenix, the client code is simpler and uses the familiar SQL query syntax and JDBC connectivity.
- If someone desires to use HBase API, ideally a thin, client library should be developed, that wraps much of the complexity of the HBase API e.g. converting all values to byte array and the like.
Opinions expressed by DZone contributors are their own.
Comments