Using Casandra's 'sstable' for Bulk-Loading Data on the Cloud
Join the DZone community and get the full member experience.
Join For Free- sstable generation
- Bulk loading Cassandra using sstableloader
- Using JMX
'sstable' generation
-Dcassandra-foreground -Dcassandra.config=file:///<path to/apache-cassandra-1.1.2/conf/cassandra.yaml> -ea -Xmx1G
SSTableSimpleUnsortedWriter eventWriter = new SSTableSimpleUnsortedWriter(directory, partitioner, keySpace, "Events", AsciiType.instance,null, 64);
With the following code we are creating the rows and adding columns of each row according to the entry read from .csv file. As for the Cassandra wiki one row can have upto 2Billion columns like this.
eventWriter.newRow(uuid); eventWriter.addColumn(bytes("sourceAdd"),bytes(entry.sourceAdd), timestamp); eventWriter.addColumn(bytes("sourceChannelType"),bytes(entry.sourceChannelType), timestamp);The static nested class CsvEntry is used to read just the relevant fields from the csv row.
Once you run the code pointing to the csv file, there will be a directory created in the location you specified as 'directory'. Inside it you will find something similar to following which contains the created sstables.
Bulk loading Cassandra using sstableloader
Inside bin directory of Cassandra you can find this tool sstableloader. You can run it through command line pointing to the above generated sstables. Good guidance on that can be found in Datastax and this blog. Also you can directly use the class 'org.apache.cassandra.tools.Bulkloader' in java code to load the sstables to a Cassandra cluster.If you are testing all this in localhost, following steps need to be taken to try out sstableloader.
- Get a copy of the running Cassandra instance
- Set another loop-back address. In Linux you can do it using,
- Set the rpc address and listen address of the copied /conf/casandra.yaml to 127.0.0.2. Of course you can set rpc address to 0.0.0.0 if you want to listen all interfaces.
- Then from the copied Cassandra run sstableloader we run sstableloader from command line as follows,
- It needs to be noticed the path should end as /keyspace_name/columnfamily_name (eg : ...../CDRs/Events for the above screenshot)
Using JMX bulk load
import java.io.IOException;> import java.util.HashMap; import java.util.Map; import javax.management.JMX; import javax.management.MBeanServerConnection; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; import org.apache.cassandra.service.StorageServiceMBean; public class JmxBulkLoader { private JMXConnector connector; private StorageServiceMBean storageBean; public JmxBulkLoader(String host, int port) throws Exception { connect(host, port); } private void connect(String host, int port) throws IOException, MalformedObjectNameException { JMXServiceURL jmxUrl = new JMXServiceURL(String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", host, port)); Map<String,Object> env = new HashMap<String,Object>(); connector = JMXConnectorFactory.connect(jmxUrl, env); MBeanServerConnection mbeanServerConn = connector.getMBeanServerConnection(); ObjectName name = new ObjectName("org.apache.cassandra.db:type=StorageService"); storageBean = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class); } public void close() throws IOException { connector.close(); } public void bulkLoad(String path) { storageBean.bulkLoad(path); } public static void main(String[] args) throws Exception { if (args.length == 0) { throw new IllegalArgumentException("usage: paths to bulk files"); } JmxBulkLoader np = new JmxBulkLoader("127.0.0.1", 7199); for (String arg : args) { np.bulkLoad(arg); } np.close(); } }
Published at DZone with permission of Pushpalanka Jayawardhana, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments