DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Schema Change Management Tools: A Practical Overview
  • Advanced Maintenance of a Multi-Database Citus Cluster With Flyway
  • An Introduction to Type Safety in JavaScript With Prisma
  • Why Database Migrations Take Months and How to Speed Them Up

Trending

  • DZone's Article Submission Guidelines
  • Using Python Libraries in Java
  • Vibe Coding With GitHub Copilot: Optimizing API Performance in Fintech Microservices
  • Intro to RAG: Foundations of Retrieval Augmented Generation, Part 1
  1. DZone
  2. Data Engineering
  3. Data
  4. Data Flow Tutorial: Dealing With BigQuery Schema Changes

Data Flow Tutorial: Dealing With BigQuery Schema Changes

Learn how to tackle the challenge of changing requirements in your data flow system using the popular PaaS system, BigQuery.

By 
Brachi Packter user avatar
Brachi Packter
·
Mar. 13, 19 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
7.9K Views

Join the DZone community and get the full member experience.

Join For Free

Imagine you have a pipeline that writes data from a PubSub/Kafka instance, or any other source, really, to BigQuery, and everything streams well. 

Then, suddenly, you learn that your source JSON event had changed, and a new field should be added.

Now what?

You can stop the job, update it with the new field, and rerun it. I’m sure you don’t want to do this for every change, so you set out to look for a better solution.

Here I’ll describe how I overcame this very predicament.

The Design

We will keep a GSBlob file that contains the schema definition.

We will load the file in the Stream Pipeline, and track for updates.

For any event, we will take only the fields that appear in the loaded GSBlob schema. we will ignore any field in the event that doesn’t appear in the schema configuration file.

And, when we will have a schema change, first, we will need to add the column manually to the BigQuery table and update the Blob file that contains the schema definition (we have a script that does this, using bq update and gsutil cp). Our job will identify that the Blob file has been updated and reload the configuration. Now it can extract any new field added to the event and send it to BigQuery.

Why Didn’t I Choose a Fully Automated Solution?

If I would take any new field in a JSON event that doesn’t appear in the target BigQuery table, I’m opening myself up to fraud. What does this mean? Imagine that some hacker sends dummy events to my server with fake fields, f1,f2,f3… f10 and I treat these fields as new fields, creating columns for them in my BigQuery table automatically. Then, my table will become garbaged and can also reach some quotas limits. I need control over this, and, therefore, I must enforce the manual updating of the BigQuery schema and Blob file. In this way, I can be sure I didn’t put garbaged columns into my BigQuery table.

Let’s Look Into the Class That Loads the Field's Configuration

You can see that this is a singleton class and that it, on construction, loads the data and starts to wait for changes. 

public class FieldsConfigurationProvider  {

    private Logger logger = LoggerFactory.getLogger(FieldsConfigurationProvider.class);
    private Long lastUpdateTime;
    private FieldsConfiguration config;

    private static FieldsConfigurationProvider instance = null;
    public static String bucketName;
    public static  String filePath;
    private static long fieldsConfigLoadInterval;
    private final static Object lock = new Object();

    public static FieldsConfigurationProvider getInstance() {
        if (instance == null) {
            synchronized (lock) {
                if (instance == null) {
                    instance = new FieldsConfigurationProvider();
                }
            }
        }
        return instance;
    }
    private FieldsConfigurationProvider(){
        config = loadConfig(filePath);
        watch();
    }

    public static void init(String bucketName, long fieldsConfigLoadInterval, String filePath ) {

        FieldsConfigurationProvider.bucketName = bucketName;
        FieldsConfigurationProvider.fieldsConfigLoadInterval = fieldsConfigLoadInterval;
        FieldsConfigurationProvider.filePath=filePath;
    }



    private void watch() {
        new Thread(() -> {

            logger.info("start watching for bucket changes....");

            Storage storage = StorageOptions.getDefaultInstance().getService();
            Bucket bucket = storage.get(bucketName);

            while(true) {
                logger.debug("Checking for new fields configuration in the bucket");
                Blob blob = bucket.get(filePath);
                long currentTime = blob.getUpdateTime();
                if (currentTime > lastUpdateTime) {
                    logger.info("Found new fields configuration in the bucket");
                    config = loadConfig(filePath);
                } else {
                    logger.debug("Didn't find new fields configuration in bucket");
                }
                try {
                    Thread.sleep(fieldsConfigLoadInterval);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }



    private FieldsConfiguration loadConfig(String path) {
        try {
            ObjectMapper mapper = new ObjectMapper();

            logger.info("Reading fields configuration from gs://" + bucketName + "/" + path);

            Storage storage = StorageOptions.getDefaultInstance().getService();
            Bucket bucket = storage.get(bucketName);
            Blob blob = bucket.get(path);
            lastUpdateTime = blob.getUpdateTime();

            ByteArrayInputStream byteStream = new ByteArrayInputStream(blob.getContent());

            return mapper.readValue(byteStream, FieldsConfiguration.class);
        } catch (IOException e) {
            logger.error("Unable reading fields configuration file", e);
        }
        return null;
    }

    public FieldsConfiguration getConfig() {
        return config;
    }


}

Field Config JSON Example

{
  "fields": [
    {
      "jsonpath": "name",
      "columnname": "name",
      "type": "STRING"
    },
    {
      "jsonpath": "price",
      "columnname": "price",
      "type": "FLOAT64"
    },
    {
      "jsonpath": "purches_date",
      "columnname": "purches_date",
      "type": "TIMESTAMP"
    }
}

How We Use the FieldConfig in the Step Function

You can see here that I'm loading the config fields in @setup, while this instance is changed if a new config is loaded.

And while transforming the JsonNode event, I took only fields that appear in the configuration, and transformed them by their type.

public class EventToTableRowTransformerFn extends DoFn<JsonNode, TableRow> {

    private FieldsConfigurationProvider fieldsConfigurationProvider;


    public EventToTableRowTransformerFn(String filedConfigurationBucketName, String filedConfigurationFilePath, Long filedConfigurationCheckInterval) {
        this.filedConfigurationBucketName = filedConfigurationBucketName;
        this.filedConfigurationFilePath = filedConfigurationFilePath;
        this.filedConfigurationCheckInterval = filedConfigurationCheckInterval;
    }

   @Setup
    public void setUp() {
        FieldsConfigurationProvider.init(filedConfigurationBucketName, filedConfigurationCheckInterval, filedConfigurationFilePath);
        fieldsConfigurationProvider = FieldsConfigurationProvider.getInstance();
    }

    @ProcessElement
    public void processElement(@Element JsonNode element, OutputReceiver<TableRow> out, ProcessContext context) {

      TableRow convertedRow = new TableRow();

      FieldsConfiguration config = fieldsConfigurationProvider.getConfig();
        if(config==null){
            throw new RuntimeException("empty config, check configuration file in "+ FieldsConfigurationProvider.bucketName +" file"+ FieldsConfigurationProvider.filePath);
        }
        for (FieldConfig fieldConfig : config.getFields()) {
            JsonNode extracted = element.get(fieldConfig.getJsonpath());

            switch (fieldConfig.getType()) {
                        case STRING:
                            transformer = v -> v;
                            break;
                        case FLOAT64:
                            transformer = Float::parseFloat;
                            break;
                        case INT64:
                             transformer = Long::parseLong;
                            break;
                        case TIMESTAMP:
                            transformer = new DateTransformer();
                            break;
                        case BOOL:
                            transformer = Boolean::valueOf;
                            break;

                    }
             insertValue(FieldConfig, fieldConfig.getColumnname(), convertedRow, transformer);
             context.output(convertedRow);
        }

    }


}

Do We Have a Design Pattern?! Yes!

We found a way to influence our stream by bucket change. This can be relevant to schema change or any other configuration, for example, for Geo IP, we use Maxmind's DB. The IP collection can be updated and I want to be aware of this change in my pipeline and look for IPs in the new DB file. I used the same pattern for it and created a provider that looks for bucket changes and loads them. If I  update the bucket with a new IP DB then my stream will automatically look for the change identify it.

One Disadvantage to Remember

The singleton class that opens up a thread to look for bucket changes isn’t a functional step, and you can’t look into it using the DataFlow graph. No logs, no statistics.

You may try using Side Inputwhich looks into changes and loads them. This solution requires time sliding windows, which can be bad if you are not processing your data in any time window.

Here's an example of SideInput  with time-slicing:

      PCollection<Long> ticks = p
                    // Produce 1 "tick" per second
                    .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1)))
                    // Window the ticks into 1-minute windows
                    .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
                    // Use an arbitrary per-window combiner to reduce to 1 element per window
                    .apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults());

            String bucketName = options.getBigQuerySchemaConfigBucketName().get();

            PCollectionView<FieldsConfiguration> sideView = ticks
                    .apply(MapElements.into(TypeDescriptor.of(FieldsConfiguration.class)).via((Long ignored) -> getFieldsConfiguration(bucketName)))
                    .apply(View.<FieldsConfiguration>asSingleton().withDefaultValue(getFieldsConfiguration(bucketName)));

We can them implement it using the following code: 

.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))) 
.apply("extract-events", ParDo.of(new EventsRowFn(sideView)).withSideInputs(sideView));

And then only the iterate function is changed:

for (FieldConfig fieldConfig : context.sideInput(sideView).getFields()) {
Schema Data (computing) Database Flow (web browser)

Opinions expressed by DZone contributors are their own.

Related

  • Schema Change Management Tools: A Practical Overview
  • Advanced Maintenance of a Multi-Database Citus Cluster With Flyway
  • An Introduction to Type Safety in JavaScript With Prisma
  • Why Database Migrations Take Months and How to Speed Them Up

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: