{{announcement.body}}
{{announcement.title}}

Making S3A Hadoop Connector Work With Druid

DZone 's Guide to

Making S3A Hadoop Connector Work With Druid

In this post, we learn how to use two powerful, open source tools for performing real-time data analysis.

· Big Data Zone ·
Free Resource

Apache Druid is a high-performance real-time analytics database. Druid is a unique type of database that combines ideas from OLAP/analytic databases, time series databases, and search systems to enable new use cases in real-time architectures. For building a framework for time series trend analysis, prediction model and anomaly detection, I decided to use Druid. As per the requirements, apart from real-time data ingestion, there is a need for batch-based data ingestion too in Druid. After reading several blogs and articles around the production environment setup of Druid cluster for handling petabytes of data, I decided to follow the below architecture:

  • Two nodes as Druid master which run Druid Coordinator and Overlord processes. Two nodes are for high-availability.
  • Two nodes as Druid query server which run Druid Broker process. Two nodes are for high-availability and for defining two tiers of queries, i.e. 'hot' and '_default_tier'. Additionally, Router processes on one node for a unique gateway to all Druid API access.
  • Three nodes as Druid data server which run Druid Historical and MiddleManager processes. Also, three nodes of Zookeeper are also leveraged for running Druid Historical process. Caffeine is used for query results caching.
  • Three Zookeeper nodes for management of Druid current cluster state.
  • Druid Metadata database node for running Postgres along with Grafana for visualizing Druid cluster metrics.
  • S3 as a Druid deep storage and also for jobs logs storage.

Why Is Druid With a Workable S3A Hadoop Connector Required?

  • Firstly, the S3A connector is the newest connector with Hadoop. As per the AmazonS3 Hadoop wiki, previous connectors, i.e., S3 and S3N, are deprecated now. With the S3A connector, there is no need to explicitly provide AWS_ACCESS_KEY and AWS_SECRET_KEY. The S3A connector determines the credentials and role from the 'EC2 IAM' profile that makes Druid's common runtime properties files much simpler and more generic.
  • Secondly, even if you are not planning to run Druid indexing jobs on a Hadoop cluster for ingesting Parquet format data into the Druid, the index job should be of type: index_hadoopand hence a workable S3A connector is required. Druid supports Parquet format through a parquet extension. Since, in my case, data is in the Parquet format, I need to make it workable.

Druid's Current State With an S3A Hadoop Connector

I am writing this article having used Druid version 0.12.3. Recently, Druid released a new version but I haven't yet evaluated it.

  • If you try to run an `index_hadoop` job using Druid index jobs specs like below:
 "tuningConfig": {

    "type": "hadoop",

    "jobProperties": {

       "fs.s3a.endpoint": "s3.ca-central-1.amazonaws.com",

       "fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",

       "fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",

       "io.compression.codecs":  "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec"

    }

 }

 you will get an exception with stack-trace like below:

Caused by: java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManager.<init>(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V

     at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:287) ~[?:?]

     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669) ~[?:?]

     at org.apache.hadoop.fs.FileSystem.access00(FileSystem.java:94) ~[?:?]

     at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703) ~[?:?]

     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685) ~[?:?]

     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373) ~[?:?]

     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295) ~[?:?]

This is because of the use of incompatible versions of the hadoop-client libraries and aws-sdk library in Druid code. This issue can be fixed by either bumping the Hadoop hadoop.compile.version variable to value '2.8.3' from  '2.7.3' or by downgrading 'aws.sdk.version' to one lower from '1.10.77' in a Maven pom.xml file. I decided to follow the first option and, after bumping the version, rebuilt the Druid distribution.

After fixing the libraries' incompatablilty issue, I faced another issue related to the storage path of segment files. Since Druid follows S3N connectors, by default, the segmentOutputPath value is based on the  s3n:// URI instead of s3a://. Below is the generated sample job spec for the index job.

"ioConfig" : {

      "type" : "hadoop",

      "inputSpec" : {

        "type" : "static",

        "paths" : "s3a://experiment-druid/input_data/wikiticker-2015-09-12-sampled.json.gz"

      },

      "metadataUpdateSpec" : null,

      "segmentOutputPath" : "s3n://daas-experiment-druid/deepstorage"

    },

As per the hadoop-index documentation, we can provide segmentOutputPath in the ioconfig file of the index job spec. However, I was getting an error while providing a segment path. To fix this issue, I found the property useS3aSchema in the class S3DataSegmentPusherConfig in Druid's source code. Setting the following property in Druid's properties files will fix this issue.

druid.storage.useS3aSchema=true

The inal issue I faced for making it workable was a runtime exception while pushing a segment to the S3 deep storage. The following is the stacktrace of the execption:

java.io.IOException: Output Stream closed

at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:83) ~[hadoop-aws-2.8.3.jar:?]

at org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:89) ~[hadoop-aws-2.8.3.jar:?]

at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) ~[?:1.8.0_191]

at java.io.DataOutputStream.flush(DataOutputStream.java:123) ~[?:1.8.0_191]

Thanks to the Druid user discussion and this great pull request, I was able to fixed the problem by commenting out the flush statement.

Making S3A Hadoop Connector Workable in an App

Compiling fixes of all the issues mentioned above, to make S3A Hadoop connector viable with Druid, use follow the following steps:

  • Rebuild the Druid 0.12.3 source code branch after changing the hadoop.compile.version property value to '2.8.3' and commenting out the flush statement in class JobHelper by using the command mvn clean package -DskipTests.
  • Replace druid-hdfs-storage with the newly built artifacts in Druid's `extensions` folder.
  • Copy hadoop-client libraries to '2.8.3' in Druid's 'hadoop-dependencies' folder.

Finally, for testing, you can use the example index-jobs published in my GitHub repo.

Happy Druid exploration!

Topics:
big data ,druid engine ,real-time analytics ,hadoop tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}