Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Building a Data Mart With Spring Cloud Data Flow

DZone's Guide to

Building a Data Mart With Spring Cloud Data Flow

Lightweight and easy to use, Spring Cloud Data Flow can be used to build streams and connect your data for integrated use with your projects.

· Cloud Zone
Free Resource

Are you joining the containers revolution? Start leveraging container management using Platform9's ultimate guide to Kubernetes deployment.

As a fan of Spring frameworks, I often find myself perusing their projects to see what's updated or new. Recently, I came across the Spring Cloud Data Flow project and initially thought to myself, "Yes, I need this in my life." I appreciate the relatively lightweight approach it takes to using microservices for data orchestration. As I do with many new projects, I decided to get crackin' on a practical example. In this case, I wanted to see what it would look like to support a basic data mart with a star schema using Spring Cloud Data Flow (for the sake of brevity, I'll use "SCDF").

At the moment, SCDF supports the following platforms: Cloud Foundry, Apache YARN, Apache Mesos, and Kubernetes. However, for simplicity, this article will only use a local deployment of SCDF. Also, to make things easier, this project will use Docker Compose to stand up our stack locally, so if you don't already have it installed, you can check it out here: Docker Compose.

Okay, let's get started. You can find the source code for this example here. The simplest way to explain SCDF is that it's an easy way to tie together Spring Cloud Stream applications. These applications are either obtained through Maven as executable JARs or Docker images. The applications have three categories: source, processor, and sink. A source application is one that retrieves data either by obtaining it itself or accepting it from an external source. For this example, we'll be creating an HTTP source that will accept a JSON message via a POST. The message will then be sent to a processor, the car-fact-processor, which will take information from the JSON payload and look up the ID values for the dimensions in order to populate a fact message. Finally, this fact message is sent to a JDBC sink, which will persist the message to the car_fact table.

Because we are using out-of-the-box applications for the sink and the source (which are already documented by Spring), let's talk about the custom processor, car-fact-processor. 

@EnableBinding(Processor.class)
public class CarFactProcessor {

    private static Logger logger = LoggerFactory.getLogger(CarFactProcessor.class);

    @Autowired
    private EngineDimRepository engineDimRepository;

    @Autowired
    private MakeDimRepository makeDimRepository;

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Message<String> transform(Car payload) throws ParseException {
        logger.debug("Transforming payload: " + payload.toString());

        String engineCode = payload.getEngine();
        String make = payload.getMake();

        EngineDimEntity engineDimEntity = engineDimRepository.findByCode(engineCode);
        MakeDimEntity makeDimEntity = makeDimRepository.findByName(make);

        String message = "{" +
                "   \"engine\":\"" + engineDimEntity.getId() + "\"," +
                "   \"make\":" + makeDimEntity.getId() +
                "}";

        logger.debug("CarFactProcessor transformed result: " + message);
        return MessageBuilder.withPayload(message).build();
    }
}


This is a simple Spring Cloud Stream application that takes a Car object, finds the primary keys of the "engine" and "make" dimensions needed, and returns a populated car fact message. In order to retrieve the dimensions, there is a module, star-data-jpa, that contains the Spring Data repositories and entities to handle this functionality. Also of note, caching is valuable here. Looking up the dimensions from the database each time would be expensive, so we setup caching. The CacheConfig class configures the processor to use Redis for caching.

Running All the Things

Firstly, perform a mvn install on the parent pom "spring-data-flow-example." This will build your custom processor used in your new data stream.

Now, let's get this thing up and running. The first thing is to navigate to /spring-data-flow-example/docker/ and run docker-compose up. This will start MySQL, Redis, Zookeeper and Kafka as defined in the docker-compose.yml file. Once these are up and running you can start the local SCDF server and SCDF shell. Navigate to /spring-data-flow-example/local-dataflow-server/ and use the maven command: mvn spring-boot:run. This will start your SCDF server and after a moment you can view it at http://localhost:9393/dashboard/.

You may create your streams through the dashboard, but for the sake of simplicity, we'll create our streams with commands via the SCDF shell. Start up the shell the same way you did the dashboard, by running mvn spring-boot: run but in the /spring-data-flow-example/local-dataflow-server/ directory.

In order to create streams, first we need to register Spring Cloud Stream applications. In your SCDF shell, run:

app import --uri http://bit.ly/stream-applications-kafka-maven 

This will register all the currently available stream applications. Next, we need to register our custom fact processor which was built when we did the Maven install. Run: 

app register

--name car-fact-processor

--type processor

--uri maven://com.github.wkennedy:car-fact-processor:jar:1.0.0-SNAPSHOT\


That will install the car fact processor from our local Maven repository. Now that our apps are ready to go, we can create our stream. The following command will create and deploy a stream. The stream is composed of an HTTP endpoint that accepts JSON, a car fact processor, and a JDBC sink that writes the car fact to the table in MySQL.

http --port=10101

--spring.cloud.stream.bindings.output.contentType='application/json' | car-fact-processor --spring.cloud.stream.bindings.

input.contentType='application/x-java-object;type=com.github.wkennedy.dto.Car' | jdbc --spring.datasource.url=jdbc:mysql://127.0.0.1:3306/galaxy_schema?user=user --spring.datasource.password=pass

--spring.datasource.driver-class-name=org.mariadb.jdbc.Driver

--jdbc.table-name=car_fact --jdbc.columns=engine,make


Once the stream successfully deploys, we can test it out. The stream above created an HTTP endpoint on port 10101 that we can now POST data to.

curl -X POST -H "Content-Type: application/json" -H "Cache-Control: no-cache" 

-H "Postman-Token: 54f725ef-aa53-8f50-63a4-feeb5a5b5a5c"

-d '{ "engine":"SR20", "make":"Nissan" }' "http://localhost:1010


After running this curl statement, you should see a new row in your galaxy_schema.car_fact table. This example comes with the dimensions already populated, but what if we want to add more dimensions? With the built-in apps, it's easily done:

stream create engine-dim-stream --definition "http --port=10102

--spring.cloud.stream.bindings.output.contentType='application/json' | jdbc --spring.datasource.url=jdbc:mysql://127.0.0.1:3306/galaxy_schema?user=user 

--spring.datasource.password=pass

--spring.datasource.driver-class-name=org.mariadb.jdbc.Driver

--jdbc.table-name=engine_dim --jdbc.columns=code,displacement,fuel"

--deploy


You now have a new endpoint to add engines to your engine_dim table. For instance:

curl -X POST -H "Content-Type: application/json" -H "Cache-Control: no-cache" 

-d '{ "code":"ISB 6.7", "displacement":"6.7 litres", "fuel":"diesel" }' "http://localhost:10102"


From here, you can easily create more facts and dimensions. SCDF clears the way to obtain or route the data by different means for whatever scenario you might encounter.

Using Containers? Read our Kubernetes Comparison eBook to learn the positives and negatives of Kubernetes, Mesos, Docker Swarm and EC2 Container Services.

Topics:
cloud ,microservices ,sring cloud data flow ,stream

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}