DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Exploring the Dynamics of Streaming Databases
  • What Is a Streaming Database?
  • Designing High-Volume Systems Using Event-Driven Architectures
  • Next-Gen Data Pipes With Spark, Kafka and k8s

Trending

  • How Can Developers Drive Innovation by Combining IoT and AI?
  • Docker Base Images Demystified: A Practical Guide
  • From Zero to Production: Best Practices for Scaling LLMs in the Enterprise
  • Unmasking Entity-Based Data Masking: Best Practices 2025
  1. DZone
  2. Data Engineering
  3. Data
  4. Nebula Flink Connector: Implementation and Practices

Nebula Flink Connector: Implementation and Practices

This post introduces Nebula Flink Connector. Like the pre-defined Flink connectors, it enables Flink to read data from and write data to Nebula Graph.

By 
Nicole Wang user avatar
Nicole Wang
·
Apr. 14, 21 · Tutorial
Likes (1)
Comment
Save
Tweet
Share
5.9K Views

Join the DZone community and get the full member experience.

Join For Free

In the scenarios of relational network analysis, relationship modeling, and real-time recommendation, using graph databases for background data is becoming popular, and some scenarios, such as recommendation systems and search engines, require high real-time graph data. To improve the real-time performance of data, stream processing is widely used for incremental processing of updated data in real-time. To support the stream processing of graph data, the Nebula Graph team developed Nebula Flink Connecter to empower Flink to operate stream processing of data in Nebula Graph.

Flink is a new generation of computing engines that can support both stream and batch processing of data. It reads data from a third-party storage engine, processes them, and then writes them to another storage engine. A Flink Connector works like a connector, connecting the Flink computing engine to an external storage system.

Flink can use four methods to exchange data with an external source:

  • The pre-defined API of Source and Sink.
  • The bundled connectors, such as JDBC connectors.
  • The Apache Bahir connectors. Apache Bahir was part of Apache Spark. It was intended to provide the implementation of extensions and/or plug-ins, connectors, and other pluggable components that are not limited to Spark.
  • Asynchronous I/O. In-stream processing, it is often necessary to interact with external storage systems, such as associating a table in MySQL. If synchronous I/O is used, a lot of time is consumed for waiting, which has an influence on throughput and latency. But in asynchronous I/O mode, multiple requests can be handled concurrently, so the throughput is improved and the latency is reduced.

This post introduces Nebula Flink Connector. Like the pre-defined Flink connectors, it enables Flink to read data from and write data to Nebula Graph.

Connector Source

As a stream-processing framework, Flink can handle both bounded and unbounded data. The unbounded data are the continuous flow of data that cannot be terminated, and it is the data that is processed by real-time stream processing. The data processed by batch processing is bounded data. As for Flink, the system that provides data to be processed by Flink is called Source.

For Nebula Flink Connector, Nebula Graph is the Source. Flink provides rich Connector components, allowing users to define external storage systems as their Sources.

About Source

The Source enables Flink to get access to external data sources. The source capability of Flink is mainly implemented with read related APIs and the addSource method. When using the addSource method to read data from an external system, you can use a Flink Bundled Connector or customize a Source.

How to use Flink Source is shown in this figure.

Flink Source Tree

In this section, we focus on how to customize Nebula Graph Source.

1.2 Customize Source

In Flink, either StreamExecutionEnvironment.addSource(sourceFunction) or ExecutionEnvironment.createInput(inputFormat) can be used to add a data source to your program.

Flink provides a number of built-in SourceFunction. Developers can customize a non-parallel Source by inheriting from RichSourceFunction or a parallel Source by inheriting from RichParallelSourceFunction. RichSourceFunction and RichParallelSourceFunction have the features of both SourceFunction and RichFunction. Of them, SourceFunction is responsible for data generation, and RichFunction is responsible for resource management. Of course, you can implement only SourceFunction to define the simplest DataSource that has the function of getting data only.

Generally, a complete Source is customized by implementing the RichSourceFunction class, which has the capabilities of both RichFunction and SourceFunction. Therefore, to enable Nebula Graph to be a Source of Flink, we must implement the methods provided in RichSourceFunction.

1.3 Implementation of Nebula Graph Source

Two options are provided for you to use the custom Nebula Graph Source implemented in Nebula Flink Connector, namely addSource and createInput.

Here is the class diagram of Nebula Graph Source implementation. The image is too large to be uploaded here.

addSource

This method is implemented through the NebulaSourceFunction class, which inherits from RichSourceFunction and realizes these methods:

  • open: Prepares information for connecting to Nebula Graph and obtains the connection to the Meta Service and the Storage Service.
  • close: Releases resources after data reading is done. Closes the connection to Nebula Graph.
  • run: Starts data reading and fills the data into sourceContext.
  • cancel: Called to release the resources when the Flink job is canceled.

createInput

This method is implemented through the NebulaInputFormat class, which inherits from RichInputFormat and realizes these methods:

  • openInputFormat: Prepares inputFormat and obtains the connection.
  • closeInputFormat: Releases resources after data reading is done and closes the connection to Nebula Graph.
  • getStatistics: Obtains the basic statistics of the source data.
  • createInputSplits: Creates genericInputSplit based on the configured partition number.
  • getInputSplitAssigner: Returns the input split assigners and returns all splits of the source in the order of the original computing.
  • open: Starts reading data of inputFormat, converts the format of the data into Flink format, and constructs iterators.
  • close: Prints the reading logs after data reading is done.
  • reachedEnd: Determines whether the reading is done or not.
  • nextRecord: Reads the next record through the iterator.

The data read from Source with addSource is DataStreamSource of Flink, which indicates the starting point of the data flow.

The data read with createInput is DataSource of Flink, which is an operator to create new datasets. This operator can be used for further conversion. DataSource can be used for other operations with withParameters.

Use Custom Nebula Graph Source

To enable Flink to read data from Nebula Graph, NebulaSourceFunction and NebulaOutputFormat must be constructed, and Nebula Graph must be registered as the data source with the addSource or createInput method.

To construct NebulaSourceFunction and NebulaOutputFormat, you must configure the client parameters and execution parameters as follows:

  • NebulaClientOptions

    • Specifies the IP address of the Meta Service of Nebula Graph is necessary for NebulaSource.
    • username
    • password
  • VertexExecutionOptions

    • Specifies the name of a Nebula Graph space to be read.
    • Specifies a tag to be read.
    • Specifies properties of the tag.
    • Determine reading all the properties or not. The default value is false. If this parameter is set to true, the Fields configuration is not effective.
    • Specifies the maximum size of each batch to be read. The default value is 2000.
  • EdgeExecutionOptions

    • Specifies the name of a Nebula Graph space to be read.
    • Specifies an edge type to be read.
    • Specifies properties of the edge type.
    • Determine whether reading all the properties or not. The default value is false. If this parameter is set to true, the property configuration is not effective.
    • Specifies the maximum size of each batch to be read. The default value is 2000.
// Contructs the necessary parameters for the Nebula Graph client
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions
                .NebulaClientOptionsBuilder()
                .setAddress("127.0.0.1:45500")
                .build();

// Creates connectionProvider
NebulaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);

// Constructs the necessary parameters for reading data from Nebula Graph
List<String> cols = Arrays.asList("name", "age");
VertexExecutionOptions sourceExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSource")
                .setTag(tag)
                .setFields(cols)
                .setLimit(100)
                .builder();

// Constructs NebulaInputFormat
NebulaInputFormat inputFormat = new NebulaInputFormat(metaConnectionProvider)
                .setExecutionOptions(sourceExecutionOptions);

// Method 1: Uses createInput to enable Flink to read data from Nebula Graph
DataSource<Row> dataSource1 = ExecutionEnvironment.getExecutionEnvironment()
         .createInput(inputFormat);
 
// Method 2: Uses addSource to enable Flink to read data from Nebula Graph
NebulaSourceFunction sourceFunction = new NebulaSourceFunction(metaConnectionProvider)
                .setExecutionOptions(sourceExecutionOptions);
 DataStreamSource<Row> dataSource2 = StreamExecutionEnvironment.getExecutionEnvironment()
         .addSource(sourceFunction);


You can edit the Nebula Source Demo, package it, and submit it to a Flink cluster for execution.

The demo program reads the vertex data from Nebula Graph and prints them. In this Flink job, Nebula Graph is the Source, and print is the Sink. The execution result is as follows.

Execution Result

The Source sent 59,671,064 records and the Sink received 59,671,064 records.

Connector Sink

For Nebula Flink Connector, the Sink is Nebula Graph. Flink provides users with rich Connector components to customize data pools to receive data flow processed by Flink.

About Sink

A Sink is the output of the data processed by Flink. It is mainly responsible for the output and persistence of real-time data processing. For example, Sinks support writing data to stdout, to files, to sockets, and to external systems.

A Sink of Flink works by calling write related APIs or the DataStream.addSink method to implement writing data flow to an external store. Like the Source of a Flink Connector, a Sink also allows users to customize external storage systems to be a data pool of Flink.

How to use Flink Sink is shown in this figure.

Nebula Graph Sink Diagram

This section focuses on how to enable Nebula Graph to be a Sink of Flink.

Customize Sink

In Flink, you can use DataStream.addSink or DataStream.writeUsingOutputFormat to write Flink data flow into a user-defined data pool.

To customize a Sink, you can use the built-in Sink Functions, or implement SinkFunction and inherit from RichOutputFormat.

Implementation of Nebula Graph Sink

In Nebula Flink Connector, NebulaSinkFunction is implemented. Developers can call DataSource.addSink and pass it in the NebulaSinkFunction object as a parameter to write the Flink data flow to Nebula Graph.

Nebula Flink Connector is developed based on Flink 1.11-SNAPSHOT. In this version, writeUsingOutputFormat is deprecated (Here is its source code for your reference). If you want to use Nebula Graph Sink, you must use DataStream.addSink.

  /** @deprecated */
    @Deprecated
    @PublicEvolving
    public DataStreamSink<T> writeUsingOutputFormat(OutputFormat<T> format) {
        return this.addSink(new OutputFormatSinkFunction(format));
    }


See here for the class diagram of Nebula Graph Sink. The image is too large to be uploaded here.

In the class diagram, NebulaSinkFunction and NebulaBatchOutputFormat are the most important classes.

NebulaSinkFunction inherits from AbstractRichFunction and these methods are implemented:

  • open: Prepares resources by calling the open method of NebulaBatchOutputFormat.
  • close: Releases resources by calling the close method of NebulaBatchOutputFormat.
  • invoke: Writes data by calling the write method of NebulaBatchOutputFormat. This is the core method of Sink.
  • flush: Submits data by calling the flush method of NebulaBatchOutputFormat.

NebulaBatchOutputFormat inherits from AbstractNebulaOutPutFormat, and the latter inherits from RichOutputFormat. These methods are implemented:

  • open: Prepares the connection to the Graph Service of Nebula Graph and initializes nebulaBatchExecutor.
  • close: Submits the last batch of data, waits for the callback result, and closes the connection to Nebula Graph.
  • writeRecord: A core method. Writes data into nebulaBufferedRow and submits a job to write data when the data size reaches the configured limit. The writing process of Nebula Graph Sink is asynchronous, so a callback to obtain the execution result is necessary.
  • flush: Writes data to Nebula Graph when data exists in nebulaBufferedRow.

In AbstractNebulaOutputFormat, NebulaBatchExecutor is called for batch management and batch submission of data, and the result of a batch submission is received by defining a callback function. The code is as follows.

 /**
     * write one record to buffer
     */
    @Override
    public final synchronized void writeRecord(T row) throws IOException {
        nebulaBatchExecutor.addToBatch(row);

        if (numPendingRow.incrementAndGet() >= executionOptions.getBatch()) {
            commit();
        }
    }

    /**
     * put record into buffer
     *
     * @param record represent vertex or edge
     */
    void addToBatch(T record) {
        boolean isVertex = executionOptions.getDataType().isVertex();

        NebulaOutputFormatConverter converter;
        if (isVertex) {
            converter = new NebulaRowVertexOutputFormatConverter((VertexExecutionOptions) executionOptions);
        } else {
            converter = new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) executionOptions);
        }
        String value = converter.createValue(record, executionOptions.getPolicy());
        if (value == null) {
            return;
        }
        nebulaBufferedRow.putRow(value);
    }

    /**
     * commit batch insert statements
     */
    private synchronized void commit() throws IOException {
        graphClient.switchSpace(executionOptions.getGraphSpace());
        future = nebulaBatchExecutor.executeBatch(graphClient);
        // clear waiting rows
        numPendingRow.compareAndSet(executionOptions.getBatch(),0);
    }

    /**
     * execute the insert statement
     *
     * @param client Asynchronous graph client
     */
    ListenableFuture executeBatch(AsyncGraphClientImpl client) {
        String propNames = String.join(NebulaConstant.COMMA, executionOptions.getFields());
        String values = String.join(NebulaConstant.COMMA, nebulaBufferedRow.getRows());
        // construct insert statement
        String exec = String.format(NebulaConstant.BATCH_INSERT_TEMPLATE, executionOptions.getDataType(), executionOptions.getLabel(), propNames, values);
        // execute insert statement
        ListenableFuture<Optional<Integer>> execResult = client.execute(exec);
        // define callback function
        Futures.addCallback(execResult, new FutureCallback<Optional<Integer>>() {
            @Override
            public void onSuccess(Optional<Integer> integerOptional) {
                if (integerOptional.isPresent()) {
                    if (integerOptional.get() == ErrorCode.SUCCEEDED) {
                        LOG.info("batch insert Succeed");
                    } else {
                        LOG.error(String.format("batch insert Error: %d",
                                integerOptional.get()));
                    }
                } else {
                    LOG.error("batch insert Error");
                }
            }

            @Override
            public void onFailure(Throwable throwable) {
                LOG.error("batch insert Error");
            }
        });
        nebulaBufferedRow.clean();
        return execResult;
    }


Nebula Graph Sink writes data in batches and asynchronously, so the Nebula Graph client must not be closed before the write job is submitted. To prevent this, it is necessary to submit data in the cache in batches and wait for the write operation to complete before the resources are released at the end of the business. The code is as follows:

/**
     * commit the batch write operator before release connection
     */
    @Override
    public  final synchronized void close() throws IOException {
        if(numPendingRow.get() > 0){
            commit();
        }
        while(!future.isDone()){
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                LOG.error("sleep interrupted, ", e);
            }
        }

        super.close();
    }


2.4 Use Custom Nebula Graph Sink

When Flink uses the Sink to write data to Nebula Graph, it is necessary to map the Flink data flow to the data format that Nebula Graph Sink can recognize. To use the custom Nebula Graph Sink, you must pass the NebulaSinkFunction as a parameter to the addSink method to write the Flink data flow.

  • NebulaClientOptions:
    • Address: Specifies the IP addresses of the Graph Service of Nebula Graph.
    • username
    • password
  • VertexExecutionOptions:
    • Specifies the name of a Nebula Graph space to write data in.
    • Specifies a tag to be written.
    • Specifies properties of the tag.
    • Specifies the index on the Flink data row where the vertex to be written is.
    • Specifies the vertex data size limit to be written at a time. The default value is 2000.
  • EdgeExecutionOptions:
    • Specifies the name of a Nebula Graph space to write data in.
    • Specifies an edge type to be written.
    • Specifies properties of the edge type.
    • Specifies the index on the Flink data row where the source vertices to be written are.
    • Specifies the index on the Flink data row where the destination vertices to be written are.
    • Specifies the index on the Flink data row where the ranking information of the edges is.
    • Specifies the edge data size limit to be written at a time. The default value is 2000.
/// The necessary parameters for connecting to the Nebula Graph client
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions
                .NebulaClientOptionsBuilder()
                .setAddress("127.0.0.1:3699")
                .build();
NebulaConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);

// The necessary parameters for writing data to Nebula Graph
List<String> cols = Arrays.asList("name", "age")
ExecutionOptions sinkExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSink")
                .setTag(tag)
                .setFields(cols)
                .setIdIndex(0)
                .setBatch(20)
                .builder();
  
// Writes data to Nebula Graph
dataSource.addSink(nebulaSinkFunction);


You can use the demo program of Nebula Graph Sink to transfer data between two graph spaces of Nebula Graph. The flinkSource graph space is the Source. After Flink reads its data and operates map conversion, all the data is written to another graph space, flinkSink, the Sink.

Catalog

Before Flink 1.11.0, to use Flink to read data from or write data to an external system, you must manually read the schema of the source. For example, if you want to read data from or write data to Nebula Graph, you must know the schema before the read or write operation. If the schema in Nebula Graph is updated, you must manually update the Flink job to make sure that types are matching. Any mismatch may cause failures in the job. Such operation is redundant and tedious.

Since from Flink 1.11.0, Flink Connectors can automatically obtain the schema of the data source or match the data without knowing the schema of an external storage system.

For now, Nebula Flink Connector can be used to read data from and write data to Nebula Graph. But if we want to implement schema matching, Catalog management is necessary. To make sure that data in Nebula Graph is safe, Nebula Flink Connector only supports reading Catalog, but not editing or writing Catalog.

When Nebula Flink Connector gets access to data of some specified types, the complete path is as follows: <graphSpace>.<VERTEX.tag> or <graphSpace>.<EDGE.edge>.

It can be used as follows:

String catalogName  = "testCatalog";
String defaultSpace = "flinkSink";
String username     = "root";
String password     = "nebula";
String address      = "127.0.0.1:45500";
String table        = "VERTEX.player"

// define Nebula catalog
Catalog catalog = NebulaCatalogUtils.createNebulaCatalog(catalogName,defaultSpace, address, username, password);
// define Flink table environment
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(bsEnv);
// register customed nebula catalog
tEnv.registerCatalog(catalogName, catalog);
// use customed nebula catalog
tEnv.useCatalog(catalogName);

// show graph spaces of Nebula Graph
String[] spaces = tEnv.listDatabases();

// show tags and edges of Nebula Graph
tEnv.useDatabase(defaultSpace);
String[] tables = tEnv.listTables();

// check tag player exist in defaultSpace
ObjectPath path = new ObjectPath(defaultSpace, table);
assert catalog.tableExists(path) == true
    
// get nebula tag schema
CatalogBaseTable table = catalog.getTable(new ObjectPath(defaultSpace, table));
table.getSchema();


For more Catalog interfaces supported by Nebula Flink Connector, see NebulaCatalog.java.

Exactly-once

For a Flink Connector, Exactly-once means that, because of the checkpoint algorithm, Flink ensures that each incoming event affects the final result only once. Any fault in the data processing will not cause data duplication or loss.

To provide end-to-end Exactly-once semantics, an external storage system of Flink must provide a means to commit or roll back writes that coordinate with the checkpoints of Flink. Flink provides an abstraction to implement the end-to-end Exactly-once, namely TwoPhaseCommitSinkFunction, to implement a two-phase commit.

To implement Exactly-once semantics for data output, these four functions must be implemented:

  • beginTransaction: Creates a temporary file under the temporary directory on the destination file system before the transaction begins. Then writes data in this file during processing the data.
  • preCommit: Closes the file and stops writing in the pre-commit phase. Starts a new transaction to write any subsequent data for the next checkpoint.
  • commit: Moves the files processed in the pre-commit phase atomically to the actual destination directory in the commit phase. The two-phase process increases the visibility latency of the output data.
  • abort: Deletes temporary files in the termination phase.

From the preceding methods, the two-phase commit mechanism of Flink requires that the Source support data retransmission and the Sink supports transaction commit and idempotent write.

Nebula Graph v1.1.0 does not support transactions, but it supports idempotent write, which means one record can be written multiple times and the record is consistent. Therefore, the At-least-Once mechanism of Nebula Flink Connector can be implemented through the checkpoint algorithm, and the Exactly-Once of Sink can be indirectly implemented with the idempotency of multiple writes.

To use the fault tolerance feature of Nebula Graph Sink, make sure that the checkpoint is enabled in the Flink execution environment.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000) // checkpoint every 10000 msecs
   .getCheckpointConfig()
   .setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);


Reference

  • Nebula Source Demo [testNebulaSource]:https://github.com/vesoft-inc/nebula-java/blob/v1.0/examples/src/main/java/org/apache/flink/FlinkDemo.java
  • Nebula Sink Demo [testSourceSink]:https://github.com/vesoft-inc/nebula-java/blob/v1.0/examples/src/main/java/org/apache/flink/FlinkDemo.java
  • Apache Flink:https://github.com/apache/flink
  • Flink Documentation:https://flink.apache.org/flink-architecture.html and https://ci.apache.org/projects/flink/flink-docs-release-1.12/
  • flink-connector-jdbc:https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-jdbc

If you are interested in this topic and would like to talk to Nicole about Nebula Graph and Flink, join the Nebula Graph Slack channel.

Nebula (computing) Data processing Connector (mathematics) Database Graph (Unix) Implementation Stream processing

Published at DZone with permission of Nicole Wang. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Exploring the Dynamics of Streaming Databases
  • What Is a Streaming Database?
  • Designing High-Volume Systems Using Event-Driven Architectures
  • Next-Gen Data Pipes With Spark, Kafka and k8s

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!