Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Stream a File to AWS S3 Using Akka Streams in Play Framework

DZone's Guide to

Stream a File to AWS S3 Using Akka Streams in Play Framework

Here's a handy guide to start streaming your files to the cloud (AWS S3) in particular when using Akka, Play Framework, and Alpakka.

· Java Zone ·
Free Resource

Verify, standardize, and correct the Big 4 + more– name, email, phone and global addresses – try our Data Quality APIs now at Melissa Developer Portal!

In this blog post, we’ll see how a file can be streamed from a client (eg: browser) to Amazon S3 (AWS S3) using Alpakka’s AWS S3 connector. Alpakka provides various Akka Stream connectors, integration patterns, and data transformations for integration use cases.

The example in this blog post uses Play Framework to provide a user interface to submit a file from a web page directly to AWS S3 without creating any temporary files (on the storage space) during the process. The file will be streamed to AWS S3 using S3’s multipart upload API.

play_full_colorakka_full_color

(To understand this blog post, a basic knowledge of Play Framework and Akka Streams is required. Also, check out the What can Reactive Stream Offer to EE4J by James Roper and check out its Servlet IO section to fully understand to what extent the example in the blog post can be useful).

Let’s begin by looking at the artifacts used for achieving the task at hand:

  1. Scala 2.11.11
  2. Play Framework 2.6.10
  3. Alpakka S3 0.18

Now moving on to the fun part, let’s see what the code base will look like. We’ll first create a class for interacting with AWS S3 using the Alpakka S3 connector, let’s name the class AwsS3Client.

@Singleton
class AwsS3Client @Inject()(system: ActorSystem, materializer: Materializer) {

    private val awsCredentials = new BasicAWSCredentials("AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY")
    private val awsCredentialsProvider = new AWSStaticCredentialsProvider(awsCredentials)
    private val regionProvider =
        new AwsRegionProvider {
            def getRegion: String = "us-west-2"
        }
    private val settings = new S3Settings(MemoryBufferType, None, awsCredentialsProvider, regionProvider, false, None, ListBucketVersion2)
    private val s3Client = new S3Client(settings)(system, materializer)

    def s3Sink(bucketName: String, bucketKey: String): Sink[ByteString, Future[MultipartUploadResult]] =
        s3Client.multipartUpload(bucketName, bucketKey)
}


From the first line, it can be seen the class is marked as a Singleton. This is because we do not want multiple instances of this class to be created. From the next line, it can be seen that ActorSystem and Materializer are injected, which is required for configuring Alpakka’s AWS S3 client. The next few lines are for configuring an instance of Alpakka’s AWS S3 client, which will be used for interfacing with your AWS S3 bucket. Also, in the last section of the class, there’s a behavior that returns an Akka Streams Sink, of type Sink[ByteSring, Future[MultipartUploadResult]]. This Sink does the job of sending the file stream to the AWS S3 bucket using the AWS multipart upload API.

In order to make this class workable, replace AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY with your AWS S3 access key and secret key respectively. And replace us-west-2 with your respective AWS region.

Next, let’s look at how the s3Sink behavior of this call can be used to connect our Play Framework’s controller with the AWS S3 multipart upload API. But before doing that and slightly digressing from the example (bear with me, it’s going to build up the example further), if you followed my previous blog post — Streaming data from PostgreSQL using Akka Streams and Slick in Play Framework (containing a Customer Management example) — you might have seen how a CustomerController was used to build a functionality wherein a Play Framework’s route was available to stream the customer data directly from PostgreSQL into a downloadable CSV file (without the need to buffering data as file on storage space). This blog post builds an example on top of the Customer Management example highlighted in the previous blog post. So, we’re going to use the same CustomerController but modify it a bit in terms of adding a new Play Framework Action for accepting the file from the web page.

For simplicity, let’s name the controller Action upload. This Action is used for accepting a file from a web page via one of the reverse routes. Let’s first look at the controller code base and then we’ll discuss the reverse route.

@Singleton
class CustomerController @Inject()(cc: ControllerComponents, awsS3Client: AwsS3Client)
    (implicit ec: ExecutionContext) extends AbstractController(cc) {

        def upload: Action[MultipartFormData[MultipartUploadResult]] =
            Action(parse.multipartFormData(handleFilePartAwsUploadResult)) {
                request =>
                    val maybeUploadResult =
                    request.body.file("customers").map {
                        case FilePart(key, filename, contentType, multipartUploadResult) =>
                        multipartUploadResult
                    }

                maybeUploadResult.fold(
                    InternalServerError("Something went wrong!")
                )(uploadResult =>
                    Ok(s "File ${uploadResult.key} upload to bucket ${uploadResult.bucket}")
                )
            }

        private def handleFilePartAwsUploadResult: Multipart.FilePartHandler[MultipartUploadResult] = {
            case FileInfo(partName, filename, contentType) =>
            val accumulator = Accumulator(awsS3Client.s3Sink("test-ocr", filename))

            accumulator map {
                multipartUploadResult =>
                    FilePart(partName, filename, contentType, multipartUploadResult)
            }
        }
    }


Dissecting the controller code base, it can be seen that the controller is a singleton and the AwsS3Client class that was created earlier is injected into the controller along with the Play ControllerComponents and ExecutionContext.

Let’s look at the private behavior of the CustomerController first, i.e handleFilePartAwsUploadResult. It can be seen that the return type of this behavior is:

Multipart.FilePartHandler[MultipartUploadResult]


Which is nothing but a Scala type defined inside Play’s Multipart object:

type FilePartHandler[A] = FileInfo => Accumulator[ByteString, FilePart[A]]


It should be noted here that the example uses multipart/form-data encoding for the file upload, so the default multipartFormData parser is used by providing a FilePartHandler of type FilePartHandler[MultipartUploadResult]. The type of FilePartHandler is MultipartUploadResult because Alpakka AWS S3 Sink is of the type Sink[ByteString, Future[MultipartUploadResult]], to which the file will be finally sent to.

Looking at this private behavior and understanding what it does, it accepts a case class of type FileInfo, creates an Accumulator from s3Sink, and then finally maps the result of the Accumulator to a result of type FilePart.

NOTE: Accumulator is essentially a lightweight wrapper around Akka Sink that gets materialized to a Future. It provides convenient methods for working directly with Future as well as transforming the inputs.

Moving ahead and understanding the upload Action, it looks like any other normal Play Framework Action — the only difference is that the request body is being parsed to MultipartFormData and then handled via our custom FilePartHandler, i.e handleFilePartAwsUploadResult, which was discussed earlier.

For connecting everything together, we need to enable an endpoint to facilitate this file upload and a view to be able to submit a file. Let’s add a new reverse route to the Play route file:

POST /upload controllers.CustomerController.upload


And a view to enable file upload from the user interface:

@import helper._

@()(implicit request: RequestHeader)

@main("Customer Management Portal") {
    <h1><b>Upload Customers to AWS S3</b></h1>
    @helper.form(CSRF(routes.CustomerController.upload()), 'enctype -> "multipart/form-data") {
        <input type="file" name="customers">
        <br>
       <input type="submit">
    }
}


Note the CSRF, which is required for the form as it is enabled by default in Play Framework.

The entire code base is available at the following repository: playakkastreams.

I hope this helps! Shout out your queries in the comment section.

Developers! Quickly and easily gain access to the tools and information you need! Explore, test and combine our data quality APIs at Melissa Developer Portal – home to tools that save time and boost revenue. Our APIs verify, standardize, and correct the Big 4 + more – name, email, phone and global addresses – to ensure accurate delivery, prevent blacklisting and identify risks in real-time.

Topics:
play framework ,akka streams ,reactive streams ,aws s3 ,streaming ,java ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}