Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Data Flow Tutorial: Dealing With BigQuery Schema Changes

DZone's Guide to

Data Flow Tutorial: Dealing With BigQuery Schema Changes

Learn how to tackle the challenge of changing requirements in your data flow system using the popular PaaS system, BigQuery.

· Big Data Zone ·
Free Resource

How to Simplify Apache Kafka. Get eBook.

Imagine you have a pipeline that writes data from a PubSub/Kafka instance, or any other source, really, to BigQuery, and everything streams well. 

Then, suddenly, you learn that your source JSON event had changed, and a new field should be added.

Now what?

You can stop the job, update it with the new field, and rerun it. I’m sure you don’t want to do this for every change, so you set out to look for a better solution.

Here I’ll describe how I overcame this very predicament.

The Design

We will keep a GSBlob file that contains the schema definition.

We will load the file in the Stream Pipeline, and track for updates.

For any event, we will take only the fields that appear in the loaded GSBlob schema. we will ignore any field in the event that doesn’t appear in the schema configuration file.

And, when we will have a schema change, first, we will need to add the column manually to the BigQuery table and update the Blob file that contains the schema definition (we have a script that does this, using bq update and gsutil cp). Our job will identify that the Blob file has been updated and reload the configuration. Now it can extract any new field added to the event and send it to BigQuery.

Why Didn’t I Choose a Fully Automated Solution?

If I would take any new field in a JSON event that doesn’t appear in the target BigQuery table, I’m opening myself up to fraud. What does this mean? Imagine that some hacker sends dummy events to my server with fake fields, f1,f2,f3… f10 and I treat these fields as new fields, creating columns for them in my BigQuery table automatically. Then, my table will become garbaged and can also reach some quotas limits. I need control over this, and, therefore, I must enforce the manual updating of the BigQuery schema and Blob file. In this way, I can be sure I didn’t put garbaged columns into my BigQuery table.

Let’s Look Into the Class That Loads the Field's Configuration

You can see that this is a singleton class and that it, on construction, loads the data and starts to wait for changes. 

public class FieldsConfigurationProvider  {

    private Logger logger = LoggerFactory.getLogger(FieldsConfigurationProvider.class);
    private Long lastUpdateTime;
    private FieldsConfiguration config;

    private static FieldsConfigurationProvider instance = null;
    public static String bucketName;
    public static  String filePath;
    private static long fieldsConfigLoadInterval;
    private final static Object lock = new Object();

    public static FieldsConfigurationProvider getInstance() {
        if (instance == null) {
            synchronized (lock) {
                if (instance == null) {
                    instance = new FieldsConfigurationProvider();
                }
            }
        }
        return instance;
    }
    private FieldsConfigurationProvider(){
        config = loadConfig(filePath);
        watch();
    }

    public static void init(String bucketName, long fieldsConfigLoadInterval, String filePath ) {

        FieldsConfigurationProvider.bucketName = bucketName;
        FieldsConfigurationProvider.fieldsConfigLoadInterval = fieldsConfigLoadInterval;
        FieldsConfigurationProvider.filePath=filePath;
    }



    private void watch() {
        new Thread(() -> {

            logger.info("start watching for bucket changes....");

            Storage storage = StorageOptions.getDefaultInstance().getService();
            Bucket bucket = storage.get(bucketName);

            while(true) {
                logger.debug("Checking for new fields configuration in the bucket");
                Blob blob = bucket.get(filePath);
                long currentTime = blob.getUpdateTime();
                if (currentTime > lastUpdateTime) {
                    logger.info("Found new fields configuration in the bucket");
                    config = loadConfig(filePath);
                } else {
                    logger.debug("Didn't find new fields configuration in bucket");
                }
                try {
                    Thread.sleep(fieldsConfigLoadInterval);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }



    private FieldsConfiguration loadConfig(String path) {
        try {
            ObjectMapper mapper = new ObjectMapper();

            logger.info("Reading fields configuration from gs://" + bucketName + "/" + path);

            Storage storage = StorageOptions.getDefaultInstance().getService();
            Bucket bucket = storage.get(bucketName);
            Blob blob = bucket.get(path);
            lastUpdateTime = blob.getUpdateTime();

            ByteArrayInputStream byteStream = new ByteArrayInputStream(blob.getContent());

            return mapper.readValue(byteStream, FieldsConfiguration.class);
        } catch (IOException e) {
            logger.error("Unable reading fields configuration file", e);
        }
        return null;
    }

    public FieldsConfiguration getConfig() {
        return config;
    }


}

Field Config JSON Example

{
  "fields": [
    {
      "jsonpath": "name",
      "columnname": "name",
      "type": "STRING"
    },
    {
      "jsonpath": "price",
      "columnname": "price",
      "type": "FLOAT64"
    },
    {
      "jsonpath": "purches_date",
      "columnname": "purches_date",
      "type": "TIMESTAMP"
    }
}

How We Use the FieldConfig in the Step Function

You can see here that I'm loading the config fields in @setup, while this instance is changed if a new config is loaded.

And while transforming the JsonNode event, I took only fields that appear in the configuration, and transformed them by their type.

public class EventToTableRowTransformerFn extends DoFn<JsonNode, TableRow> {

    private FieldsConfigurationProvider fieldsConfigurationProvider;


    public EventToTableRowTransformerFn(String filedConfigurationBucketName, String filedConfigurationFilePath, Long filedConfigurationCheckInterval) {
        this.filedConfigurationBucketName = filedConfigurationBucketName;
        this.filedConfigurationFilePath = filedConfigurationFilePath;
        this.filedConfigurationCheckInterval = filedConfigurationCheckInterval;
    }

   @Setup
    public void setUp() {
        FieldsConfigurationProvider.init(filedConfigurationBucketName, filedConfigurationCheckInterval, filedConfigurationFilePath);
        fieldsConfigurationProvider = FieldsConfigurationProvider.getInstance();
    }

    @ProcessElement
    public void processElement(@Element JsonNode element, OutputReceiver<TableRow> out, ProcessContext context) {

      TableRow convertedRow = new TableRow();

      FieldsConfiguration config = fieldsConfigurationProvider.getConfig();
        if(config==null){
            throw new RuntimeException("empty config, check configuration file in "+ FieldsConfigurationProvider.bucketName +" file"+ FieldsConfigurationProvider.filePath);
        }
        for (FieldConfig fieldConfig : config.getFields()) {
            JsonNode extracted = element.get(fieldConfig.getJsonpath());

            switch (fieldConfig.getType()) {
                        case STRING:
                            transformer = v -> v;
                            break;
                        case FLOAT64:
                            transformer = Float::parseFloat;
                            break;
                        case INT64:
                             transformer = Long::parseLong;
                            break;
                        case TIMESTAMP:
                            transformer = new DateTransformer();
                            break;
                        case BOOL:
                            transformer = Boolean::valueOf;
                            break;

                    }
             insertValue(FieldConfig, fieldConfig.getColumnname(), convertedRow, transformer);
             context.output(convertedRow);
        }

    }


}

Do We Have a Design Pattern?! Yes!

We found a way to influence our stream by bucket change. This can be relevant to schema change or any other configuration, for example, for Geo IP, we use Maxmind's DB. The IP collection can be updated and I want to be aware of this change in my pipeline and look for IPs in the new DB file. I used the same pattern for it and created a provider that looks for bucket changes and loads them. If I  update the bucket with a new IP DB then my stream will automatically look for the change identify it.

One Disadvantage to Remember

The singleton class that opens up a thread to look for bucket changes isn’t a functional step, and you can’t look into it using the DataFlow graph. No logs, no statistics.

You may try using Side Inputwhich looks into changes and loads them. This solution requires time sliding windows, which can be bad if you are not processing your data in any time window.

Here's an example of SideInput  with time-slicing:

      PCollection<Long> ticks = p
                    // Produce 1 "tick" per second
                    .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1)))
                    // Window the ticks into 1-minute windows
                    .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
                    // Use an arbitrary per-window combiner to reduce to 1 element per window
                    .apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults());

            String bucketName = options.getBigQuerySchemaConfigBucketName().get();

            PCollectionView<FieldsConfiguration> sideView = ticks
                    .apply(MapElements.into(TypeDescriptor.of(FieldsConfiguration.class)).via((Long ignored) -> getFieldsConfiguration(bucketName)))
                    .apply(View.<FieldsConfiguration>asSingleton().withDefaultValue(getFieldsConfiguration(bucketName)));

We can them implement it using the following code: 

.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))) 
.apply("extract-events", ParDo.of(new EventsRowFn(sideView)).withSideInputs(sideView));

And then only the iterate function is changed:

for (FieldConfig fieldConfig : context.sideInput(sideView).getFields()) {

12 Best Practices for Modern Data Ingestion. Download White Paper.

Topics:
bigquery ,big data ,data flow ,data streaming ,bigquery tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}