Loading Data Into a Grakn Knowledge Graph Using the Java Client
This tutorial illustrates how a dataset in CSV, JSON, or XML format can be migrated into a Grakn knowledge graph using Grakn's Java Client.
Join the DZone community and get the full member experience.
Join For FreeThis tutorial illustrates how a dataset in CSV, JSON or XML format can be migrated into a Grakn knowledge graph, using Grakn's Java Client.
The knowledge graph that we'll work on in this post, is called phone_calls.
The schema for this knowledge graph was defined in a previous post, here.
If you're already familiar with Grakn, and all you need is a migration example to follow, you'll find this Github repository useful. If, on the other hand, you're not familiar with the technology, make sure to first complete defining the schema for the phone_calls
knowledge graph, and read on for a detailed guide on migrating data into Grakn using Java.
A Quick Look Into the phone_calls Schema
Before we get started with migration, let's have a quick reminder of how the schema for the phone_calls
knowledge graph looks like.
Migrate Data Into Grakn
Let's go through an overview of how the migration takes place.
- First, we need need to talk to our Grakn keyspace. To do this, we'll use the Grakn's Java Client.
- We'll go through each data file, extracting each data item and parsing it to a JSON object.
- We'll pass each data item (in the form of a JSON object) to its corresponding template. What the template returns is the Graql query for inserting that item into Grakn.
- We'll execute each of those queries to load the data into our target keyspace —
phone_calls
.
Before moving on, make sure you have Java 1.8installed and the Grakn server running on your machine.
Getting Started
Create a New Maven Project
This project uses SDK 1.8 and is named phone_calls
. I'll be using IntelliJ as the IDE.
Set Grakn as a Dependency
Modify pom.xml
to include the latest version of Grakn (1.4.2) as a dependency.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ai.grakn.examples</groupId>
<artifactId>migrate-csv-to-grakn</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>releases</id>
<url>https://oss.sonatype.org/content/repositories/releases</url>
</repository>
</repositories>
<properties>
<grakn.version>1.4.2</grakn.version>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>ai.grakn</groupId>
<artifactId>client-java</artifactId>
<version>${grakn.version}</version>
</dependency>
</dependencies>
</project>
Configure Logging
We'd like to be able to configure what Grakn logs out. To do this, modify pom.xml
to exclude slf4j
shipped with grakn
and add logback
as a dependency, instead.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- ... -->
<dependencies>
<dependency>
<groupId>ai.grakn</groupId>
<artifactId>client-java</artifactId>
<version>${grakn.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
</project>
Next, add a new file called logback.xml
with the content below and place it under src/main/resources
.
<configuration debug="false">
<root level="INFO"/>
</configuration>
Create the Migration Class
Under src/main
create a new file called Migration.java
. This is where we're going to write all our code.
Including the Data Files
Pick one of the data formats below and download the files. After you download each of the four files, place them under the src/main/resources/data
directory. We'll be using these to load their data into our phone_calls
knowledge graph.
All code that follows is to be written in Migration.java
.
Specifying Details for Each Data File
Before anything, we need a structure to contain the details required for reading data files and constructing Graql queries. These details include:
- The path to the data file, and
- The template function that receives a JSON object and produces a Graql insert query.
For this purpose, we create a new subclass called Input
.
import mjson.Json;
public class Migration {
abstract static class Input {
String path;
public Input(String path) {
this.path = path;
}
String getDataPath(){ return path;}
abstract String template(Json data);
}
}
Later in this article, we'll see how an instance of the Input
class can be created, but before we get to that, let's add the mjson
dependency to the dependencies
tag in our pom.xml
file.
<dependency>
<groupId>org.sharegov</groupId>
<artifactId>mjson</artifactId>
<version>1.4.0</version>
</dependency>
Time to initialize the inputs
.
The code below calls the initialiseInputs()
method which returns a collection of inputs
. We'll then use each input
element in this collection to load each data file into Grakn.
// other imports
import java.util.ArrayList;
import java.util.Collection;
public class Migration {
abstract static class Input {...}
public static void main(String[] args) {
Collection<Input> inputs = initialiseInputs();
}
static Collection<Input> initialiseInputs() {
Collection<Input> inputs = new ArrayList<>();
// coming up next
return inputs;
}
}
Input Instance for a Company
// imports
public class Migration {
abstract static class Input {...}
public static void main(String[] args) {...}
static Collection<Input> initialiseInputs() {
Collection<Input> inputs = new ArrayList<>();
inputs.add(new Input("data/companies") {
@Override
public String template(Json company) {
return "insert $company isa company has name " + company.at("name") + ";";
}
});
return inputs;
}
}
input.getDataPath()
will return data/companies
.
Given company
is
{ name: "Telecom" }
input.template(company)
will return
insert $company isa company has name "Telecom";
Input Instance for a Person
// imports
public class Migration {
abstract static class Input {...}
public static void main(String[] args) {...}
static Collection<Input> initialiseInputs() {
Collection<Input> inputs = new ArrayList<>();
inputs.add(new Input("data/companies") {...});
inputs.add(new Input("data/people") {
@Override
public String template(Json person) {
// insert person
String graqlInsertQuery = "insert $person isa person has phone-number " + person.at("phone_number");
if (! person.has("first_name")) {
// person is not a customer
graqlInsertQuery += " has is-customer false";
} else {
// person is a customer
graqlInsertQuery += " has is-customer true";
graqlInsertQuery += " has first-name " + person.at("first_name");
graqlInsertQuery += " has last-name " + person.at("last_name");
graqlInsertQuery += " has city " + person.at("city");
graqlInsertQuery += " has age " + person.at("age").asInteger();
}
graqlInsertQuery += ";";
return graqlInsertQuery;
}
});
return inputs;
}
}
input.getDataPath()
will return data/people
.
Given person
is
{ phone_number: "+44 091 xxx" }
input.template(person)
will return
insert $person has phone-number "+44 091 xxx";
And given person
is
{ firs-name: "Jackie", last-name: "Joe", city: "Jimo", age: 77, phone_number: "+00 091 xxx"}
input.template(person)
will return
insert $person has phone-number "+44 091 xxx" has first-name "Jackie" has last-name "Joe" has city "Jimo" has age 77;
Input instance for a Contract
// imports
public class Migration {
abstract static class Input {...}
public static void main(String[] args) {...}
static Collection<Input> initialiseInputs() {
Collection<Input> inputs = new ArrayList<>();
inputs.add(new Input("data/companies") {...});
inputs.add(new Input("data/people") {...});
inputs.add(new Input("data/contracts") {
@Override
public String template(Json contract) {
// match company
String graqlInsertQuery = "match $company isa company has name " + contract.at("company_name") + ";";
// match person
graqlInsertQuery += " $customer isa person has phone-number " + contract.at("person_id") + ";";
// insert contract
graqlInsertQuery += " insert (provider: $company, customer: $customer) isa contract;";
return graqlInsertQuery;
}
});
return inputs;
}
}
input.getDataPath()
will return data/contracts
.
Given contract
is
{ company_name: "Telecom", person_id: "+00 091 xxx" }
input.template(contract)
will return
match $company isa company has name "Telecom"; $customer isa person has phone-number "+00 091 xxx"; insert (provider: $company, customer: $customer) isa contract;
Input instance for a Call
// imports
public class Migration {
abstract static class Input {...}
public static void main(String[] args) {...}
static Collection<Input> initialiseInputs() {
Collection<Input> inputs = new ArrayList<>();
inputs.add(new Input("data/companies") {...});
inputs.add(new Input("data/people") {...});
inputs.add(new Input("data/contracts") {...});
inputs.add(new Input("data/calls") {
@Override
public String template(Json call) {
// match caller
String graqlInsertQuery = "match $caller isa person has phone-number " + call.at("caller_id") + ";";
// match callee
graqlInsertQuery += " $callee isa person has phone-number " + call.at("callee_id") + ";";
// insert call
graqlInsertQuery += " insert $call(caller: $caller, callee: $callee) isa call;" +
" $call has started-at " + call.at("started_at").asString() + ";" +
" $call has duration " + call.at("duration").asInteger() + ";";
return graqlInsertQuery;
}
});
return inputs;
}
}
input.getDataPath()
will return data/calls
.
Given call
is
{ caller_id: "+44 091 xxx", callee_id: "+00 091 xxx", started_at: 2018-08-10T07:57:51, duration: 148 }
input.template(call)
will return
match $caller isa person has phone-number "+44 091 xxx"; $callee isa person has phone-number "+00 091 xxx"; insert $call(caller: $caller, callee: $callee) isa call; $call has started-at 2018-08-10T07:57:51; $call has duration 148;
Connect and Migrate
Now that we have the datapath and template defined for each of our data files, we can continue to connect with our phone_calls
knowledge graph and load the data into it.
// other imports
import ai.grakn.GraknTxType;
import ai.grakn.Keyspace;
import ai.grakn.client.Grakn;
import ai.grakn.util.SimpleURI;
import java.io.UnsupportedEncodingException;
public class Migration {
abstract static class Input {...}
public static void main(String[] args) {
Collection<Input> inputs = initialiseInputs();
connectAndMigrate(inputs);
}
static void connectAndMigrate(Collection<Input> inputs) {
SimpleURI localGrakn = new SimpleURI("localhost", 48555);
Keyspace keyspace = Keyspace.of("phone_calls");
Grakn grakn = new Grakn(localGrakn);
Grakn.Session session = grakn.session(keyspace);
inputs.forEach(input -> {
System.out.println("Loading from [" + input.getDataPath() + "] into Grakn ...");
try {
loadDataIntoGrakn(input, session);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
});
session.close();
}
static Collection<Input> initialiseInputs() {...}
static void loadDataIntoGrakn(Input input, Grakn.Session session) throws UnsupportedEncodingException {...}
}
connectAndMigrate(Collection<Input> inputs)
is the only method that will be fired to initiate migration of the data into the phone_calls
knowledge graph.
The following happens in this method:
- A Grakn instance
grakn
is created, connected to the server we have running locally atlocalhost:48555
. - A
session
is created, connected to the keyspacephone_calls
. - For each
input
object in theinputs
collection, we call theloadDataIntoGrakn(input, session)
. This will take care of loading the data as specified in theinput
object into our keyspace. - Finally the
session
is closed.
Loading the Data Into phone_calls
Now that we have a session
connected to the phone_calls
keyspace, we can move on to actually loading the data into our knowledge graph.
// imports
public class Migration {
abstract static class Input {...}
public static void main(String[] args) {
Collection<Input> inputs = initialiseInputs();
connectAndMigrate(inputs);
}
static Collection<Input> initialiseInputs() {...}
static void connectAndMigrate(Collection<Input> inputs) {...}
static void loadDataIntoGrakn(Input input, Grakn.Session session) throws UnsupportedEncodingException {
ArrayList<Json> items = parseDataToJson(input);
items.forEach(item -> {
Grakn.Transaction tx = session.transaction(GraknTxType.WRITE);
String graqlInsertQuery = input.template(item);
System.out.println("Executing Graql Query: " + graqlInsertQuery);
tx.graql().parse(graqlInsertQuery).execute();
tx.commit();
tx.close();
});
System.out.println("\nInserted " + items.size() + " items from [ " + input.getDataPath() + "] into Grakn.\n");
}
static ArrayList<Json> parseDataToJson(Input input) throws UnsupportedEncodingException {...}
}
In order to load data from each file into Grakn, we need to:
- retrieve an
ArrayList
of JSON objects, each of which represents a data item. We do this by callingparseDataToJson(input)
, and - for each JSON object in
items
: a) create a transactiontx
, b) construct thegraqlInsertQuery
using the correspondingtemplate
, c) run thequery
, d)commit
the transaction and e)close
the transaction.
Note on creating and committing transactions: To avoid running out of memory, it's recommended that every single query gets created and committed in a single transaction. However, for faster migration of large datasets, this can happen once for every queries, where is the maximum number of queries guaranteed to run on a single transaction.
Now that we've done all the above, we're ready to read each file and parse each data item to a JSON object. It's these JSON objects that will be passed to the template
method on each Input
object.
We're going to write the implementation of parseDataToJson(input)
.
DataFormat-Specific Implementation
The implementation for parseDataToJson(input)
differs based on what format our data files have.
But regardless of what the data format is, we need the right setup to read the files line by line. For this, we'll use an InputStreamReader
.
// other imports
import java.io.InputStreamReader;
import java.io.Reader;
public class Migration {
abstract static class Input {...}
public static void main(String[] args) {...}
static void connectAndMigrate(Collection<Input> inputs) {...}
static Collection<Input> initialiseInputs() {...}
static void loadDataIntoGrakn(Input input, Grakn.Session session) throws UnsupportedEncodingException {...}
public static Reader getReader(String relativePath) throws UnsupportedEncodingException {
return new InputStreamReader(Migration.class.getClassLoader().getResourceAsStream(relativePath), "UTF-8");
}
}
Parsing CSV
We'll use the Univocity CSV Parser for parsing our .csv
files. Let's add the dependency for it. We need to add the following to the dependencies
tag in pom.xml
.
<dependency>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
<version>2.7.6</version>
</dependency>
Having done that, we'll write the implementation of parseDataToJson(input)
for parsing .csv
files.
// other imports
import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;
public class Migration {
abstract static class Input {...}
public static void main(String[] args) {...}
static void connectAndMigrate(Collection<Input> inputs) {...}
static Collection<Input> initialiseInputs() {...}
static void loadDataIntoGrakn(Input input, Grakn.Session session) throws UnsupportedEncodingException {...}
static ArrayList<Json> parseDataToJson(Input input) throws UnsupportedEncodingException {
ArrayList<Json> items = new ArrayList<>();
CsvParserSettings settings = new CsvParserSettings();
settings.setLineSeparatorDetectionEnabled(true);
CsvParser parser = new CsvParser(settings);
parser.beginParsing(getReader(input.getDataPath() + ".csv"));
String[] columns = parser.parseNext();
String[] row;
while ((row = parser.parseNext()) != null) {
Json item = Json.object();
for (int i = 0; i < row.length; i++) {
item.set(columns[i], row[i]);
}
items.add(item);
}
return items;
}
public static Reader getReader(String relativePath) throws UnsupportedEncodingException {
return new InputStreamReader(Migration.class.getClassLoader().getResourceAsStream(relativePath), "UTF-8");
}
}
Besides this implementation, we need to make one more change.
Given the nature of CSV files, the JSON object produced will have all the columns of the .csv
file as its keys, even when the value is not there, it'll be taken as a null
.
For this reason, we need to change one line in the template
method for the input
instance for person.
if (! person.has("first_name")) {...}
becomes
if (person.at("first_name").isNull()) {...}
.
Reading JSON
We'll use Gson's JsonReader for reading our .json
files. Let's add the dependency for it. We need to add the following to the dependencies
tag in pom.xml
.
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.7</version>
</dependency>
Having done that, we'll write the implementation of parseDataToJson(input)
for reading .json
files.
// other imports
import com.google.gson.stream.JsonReader;
public class Migration {
abstract static class Input {...}
public static void main(String[] args) {...}
static void connectAndMigrate(Collection<Input> inputs) {...}
static Collection<Input> initialiseInputs() {...}
static void loadDataIntoGrakn(Input input, Grakn.Session session) throws UnsupportedEncodingException {...}
static ArrayList<Json> parseDataToJson(Input input) throws IOException {
ArrayList<Json> items = new ArrayList<>();
JsonReader jsonReader = new JsonReader(getReader(input.getDataPath() + ".json"));
jsonReader.beginArray();
while (jsonReader.hasNext()) {
jsonReader.beginObject();
Json item = Json.object();
while (jsonReader.hasNext()) {
String key = jsonReader.nextName();
switch (jsonReader.peek()) {
case STRING:
item.set(key, jsonReader.nextString());
break;
case NUMBER:
item.set(key, jsonReader.nextInt());
break;
}
}
jsonReader.endObject();
items.add(item);
}
jsonReader.endArray();
return items;
}
public static Reader getReader(String relativePath) throws UnsupportedEncodingException {
return new InputStreamReader(Migration.class.getClassLoader().getResourceAsStream(relativePath), "UTF-8");
}
}
Parsing XML
We'll use Java's built-in StAX for parsing our .xml
files.
For parsing XML data, we need to know the name of the target tag. This needs to be declared in the Input
class and specified when constructing each input
object.
// imports
public class XmlMigration {
abstract static class Input {
String path;
String selector;
public Input(String path, String selector) {
this.path = path;
this.selector = selector;
}
String getDataPath(){ return path;}
String getSelector(){ return selector;}
abstract String template(Json data);
}
public static void main(String[] args) {...}
static void connectAndMigrate(Collection<Input> inputs) {...}
static Collection<Input> initialiseInputs() {
Collection<Input> inputs = new ArrayList<>();
inputs.add(new Input("data/companies", "company") {...});
inputs.add(new Input("data/people", "person") {...});
inputs.add(new Input("data/contracts", "contract") {...});
inputs.add(new Input("data/calls", "call") {...});
return inputs;
}
static void loadDataIntoGrakn(Input input, Grakn.Session session) throws UnsupportedEncodingException, XMLStreamException {...}
public static Reader getReader(String relativePath) throws UnsupportedEncodingException {...}
}
And now for the implementation of parseDataToJson(input)
for parsing .xml
files.
// other imports
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
public class XmlMigration {
abstract static class Input {
String path;
String selector;
public Input(String path, String selector) {
this.path = path;
this.selector = selector;
}
String getDataPath(){ return path;}
String getSelector(){ return selector;}
abstract String template(Json data);
}
public static void main(String[] args) {...}
static void connectAndMigrate(Collection<Input> inputs) {...}
static Collection<Input> initialiseInputs() {
Collection<Input> inputs = new ArrayList<>();
inputs.add(new Input("data/companies", "company") {...});
inputs.add(new Input("data/people", "person") {...});
inputs.add(new Input("data/contracts", "contract") {...});
inputs.add(new Input("data/calls", "call") {...});
return inputs;
}
static void loadDataIntoGrakn(Input input, Grakn.Session session) throws UnsupportedEncodingException, XMLStreamException {...}
static ArrayList<Json> parseDataToJson(Input input) throws UnsupportedEncodingException, XMLStreamException {
ArrayList<Json> items = new ArrayList<>();
XMLStreamReader r = XMLInputFactory.newInstance().createXMLStreamReader(getReader(input.getDataPath() + ".xml"));
String key;
String value = null;
Boolean inSelector = false;
Json item = null;
while(r.hasNext()) {
int event = r.next();
switch (event) {
case XMLStreamConstants.START_ELEMENT:
if (r.getLocalName().equals(input.getSelector())) {
inSelector = true;
item = Json.object();
}
break;
case XMLStreamConstants.CHARACTERS:
value = r.getText();
break;
case XMLStreamConstants.END_ELEMENT:
key = r.getLocalName();
if (inSelector && ! key.equals(input.getSelector())) {
item.set(key, value);
}
if (key.equals(input.getSelector())) {
inSelector = false;
items.add(item);
}
break;
}
}
return items;
}
public static Reader getReader(String relativePath) throws UnsupportedEncodingException {...}
}
Putting It All Together
Here is how our Migrate.java
looks like for loading CSV data into Grakn, and find here the ones for JSON and XML files.
package ai.grakn.examples;
import ai.grakn.GraknTxType;
import ai.grakn.Keyspace;
import ai.grakn.client.Grakn;
import ai.grakn.util.SimpleURI;
/**
* a collection of fast and reliable Java-based parsers for CSV, TSV and Fixed Width files
* @see <a href="https://www.univocity.com/pages/univocity_parsers_documentation">univocity</a>
*/
import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;
/**
* a lean JSON Library for Java,
* @see <a href="https://bolerio.github.io/mjson/">mjson</a>
*/
import mjson.Json;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
public class CsvMigration {
/**
* representation of Input object that links an input file to its own templating function,
* which is used to map a Json object to a Graql query string
*/
abstract static class Input {
String path;
public Input(String path) {
this.path = path;
}
String getDataPath(){ return path;}
abstract String template(Json data);
}
/**
* 1. creates a Grakn instance
* 2. creates a session to the targeted keyspace
* 3. initialises the list of Inputs, each containing details required to parse the data
* 4. loads the csv data to Grakn for each file
* 5. closes the session
*/
public static void main(String[] args) {
Collection<Input> inputs = initialiseInputs();
connectAndMigrate(inputs);
}
static void connectAndMigrate(Collection<Input> inputs) {
SimpleURI localGrakn = new SimpleURI("localhost", 48555);
Grakn grakn = new Grakn(localGrakn);
Keyspace keyspace = Keyspace.of("phone_calls");
Grakn.Session session = grakn.session(keyspace);
inputs.forEach(input -> {
System.out.println("Loading from [" + input.getDataPath() + "] into Grakn ...");
try {
loadDataIntoGrakn(input, session);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
});
session.close();
}
static Collection<Input> initialiseInputs() {
Collection<Input> inputs = new ArrayList<>();
// define template for constructing a company Graql insert query
inputs.add(new Input("data/companies") {
@Override
public String template(Json company) {
return "insert $company isa company has name " + company.at("name") + ";";
}
});
// define template for constructing a person Graql insert query
inputs.add(new Input("data/people") {
@Override
public String template(Json person) {
// insert person
String graqlInsertQuery = "insert $person isa person has phone-number " + person.at("phone_number");
if (person.at("first_name").isNull()) {
// person is not a customer
graqlInsertQuery += " has is-customer false";
} else {
// person is a customer
graqlInsertQuery += " has is-customer true";
graqlInsertQuery += " has first-name " + person.at("first_name");
graqlInsertQuery += " has last-name " + person.at("last_name");
graqlInsertQuery += " has city " + person.at("city");
graqlInsertQuery += " has age " + person.at("age").asInteger();
}
graqlInsertQuery += ";";
return graqlInsertQuery;
}
});
// define template for constructing a contract Graql insert query
inputs.add(new Input("data/contracts") {
@Override
public String template(Json contract) {
// match company
String graqlInsertQuery = "match $company isa company has name " + contract.at("company_name") + ";";
// match person
graqlInsertQuery += " $customer isa person has phone-number " + contract.at("person_id") + ";";
// insert contract
graqlInsertQuery += " insert (provider: $company, customer: $customer) isa contract;";
return graqlInsertQuery;
}
});
// define template for constructing a call Graql insert query
inputs.add(new Input("data/calls") {
@Override
public String template(Json call) {
// match caller
String graqlInsertQuery = "match $caller isa person has phone-number " + call.at("caller_id") + ";";
// match callee
graqlInsertQuery += " $callee isa person has phone-number " + call.at("callee_id") + ";";
// insert call
graqlInsertQuery += " insert $call(caller: $caller, callee: $callee) isa call;" +
" $call has started-at " + call.at("started_at").asString() + ";" +
" $call has duration " + call.at("duration").asInteger() + ";";
return graqlInsertQuery;
}
});
return inputs;
}
/**
* loads the csv data into our Grakn phone_calls keyspace:
* 1. gets the data items as a list of json objects
* 2. for each json object
* a. creates a Grakn transaction
* b. constructs the corresponding Graql insert query
* c. runs the query
* d. commits the transaction
* e. closes the transaction
*
* @param input contains details required to parse the data
* @param session off of which a transaction will be created
* @throws UnsupportedEncodingException
*/
static void loadDataIntoGrakn(Input input, Grakn.Session session) throws UnsupportedEncodingException {
ArrayList<Json> items = parseDataToJson(input); // 1
items.forEach(item -> {
Grakn.Transaction tx = session.transaction(GraknTxType.WRITE); // 2a
String graqlInsertQuery = input.template(item); // 2b
System.out.println("Executing Graql Query: " + graqlInsertQuery);
tx.graql().parse(graqlInsertQuery).execute(); // 2c
tx.commit(); // 2d
tx.close(); // 2e
});
System.out.println("\nInserted " + items.size() + " items from [ " + input.getDataPath() + "] into Grakn.\n");
}
/**
* 1. reads a csv file through a stream
* 2. parses each row to a json object
* 3. adds the json object to the list of items
*
* @param input used to get the path to the data file, minus the format
* @return the list of json objects
* @throws UnsupportedEncodingException
*/
static ArrayList<Json> parseDataToJson(Input input) throws UnsupportedEncodingException {
ArrayList<Json> items = new ArrayList<>();
CsvParserSettings settings = new CsvParserSettings();
settings.setLineSeparatorDetectionEnabled(true);
CsvParser parser = new CsvParser(settings);
parser.beginParsing(getReader(input.getDataPath() + ".csv")); // 1
String[] columns = parser.parseNext();
String[] row;
while ((row = parser.parseNext()) != null) {
Json item = Json.object();
for (int i = 0; i < row.length; i++) {
item.set(columns[i], row[i]); // 2
}
items.add(item); // 3
}
return items;
}
public static Reader getReader(String relativePath) throws UnsupportedEncodingException {
return new InputStreamReader(CsvMigration.class.getClassLoader().getResourceAsStream(relativePath), "UTF-8");
}
}
Time to Load
Run the main
method, sit back, relax and watch the logs while the data starts pouring into Grakn.
To Recap
- We started off by setting up our project and positioning the data files.
- Next, we went on to set up the migration mechanism, one that was independent of the data format.
- Then, we learned how files with different data formats can be parsed into JSON objects.
- Lastly, we ran the
main
method which fired theconnectAndMigrate
method with the giveninputs
. This loaded the data into our Grakn knowledge graph.
Next
In the next post (to be published soon), we'll see how we can get insights over this dataset by querying the phone_calls
knowledge graph using the Graql console, Grakn Workbase, and the Java client. Stay tuned!
Published at DZone with permission of Soroush Saffari, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments