DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Spring Boot With Kubernetes
  • 7 Microservices Best Practices for Developers
  • Containerization and Helm Templatization Best Practices for Microservices in Kubernetes
  • Spring Cloud Stream: A Brief Guide

Trending

  • Can You Run a MariaDB Cluster on a $150 Kubernetes Lab? I Gave It a Shot
  • Apache Spark 4.0: Transforming Big Data Analytics to the Next Level
  • How GitHub Copilot Helps You Write More Secure Code
  • Security by Design: Building Full-Stack Applications With DevSecOps
  1. DZone
  2. Software Design and Architecture
  3. Cloud Architecture
  4. How to Run Any Dockerized Application on Spring Cloud Data Flow

How to Run Any Dockerized Application on Spring Cloud Data Flow

Let's address the issue of including any Docker image while building data pipelines using Spring Cloud Data Flow, which can natively interpret only Spring Boot images.

By 
Randhir Singh user avatar
Randhir Singh
·
Updated Sep. 19, 18 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
11.7K Views

Join the DZone community and get the full member experience.

Join For Free

Spring Cloud Data Flow (SCDF) provides tools to build streaming and batch data pipelines. The data processing pipelines could be deployed on top of a container orchestration engine, for example,  Kubernetes. Spring Boot applications could be one of the starter apps or custom boot applications packaged as Docker images if Kubernetes is the runtime for these pipelines. However, Python is a popular data munging tool, but SCDF cannot natively interpret Python Docker images. In this article, we will create a streaming data pipeline that starts a Python Docker image in one of the intermediate steps and use the result of computation in downstream components.

Scenario

Let us consider a scenario where we are receiving data from sensors that generate sensor data tags as time series data. The sensor data is a JSON array, an example of which is shown below.

{  
   "tags":[  
      {  
         "tagId":"d624523e-29eb-4d23-86eb-08d7b881a3a8",
         "tagType":0,
         "timestamp":1528927600678,
         "value":"10.0",
         "quality":3,
         "datatype":"String",
         "additionalProperties":{  

         }
      }
   ]
}

Our data pipeline consists of accepting this JSON array via an HTTP POST. Let there be a configurable parameter, say  value-threshold , whose value is the upper limit for the event to be valid. For each event in the array, check to see if the value is less than the value of  value-threshold , say, 12, and add a property in the event indicating if it is valid or not. Finally, send the enriched event to a sink.

To develop the pipeline, we'll create a custom processor using familiar Spring Boot style called  data-quality , dockerize it and push to the Docker Hub. Register the processor with SCDF server using it's Docker URI. The pipeline can then be built with the SCDF DSL as shown below.

dataflow:>app register --type processor --name data-quality --uri docker:randhirkumars/data-quality:latest
dataflow:>stream create --name dataquality --definition "http --port=8086 | splitter --expression=#jsonPath(payload,'$.tags') --outputType=application/json | data-quality --value-threshold=12 --outputType=application/json | log"

and the corresponding graphical representation of the same data pipeline.

Image title

The data-quality processor sets a property to each item that indicates if it is valid or not. The code is shown below.

@EnableConfigurationProperties(SensorReadingProperties.class)
@EnableBinding(Processor.class)
@SpringBootApplication
public class DataQualityApplication {

    public static void main(String[] args) {
        SpringApplication.run(DataQualityApplication.class, args);
    }

    @Autowired
    private SensorReadingProperties properties;

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public SensorReading SetValidity(SensorReading r) {

        Double validThreshold = properties.getValueThreshold();
        if (!isValid(r, String.valueOf(validThreshold))) {
            r.setInvalid(true);
        }
        return r;
    }

    private boolean isValid(SensorReading r, String threshold) {
      return Double.compare(Double.valueOf(r.getValue()), Double.valueOf(threshold)) > 0;
    }
}

This is all fine with Spring Boot processor with a simple logic to check validity. What if the logic to determine validity has been comprehensively developed by analysts and it is available as a Python Docker image. For the sake of illustration, let the Python code be as is shown below.

import os
import json

try:
    data = json.loads(os.environ["RECORD"])
    threshold = float(os.environ["THRESHOLD"])

    print "VALID" if float(data["value"]) < threshold else "INVALID"
except:
    print "INVALID"

The item and the threshold parameters are passed as environment variables to the Docker image. The validation result is printed to the console. How can we include this Docker image in our data pipeline?

Generic Processor

To address this issue, we'll modify our Spring Boot processor to remove the validation logic written in Java. In its place, we run the Python Docker image and use its result in setting the validation property of the event. To run a Docker image in Java, we use Docker API for Java. For this to work, we'll have to use Docker-in-Docker (DinD) as the base image for our Spring Boot application. To dockerize our application, Google Jib Maven plugin is used. The plugin configuration in application's  pom.xmlis shown below.

<plugin>
    <groupId>com.google.cloud.tools</groupId>
    <artifactId>jib-maven-plugin</artifactId>
    <version>0.9.9</version>
    <configuration>
        <from>
            <image>npalm/dind-java</image>
        </from>
        <to>
            <image>data-quality</image>
        </to>
    </configuration>
</plugin>

The modified isValid  method is shown below.

    private boolean isValid(SensorReading r, String threshold) {

    Boolean valid = true;
        String sensorReadingStr = r.toString();
        DockerClient dockerClient = DockerClientBuilder.getInstance().build();

        try {

            dockerClient
                    .pullImageCmd("randhirkumars/python-docker")
                    .withTag("latest")
                    .exec(new PullImageResultCallback() {
                        @Override
                        public void onNext(PullResponseItem item) {
                            super.onNext(item);
                            System.out.println(item.getStatus());
                        }
                    }).awaitCompletion();

            CreateContainerResponse createContainerResponse = dockerClient
                    .createContainerCmd("randhirkumars/python-docker")
                    .withEnv("RECORD=" + sensorReadingStr, "THRESHOLD=" + threshold)
                    .withBinds(new Bind("/var/run/docker.sock", new Volume("/var/run/docker.sock")))
                    .exec();

            dockerClient
                    .startContainerCmd(createContainerResponse.getId())
                    .exec();

            dockerClient
                    .waitContainerCmd(createContainerResponse.getId())
                    .exec(new WaitContainerResultCallback())
                    .awaitCompletion();

            final List<Frame> loggingFrames = getLoggingFrames(dockerClient, createContainerResponse.getId());

            for (final Frame frame : loggingFrames) {

                if (frame.toString().indexOf("INVALID") > 0) {
                    valid = false;
                }
            }
        } catch (Exception e) {
            valid = false;
        }

        return valid;
    }

    private List<Frame> getLoggingFrames(DockerClient dockerClient, String containerId) throws Exception {

        FrameReaderITestCallback collectFramesCallback = new FrameReaderITestCallback();

        dockerClient.logContainerCmd(containerId).withStdOut(true).withStdErr(true)
                .withTailAll()
                .exec(collectFramesCallback).awaitCompletion();

        return collectFramesCallback.frames;
    }

    public static class FrameReaderITestCallback extends LogContainerResultCallback {

        public List<Frame> frames = new ArrayList<>();

        @Override
        public void onNext(Frame item) {
            frames.add(item);
            super.onNext(item);
        }
    }

To test the pipeline, find out the IP of the http source Kubernetes Pod, and send an HTTP POST as shown below.

dataflow:>http post --target http://<ip-of-http-source-pod>:8080 --data "{\"tags\":[{\"tagId\":\"d6\",\"tagType\":0,\"timestamp\":1528927600678,\"value\":\"10.0\",\"quality\":3,\"datatype\":\"String\",\"additionalProperties\":{}}]}"

Check the logs of the log   sink.

dataflow:>! kubectl logs dataquality-log-65ccf64699-9dd7s

The logs should display the enriched sensor item reading with it's validating property set.

2018-09-10 11:44:19.163  INFO 1 --- [y.dataquality-1] log-sink   
: {"tagId":"d6","tagType":0,"timestamp":1528927600678,"value":"10.0","quality":3,"datatype":"String","additionalProperties":{},"invalid":true}

Conclusion

In this article, we addressed the issue of including any Docker image while building data pipelines using Spring Cloud Data Flow which can natively interpret only Spring Boot images. We created a Java processor with Docker-in-Docker as the base image that allowed us to use Docker API for Java to use any Docker image built for our processing. The code is available on GitHub.

Spring Framework Data processing Spring Cloud application Docker (software) Spring Boot Flow (web browser) Pipeline (software) Kubernetes

Opinions expressed by DZone contributors are their own.

Related

  • Spring Boot With Kubernetes
  • 7 Microservices Best Practices for Developers
  • Containerization and Helm Templatization Best Practices for Microservices in Kubernetes
  • Spring Cloud Stream: A Brief Guide

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!