SQL over anything with an Optiq Adapter
Join the DZone community and get the full member experience.
Join For FreeThis post shows how to create an Optiq Adapter. There are examples out there with optiq-csv and couple of other projects but I found them a little hard to comprehend. Moreover, query push down feature isn’t well documented and I would do a followup post on that soon.
What is Optiq?
Optiq (https://github.com/julianhyde/optiq) is a query planning engine that can help you execute and plan SQL over your data sources. For example, there is a project optiq-web using which you can point to wiki pages with tables and query those html tables through SQL. Similar projects are there to run SQL on CSV, JSON files, MongoDB etc. Lets say you built a custom data store and you want to provide SQL access to it, then Optiq is a good choice. You just need to write an Optiq adapter to your data source.
A Sample Use Case
Anyways, the intention of this post is a step by step guide to write a custom adapter. Let’s see how I wrote an adapter to run SQL on top of JavaBean objects. Lets say you have a List of Users (JavaBean Objects) and would like to run queries like: “select max(age) from users” etc. Optiq has an inbuilt ReflectiveSchema that can be used here, but lets do our own implementation to see how it’s done, also ReflectiveSchema does not have query push down which I am planning to add.
Creating the Adapter
Tutorial source code link: https://github.com/cyberabis/optiq-javabean
Step 1: Create a Schema Class
The Schema class is the equivalent of a Database and can contain multiple tables.
Extend AbstractSchema and override getTableMap method. This method should return table names and Tables. How to create table class is next.
package io.thedal.optiq.javabean; import java.util.HashMap; import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableMap; import net.hydromatic.optiq.Table; import net.hydromatic.optiq.impl.AbstractSchema; /** * JavaBeanSchema is a type of Optiq Schema that contains a list of tables. A * table is a List of JavaBean Objects of the same type. * * @author Abishek Baskaran */ public class JavaBeanSchema extends AbstractSchema { static final Logger logger = LoggerFactory.getLogger(JavaBeanSchema.class); private String schemaName; private Map<String, List> javaBeanListMap = new HashMap<String, List>(); /** * Constructor * * @param schemaName * The schema name which is like database name. */ public JavaBeanSchema(String schemaName) { super(); this.schemaName = schemaName; } /** * Adds a table to the schema. * * @param tableName * The name of the table, has to be unique else will overwrite. * @param javaBeanList * A List of JavaBeans of same type that's to be seen as table. */ public <E> void addAsTable(String tableName, List<E> javaBeanList) { javaBeanListMap.put(tableName, javaBeanList); logger.info("Added table: " + tableName + " to Schema: " + schemaName); } /** * @return The name of the schema */ public String getName() { return schemaName; } @Override protected Map<String, Table> getTableMap() { final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder(); for (String tableName : javaBeanListMap.keySet()) { Table javaBeanTable = new JavaBeanTable(javaBeanListMap.get(tableName)); builder.put(tableName, javaBeanTable); logger.debug("Initialized JavaBeanTable for: " + tableName); } return builder.build(); } }
Step 2: Create Table Class
Create table class extending AbstractQueryableTable and implementing TranslatableTable.
The table has 2 important methods:
getRowType – This method should return the table row headers and their types in two arrays like [Name, Age, Country] and [String, Integer, String].
asQueryable – This method should return an Enumerator. The Enumerator will have methods to iterate the actual rows in the table. See below on how to create an Enumerator.
package io.thedal.optiq.javabean; import io.thedal.optiq.javabean.utils.JavaBeanInspector; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import org.eigenbase.rel.RelNode; import org.eigenbase.relopt.RelOptTable; import org.eigenbase.relopt.RelOptTable.ToRelContext; import org.eigenbase.reltype.RelDataType; import org.eigenbase.reltype.RelDataTypeFactory; import org.eigenbase.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import net.hydromatic.linq4j.Enumerator; import net.hydromatic.linq4j.QueryProvider; import net.hydromatic.linq4j.Queryable; import net.hydromatic.optiq.SchemaPlus; import net.hydromatic.optiq.TranslatableTable; import net.hydromatic.optiq.impl.AbstractTableQueryable; import net.hydromatic.optiq.impl.java.AbstractQueryableTable; import net.hydromatic.optiq.rules.java.EnumerableConvention; import net.hydromatic.optiq.rules.java.JavaRules; /** * JavaBeanTable is an Optiq table that accepts a List of JavaBeans. JavaBeans * can have only fields of eligible type / class defined in * JavaBeanInspector.checkMethodEligibility. * * @author Abishek Baskaran * * @param <E> * Table contains items for a specific Class E */ public class JavaBeanTable<E> extends AbstractQueryableTable implements TranslatableTable { static final Logger logger = LoggerFactory.getLogger(JavaBeanTable.class); private List<E> javaBeanList; /** * Constructor * * @param javaBeanList * A JavaBean List */ public JavaBeanTable(List<E> javaBeanList) { super(Object[].class); this.javaBeanList = javaBeanList; } @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { List<String> names = new ArrayList<String>(); List<RelDataType> types = new ArrayList<RelDataType>(); if ((javaBeanList != null) && (javaBeanList.size() > 0)) { Class sample = javaBeanList.get(0).getClass(); Method[] methods = sample.getMethods(); for (Method method : methods) { if (JavaBeanInspector.checkMethodEligiblity(method)) { String name = method.getName().substring(3); Class type = method.getReturnType(); names.add(name); types.add(typeFactory.createJavaType(type)); logger.info("Added field name: " + name + " of type: " + type.getSimpleName()); } } } return typeFactory.createStructType(Pair.zip(names, types)); } @Override public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema, String tableName) { logger.info("Got query request for: " + tableName); return new AbstractTableQueryable<T>(queryProvider, schema, this, tableName) { public Enumerator<T> enumerator() { // noinspection unchecked try { JavaBeanEnumerator enumerator = new JavaBeanEnumerator(javaBeanList); return (Enumerator<T>) enumerator; } catch (Exception e) { throw new RuntimeException(e); } } }; } @Override public RelNode toRel(ToRelContext context, RelOptTable relOptTable) { return new JavaRules.EnumerableTableAccessRel(context.getCluster(), context .getCluster().traitSetOf(EnumerableConvention.INSTANCE), relOptTable, (Class) getElementType()); } }
Step 3: Create Enumerator Class
The Enumerator needs to have an Iterator for the rows in a table. The rows in a Table are modelled as an Object[]. So we have to convert our custom storage component into an Iterator. In my Case my storage component is a JavaBean List – I converted this into an Iterator in my Enumerator constructor. The implemented methods are self descriptive.
package io.thedal.optiq.javabean; import io.thedal.optiq.javabean.utils.JavaBeanInspector; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import net.hydromatic.linq4j.Enumerator; /** * JavaBeanEnumerator converts a JavaBean List into rows. A Row is an Object * array of columns. The iterator over every row is called rows. * * @author Abishek Baskaran * */ public class JavaBeanEnumerator implements Enumerator<Object> { static final Logger logger = LoggerFactory .getLogger(JavaBeanEnumerator.class); private Object current; private Iterator<Object[]> rowIterator; /** * Constructor - forms the row iterator. * * @param javaBeanList */ public <E> JavaBeanEnumerator(List<E> javaBeanList) { List<Object[]> rows = new ArrayList<Object[]>(); for (Object javaBean : javaBeanList) { rows.add(getRow(javaBean)); } rowIterator = rows.iterator(); logger.debug("Created an iterator for the enumerator"); } private Object[] getRow(Object javaBean) { List<Object> row = new ArrayList<Object>(); Class clazz = javaBean.getClass(); Method[] methods = clazz.getMethods(); for (Method method : methods) { if (JavaBeanInspector.checkMethodEligiblity(method)) { try { row.add(method.invoke(javaBean)); } catch (IllegalAccessException e) { logger.error("Unable to invoke method via reflection"); } catch (IllegalArgumentException e) { logger.error("Unable to invoke method via reflection"); } catch (InvocationTargetException e) { logger.error("Unable to invoke method via reflection"); } } } logger.debug("Formed row is: " + row); return row.toArray(); } @Override public void close() { // Nothing to do } @Override public Object current() { if (current == null) { this.moveNext(); } return current; } @Override public boolean moveNext() { if (this.rowIterator.hasNext()) { final Object[] row = this.rowIterator.next(); current = row; return true; } else { current = null; return false; } } @Override public void reset() { throw new UnsupportedOperationException(); } }
Step 4: A Query Executor Class
The Query Executor can take a Schema object and prepare a connection and statement for executing SQL. This execute method accepts a SQL string and returns a java ResultSet object. Note the query execution code in Optiq github home page is outdated, you can see the correct way in my example.
package io.thedal.optiq.javabean; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import net.hydromatic.optiq.SchemaPlus; import net.hydromatic.optiq.jdbc.OptiqConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class JdbcQueryExecutor { final Logger logger = LoggerFactory.getLogger(JdbcQueryExecutor.class); private Connection connection; private Statement statement; public JdbcQueryExecutor(JavaBeanSchema schema) { try { Class.forName("net.hydromatic.optiq.jdbc.Driver"); connection = DriverManager.getConnection("jdbc:optiq:"); OptiqConnection optiqConnection = connection .unwrap(OptiqConnection.class); SchemaPlus rootSchema = optiqConnection.getRootSchema(); rootSchema.add(schema.getName(), schema); logger.info("Created connection to schema: " + schema.getName()); } catch (Exception e) { logger.error("Could not create Optiq Connection"); } } public ResultSet execute(String sql) { ResultSet results = null; try { statement = connection.createStatement(); results = statement.executeQuery(sql); } catch (SQLException e) { logger.error("Could not create a statement" + e); } return results; } public void close() { if (connection != null) { try { connection.close(); } catch (SQLException e) { logger.error("Could not close Optiq connection"); } if (statement != null) { try { statement.close(); } catch (SQLException e) { logger.error("Could not close Optiq statement"); } } } } }
Putting it all together:
Checkout the Test Program I have created!
package io.thedal.optiq.javabean; import static org.junit.Assert.*; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import org.junit.Test; public class SampleTestProgram { @Test public void testQuery() { // Create test data User user1 = new User("Abishek", 29, "India"); User user2 = new User("Kousik", 25, "Thailand"); User user3 = new User("CP", 15, "Russia"); User user4 = new User("Karthik", 29, "US"); List<User> userList = new ArrayList<User>(); userList.add(user1); userList.add(user2); userList.add(user3); userList.add(user4); // Create a test Schema JavaBeanSchema schema = new JavaBeanSchema("TESTDB"); schema.addAsTable("USERS", userList); // Execute a Query on schema JdbcQueryExecutor queryExec = new JdbcQueryExecutor(schema); String sql = "select \"Age\" from \"TESTDB\".\"USERS\" where \"Name\"='Abishek'"; ResultSet result = queryExec.execute(sql); // Verify results if (result != null) { int output = -1; try { while (result.next()) { output = result.getInt("Age"); } } catch (SQLException e) { fail("Failed while iterating resultset"); } assertEquals(output, 29); } else { fail("Null resultset"); } queryExec.close(); } }
The next difficult step is pushing down queries to the storage by creating rules and matching SQL expressions. I hope to write a post on that soon.
Happy querying!
Opinions expressed by DZone contributors are their own.
Comments