DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Essential Guidelines for Building Optimized ETL Data Pipelines in the Cloud With Azure Data Factory
  • Building Robust Real-Time Data Pipelines With Python, Apache Kafka, and the Cloud
  • Blueprint for Migrating On-Premise Data Pipelines To Azure Cloud
  • Testing Serverless Functions

Trending

  • Chat With Your Knowledge Base: A Hands-On Java and LangChain4j Guide
  • Mastering Advanced Traffic Management in Multi-Cloud Kubernetes: Scaling With Multiple Istio Ingress Gateways
  • Debugging With Confidence in the Age of Observability-First Systems
  • Driving DevOps With Smart, Scalable Testing
  1. DZone
  2. Software Design and Architecture
  3. Cloud Architecture
  4. Triggering Dataflow Pipelines With Cloud Functions

Triggering Dataflow Pipelines With Cloud Functions

Love cronjobs? No? Oh. Good! You can combine Google Cloud Functions with a Dataflow Pipeline to make them a thing of the past, not to mention their VMs.

By 
Gareth Jones user avatar
Gareth Jones
·
May. 01, 17 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
12.1K Views

Join the DZone community and get the full member experience.

Join For Free

Do you have an unreasonable fear of cronjobs? Find spinning up VMs to be a colossal waste of your towering intellect? Does the thought of checking a folder regularly for updates fill you with an apoplectic rage? If so, you should probably get some help. Maybe find another line of work.

In the meantime, here’s one way to ease your regular file processing anxieties. With just one application of Google Cloud Functions, eased gently up your Dataflow Pipeline, you can find lasting relief from troublesome cronjobs.

But first, some assumptions.

Assumptions?

You’ve got gcloud installed, you’ve created a project, and you’ve enabled the dataflow and cloud functions APIs. If you haven’t, then a few minutes of reading the Google docs should get you started.

I’m Ready Now

We are going to set up a Google Cloud Function that will get called every time a cloud storage bucket gets updated. That function will kick off a dataflow pipeline (using the handy new templates) to process the file with whatever complicated data transformations somebody further up the food chain has specified.

Hang on, You’ve Lost Me

OK, maybe I should have been a bit more explicit in my assumptions. Here’s a quick cheat sheet:

  • Cloud functions: serverless little bits of code (like Amazon’s Lambdas)
  • Dataflow: a managed service for processing large amounts of data
  • Cloud storage: Google’s equivalent of S3

OK, Carry on, We’re Good

The first step is to create a few buckets (you can call them whatever you want, but these are the names I’ve used in the rest of this article):

  • One for keeping the code for your cloud function: cloud-fn-staging
  • One for keeping the code for your pipeline: pipeline-staging
  • The bucket you want to monitor for updates: incoming-files

If you’re not sure how to create buckets, Google’s docs can help. Or just start clicking around randomly in the cloud console. You’ll work it out. I believe in you, champ.

Now What?

Now we’re going to make a Dataflow Template. If you’ve already got a pipeline, you can use that one, but I mangled one of the example pipelines that Google gives you. There’s a little fiddly detail here: Templates were introduced in version 1.9.0 of the Dataflow Java libraries, so you’ll need at least that version. However, if you go for the latest 2.x version (which is also the default version that the maven archetypes generate), the way to create your template changes. But the Google docs haven’t been updated for that version.

To create the example pipeline, I ran the following maven command:

mvn archetype:generate \
    -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
    -DarchetypeGroupId=com.google.cloud.dataflow \
    -DarchetypeVersion=1.9.0 \
    -DgroupId=com.shinesolutions \
    -DartifactId=dataflow-template-poc \
    -Dversion="0.1" \
    -DinteractiveMode=false \
    -Dpackage=com.shinesolutions


Notice the -DarchetypeVersion=1.9.0 option, which ensures that I’m using that version of the libs. Without that option, you’ll end up with the 2.x versions (which I will explain how to use as well, don’t worry).

This generates some sample code, including the Dataflow standard WordCount. I edited this to make it templatable. I also invented the word templatable, just now. You can use it, though, it’s cool. Here’s the important changes, first in the WordCountOptions interface (in the WordCount class):

public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Validation.Required
    ValueProvider < String > getInputFile();
    void setInputFile(ValueProvider < String > value);

    @Description("Path of the file to write to")
    @Validation.Required
    ValueProvider < String > getOutputFile();
    void setOutputFile(ValueProvider < String > value);
}


Instead of the options just being String values, they are now ValueProvider types. This lets the Runner know that these values will be provided later.

The main method looks like this:

public static void main(String[] args) {
    WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
        .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()).withoutValidation())
        .apply(new CountWords())
        .apply(ParDo.of(new FormatAsTextFn()))
        .apply(TextIO.Write.named("WriteCounts").to(options.getOutputFile()).withoutValidation());

    p.run();
}


The important thing to note here is the .withoutValidation() modifier to the TextIO.Read. If you don’t do that, your template won’t get created because TextIO will try to validate the option values before they’ve been supplied. We’re going to use TextIO in this example, but PubSub and BigQuery input/output in dataflow also support passing in options from templates.

To create our template, run this command:

mvn compile exec:java \
    -Dexec.mainClass=com.shinesolutions.WordCount \
    -Dexec.args="--project=<your project> \
    --stagingLocation=gs://pipeline-staging \
    --dataflowJobFile=gs://pipeline-staging/templates/WordCount \
    --runner=TemplatingDataflowPipelineRunner"


If that succeeded, your template has been created. Remember the value you supplied for the dataflowJobFile option, you’ll need that in your cloud function.

If you chose to use the 2.x version of the libraries, you probably got an error. I did tell you not to, but you knew better, didn’t you? The command for 2.x looks like this:

mvn compile exec:java \
    -Dexec.mainClass=com.shinesolutions.WordCount \
    -Dexec.args="--project=<your project> \
    --stagingLocation=gs://pipeline-staging \
    --templateLocation=gs://pipeline-staging/templates/WordCount \
    --runner=DataflowRunner"


I haven’t tested this with version 2.x, so don’t blame me if that command deletes the Internet or something.

Template's Done. What Next?

Now you need your cloud function. They are written in JavaScript running on Node.js, so you should probably install that. Then, in a suitable directory, run npm init to set up a package.json file, which will tell Google what your dependencies are. It will ask you a lot of questions, but don’t stress about the answers, they’re not a big deal.

Our cloud function is going to talk to the Dataflow API, so you’ll need to install that dependency. Run npm install --save googleapis to get that done. (Confusingly there are two versions of Node.js support from Google, the @google-cloud packages don’t support Dataflow yet though). The cloud function looks like this:

const google = require('googleapis');

exports.goWithTheDataFlow = function(event, callback) {
    const file = event.data;
    if (file.resourceState === 'exists' && file.name) {
        google.auth.getApplicationDefault(function(err, authClient, projectId) {
            if (err) {
                throw err;
            }

            if (authClient.createScopedRequired && authClient.createScopedRequired()) {
                authClient = authClient.createScoped([
                    'https://www.googleapis.com/auth/cloud-platform',
                    'https://www.googleapis.com/auth/userinfo.email'
                ]);
            }

            const dataflow = google.dataflow({
                version: 'v1b3',
                auth: authClient
            });

            dataflow.projects.templates.create({
                projectId: projectId,
                resource: {
                    parameters: {
                        inputFile: `gs://${file.bucket}/${file.name}`,
                        outputFile: `gs://${file.bucket}/${file.name}-wordcount`
                    },
                    jobName: 'cloud-fn-dataflow-test',
                    gcsPath: 'gs://pipeline-staging/templates/WordCount'
                }
            }, function(err, response) {
                if (err) {
                    console.error("problem running dataflow template, error was: ", err);
                }
                console.log("Dataflow template response: ", response);
                callback();
            });

        });
    }
};


We’re going to trigger our cloud function from files being uploaded to a GCS bucket, so goWithTheDataFlow gets called with an event that has a few useful properties. The main one is event.data, which contains the information about the updated resource. We check if the resource exists (because we also get notifications of deletes from the bucket). Then we authenticate – because we’re a cloud function, default application auth is all set up for you – and create a Dataflow API client. Make the call to our dataflow template and we are done. Easy.

Now we upload our function to Google’s cloud with a command that looks like this:

gcloud beta functions deploy goWithTheDataFlow \
    --stage-bucket cloud-fn-staging \
    --trigger-bucket incoming-files


If that all went OK, we should be good to test with a file.

Awesome. How Do I Know It’s Working?

The cloud function logs go to Stackdriver Logging in your Google Cloud console. Upload a file to the bucket, and in a few seconds, you should be able to see some output there. Any errors will appear here, and also trigger an email to you as well (by default anyway). You can see your Dataflow pipeline in the usual Dataflow area of the cloud console, and you can see the output files (if you used the WordCount code from above) in the same bucket as your source file.

Great. What Should I Do Now?

You’ll probably want to get rid of that test cloud function at some point. Do that with a command like this:

gcloud beta functions delete goWithTheDataFlow


Then make a cup of tea. Put your feet up, close your eyes, let your mind relax and just think about all those cronjobs you can delete. Imagine grinding them under your heel, and setting fire to the VMs they run on. Better now? Good.

Cloud Pipeline (software)

Published at DZone with permission of Gareth Jones, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Essential Guidelines for Building Optimized ETL Data Pipelines in the Cloud With Azure Data Factory
  • Building Robust Real-Time Data Pipelines With Python, Apache Kafka, and the Cloud
  • Blueprint for Migrating On-Premise Data Pipelines To Azure Cloud
  • Testing Serverless Functions

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!