Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Streaming Data From PostgreSQL Using Akka Streams and Slick in Play Framework

DZone's Guide to

Streaming Data From PostgreSQL Using Akka Streams and Slick in Play Framework

Read this tutorial, which gives many examples, in order to learn how to stream data directly from the PostgreSQL database using Scala Slick.

· Database Zone ·
Free Resource

Running out of memory? Learn how Redis Enterprise enables large dataset analysis with the highest throughput and lowest latency while reducing costs over 75%! 

In this blog post, I’ll try to explain the process wherein you can stream data directly from the PostgreSQL database using Scala Slick (which is Scala’s database access/query library) and Akka Streams (which is an implementation of Reactive Streams specification on top of Akka toolkit) in Play Framework. The process is going to be pretty straightforward in terms of implementation where data is read from one of the tables in your SQL database as stream and then it is sent/streamed to one of the REST endpoints configured to download this data.

Image title

For better understanding, let’s take an example of an application or service that is used for administering a huge customer base of an organization/company. The person involved in administering the customer base wants to get the entire data-set of customers for, let’s say, auditing purposes. Based on requirements, it would sometimes make sense to stream this data directly into a downloadable file which is what we are going to do in this blog post.

(For this blog post you should have a basic knowledge of using Play Framework and Slick library)

Image title

The example uses the following dependencies:

  1. Play Framework 2.6.10 (“com.typesafe.play” % “sbt-plugin” % “2.6.10”)

  2. Play-Slick 3.0.1 (“com.typesafe.play” %% “play-slick” % “3.0.1”)

  3. Akka Streams 2.5.8 (“com.typesafe.akka” %% “akka-stream” % “2.5.8”)

  4. PostgreSQL 42.1.4 (“org.postgresql” % “postgresql” % “42.1.4”)

Image title

Let’s start by assuming we have a customer table in our PostgreSQL database that has the following structure:

CREATE TABLE customers (
  id        BIGSERIAL PRIMARY KEY,
  firstname VARCHAR(255) NOT NULL,
  lastname  VARCHAR(255),
  email     VARCHAR(255)
);

Slick’s functional relational mapping corresponding to this table structure should look like this:

case class Customer(id: Long,
                    firstName: String,
                    lastName: String,
                    email: String)

trait CustomerTable extends HasDatabaseConfigProvider[slick.jdbc.JdbcProfile] {

  import profile.api._

  val customerQuery: TableQuery[CustomerMapping] = TableQuery[CustomerMapping]

  private[models] class CustomerMapping(tag: Tag) extends Table[Customer](tag, "customers") {

    def id: Rep[Long] = column[Long]("id", O.PrimaryKey, O.AutoInc)

    def firstName: Rep[String] = column[String]("firstname")

    def lastName: Rep[String] = column[String]("lastname")

    def email: Rep[String] = column[String]("email")

    def * : ProvenShape[Customer] = (id, firstName, lastName, email) <>(Customer.tupled, Customer.unapply)

  }

}

Now let’s use the "customerQuery" to get data from the customers table in the form of "DatabasePublisher" of type "Customer", i.e "DatabasePublisher[Customer]", which is Slick’s implementation of reactive stream’s "Publisher" where "Publisher" is the (potential) unbounded sequence of elements that publishes the elements according to the demand from the subscriber. We will define this inside "CustomerRepository".

Image title

def customers: DatabasePublisher[Customer] =
  db.stream(
    customerQuery
      .result
      .withStatementParameters(
         rsType = ResultSetType.ForwardOnly,
         rsConcurrency = ResultSetConcurrency.ReadOnly,
         fetchSize = 10000)
      .transactionally)

Certain things to be noted when using PostgreSQL for streaming data/records, which is also noted in Slick’s Official documentation:

  1. The use of transactionally which enforces the code to run on a single Connection with auto-commit set as false [setAutoCommit(false)], by default slick is set to run in auto-commit mode.

  2. The use of fetchSize so that the JDBC driver does not fetch all rows to the memory (i.e on client side) at once but instead fetch the specified number of rows at a time.

  3. ResultSetType.ForwardOnly sets the type to allow results to be read sequentially so that the cursor will only move forward.

  4. ResultSetConcurrency.ReadOnly makes sure that the ResultSet may not be updated.

Only if all of the above is done will the streaming work properly for PostgreSQL, and the actions inside the stream behavior will fetch the entire dataset.

So, the database repository code base is now sorted out. Let’s focus on the controller and how it will stream this data to a downloadable file.

We can create a new Play controller for the purpose of managing all APIs related to the customers, and this controller has access to the CustomerRepository we created earlier in which the customers' method is defined and implemented.

We’ll use Play’s simple result to stream the data to our client, i.e to the person administering the customers on/customers API (added to Play routes) by providing the customer stream to HttpEntity.Streamed case class like this:

Result(
      header = ResponseHeader(OK, Map(CONTENT_DISPOSITION → s"attachment; filename=customers.csv")),
      body = HttpEntity.Streamed(csvSource, None, None))

The entire controller method would look something like this:

def customers: Action[AnyContent] = Action { implicit request =>
  val customerDatabasePublisher = customerRepository.customers
  val customerSource = Source.fromPublisher(customerDatabasePublisher)

  val headerCSVSource = Source.single(ByteString(""""First Name","Last Name","Email"""" + "\n"))
  val customerCSVSource =
    customerSource.map(data => ByteString(s""""${data.firstName}","${data.lastName}","${data.email}"""" + "\n"))

  val csvSource = Source.combine(headerCSVSource, customerCSVSource)(Concat[ByteString])

  Result(
        header = ResponseHeader(OK, Map(CONTENT_DISPOSITION → s"attachment; filename=customers.csv")),
        body = HttpEntity.Streamed(csvSource, None, None))
}

Note that the DatabasePublisher[Customer] is converted to Source of Customer using the Source.fromPublisher helper method, which is used to create a Source from Publisher.
The rest of the manipulations are done on the Source to convert the data into readable CSV file format.

Also, note the use of the Source.combine method, which is used to combine sources with a fan-in strategy, which in our case is Concat.

Hope this helps :)

The entire code base is available in the following repository playakkastreams.

Running out of memory? Never run out of memory with Redis Enterprise databaseStart your free trial today.

Topics:
scala ,play framework ,akka ,akka streams ,slick ,postgresql ,reactive streams ,scala slick ,database

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}