Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

How to Run Any Dockerized Application on Spring Cloud Data Flow

DZone's Guide to

How to Run Any Dockerized Application on Spring Cloud Data Flow

Let's address the issue of including any Docker image while building data pipelines using Spring Cloud Data Flow, which can natively interpret only Spring Boot images.

· Cloud Zone ·
Free Resource

Discover a centralized approach to monitor your virtual infrastructure, on-premise IT environment, and cloud infrastructure – all on a single platform.

Spring Cloud Data Flow (SCDF) provides tools to build streaming and batch data pipelines. The data processing pipelines could be deployed on top of a container orchestration engine, for example,  Kubernetes. Spring Boot applications could be one of the starter apps or custom boot applications packaged as Docker images if Kubernetes is the runtime for these pipelines. However, Python is a popular data munging tool, but SCDF cannot natively interpret Python Docker images. In this article, we will create a streaming data pipeline that starts a Python Docker image in one of the intermediate steps and use the result of computation in downstream components.

Scenario

Let us consider a scenario where we are receiving data from sensors that generate sensor data tags as time series data. The sensor data is a JSON array, an example of which is shown below.

{  
   "tags":[  
      {  
         "tagId":"d624523e-29eb-4d23-86eb-08d7b881a3a8",
         "tagType":0,
         "timestamp":1528927600678,
         "value":"10.0",
         "quality":3,
         "datatype":"String",
         "additionalProperties":{  

         }
      }
   ]
}

Our data pipeline consists of accepting this JSON array via an HTTP POST. Let there be a configurable parameter, say  value-threshold , whose value is the upper limit for the event to be valid. For each event in the array, check to see if the value is less than the value of  value-threshold , say, 12, and add a property in the event indicating if it is valid or not. Finally, send the enriched event to a sink.

To develop the pipeline, we'll create a custom processor using familiar Spring Boot style called  data-quality , dockerize it and push to the Docker Hub. Register the processor with SCDF server using it's Docker URI. The pipeline can then be built with the SCDF DSL as shown below.

dataflow:>app register --type processor --name data-quality --uri docker:randhirkumars/data-quality:latest
dataflow:>stream create --name dataquality --definition "http --port=8086 | splitter --expression=#jsonPath(payload,'$.tags') --outputType=application/json | data-quality --value-threshold=12 --outputType=application/json | log"

and the corresponding graphical representation of the same data pipeline.

Image title

The data-quality processor sets a property to each item that indicates if it is valid or not. The code is shown below.

@EnableConfigurationProperties(SensorReadingProperties.class)
@EnableBinding(Processor.class)
@SpringBootApplication
public class DataQualityApplication {

    public static void main(String[] args) {
        SpringApplication.run(DataQualityApplication.class, args);
    }

    @Autowired
    private SensorReadingProperties properties;

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public SensorReading SetValidity(SensorReading r) {

        Double validThreshold = properties.getValueThreshold();
        if (!isValid(r, String.valueOf(validThreshold))) {
            r.setInvalid(true);
        }
        return r;
    }

    private boolean isValid(SensorReading r, String threshold) {
      return Double.compare(Double.valueOf(r.getValue()), Double.valueOf(threshold)) > 0;
    }
}

This is all fine with Spring Boot processor with a simple logic to check validity. What if the logic to determine validity has been comprehensively developed by analysts and it is available as a Python Docker image. For the sake of illustration, let the Python code be as is shown below.

import os
import json

try:
    data = json.loads(os.environ["RECORD"])
    threshold = float(os.environ["THRESHOLD"])

    print "VALID" if float(data["value"]) < threshold else "INVALID"
except:
    print "INVALID"

The item and the threshold parameters are passed as environment variables to the Docker image. The validation result is printed to the console. How can we include this Docker image in our data pipeline?

Generic Processor

To address this issue, we'll modify our Spring Boot processor to remove the validation logic written in Java. In its place, we run the Python Docker image and use its result in setting the validation property of the event. To run a Docker image in Java, we use Docker API for Java. For this to work, we'll have to use Docker-in-Docker (DinD) as the base image for our Spring Boot application. To dockerize our application, Google Jib Maven plugin is used. The plugin configuration in application's  pom.xmlis shown below.

<plugin>
    <groupId>com.google.cloud.tools</groupId>
    <artifactId>jib-maven-plugin</artifactId>
    <version>0.9.9</version>
    <configuration>
        <from>
            <image>npalm/dind-java</image>
        </from>
        <to>
            <image>data-quality</image>
        </to>
    </configuration>
</plugin>

The modified isValid  method is shown below.

    private boolean isValid(SensorReading r, String threshold) {

    Boolean valid = true;
        String sensorReadingStr = r.toString();
        DockerClient dockerClient = DockerClientBuilder.getInstance().build();

        try {

            dockerClient
                    .pullImageCmd("randhirkumars/python-docker")
                    .withTag("latest")
                    .exec(new PullImageResultCallback() {
                        @Override
                        public void onNext(PullResponseItem item) {
                            super.onNext(item);
                            System.out.println(item.getStatus());
                        }
                    }).awaitCompletion();

            CreateContainerResponse createContainerResponse = dockerClient
                    .createContainerCmd("randhirkumars/python-docker")
                    .withEnv("RECORD=" + sensorReadingStr, "THRESHOLD=" + threshold)
                    .withBinds(new Bind("/var/run/docker.sock", new Volume("/var/run/docker.sock")))
                    .exec();

            dockerClient
                    .startContainerCmd(createContainerResponse.getId())
                    .exec();

            dockerClient
                    .waitContainerCmd(createContainerResponse.getId())
                    .exec(new WaitContainerResultCallback())
                    .awaitCompletion();

            final List<Frame> loggingFrames = getLoggingFrames(dockerClient, createContainerResponse.getId());

            for (final Frame frame : loggingFrames) {

                if (frame.toString().indexOf("INVALID") > 0) {
                    valid = false;
                }
            }
        } catch (Exception e) {
            valid = false;
        }

        return valid;
    }

    private List<Frame> getLoggingFrames(DockerClient dockerClient, String containerId) throws Exception {

        FrameReaderITestCallback collectFramesCallback = new FrameReaderITestCallback();

        dockerClient.logContainerCmd(containerId).withStdOut(true).withStdErr(true)
                .withTailAll()
                .exec(collectFramesCallback).awaitCompletion();

        return collectFramesCallback.frames;
    }

    public static class FrameReaderITestCallback extends LogContainerResultCallback {

        public List<Frame> frames = new ArrayList<>();

        @Override
        public void onNext(Frame item) {
            frames.add(item);
            super.onNext(item);
        }
    }

To test the pipeline, find out the IP of the http source Kubernetes Pod, and send an HTTP POST as shown below.

dataflow:>http post --target http://<ip-of-http-source-pod>:8080 --data "{\"tags\":[{\"tagId\":\"d6\",\"tagType\":0,\"timestamp\":1528927600678,\"value\":\"10.0\",\"quality\":3,\"datatype\":\"String\",\"additionalProperties\":{}}]}"

Check the logs of the log   sink.

dataflow:>! kubectl logs dataquality-log-65ccf64699-9dd7s

The logs should display the enriched sensor item reading with it's validating property set.

2018-09-10 11:44:19.163  INFO 1 --- [y.dataquality-1] log-sink   
: {"tagId":"d6","tagType":0,"timestamp":1528927600678,"value":"10.0","quality":3,"datatype":"String","additionalProperties":{},"invalid":true}

Conclusion

In this article, we addressed the issue of including any Docker image while building data pipelines using Spring Cloud Data Flow which can natively interpret only Spring Boot images. We created a Java processor with Docker-in-Docker as the base image that allowed us to use Docker API for Java to use any Docker image built for our processing. The code is available on GitHub.

Learn how to auto-discover your containers and monitor their performance, capture Docker host and container metrics to allocate host resources, and provision containers.

Topics:
docker ,kubernetes ,spring cloud data flow ,spring boot ,java ,python ,data pipelines ,tutorial ,cloud

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}