DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Join us today at 1 PM EST: "3-Step Approach to Comprehensive Runtime Application Security"
Save your seat
  1. DZone
  2. Software Design and Architecture
  3. Cloud Architecture
  4. How to Run Any Dockerized Application on Spring Cloud Data Flow

How to Run Any Dockerized Application on Spring Cloud Data Flow

Let's address the issue of including any Docker image while building data pipelines using Spring Cloud Data Flow, which can natively interpret only Spring Boot images.

Randhir Singh user avatar by
Randhir Singh
·
Sep. 19, 18 · Tutorial
Like (4)
Save
Tweet
Share
10.37K Views

Join the DZone community and get the full member experience.

Join For Free

Spring Cloud Data Flow (SCDF) provides tools to build streaming and batch data pipelines. The data processing pipelines could be deployed on top of a container orchestration engine, for example,  Kubernetes. Spring Boot applications could be one of the starter apps or custom boot applications packaged as Docker images if Kubernetes is the runtime for these pipelines. However, Python is a popular data munging tool, but SCDF cannot natively interpret Python Docker images. In this article, we will create a streaming data pipeline that starts a Python Docker image in one of the intermediate steps and use the result of computation in downstream components.

Scenario

Let us consider a scenario where we are receiving data from sensors that generate sensor data tags as time series data. The sensor data is a JSON array, an example of which is shown below.

{  
   "tags":[  
      {  
         "tagId":"d624523e-29eb-4d23-86eb-08d7b881a3a8",
         "tagType":0,
         "timestamp":1528927600678,
         "value":"10.0",
         "quality":3,
         "datatype":"String",
         "additionalProperties":{  

         }
      }
   ]
}

Our data pipeline consists of accepting this JSON array via an HTTP POST. Let there be a configurable parameter, say  value-threshold , whose value is the upper limit for the event to be valid. For each event in the array, check to see if the value is less than the value of  value-threshold , say, 12, and add a property in the event indicating if it is valid or not. Finally, send the enriched event to a sink.

To develop the pipeline, we'll create a custom processor using familiar Spring Boot style called  data-quality , dockerize it and push to the Docker Hub. Register the processor with SCDF server using it's Docker URI. The pipeline can then be built with the SCDF DSL as shown below.

dataflow:>app register --type processor --name data-quality --uri docker:randhirkumars/data-quality:latest
dataflow:>stream create --name dataquality --definition "http --port=8086 | splitter --expression=#jsonPath(payload,'$.tags') --outputType=application/json | data-quality --value-threshold=12 --outputType=application/json | log"

and the corresponding graphical representation of the same data pipeline.

Image title

The data-quality processor sets a property to each item that indicates if it is valid or not. The code is shown below.

@EnableConfigurationProperties(SensorReadingProperties.class)
@EnableBinding(Processor.class)
@SpringBootApplication
public class DataQualityApplication {

    public static void main(String[] args) {
        SpringApplication.run(DataQualityApplication.class, args);
    }

    @Autowired
    private SensorReadingProperties properties;

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public SensorReading SetValidity(SensorReading r) {

        Double validThreshold = properties.getValueThreshold();
        if (!isValid(r, String.valueOf(validThreshold))) {
            r.setInvalid(true);
        }
        return r;
    }

    private boolean isValid(SensorReading r, String threshold) {
      return Double.compare(Double.valueOf(r.getValue()), Double.valueOf(threshold)) > 0;
    }
}

This is all fine with Spring Boot processor with a simple logic to check validity. What if the logic to determine validity has been comprehensively developed by analysts and it is available as a Python Docker image. For the sake of illustration, let the Python code be as is shown below.

import os
import json

try:
    data = json.loads(os.environ["RECORD"])
    threshold = float(os.environ["THRESHOLD"])

    print "VALID" if float(data["value"]) < threshold else "INVALID"
except:
    print "INVALID"

The item and the threshold parameters are passed as environment variables to the Docker image. The validation result is printed to the console. How can we include this Docker image in our data pipeline?

Generic Processor

To address this issue, we'll modify our Spring Boot processor to remove the validation logic written in Java. In its place, we run the Python Docker image and use its result in setting the validation property of the event. To run a Docker image in Java, we use Docker API for Java. For this to work, we'll have to use Docker-in-Docker (DinD) as the base image for our Spring Boot application. To dockerize our application, Google Jib Maven plugin is used. The plugin configuration in application's  pom.xmlis shown below.

<plugin>
    <groupId>com.google.cloud.tools</groupId>
    <artifactId>jib-maven-plugin</artifactId>
    <version>0.9.9</version>
    <configuration>
        <from>
            <image>npalm/dind-java</image>
        </from>
        <to>
            <image>data-quality</image>
        </to>
    </configuration>
</plugin>

The modified isValid  method is shown below.

    private boolean isValid(SensorReading r, String threshold) {

    Boolean valid = true;
        String sensorReadingStr = r.toString();
        DockerClient dockerClient = DockerClientBuilder.getInstance().build();

        try {

            dockerClient
                    .pullImageCmd("randhirkumars/python-docker")
                    .withTag("latest")
                    .exec(new PullImageResultCallback() {
                        @Override
                        public void onNext(PullResponseItem item) {
                            super.onNext(item);
                            System.out.println(item.getStatus());
                        }
                    }).awaitCompletion();

            CreateContainerResponse createContainerResponse = dockerClient
                    .createContainerCmd("randhirkumars/python-docker")
                    .withEnv("RECORD=" + sensorReadingStr, "THRESHOLD=" + threshold)
                    .withBinds(new Bind("/var/run/docker.sock", new Volume("/var/run/docker.sock")))
                    .exec();

            dockerClient
                    .startContainerCmd(createContainerResponse.getId())
                    .exec();

            dockerClient
                    .waitContainerCmd(createContainerResponse.getId())
                    .exec(new WaitContainerResultCallback())
                    .awaitCompletion();

            final List<Frame> loggingFrames = getLoggingFrames(dockerClient, createContainerResponse.getId());

            for (final Frame frame : loggingFrames) {

                if (frame.toString().indexOf("INVALID") > 0) {
                    valid = false;
                }
            }
        } catch (Exception e) {
            valid = false;
        }

        return valid;
    }

    private List<Frame> getLoggingFrames(DockerClient dockerClient, String containerId) throws Exception {

        FrameReaderITestCallback collectFramesCallback = new FrameReaderITestCallback();

        dockerClient.logContainerCmd(containerId).withStdOut(true).withStdErr(true)
                .withTailAll()
                .exec(collectFramesCallback).awaitCompletion();

        return collectFramesCallback.frames;
    }

    public static class FrameReaderITestCallback extends LogContainerResultCallback {

        public List<Frame> frames = new ArrayList<>();

        @Override
        public void onNext(Frame item) {
            frames.add(item);
            super.onNext(item);
        }
    }

To test the pipeline, find out the IP of the http source Kubernetes Pod, and send an HTTP POST as shown below.

dataflow:>http post --target http://<ip-of-http-source-pod>:8080 --data "{\"tags\":[{\"tagId\":\"d6\",\"tagType\":0,\"timestamp\":1528927600678,\"value\":\"10.0\",\"quality\":3,\"datatype\":\"String\",\"additionalProperties\":{}}]}"

Check the logs of the log   sink.

dataflow:>! kubectl logs dataquality-log-65ccf64699-9dd7s

The logs should display the enriched sensor item reading with it's validating property set.

2018-09-10 11:44:19.163  INFO 1 --- [y.dataquality-1] log-sink   
: {"tagId":"d6","tagType":0,"timestamp":1528927600678,"value":"10.0","quality":3,"datatype":"String","additionalProperties":{},"invalid":true}

Conclusion

In this article, we addressed the issue of including any Docker image while building data pipelines using Spring Cloud Data Flow which can natively interpret only Spring Boot images. We created a Java processor with Docker-in-Docker as the base image that allowed us to use Docker API for Java to use any Docker image built for our processing. The code is available on GitHub.

Spring Framework Data processing Spring Cloud application Docker (software) Spring Boot Flow (web browser) Pipeline (software) Kubernetes

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • AIOps Being Powered by Robotic Data Automation
  • Project Hygiene
  • The 31 Flavors of Data Lineage and Why Vanilla Doesn’t Cut It
  • Best Practices for Writing Clean and Maintainable Code

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: