DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
11 Monitoring and Observability Tools for 2023
Learn more
  1. DZone
  2. Data Engineering
  3. Data
  4. Data Flow Tutorial: Dealing With BigQuery Schema Changes

Data Flow Tutorial: Dealing With BigQuery Schema Changes

Learn how to tackle the challenge of changing requirements in your data flow system using the popular PaaS system, BigQuery.

Brachi Packter user avatar by
Brachi Packter
·
Mar. 13, 19 · Tutorial
Like (2)
Save
Tweet
Share
7.03K Views

Join the DZone community and get the full member experience.

Join For Free

Imagine you have a pipeline that writes data from a PubSub/Kafka instance, or any other source, really, to BigQuery, and everything streams well. 

Then, suddenly, you learn that your source JSON event had changed, and a new field should be added.

Now what?

You can stop the job, update it with the new field, and rerun it. I’m sure you don’t want to do this for every change, so you set out to look for a better solution.

Here I’ll describe how I overcame this very predicament.

The Design

We will keep a GSBlob file that contains the schema definition.

We will load the file in the Stream Pipeline, and track for updates.

For any event, we will take only the fields that appear in the loaded GSBlob schema. we will ignore any field in the event that doesn’t appear in the schema configuration file.

And, when we will have a schema change, first, we will need to add the column manually to the BigQuery table and update the Blob file that contains the schema definition (we have a script that does this, using bq update and gsutil cp). Our job will identify that the Blob file has been updated and reload the configuration. Now it can extract any new field added to the event and send it to BigQuery.

Why Didn’t I Choose a Fully Automated Solution?

If I would take any new field in a JSON event that doesn’t appear in the target BigQuery table, I’m opening myself up to fraud. What does this mean? Imagine that some hacker sends dummy events to my server with fake fields, f1,f2,f3… f10 and I treat these fields as new fields, creating columns for them in my BigQuery table automatically. Then, my table will become garbaged and can also reach some quotas limits. I need control over this, and, therefore, I must enforce the manual updating of the BigQuery schema and Blob file. In this way, I can be sure I didn’t put garbaged columns into my BigQuery table.

Let’s Look Into the Class That Loads the Field's Configuration

You can see that this is a singleton class and that it, on construction, loads the data and starts to wait for changes. 

public class FieldsConfigurationProvider  {

    private Logger logger = LoggerFactory.getLogger(FieldsConfigurationProvider.class);
    private Long lastUpdateTime;
    private FieldsConfiguration config;

    private static FieldsConfigurationProvider instance = null;
    public static String bucketName;
    public static  String filePath;
    private static long fieldsConfigLoadInterval;
    private final static Object lock = new Object();

    public static FieldsConfigurationProvider getInstance() {
        if (instance == null) {
            synchronized (lock) {
                if (instance == null) {
                    instance = new FieldsConfigurationProvider();
                }
            }
        }
        return instance;
    }
    private FieldsConfigurationProvider(){
        config = loadConfig(filePath);
        watch();
    }

    public static void init(String bucketName, long fieldsConfigLoadInterval, String filePath ) {

        FieldsConfigurationProvider.bucketName = bucketName;
        FieldsConfigurationProvider.fieldsConfigLoadInterval = fieldsConfigLoadInterval;
        FieldsConfigurationProvider.filePath=filePath;
    }



    private void watch() {
        new Thread(() -> {

            logger.info("start watching for bucket changes....");

            Storage storage = StorageOptions.getDefaultInstance().getService();
            Bucket bucket = storage.get(bucketName);

            while(true) {
                logger.debug("Checking for new fields configuration in the bucket");
                Blob blob = bucket.get(filePath);
                long currentTime = blob.getUpdateTime();
                if (currentTime > lastUpdateTime) {
                    logger.info("Found new fields configuration in the bucket");
                    config = loadConfig(filePath);
                } else {
                    logger.debug("Didn't find new fields configuration in bucket");
                }
                try {
                    Thread.sleep(fieldsConfigLoadInterval);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }



    private FieldsConfiguration loadConfig(String path) {
        try {
            ObjectMapper mapper = new ObjectMapper();

            logger.info("Reading fields configuration from gs://" + bucketName + "/" + path);

            Storage storage = StorageOptions.getDefaultInstance().getService();
            Bucket bucket = storage.get(bucketName);
            Blob blob = bucket.get(path);
            lastUpdateTime = blob.getUpdateTime();

            ByteArrayInputStream byteStream = new ByteArrayInputStream(blob.getContent());

            return mapper.readValue(byteStream, FieldsConfiguration.class);
        } catch (IOException e) {
            logger.error("Unable reading fields configuration file", e);
        }
        return null;
    }

    public FieldsConfiguration getConfig() {
        return config;
    }


}

Field Config JSON Example

{
  "fields": [
    {
      "jsonpath": "name",
      "columnname": "name",
      "type": "STRING"
    },
    {
      "jsonpath": "price",
      "columnname": "price",
      "type": "FLOAT64"
    },
    {
      "jsonpath": "purches_date",
      "columnname": "purches_date",
      "type": "TIMESTAMP"
    }
}

How We Use the FieldConfig in the Step Function

You can see here that I'm loading the config fields in @setup, while this instance is changed if a new config is loaded.

And while transforming the JsonNode event, I took only fields that appear in the configuration, and transformed them by their type.

public class EventToTableRowTransformerFn extends DoFn<JsonNode, TableRow> {

    private FieldsConfigurationProvider fieldsConfigurationProvider;


    public EventToTableRowTransformerFn(String filedConfigurationBucketName, String filedConfigurationFilePath, Long filedConfigurationCheckInterval) {
        this.filedConfigurationBucketName = filedConfigurationBucketName;
        this.filedConfigurationFilePath = filedConfigurationFilePath;
        this.filedConfigurationCheckInterval = filedConfigurationCheckInterval;
    }

   @Setup
    public void setUp() {
        FieldsConfigurationProvider.init(filedConfigurationBucketName, filedConfigurationCheckInterval, filedConfigurationFilePath);
        fieldsConfigurationProvider = FieldsConfigurationProvider.getInstance();
    }

    @ProcessElement
    public void processElement(@Element JsonNode element, OutputReceiver<TableRow> out, ProcessContext context) {

      TableRow convertedRow = new TableRow();

      FieldsConfiguration config = fieldsConfigurationProvider.getConfig();
        if(config==null){
            throw new RuntimeException("empty config, check configuration file in "+ FieldsConfigurationProvider.bucketName +" file"+ FieldsConfigurationProvider.filePath);
        }
        for (FieldConfig fieldConfig : config.getFields()) {
            JsonNode extracted = element.get(fieldConfig.getJsonpath());

            switch (fieldConfig.getType()) {
                        case STRING:
                            transformer = v -> v;
                            break;
                        case FLOAT64:
                            transformer = Float::parseFloat;
                            break;
                        case INT64:
                             transformer = Long::parseLong;
                            break;
                        case TIMESTAMP:
                            transformer = new DateTransformer();
                            break;
                        case BOOL:
                            transformer = Boolean::valueOf;
                            break;

                    }
             insertValue(FieldConfig, fieldConfig.getColumnname(), convertedRow, transformer);
             context.output(convertedRow);
        }

    }


}

Do We Have a Design Pattern?! Yes!

We found a way to influence our stream by bucket change. This can be relevant to schema change or any other configuration, for example, for Geo IP, we use Maxmind's DB. The IP collection can be updated and I want to be aware of this change in my pipeline and look for IPs in the new DB file. I used the same pattern for it and created a provider that looks for bucket changes and loads them. If I  update the bucket with a new IP DB then my stream will automatically look for the change identify it.

One Disadvantage to Remember

The singleton class that opens up a thread to look for bucket changes isn’t a functional step, and you can’t look into it using the DataFlow graph. No logs, no statistics.

You may try using Side Inputwhich looks into changes and loads them. This solution requires time sliding windows, which can be bad if you are not processing your data in any time window.

Here's an example of SideInput  with time-slicing:

      PCollection<Long> ticks = p
                    // Produce 1 "tick" per second
                    .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1)))
                    // Window the ticks into 1-minute windows
                    .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
                    // Use an arbitrary per-window combiner to reduce to 1 element per window
                    .apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults());

            String bucketName = options.getBigQuerySchemaConfigBucketName().get();

            PCollectionView<FieldsConfiguration> sideView = ticks
                    .apply(MapElements.into(TypeDescriptor.of(FieldsConfiguration.class)).via((Long ignored) -> getFieldsConfiguration(bucketName)))
                    .apply(View.<FieldsConfiguration>asSingleton().withDefaultValue(getFieldsConfiguration(bucketName)));

We can them implement it using the following code: 

.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))) 
.apply("extract-events", ParDo.of(new EventsRowFn(sideView)).withSideInputs(sideView));

And then only the iterate function is changed:

for (FieldConfig fieldConfig : context.sideInput(sideView).getFields()) {
Schema Data (computing) Database Flow (web browser)

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • How To Use Artificial Intelligence to Ensure Better Security
  • LazyPredict: A Utilitarian Python Library to Shortlist the Best ML Models for a Given Use Case
  • UUID: Coordination-Free Unique Keys
  • All the Cloud’s a Stage and All the WebAssembly Modules Merely Actors

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: