DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Schema Change Management Tools: A Practical Overview
  • Advanced Maintenance of a Multi-Database Citus Cluster With Flyway
  • An Introduction to Type Safety in JavaScript With Prisma
  • Production Database Migration or Modernization: A Comprehensive Planning Guide [Part 2]

Trending

  • Introduction to Tactical DDD With Java: Steps to Build Semantic Code
  • Why Good Models Fail After Deployment
  • Can Claude Skills Replace Playwright Agents? A Practical View for QA Engineers
  • Working With Cowork: Don’t Be Confused
  1. DZone
  2. Data Engineering
  3. Data
  4. Data Flow Tutorial: Dealing With BigQuery Schema Changes

Data Flow Tutorial: Dealing With BigQuery Schema Changes

Learn how to tackle the challenge of changing requirements in your data flow system using the popular PaaS system, BigQuery.

By 
Brachi Packter user avatar
Brachi Packter
·
Mar. 13, 19 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
8.2K Views

Join the DZone community and get the full member experience.

Join For Free

Imagine you have a pipeline that writes data from a PubSub/Kafka instance, or any other source, really, to BigQuery, and everything streams well. 

Then, suddenly, you learn that your source JSON event had changed, and a new field should be added.

Now what?

You can stop the job, update it with the new field, and rerun it. I’m sure you don’t want to do this for every change, so you set out to look for a better solution.

Here I’ll describe how I overcame this very predicament.

The Design

We will keep a GSBlob file that contains the schema definition.

We will load the file in the Stream Pipeline, and track for updates.

For any event, we will take only the fields that appear in the loaded GSBlob schema. we will ignore any field in the event that doesn’t appear in the schema configuration file.

And, when we will have a schema change, first, we will need to add the column manually to the BigQuery table and update the Blob file that contains the schema definition (we have a script that does this, using bq update and gsutil cp). Our job will identify that the Blob file has been updated and reload the configuration. Now it can extract any new field added to the event and send it to BigQuery.

Why Didn’t I Choose a Fully Automated Solution?

If I would take any new field in a JSON event that doesn’t appear in the target BigQuery table, I’m opening myself up to fraud. What does this mean? Imagine that some hacker sends dummy events to my server with fake fields, f1,f2,f3… f10 and I treat these fields as new fields, creating columns for them in my BigQuery table automatically. Then, my table will become garbaged and can also reach some quotas limits. I need control over this, and, therefore, I must enforce the manual updating of the BigQuery schema and Blob file. In this way, I can be sure I didn’t put garbaged columns into my BigQuery table.

Let’s Look Into the Class That Loads the Field's Configuration

You can see that this is a singleton class and that it, on construction, loads the data and starts to wait for changes. 

public class FieldsConfigurationProvider  {

    private Logger logger = LoggerFactory.getLogger(FieldsConfigurationProvider.class);
    private Long lastUpdateTime;
    private FieldsConfiguration config;

    private static FieldsConfigurationProvider instance = null;
    public static String bucketName;
    public static  String filePath;
    private static long fieldsConfigLoadInterval;
    private final static Object lock = new Object();

    public static FieldsConfigurationProvider getInstance() {
        if (instance == null) {
            synchronized (lock) {
                if (instance == null) {
                    instance = new FieldsConfigurationProvider();
                }
            }
        }
        return instance;
    }
    private FieldsConfigurationProvider(){
        config = loadConfig(filePath);
        watch();
    }

    public static void init(String bucketName, long fieldsConfigLoadInterval, String filePath ) {

        FieldsConfigurationProvider.bucketName = bucketName;
        FieldsConfigurationProvider.fieldsConfigLoadInterval = fieldsConfigLoadInterval;
        FieldsConfigurationProvider.filePath=filePath;
    }



    private void watch() {
        new Thread(() -> {

            logger.info("start watching for bucket changes....");

            Storage storage = StorageOptions.getDefaultInstance().getService();
            Bucket bucket = storage.get(bucketName);

            while(true) {
                logger.debug("Checking for new fields configuration in the bucket");
                Blob blob = bucket.get(filePath);
                long currentTime = blob.getUpdateTime();
                if (currentTime > lastUpdateTime) {
                    logger.info("Found new fields configuration in the bucket");
                    config = loadConfig(filePath);
                } else {
                    logger.debug("Didn't find new fields configuration in bucket");
                }
                try {
                    Thread.sleep(fieldsConfigLoadInterval);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }



    private FieldsConfiguration loadConfig(String path) {
        try {
            ObjectMapper mapper = new ObjectMapper();

            logger.info("Reading fields configuration from gs://" + bucketName + "/" + path);

            Storage storage = StorageOptions.getDefaultInstance().getService();
            Bucket bucket = storage.get(bucketName);
            Blob blob = bucket.get(path);
            lastUpdateTime = blob.getUpdateTime();

            ByteArrayInputStream byteStream = new ByteArrayInputStream(blob.getContent());

            return mapper.readValue(byteStream, FieldsConfiguration.class);
        } catch (IOException e) {
            logger.error("Unable reading fields configuration file", e);
        }
        return null;
    }

    public FieldsConfiguration getConfig() {
        return config;
    }


}

Field Config JSON Example

{
  "fields": [
    {
      "jsonpath": "name",
      "columnname": "name",
      "type": "STRING"
    },
    {
      "jsonpath": "price",
      "columnname": "price",
      "type": "FLOAT64"
    },
    {
      "jsonpath": "purches_date",
      "columnname": "purches_date",
      "type": "TIMESTAMP"
    }
}

How We Use the FieldConfig in the Step Function

You can see here that I'm loading the config fields in @setup, while this instance is changed if a new config is loaded.

And while transforming the JsonNode event, I took only fields that appear in the configuration, and transformed them by their type.

public class EventToTableRowTransformerFn extends DoFn<JsonNode, TableRow> {

    private FieldsConfigurationProvider fieldsConfigurationProvider;


    public EventToTableRowTransformerFn(String filedConfigurationBucketName, String filedConfigurationFilePath, Long filedConfigurationCheckInterval) {
        this.filedConfigurationBucketName = filedConfigurationBucketName;
        this.filedConfigurationFilePath = filedConfigurationFilePath;
        this.filedConfigurationCheckInterval = filedConfigurationCheckInterval;
    }

   @Setup
    public void setUp() {
        FieldsConfigurationProvider.init(filedConfigurationBucketName, filedConfigurationCheckInterval, filedConfigurationFilePath);
        fieldsConfigurationProvider = FieldsConfigurationProvider.getInstance();
    }

    @ProcessElement
    public void processElement(@Element JsonNode element, OutputReceiver<TableRow> out, ProcessContext context) {

      TableRow convertedRow = new TableRow();

      FieldsConfiguration config = fieldsConfigurationProvider.getConfig();
        if(config==null){
            throw new RuntimeException("empty config, check configuration file in "+ FieldsConfigurationProvider.bucketName +" file"+ FieldsConfigurationProvider.filePath);
        }
        for (FieldConfig fieldConfig : config.getFields()) {
            JsonNode extracted = element.get(fieldConfig.getJsonpath());

            switch (fieldConfig.getType()) {
                        case STRING:
                            transformer = v -> v;
                            break;
                        case FLOAT64:
                            transformer = Float::parseFloat;
                            break;
                        case INT64:
                             transformer = Long::parseLong;
                            break;
                        case TIMESTAMP:
                            transformer = new DateTransformer();
                            break;
                        case BOOL:
                            transformer = Boolean::valueOf;
                            break;

                    }
             insertValue(FieldConfig, fieldConfig.getColumnname(), convertedRow, transformer);
             context.output(convertedRow);
        }

    }


}

Do We Have a Design Pattern?! Yes!

We found a way to influence our stream by bucket change. This can be relevant to schema change or any other configuration, for example, for Geo IP, we use Maxmind's DB. The IP collection can be updated and I want to be aware of this change in my pipeline and look for IPs in the new DB file. I used the same pattern for it and created a provider that looks for bucket changes and loads them. If I  update the bucket with a new IP DB then my stream will automatically look for the change identify it.

One Disadvantage to Remember

The singleton class that opens up a thread to look for bucket changes isn’t a functional step, and you can’t look into it using the DataFlow graph. No logs, no statistics.

You may try using Side Inputwhich looks into changes and loads them. This solution requires time sliding windows, which can be bad if you are not processing your data in any time window.

Here's an example of SideInput  with time-slicing:

      PCollection<Long> ticks = p
                    // Produce 1 "tick" per second
                    .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1)))
                    // Window the ticks into 1-minute windows
                    .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
                    // Use an arbitrary per-window combiner to reduce to 1 element per window
                    .apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults());

            String bucketName = options.getBigQuerySchemaConfigBucketName().get();

            PCollectionView<FieldsConfiguration> sideView = ticks
                    .apply(MapElements.into(TypeDescriptor.of(FieldsConfiguration.class)).via((Long ignored) -> getFieldsConfiguration(bucketName)))
                    .apply(View.<FieldsConfiguration>asSingleton().withDefaultValue(getFieldsConfiguration(bucketName)));

We can them implement it using the following code: 

.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))) 
.apply("extract-events", ParDo.of(new EventsRowFn(sideView)).withSideInputs(sideView));

And then only the iterate function is changed:

for (FieldConfig fieldConfig : context.sideInput(sideView).getFields()) {
Schema Data (computing) Database Flow (web browser)

Opinions expressed by DZone contributors are their own.

Related

  • Schema Change Management Tools: A Practical Overview
  • Advanced Maintenance of a Multi-Database Citus Cluster With Flyway
  • An Introduction to Type Safety in JavaScript With Prisma
  • Production Database Migration or Modernization: A Comprehensive Planning Guide [Part 2]

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook