DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Data Processing With Python: Choosing Between MPI and Spark
  • Profiling Big Datasets With Apache Spark and Deequ
  • Exploring Top 10 Spark Memory Configurations
  • Cutting Big Data Costs: Effective Data Processing With Apache Spark

Trending

  • Simplifying Multi-LLM Integration With KubeMQ
  • How to Ensure Cross-Time Zone Data Integrity and Consistency in Global Data Pipelines
  • Simpler Data Transfer Objects With Java Records
  • Assessing Bias in AI Chatbot Responses
  1. DZone
  2. Data Engineering
  3. Data
  4. On Some Aspects of Big Data Processing in Apache Spark, Part 3: How To Deal With Malformed Data?

On Some Aspects of Big Data Processing in Apache Spark, Part 3: How To Deal With Malformed Data?

In this post I present some solutions on how to deal with malformed date/time data, and on how to set a default value for malformed data.

By 
Alexander Eleseev user avatar
Alexander Eleseev
DZone Core CORE ·
Aug. 23, 22 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
5.2K Views

Join the DZone community and get the full member experience.

Join For Free

In my previous post, I presented design patterns to program Spark applications in a modular, maintainable, and serializable way. This time I demonstrate a solution to deal with malformed date/time data, and how to set a default value to malformed data.

When I worked on a big data project, my tasks were to load data in different formats (JSON, orc, etc) from different sources (Kafka, Hadoop Distributed File System, Apache Hive, Postgres, Oracle), then transform the data, and to save the data to the same or different sources. The simplest task was to load data from a single data source (Postgres), and then save the data to another source (Hive), without any transformations. 

Even in this simplest case, there were a lot of malformed data! Especially, malformed date/time data took our team a lot of time to deal with. Also, rather often there were null values, and, sometimes empty arrays of data. So, it is worthwhile to have compact and versatile solutions to process such irregularities.

This post is organized as follows:

  • Section 1 describes how to deal with malformed date/time data,
  • Section 2 describes a simple decorator pattern to assign a default value to a malformed piece of data.

A fully workable code can be found here. Let's go.

I. Malformed Date/Time Data.

In my case, data was received in JSON format either from Kafka or from JSON files. Json-formatted data is built on two structures: a collection (object) of key-value pairs and an ordered list (array) of values. Values can be an object, an array, a string, a decimal number, a boolean, or null.  So, a piece of date/time data usually comes as a string.

Java doesn't have a built-in Date class, but there is a java.time package to work with date and time. The package contains LocalDate, LocalTime, LocalDateTime, ZonedDateTime, etc classes to store date/time data and parse date/time strings. Also, there are parsers, like SimpleDateFormat and DateTimeFormatter. These parsers accept a pattern string, like "dd.MM.yyyy HH:mm", and an input string, like "20.10.2020 12:30", to return a DateTime or Date object out of the input string. These objects can then act as fields in a Hibernate @Entity. Looks pretty straightforward, right?

Unfortunately, no. In many big data projects, date/time data comes in many different formats. Moreover, I encountered examples, when date/time data strings contained substrings in a different language!  So, it may not be possible to parse such date strings with a single pattern or even by a single parsing tool. 

To attack this problem, lets recall Chain of Responsibility design pattern.  The pattern consists of a Handler interface and ConcreteHandlers implementations. A ConcreteHandler refers to another ConcreteHandler; all the ConcreteHandlers form a linked list. The last ConcreteHandler refers to null.

In our case, this pattern is implemented as follows. Our Handler interface is called IChainDT:

Java
 
public interface IChainDT {
	LocalDate getDateTime(String input);
	Date parseDateTime(String input);
    void setNextChain(IChainDT element);
}

Here parseDateTime method parses date/time strings, getDateTime  converts a Date object to a more convenient LocalDate object, and setNextChain method sets a link to another parser. The converter is added to demonstrate how the parser can make output dates "prettier" before the dates are returned. 

SimpleDateTimeParser class implements IChainDT interface:

Java
 
public class SimpleDateTimeParser implements IChainDT {
	 String shortDateTimePattern;	
	    IChainDT dateTimeParser = null;
	    Date defaultTime = new Date(0L);
	    public SimpleDateTimeParser(String pattern) {
	        shortDateTimePattern = pattern;	       
	    }
	    public SimpleDateTimeParser(String pattern, IChainDT nextValidator) {
	        this(pattern);
	        this.dateTimeParser = nextValidator;
	    }
	    public LocalDate getDateTime(String json)  {
	    	Date result =parseDateTime(json);
	    	return result.toInstant().atZone(ZoneId.systemDefault()).toLocalDate());
	     	    }
	    public void setNextChain(IChainDT validator) {
	        this.dateTimeParser = validator;
	    }
	    public Date parseDateTime(String input) {
	    	DateFormat simpleDateFormatter=new SimpleDateFormat(shortDateTimePattern);
	        try {
	        	return simpleDateFormatter.parse(input);	          
	        } catch (Exception e) {
	            if (this.dateTimeParser != null) return this.dateTimeParser.parseDateTime(input);
	            else return defaultTime;
	        }
	    }
}

Here IChainDT dateTimeParser is a reference to another parser,  String shortDateTimePattern is a date/time pattern string. The other parser reference can be set either via the two-argument constructor or via the setter setNextChain.

Notice how the parseDateTime method works. Firstly, the method creates an instance  SimpleDateFormat with a specific pattern; we need the instance to be a local variable for the SimpleDateTimeParser  to be serializable (this post explains how Spark serializes tasks). If the simpleDateFormatter (with the specified pattern) fails to parse the input string, the formatter throws an exception. 

The exception gets caught in the catch block. If there is a dateTimeParser next in the chain, the next dateTimeParser.parseDateTime(input) gets called.  If the current parser is the last in the chain, the last parser's default value is returned; the value may be null.

Finally, let's see what this parser is called.

Java
 
@Test
  public void parserTest(){
      String pattern1 = "yyyy-MM-dd";
      String pattern2 = "yyyy.MM.dd";
    
      IChainDT validator1 = new SimpleDateTimeParser(pattern1);  
      IChainDT validator2 = new SimpleDateTimeParser(pattern2);
      validator1.setNextChain(validator2);

      String testString = "2020-10-19";
      LocalDate result = validator1.getDateTime(testString);     
      assertEquals(result.getYear(),2020);
      
      testString = "2020.10.19";
      result = validator1.getDateTime(testString);
      assertEquals(result.getYear(),2020);
      
      testString="10/19/2020";
      result = validator1.getDateTime(testString);
      assertEquals(result.getYear(),1969);
  }

First, we create parsers for every pattern string. Next, we chain the parsers. Finally, we call the first parser in the chain on a date/time string. If none of the parsers succeeds in parsing the string, the default LocalDate value (new Date(0L)in this case) of the last parser in the chain is returned. 

This parser can also be implemented via an abstract class. In this case, we define an abstract class AChainDT instead of the interface IChainDT:

Java
 
public abstract class AChainDT {	
	public AChainDT( String shortDateTimePattern) {			
		this.shortDateTimePattern = shortDateTimePattern;		
	}
	protected AChainDT nextParser=null;
	protected String shortDateTimePattern;	  
	protected Date defaultTime = new Date(0L);	
	public void setNextParser(AChainDT nextParser) {
		this.nextParser = nextParser;
	}	
	 public LocalDate getDateTime(String input) {
		 Date result =parseDateTime(input);
	    	LocalDate localDate = result.toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
	    	    return localDate;
	 }
	 public abstract Date parseDateTime(String input);
}

Here, the abstract class contains the common part of all parsers - another parser, a pattern string, and the Date to LocalDate converter. A ConcreteHandler now looks more concise:

Java
 
public class SimpleDateTimeParserA extends AChainDT{	
	public SimpleDateTimeParserA(String shortDateTimePattern) {
		super(shortDateTimePattern);		
	}
	@Override
	public Date parseDateTime(String input) {
		DateFormat simpleDateFormatter=new SimpleDateFormat(shortDateTimePattern);
        try {
        	Date result = simpleDateFormatter.parse(input);

            return result;
            } catch (Exception e) {
                if (nextParser != null) return nextParser.parseDateTime(input);
                else return defaultTime;
            }
	}
}

Again, we create a SimpleDateFormatter instance as a local variable for the parser to be serializable. This parser runs as before, except we replace IChainDT with AChainDT and SimpleDateTimeParser with SimpleDateTmeParserA. See the code for details.

II. Default Value Decorator.

As I mentioned in the introduction, a lot of nulls and empty arrays come as values in JSON strings. Also, sometimes when data is transferred from one database to another, the data types need to be converted, like Integer to BigDecimal. In all these cases, NullPointerExceptions, ArrayIndexOutOfBondsExceptions, and other exceptions need to get caught and processed.

A common scenario is when there is a functional interface to be fed as a callback to RDD transformations or actions. Let's decorate such an interface to catch and process exceptions. 

Java
 
import org.apache.spark.api.java.function.Function;

public interface IExceptionDecoratorSpark {
	 static <Input, Output> Function<Input, Output> process(Function<Input, Output> fun, Output def) {
	        return new Function<Input, Output>() {
	            @Override
	            public Output call(Input o) {
	                try {
	                    return (Output) fun.call(o);
	                } catch (NullPointerException e) {
	                    return null;
	                } catch (Exception e) {
	                    return def;
	                }
	            }
	        };
	    }
}

Here the fun is an input function that implements Function interface. This function's input is an Input type object, and the output is an Output-type object.  The interface, returned by the process method, overrides a call method; inside the call method the fun is called. If there are exceptions, they get caught in the catch block and a null or a provided default def value is returned. As it should be in Java, more specific exceptions should be processed first.

This decorator is called the following way: 

Java
 
@Test
    public void basicProcessorSparkTest() throws Exception {
        Double def = 10000.0;
        Double shouldBe = 0.5;
        Function<Integer, Double> fun = (x) -> 1.0 / x;
        Function<Integer, Double> outFun = IExceptionDecoratorSpark.process(fun, def);
        Double result = outFun.call(2);
        assertEquals(result, shouldBe);

    }

In this case, the fun returns an inverse of its input. In this case, fun works regularly. 

On the other hand, here is an example of when an exception is thrown and processed; a provided default value is returned as a result: 

Java
 
 @Test
    public void exceptionProcessorSparkTest() throws Exception {
        Integer def = 10;
        Double shouldBe = 0.5;
        Integer[] input = new Integer[0];
        Function<Integer[], Integer> fun = (x) -> x[1];
        Function<Integer[], Integer> outFun = IExceptionDecoratorSpark.process(fun, def);
        Integer result = outFun.call(input);
        assertEquals(result, def);
    }

The fun returns the second element of an input array of integers. If such an element doesn't exist, the provided default value is returned. 

Notice that org.apache.spark.api.java.function.Function interface is not the same as java.util.function.Function. The former has to implement a call method; also the former interface is Serializable. The latter has to implement an apply method that is not necessarily serializable. The presented approach also works for java.util.function.Function interfaces, if we replace the input function type and the call method for an apply method. See the code for details.  

Conclusions

In this post, I demonstrated possible ways how to process malformed date/time data and how to create a default value decorator. The date/time processor can handle date/time strings that can not be parsed by means of a single template string. The decorator returns different default values for different exceptions thrown. Hope these tricks will be helpful for you.

Apache Spark Big data Data processing

Opinions expressed by DZone contributors are their own.

Related

  • Data Processing With Python: Choosing Between MPI and Spark
  • Profiling Big Datasets With Apache Spark and Deequ
  • Exploring Top 10 Spark Memory Configurations
  • Cutting Big Data Costs: Effective Data Processing With Apache Spark

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!