Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Loading Data Into a Grakn Knowledge Graph Using the Java Client

DZone 's Guide to

Loading Data Into a Grakn Knowledge Graph Using the Java Client

This tutorial illustrates how a dataset in CSV, JSON, or XML format can be migrated into a Grakn knowledge graph using Grakn's Java Client.

· Database Zone ·
Free Resource

This tutorial illustrates how a dataset in CSV, JSON or XML format can be migrated into a Grakn knowledge graph, using Grakn's Java Client.

The knowledge graph that we'll work on in this post, is called phone_calls. The schema for this knowledge graph was defined in a previous post, here.

If you're already familiar with Grakn, and all you need is a migration example to follow, you'll find this Github repository useful. If, on the other hand, you're not familiar with the technology, make sure to first complete defining the schema for the phone_calls knowledge graph, and read on for a detailed guide on migrating data into Grakn using Java.

A Quick Look Into the phone_calls Schema

Before we get started with migration, let's have a quick reminder of how the schema for the phone_calls knowledge graph looks like.

Migrate Data Into Grakn

Let's go through an overview of how the migration takes place.

  1. First, we need need to talk to our Grakn keyspace. To do this, we'll use the Grakn's Java Client.
  2. We'll go through each data file, extracting each data item and parsing it to a JSON object.
  3. We'll pass each data item (in the form of a JSON object) to its corresponding template. What the template returns is the Graql query for inserting that item into Grakn.
  4. We'll execute each of those queries to load the data into our target keyspace — phone_calls.

Before moving on, make sure you have Java 1.8installed and the Grakn server running on your machine.

Getting Started

Create a New Maven Project

This project uses SDK 1.8 and is named phone_calls. I'll be using IntelliJ as the IDE.

Set Grakn as a Dependency

Modify pom.xml to include the latest version of Grakn (1.4.2) as a dependency.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>ai.grakn.examples</groupId>
    <artifactId>migrate-csv-to-grakn</artifactId>
    <version>1.0-SNAPSHOT</version>

    <repositories>
        <repository>
            <id>releases</id>
            <url>https://oss.sonatype.org/content/repositories/releases</url>
        </repository>
    </repositories>

    <properties>
        <grakn.version>1.4.2</grakn.version>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>ai.grakn</groupId>
            <artifactId>client-java</artifactId>
            <version>${grakn.version}</version>
        </dependency>
    </dependencies>
</project>

Configure Logging

We'd like to be able to configure what Grakn logs out. To do this, modify pom.xml to exclude slf4j shipped with grakn and add logback as a dependency, instead.


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <!-- ... -->

    <dependencies>
        <dependency>
            <groupId>ai.grakn</groupId>
            <artifactId>client-java</artifactId>
            <version>${grakn.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-simple</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
    </dependencies>
</project>

Next, add a new file called logback.xml with the content below and place it under src/main/resources.

<configuration debug="false">
    <root level="INFO"/>
</configuration>

Create the Migration Class

Under src/main create a new file called Migration.java. This is where we're going to write all our code.

Including the Data Files

Pick one of the data formats below and download the files. After you download each of the four files, place them under the src/main/resources/data directory. We'll be using these to load their data into our phone_calls knowledge graph.

All code that follows is to be written in Migration.java.

Specifying Details for Each Data File

Before anything, we need a structure to contain the details required for reading data files and constructing Graql queries. These details include:

  • The path to the data file, and
  • The template function that receives a JSON object and produces a Graql insert query.

For this purpose, we create a new subclass called Input.

import mjson.Json;

public class Migration {
    abstract static class Input {
        String path;
        public Input(String path) {
            this.path = path;
        }
        String getDataPath(){ return path;}
        abstract String template(Json data);
    }
}

Later in this article, we'll see how an instance of the Input class can be created, but before we get to that, let's add the mjson dependency to the dependencies tag in our pom.xml file.

<dependency>
    <groupId>org.sharegov</groupId>
    <artifactId>mjson</artifactId>
    <version>1.4.0</version>
</dependency>

Time to initialize the inputs.

The code below calls the initialiseInputs() method which returns a collection of inputs. We'll then use each input element in this collection to load each data file into Grakn.


// other imports
import java.util.ArrayList;
import java.util.Collection;

public class Migration {
    abstract static class Input {...}

    public static void main(String[] args) {
        Collection<Input> inputs = initialiseInputs();
    }

    static Collection<Input> initialiseInputs() {
        Collection<Input> inputs = new ArrayList<>();
        // coming up next
        return inputs;
    }
}

Input Instance for a Company


// imports

public class Migration {
    abstract static class Input {...}
    public static void main(String[] args) {...}

    static Collection<Input> initialiseInputs() {
        Collection<Input> inputs = new ArrayList<>();

        inputs.add(new Input("data/companies") {
            @Override
            public String template(Json company) {
                return "insert $company isa company has name " + company.at("name") + ";";
            }
        });

        return inputs;
    }
}

input.getDataPath() will return data/companies.

Given company is

{ name: "Telecom" }

input.template(company) will return

insert $company isa company has name "Telecom";

Input Instance for a Person

// imports

public class Migration {
    abstract static class Input {...}
    public static void main(String[] args) {...}

    static Collection<Input> initialiseInputs() {
        Collection<Input> inputs = new ArrayList<>();

        inputs.add(new Input("data/companies") {...});

        inputs.add(new Input("data/people") {
            @Override
            public String template(Json person) {
                // insert person
                String graqlInsertQuery = "insert $person isa person has phone-number " + person.at("phone_number");

                if (! person.has("first_name")) {
                    // person is not a customer
                    graqlInsertQuery += " has is-customer false";
                } else {
                    // person is a customer
                    graqlInsertQuery += " has is-customer true";
                    graqlInsertQuery += " has first-name " + person.at("first_name");
                    graqlInsertQuery += " has last-name " + person.at("last_name");
                    graqlInsertQuery += " has city " + person.at("city");
                    graqlInsertQuery += " has age " + person.at("age").asInteger();
                }

                graqlInsertQuery += ";";
                return graqlInsertQuery;
            }
        });

        return inputs;
    }
}

input.getDataPath() will return data/people.

Given person is

{ phone_number: "+44 091 xxx" }

input.template(person) will return

insert $person has phone-number "+44 091 xxx";

And given person is

{ firs-name: "Jackie", last-name: "Joe", city: "Jimo", age: 77, phone_number: "+00 091 xxx"}

input.template(person) will return

insert $person has phone-number "+44 091 xxx" has first-name "Jackie" has last-name "Joe" has city "Jimo" has age 77;

Input instance for a Contract

// imports

public class Migration {
    abstract static class Input {...}
    public static void main(String[] args) {...}

    static Collection<Input> initialiseInputs() {
        Collection<Input> inputs = new ArrayList<>();

        inputs.add(new Input("data/companies") {...});

        inputs.add(new Input("data/people") {...});

        inputs.add(new Input("data/contracts") {
            @Override
            public String template(Json contract) {
                // match company
                String graqlInsertQuery = "match $company isa company has name " + contract.at("company_name") + ";";
                // match person
                graqlInsertQuery += " $customer isa person has phone-number " + contract.at("person_id") + ";";
                // insert contract
                graqlInsertQuery += " insert (provider: $company, customer: $customer) isa contract;";
                return graqlInsertQuery;
            }
        });

        return inputs;
    }
}

input.getDataPath() will return data/contracts.

Given contract is

{ company_name: "Telecom", person_id: "+00 091 xxx" }

input.template(contract) will return

match $company isa company has name "Telecom"; $customer isa person has phone-number "+00 091 xxx"; insert (provider: $company, customer: $customer) isa contract;

Input instance for a Call

// imports

public class Migration {
    abstract static class Input {...}
    public static void main(String[] args) {...}

    static Collection<Input> initialiseInputs() {
        Collection<Input> inputs = new ArrayList<>();

        inputs.add(new Input("data/companies") {...});

        inputs.add(new Input("data/people") {...});

        inputs.add(new Input("data/contracts") {...});

        inputs.add(new Input("data/calls") {
            @Override
            public String template(Json call) {
                // match caller
                String graqlInsertQuery = "match $caller isa person has phone-number " + call.at("caller_id") + ";";
                // match callee
                graqlInsertQuery += " $callee isa person has phone-number " + call.at("callee_id") + ";";
                // insert call
                graqlInsertQuery += " insert $call(caller: $caller, callee: $callee) isa call;" +
                        " $call has started-at " + call.at("started_at").asString() + ";" +
                        " $call has duration " + call.at("duration").asInteger() + ";";
                return graqlInsertQuery;
            }
        });

        return inputs;
    }
}

input.getDataPath() will return data/calls.

Given call is

{ caller_id: "+44 091 xxx", callee_id: "+00 091 xxx", started_at: 2018-08-10T07:57:51, duration: 148 }

input.template(call) will return

match $caller isa person has phone-number "+44 091 xxx"; $callee isa person has phone-number "+00 091 xxx"; insert $call(caller: $caller, callee: $callee) isa call; $call has started-at 2018-08-10T07:57:51; $call has duration 148;

Connect and Migrate

Now that we have the datapath and template defined for each of our data files, we can continue to connect with our phone_calls knowledge graph and load the data into it.

// other imports
import ai.grakn.GraknTxType;
import ai.grakn.Keyspace;
import ai.grakn.client.Grakn;
import ai.grakn.util.SimpleURI;
import java.io.UnsupportedEncodingException;

public class Migration {
    abstract static class Input {...}

    public static void main(String[] args) {
        Collection<Input> inputs = initialiseInputs();
        connectAndMigrate(inputs);
    }

    static void connectAndMigrate(Collection<Input> inputs) {
        SimpleURI localGrakn = new SimpleURI("localhost", 48555);
        Keyspace keyspace = Keyspace.of("phone_calls");
        Grakn grakn = new Grakn(localGrakn);
        Grakn.Session session = grakn.session(keyspace);

        inputs.forEach(input -> {
            System.out.println("Loading from [" + input.getDataPath() + "] into Grakn ...");
            try {
                loadDataIntoGrakn(input, session);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        });

        session.close();
    }

    static Collection<Input> initialiseInputs() {...}

    static void loadDataIntoGrakn(Input input, Grakn.Session session) throws UnsupportedEncodingException {...}
}

connectAndMigrate(Collection<Input> inputs) is the only method that will be fired to initiate migration of the data into the phone_calls knowledge graph.

The following happens in this method:

  1. A Grakn instance grakn is created, connected to the server we have running locally at localhost:48555.
  2. A session is created, connected to the keyspace phone_calls.
  3. For each input object in the inputs collection, we call the loadDataIntoGrakn(input, session). This will take care of loading the data as specified in the input object into our keyspace.
  4. Finally the session is closed.

Loading the Data Into phone_calls

Now that we have a session connected to the phone_calls keyspace, we can move on to actually loading the data into our knowledge graph.

// imports

public class Migration {
    abstract static class Input {...}

    public static void main(String[] args) {
        Collection<Input> inputs = initialiseInputs();
        connectAndMigrate(inputs);
    }

    static Collection<Input> initialiseInputs() {...}

    static void connectAndMigrate(Collection<Input> inputs) {...}

    static void loadDataIntoGrakn(Input input, Grakn.Session session) throws UnsupportedEncodingException {
        ArrayList<Json> items = parseDataToJson(input);
        items.forEach(item -> {
            Grakn.Transaction tx = session.transaction(GraknTxType.WRITE);
            String graqlInsertQuery = input.template(item);
            System.out.println("Executing Graql Query: " + graqlInsertQuery);
            tx.graql().parse(graqlInsertQuery).execute();
            tx.commit();
            tx.close();
        });
        System.out.println("\nInserted " + items.size() + " items from [ " + input.getDataPath() + "] into Grakn.\n");
    }

    static ArrayList<Json> parseDataToJson(Input input) throws UnsupportedEncodingException {...}
}

In order to load data from each file into Grakn, we need to:

  1. retrieve an ArrayList of JSON objects, each of which represents a data item. We do this by calling parseDataToJson(input), and
  2. for each JSON object in items: a) create a transaction tx, b) construct the graqlInsertQuery using the corresponding template, c) run the query, d)commit the transaction and e) close the transaction.
Note on creating and committing transactions: To avoid running out of memory, it's recommended that every single query gets created and committed in a single transaction. However, for faster migration of large datasets, this can happen once for every queries, where is the maximum number of queries guaranteed to run on a single transaction.

Now that we've done all the above, we're ready to read each file and parse each data item to a JSON object. It's these JSON objects that will be passed to the template method on each Input object.

We're going to write the implementation of parseDataToJson(input).

DataFormat-Specific Implementation

The implementation for parseDataToJson(input) differs based on what format our data files have.

But regardless of what the data format is, we need the right setup to read the files line by line. For this, we'll use an InputStreamReader.

// other imports
import java.io.InputStreamReader;
import java.io.Reader;


public class Migration {
    abstract static class Input {...}

    public static void main(String[] args) {...}

    static void connectAndMigrate(Collection<Input> inputs) {...}

    static Collection<Input> initialiseInputs() {...}

    static void loadDataIntoGrakn(Input input, Grakn.Session session) throws UnsupportedEncodingException {...}

    public static Reader getReader(String relativePath) throws UnsupportedEncodingException {
        return new InputStreamReader(Migration.class.getClassLoader().getResourceAsStream(relativePath), "UTF-8");
    }
}

Parsing CSV

We'll use the Univocity CSV Parser for parsing our .csv files. Let's add the dependency for it. We need to add the following to the dependencies tag in pom.xml.

<dependency>
    <groupId>com.univocity</groupId>
    <artifactId>univocity-parsers</artifactId>
    <version>2.7.6</version>
</dependency>

Having done that, we'll write the implementation of parseDataToJson(input) for parsing .csv files.

// other imports

import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;

public class Migration {
    abstract static class Input {...}

    public static void main(String[] args) {...}

    static void connectAndMigrate(Collection<Input> inputs) {...}

    static Collection<Input> initialiseInputs() {...}

    static void loadDataIntoGrakn(Input input, Grakn.Session session) throws UnsupportedEncodingException {...}

    static ArrayList<Json> parseDataToJson(Input input) throws UnsupportedEncodingException {
        ArrayList<Json> items = new ArrayList<>();

        CsvParserSettings settings = new CsvParserSettings();
        settings.setLineSeparatorDetectionEnabled(true);
        CsvParser parser = new CsvParser(settings);
        parser.beginParsing(getReader(input.getDataPath() + ".csv"));

        String[] columns = parser.parseNext();
        String[] row;
        while ((row = parser.parseNext()) != null) {
            Json item = Json.object();
            for (int i = 0; i < row.length; i++) {
                item.set(columns[i], row[i]);
            }
            items.add(item);
        }
        return items;
    }

    public static Reader getReader(String relativePath) throws UnsupportedEncodingException {
        return new InputStreamReader(Migration.class.getClassLoader().getResourceAsStream(relativePath), "UTF-8");
    }
}

Besides this implementation, we need to make one more change.

Given the nature of CSV files, the JSON object produced will have all the columns of the .csv file as its keys, even when the value is not there, it'll be taken as a null.

For this reason, we need to change one line in the template method for the input instance for person.

if (! person.has("first_name")) {...}

becomes

if (person.at("first_name").isNull()) {...}.

Reading JSON

We'll use Gson's JsonReader for reading our .json files. Let's add the dependency for it. We need to add the following to the dependencies tag in pom.xml.

<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.7</version>
</dependency>

Having done that, we'll write the implementation of parseDataToJson(input) for reading .json files.

// other imports
import com.google.gson.stream.JsonReader;

public class Migration {
    abstract static class Input {...}

    public static void main(String[] args) {...}

    static void connectAndMigrate(Collection<Input> inputs) {...}

    static Collection<Input> initialiseInputs() {...}

    static void loadDataIntoGrakn(Input input, Grakn.Session session) throws UnsupportedEncodingException {...}

    static ArrayList<Json> parseDataToJson(Input input) throws IOException {
        ArrayList<Json> items = new ArrayList<>();

        JsonReader jsonReader = new JsonReader(getReader(input.getDataPath() + ".json"));

        jsonReader.beginArray();
        while (jsonReader.hasNext()) {
            jsonReader.beginObject();
            Json item = Json.object();
            while (jsonReader.hasNext()) {
                String key = jsonReader.nextName();
                switch (jsonReader.peek()) {
                    case STRING:
                        item.set(key, jsonReader.nextString());
                        break;
                    case NUMBER:
                        item.set(key, jsonReader.nextInt());
                        break;
                }
            }
            jsonReader.endObject();
            items.add(item);
        }
        jsonReader.endArray();
        return items;
    }

    public static Reader getReader(String relativePath) throws UnsupportedEncodingException {
        return new InputStreamReader(Migration.class.getClassLoader().getResourceAsStream(relativePath), "UTF-8");
    }
}

Parsing XML

We'll use Java's built-in StAX for parsing our .xml files.

For parsing XML data, we need to know the name of the target tag. This needs to be declared in the Input class and specified when constructing each input object.

// imports

public class XmlMigration {
    abstract static class Input {
        String path;
        String selector;
        public Input(String path, String selector) {
            this.path = path;
            this.selector = selector;
        }
        String getDataPath(){ return path;}
        String getSelector(){ return selector;}
        abstract String template(Json data);
    }

    public static void main(String[] args)  {...}

    static void connectAndMigrate(Collection<Input> inputs) {...}

    static Collection<Input> initialiseInputs() {
        Collection<Input> inputs = new ArrayList<>();

        inputs.add(new Input("data/companies", "company") {...});
        inputs.add(new Input("data/people", "person") {...});
        inputs.add(new Input("data/contracts", "contract") {...});
        inputs.add(new Input("data/calls", "call") {...});

        return inputs;
    }

    static void loadDataIntoGrakn(Input input, Grakn.Session session) throws UnsupportedEncodingException, XMLStreamException {...}

    public static Reader getReader(String relativePath) throws UnsupportedEncodingException {...}
}

And now for the implementation of parseDataToJson(input) for parsing .xml files.

// other imports
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;

public class XmlMigration {
    abstract static class Input {
        String path;
        String selector;
        public Input(String path, String selector) {
            this.path = path;
            this.selector = selector;
        }
        String getDataPath(){ return path;}
        String getSelector(){ return selector;}
        abstract String template(Json data);
    }

    public static void main(String[] args)  {...}

    static void connectAndMigrate(Collection<Input> inputs) {...}

    static Collection<Input> initialiseInputs() {
        Collection<Input> inputs = new ArrayList<>();

        inputs.add(new Input("data/companies", "company") {...});
        inputs.add(new Input("data/people", "person") {...});
        inputs.add(new Input("data/contracts", "contract") {...});
        inputs.add(new Input("data/calls", "call") {...});

        return inputs;
    }

    static void loadDataIntoGrakn(Input input, Grakn.Session session) throws UnsupportedEncodingException, XMLStreamException {...}

    static ArrayList<Json> parseDataToJson(Input input) throws UnsupportedEncodingException, XMLStreamException {
        ArrayList<Json> items = new ArrayList<>();

        XMLStreamReader r = XMLInputFactory.newInstance().createXMLStreamReader(getReader(input.getDataPath() + ".xml"));
        String key;
        String value = null;
        Boolean inSelector = false;
        Json item = null;
        while(r.hasNext()) {
            int event = r.next();

            switch (event) {
                case XMLStreamConstants.START_ELEMENT:
                    if (r.getLocalName().equals(input.getSelector())) {
                        inSelector = true;
                        item = Json.object();
                    }
                    break;

                case XMLStreamConstants.CHARACTERS:
                    value = r.getText();
                    break;

                case XMLStreamConstants.END_ELEMENT:
                    key = r.getLocalName();
                    if (inSelector && ! key.equals(input.getSelector())) {
                        item.set(key, value);
                    }
                    if (key.equals(input.getSelector())) {
                        inSelector = false;
                        items.add(item);
                    }

                    break;
            }
        }
        return items;
    }

    public static Reader getReader(String relativePath) throws UnsupportedEncodingException {...}
}

Putting It All Together

Here is how our Migrate.java looks like for loading CSV data into Grakn, and find here the ones for JSON and XML files.

package ai.grakn.examples;

import ai.grakn.GraknTxType;
import ai.grakn.Keyspace;
import ai.grakn.client.Grakn;
import ai.grakn.util.SimpleURI;

/**
 * a collection of fast and reliable Java-based parsers for CSV, TSV and Fixed Width files
 * @see <a href="https://www.univocity.com/pages/univocity_parsers_documentation">univocity</a>
 */
import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;

/**
 * a lean JSON Library for Java,
 * @see <a href="https://bolerio.github.io/mjson/">mjson</a>
 */
import mjson.Json;

import java.io.InputStreamReader;
import java.io.Reader;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;

public class CsvMigration {
    /**
     * representation of Input object that links an input file to its own templating function,
     * which is used to map a Json object to a Graql query string
     */
    abstract static class Input {
        String path;

        public Input(String path) {
            this.path = path;
        }

        String getDataPath(){ return path;}

        abstract String template(Json data);
    }

    /**
     * 1. creates a Grakn instance
     * 2. creates a session to the targeted keyspace
     * 3. initialises the list of Inputs, each containing details required to parse the data
     * 4. loads the csv data to Grakn for each file
     * 5. closes the session
     */
    public static void main(String[] args) {
        Collection<Input> inputs = initialiseInputs();
        connectAndMigrate(inputs);
    }

    static void connectAndMigrate(Collection<Input> inputs) {
        SimpleURI localGrakn = new SimpleURI("localhost", 48555);
        Grakn grakn = new Grakn(localGrakn);
        Keyspace keyspace = Keyspace.of("phone_calls");
        Grakn.Session session = grakn.session(keyspace);

        inputs.forEach(input -> {
            System.out.println("Loading from [" + input.getDataPath() + "] into Grakn ...");
            try {
                loadDataIntoGrakn(input, session);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        });

        session.close();
    }

    static Collection<Input> initialiseInputs() {
        Collection<Input> inputs = new ArrayList<>();

        // define template for constructing a company Graql insert query
        inputs.add(new Input("data/companies") {
            @Override
            public String template(Json company) {
                return "insert $company isa company has name " + company.at("name") + ";";
            }
        });
        // define template for constructing a person Graql insert query
        inputs.add(new Input("data/people") {
            @Override
            public String template(Json person) {
                // insert person
                String graqlInsertQuery = "insert $person isa person has phone-number " + person.at("phone_number");

                if (person.at("first_name").isNull()) {
                    // person is not a customer
                    graqlInsertQuery += " has is-customer false";
                } else {
                    // person is a customer
                    graqlInsertQuery += " has is-customer true";
                    graqlInsertQuery += " has first-name " + person.at("first_name");
                    graqlInsertQuery += " has last-name " + person.at("last_name");
                    graqlInsertQuery += " has city " + person.at("city");
                    graqlInsertQuery += " has age " + person.at("age").asInteger();
                }

                graqlInsertQuery += ";";
                return graqlInsertQuery;
            }
        });
        // define template for constructing a contract Graql insert query
        inputs.add(new Input("data/contracts") {
            @Override
            public String template(Json contract) {
                // match company
                String graqlInsertQuery = "match $company isa company has name " + contract.at("company_name") + ";";
                // match person
                graqlInsertQuery += " $customer isa person has phone-number " + contract.at("person_id") + ";";
                // insert contract
                graqlInsertQuery += " insert (provider: $company, customer: $customer) isa contract;";
                return graqlInsertQuery;
            }
        });
        // define template for constructing a call Graql insert query
        inputs.add(new Input("data/calls") {
            @Override
            public String template(Json call) {
                // match caller
                String graqlInsertQuery = "match $caller isa person has phone-number " + call.at("caller_id") + ";";
                // match callee
                graqlInsertQuery += " $callee isa person has phone-number " + call.at("callee_id") + ";";
                // insert call
                graqlInsertQuery += " insert $call(caller: $caller, callee: $callee) isa call;" +
                        " $call has started-at " + call.at("started_at").asString() + ";" +
                        " $call has duration " + call.at("duration").asInteger() + ";";
                return graqlInsertQuery;
            }
        });
        return inputs;
    }

    /**
     * loads the csv data into our Grakn phone_calls keyspace:
     * 1. gets the data items as a list of json objects
     * 2. for each json object
     *   a. creates a Grakn transaction
     *   b. constructs the corresponding Graql insert query
     *   c. runs the query
     *   d. commits the transaction
     *   e. closes the transaction
     *
     * @param input   contains details required to parse the data
     * @param session off of which a transaction will be created
     * @throws UnsupportedEncodingException
     */
    static void loadDataIntoGrakn(Input input, Grakn.Session session) throws UnsupportedEncodingException {
        ArrayList<Json> items = parseDataToJson(input); // 1
        items.forEach(item -> {
            Grakn.Transaction tx = session.transaction(GraknTxType.WRITE); // 2a
            String graqlInsertQuery = input.template(item); // 2b
            System.out.println("Executing Graql Query: " + graqlInsertQuery);
            tx.graql().parse(graqlInsertQuery).execute(); // 2c
            tx.commit(); // 2d
            tx.close(); // 2e

        });
        System.out.println("\nInserted " + items.size() + " items from [ " + input.getDataPath() + "] into Grakn.\n");
    }

    /**
     * 1. reads a csv file through a stream
     * 2. parses each row to a json object
     * 3. adds the json object to the list of items
     *
     * @param input used to get the path to the data file, minus the format
     * @return the list of json objects
     * @throws UnsupportedEncodingException
     */
    static ArrayList<Json> parseDataToJson(Input input) throws UnsupportedEncodingException {
        ArrayList<Json> items = new ArrayList<>();

        CsvParserSettings settings = new CsvParserSettings();
        settings.setLineSeparatorDetectionEnabled(true);
        CsvParser parser = new CsvParser(settings);
        parser.beginParsing(getReader(input.getDataPath() + ".csv")); // 1

        String[] columns = parser.parseNext();
        String[] row;
        while ((row = parser.parseNext()) != null) {
            Json item = Json.object();
            for (int i = 0; i < row.length; i++) {
                item.set(columns[i], row[i]); // 2
            }
            items.add(item); // 3
        }
        return items;
    }

    public static Reader getReader(String relativePath) throws UnsupportedEncodingException {
        return new InputStreamReader(CsvMigration.class.getClassLoader().getResourceAsStream(relativePath), "UTF-8");
    }
}

Time to Load

Run the main method, sit back, relax and watch the logs while the data starts pouring into Grakn.

To Recap

  • We started off by setting up our project and positioning the data files.
  • Next, we went on to set up the migration mechanism, one that was independent of the data format.
  • Then, we learned how files with different data formats can be parsed into JSON objects.
  • Lastly, we ran the main method which fired the connectAndMigrate method with the given inputs. This loaded the data into our Grakn knowledge graph.

Next

In the next post (to be published soon), we'll see how we can get insights over this dataset by querying the phone_calls knowledge graph using the Graql console, Grakn Workbase, and the Java client. Stay tuned!

Topics:
database ,grakn ,knowledge graphs ,java client ,tutorial ,knowledge graph tutorial ,java 1.8 ,create a maven project

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}