Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Stored Procedure to Import Data

DZone's Guide to

Stored Procedure to Import Data

Learn how to write a stored procedure to import the MaxMind city dataset for all your analytical and visualization needs.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

A while back, I showed you how to write an extension to import the MaxMind city dataset. Today is just a repeat of that exercise but instead of using an extension, we will use a stored procedure.

The documentation spells out how to write your own procedures in Chapter 6, so I'm not going to go over that again, but I do want to point out a few things.

One is that the stored procedure starts its own transaction, so if you are just reading or writing something from the database, you don't even have to worry about it. This is very convenient — except when you want to load large amounts of data. I like to have the "top-level" procedure do nothing but call another thread where the actual work will be and just return when it is done. You'll notice I'll grab a GraphDatabaseService  in our context that gets passed in (use GraphDatabaseAPI if using Neo4j version 3.1.x). Check out the code below:

    @Context
    public GraphDatabaseService db;

    @Procedure(name = "com.maxdemarzi.import.locations", mode = Mode.WRITE)
    @Description("CALL com.maxdemarzi.import.locations(file)")
    public Stream<StringResult> importLocations(@Name("file") String file) throws InterruptedException {
        long start = System.nanoTime();

        Thread t1 = new Thread(new ImportLocationsRunnable(file, db, log));
        t1.start();
        t1.join();

        return Stream.of(new StringResult("Locations imported in " + TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) + " seconds"));
    }

In the Runnable is where all the magic happens. I put a hard limit here of 1,000, but your server may be better with 5,000 or 10,000 changes per transaction. What we want to do is batch our writes — if we do one change at a time, we will destroy our performance since Neo4j is an ACID database and every a tiny little change of a property from true to false will force a write to disk.

public class ImportLocationsRunnable implements Runnable {

    private static final int TRANSACTION_LIMIT = 1000;
    private String file;
    private GraphDatabaseService db;
    private Log log;

    public ImportLocationsRunnable(String file, GraphDatabaseService db, Log log) {
        this.file = file;
        this.db = db;
        this.log = log;
    }

In our actual run method, we check to make sure the file exists...

 @Override
    public void run() {
        Reader in;
        Iterable<CSVRecord> records = null;
        try {
            in = new FileReader("/" + file);
            records = CSVFormat.EXCEL.withHeader().parse(in);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
            log.error("ImportLocationsRunnable Import - File not found: " + file);
        } catch (IOException e) {
            e.printStackTrace();
            log.error("ImportLocationsRunnable Import - IO Exception: " + file);
        }

...and begin our transaction outside of the try block. Then, loop over each record in the CSV file.

        Transaction tx = db.beginTx();
        try {
            int count = 0;

            assert records != null;
            for (CSVRecord record : records) {
                count++;

As we import along, we check to see if we have reached our transaction limit and then commit, and refresh our transaction with a new one. This will take the hit to disk once for the 1,000 changes, as well as relieve memory pressure from having to keep all that work in memory.

              if (count % TRANSACTION_LIMIT == 0) {
                    tx.success();
                    tx.close();
                    tx = db.beginTx();
                }
            }

            tx.success();
        } finally {
            tx.close();
        }

    }

And yes, it's single threaded, but if your data is only a few million records, it will be fine. Once you start talking about loading 1 billion records or more, then maybe a good time to multi-thread this code or use a different import mechanism.

As always, the code is on GitHub.

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
big data ,database performance ,stored procedure ,importing data ,tutorial ,data analytics

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}