Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Identifying Duplicate Files in AWS S3 With Apache Spark

DZone's Guide to

Identifying Duplicate Files in AWS S3 With Apache Spark

Using Spark, you can identify duplicate files in your S3 storage by calculating checksums. It's a quick, easy way to ensure you aren't carrying extra weight.

· Cloud Zone ·
Free Resource

See why enterprise app developers love Cloud Foundry. Download the 2018 User Survey for a snapshot of Cloud Foundry users’ deployments and productivity.

Amazon AWS S3 (Simple Storage Service) is an object storage service from Amazon. It's used to store different objects in buckets, and users can perform CRUD operations on those objects using a simple web service interface. Many apps, tools, and S3 clients exist to access S3 easily; however, none of them has a feature to identify duplicate files in S3 (or at least I could not find any such app/tool). Of course, using AWS command line tools, one can do that by comparing the etag values of all objects, but it really depends on how an object was created (multipart upload) and how an object was encrypted. So, you cannot totally rely on the command line's results. In this post, I will be explaining how to explore S3 and find duplicate files using Apache Spark 2.1 Datasets step by step. So, let’s get started.

Step 1: What’s the Logic?

Before I describe a single line of code, let’s take a look at the complete picture. We need to perform the following steps in order to achieve our goal.

  • Access S3 for which AWS credentials are required.

  • Explore S3 and identify different file paths. This exploration can be either be for selected buckets or for an entire S3 account.

  • For comparison of files, we need some common basis, which is the checksum of the file. This is calculated by computing the MD5 of file contents.

  • Group files based on checksum and then identify duplicate files having the same checksum.

Step 2: Required Setup

We need a proper setup (it's the indispensable part and a tricky one). This step explains how to do the setup using Maven. The whole Hadoop ecosystem (including Hadoop AWS integration) and Apache Spark projects are not only revolutionary and disruptive but also dynamic in nature. Apache Spark is ranked as the most active project of ASF, and new features and enhancements are getting added very rapidly. Also, with Hadoop 2.7, support for the S3a AWS file scheme has been added. We need the latest versions of all of them. This setup won't work if any of the old JARs are used. Also, it is important to exclude a few of the JARs, as incorrect dependencies could lead to trouble. The following is the Maven configuration:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.8.0</version>
        <exclusions>
            <exclusion>
                <artifactId>aws-java-sdk-s3</artifactId>
                <groupId>com.amazonaws</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>2.8.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>commons-codec</groupId>
        <artifactId>commons-codec</artifactId>
        <version>1.10</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.8.8</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>2.8.8</version>
    </dependency>
    <dependency>
        <groupId>net.java.dev.jets3t</groupId>
        <artifactId>jets3t</artifactId>
        <version>0.9.4</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.11.8</version>
    </dependency>
</dependencies>


Step 3: Initialize s3Client

Programmatic access to S3 is possible by using an AWS access key and AWS secret key. It’s better to keep them in a secured place in encrypted form and fetch them programmatically into system variables. However, for simplicity, I am setting them directly in the code.

/**
 * Initializes s3 client for supplied access key and secret key.
 *
 * @return s3Client The s3Client instance
*/
def initS3Client(): AmazonS3Client = {
    val credential = new BasicAWSCredentials(awsAccessKey, awsSecretKey)
    return new AmazonS3Client(credential)
}


Step 4: Initialize Spark

Note the usage of the s3a scheme. The s3a scheme is a drastic improvement over s3n. It’s much faster than the s3n and s3 protocols. It also provides automatic support to new regions requiring s4 (signature version 4) for authentication. Prior to s3a, it was required to provide an endpoint in the Spark setup in order to access new regions (it was quite annoying to maintain endpoint info in the application).

/**
 * Initializes Spark Session object and also configures aws access key and secret keys in spark context.
 *
 * @return spark The spark session instance
*/
def initSpark(): SparkSession = {
    val spark = SparkSession
      .builder
      .appName("SparkS3FindDuplicateFiles")
      .master("local[*]")
      .getOrCreate()
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", awsAccessKey)
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", awsSecretKey)
    spark.sparkContext.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

    spark
}


Step 5: Explore S3

AWS S3 is an object store and not a file system. For S3, all files/directories are objects, it is based on a flat file structure, and AWS follows the same practice in the APIs, too. Let’s see how to explore different buckets and objects.

In the following code snippet, the exploreS3 method gets client access to S3, retrieves a list of buckets and filters it based on the provided customer-specific list of bucketname, which is optional. For each requested bucket, it gets all keys (i.e. object paths) and adds them to s3paths, provided the key is not a directory. Over here, it’s important to note that we are just retrieving paths and not loading objects from S3.

/**
 * It traverses entire s3, all buckets and explores all possible routes, while exploring it stores all paths as well
 *
 * @param s3Paths The list in which all routes are stored.
*/
def exploreS3(s3Paths: ListBuffer[String], inputBuckets: List[String]) {
    val s3Client = initS3Client()
    var buckets = s3Client.listBuckets()

    if (!inputBuckets.isEmpty)
      buckets = buckets.filter { bucket => inputBuckets.contains(bucket.getName) }

    buckets.toSeq.foreach { bucket =>
        var s3Objects = S3Objects.withPrefix(s3Client, bucket.getName, "")
        for (s3Object <- s3Objects) {
            if (!isS3Directory(s3Object.getKey)) {
                var absoluteS3Path = bucket.getName().concat(S3FileSeparator).concat(s3Object.getKey)
                s3Paths += absoluteS3Path
            }
        }
    }
}


Step 6: Calculate the Checksum of All Keys

Next, we iterate over all possible identified s3 paths and load each object from S3 as RDDs. By doing so, we are instructing Spark to read the contents (contents could be binary, text, or any other format). But we just specifying the transformation — not any actions yet. Then, for each RDD, we calculate the checksum by calculating the MD5 of the contents. As a part of computing the MD5, we perform an action and executors get different RDDs having different s3paths, load data from s3, compute the MD5, and return checksums (String data) back to the driver, and it happens partition by partition. We don't want individual RDDs having file contents no,w and those can be safely discarded. We then create a tuple of the checksum and file path and add it to the list. This happens on the driver.

/**
 * Calculates checksum for supplied RDD. Check sum is figured out by calculating md5 hash of RDD.
 * @param rdd The rdd of S3 file
 * @return The checksum of S3 file contents
*/
def calculateCheckSum(parts: Int = 1000): String = {
    val partitioner = new HashPartitioner(parts)
    val output = rdd
      .map(x => (x, 1))
      .repartitionAndSortWithinPartitions(partitioner)
      .map(x => (x._1))
      .mapPartitions(x => Iterator(x.foldLeft(getMd5Digest())(md5)))
      .map(x => new java.math.BigInteger(1, x.digest()).toString(16))
      .collect()
      .sorted
      .foldLeft(getMd5Digest())(md5)
    val checksum = new java.math.BigInteger(1, output.digest()).toString(16)
    return (checksum)
}

/**
 * @param currMD5 The MD5 value of file contents so far read
 * @param input The next string in
 * @return The updated MD5 value
*/
def md5(currMD5: MessageDigest, input: String): MessageDigest = {
    val inputBytes = input.getBytes("UTF-8")
    currMD5.update(inputBytes, 0, inputBytes.length)
    currMD5
}


Step 7: Find Duplicate Files Based on Checksum

Now comes the last step, which is of identifying duplicate files having the same checksum value. From step 6, we get a list of FileChecksum(checkSum,filePath) corresponding to each explored S3 object. Then we create a dataset from this list (to achieve maximum parallelism; otherwise, the same operations could be done using Scala operators). We then group data using the checksum, count the number of files having the same checksum, then filter non-duplicate file records, and finally sort the dataset by checksum count in descending order and displaying the results.

/**
   * Reads all files in the file path using Spark, distributes this to entire cluster, calculates checksum of each of the file using MD5.
   * Groups files based on checksum and find the corresponding count.
   * Order the result based on the duplication count.
   *
   * @param s3Paths The list of all s3 paths
*/
def checkDuplicateFiles(s3Paths: ListBuffer[String]) {
    val spark = initSpark()
    import spark.implicits._

    val resultList = new ListBuffer[FileChecksum]
    s3Paths.foreach(filePath => {
        val fileRDD = initSpark().sparkContext.textFile(S3Scheme.concat(filePath))
        fileRDD.cache()

        val checkSum = fileRDD.calculateCheckSum(100)

        val result = FileChecksum(checkSum, filePath)
        resultList += result
    })

    val filePathByCheckSumDS = spark.sparkContext.parallelize(resultList).toDS()

    val duplicateFiles = filePathByCheckSumDS.select($"checkSum", $"filePath")
      .groupBy($"checkSum")
      .agg(count($"filePath") as "duplicateFileCount", collect_list($"filePath") as "duplicateFilePaths")
      .filter($"duplicateFileCount" > 1)
      .sort($"duplicateFileCount".desc)

    //display results
    duplicateFiles.show()
}


Conclusion

If you find any records, then there are duplicate files in your AWS S3. It is certainly way faster than EC2 command line tools, and Spark makes the task very elegant by handling AWS S3 connections and distributing S3 objects over multiple partitions. In this post, I have provided the minutiae of all steps with code snippets; you can find the complete project on my GitHub

Cloud Foundry saves app developers $100K and 10 weeks on average per development cycle. Download the 2018 User Survey for a snapshot of Cloud Foundry users’ deployments and productivity. Find out what people love about the industry standard cloud application platform.

Topics:
apache spark ,aws s3 ,duplicate files ,cloud ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}