{{announcement.body}}
{{announcement.title}}

Getting Started With Chronicle Queue

DZone 's Guide to

Getting Started With Chronicle Queue

Applications built on Chronicle Queue cannot tell the producer to slow down in putting messages onto the queue (no back-pressure mechanics).

· Java Zone ·
Free Resource

Chronicle Queue is low-latency, broker-less, durable message queue. Its closest cousin is probably 0MQ, except that 0MQ doesn't store the messages published and the open-source version of the Chronicle Queue doesn't support cross-machine communication. Chronicle Queue's biggest claim to fame is that it generates no garbage as it uses RandomAccessFile s as off-heap storage.

Chronicle Queue is producer-centric, i.e., applications built on Chronicle Queue cannot tell the producer to slow down in putting messages onto the queue (no back-pressure mechanics). Such design is useful in cases where there is little to no control over the producer's throughput, e.g., FX price updates.

Terminology

Where most message queues use the terms Producer and Consumer, Chronicle Queue uses Appender and Tailer instead to make the distinction that it always appends messages to the queue and it never "destroys/drops" any message after the tailer (read: receiver) reads the message from the queue. And instead of Message, Chronicle Queue prefers the term Excerpt because the blob written to Chronicle Queue can range from byte arrays to strings to domain models.

Hello, World!

Let's use the traditional "Hello, World!" to demonstrate basic usage. Add the following to build.gradle.kts if you are using Gradle:

Kotlin
 




x
27


 
1
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile     // line 1
2
plugins {
3
    id("org.jetbrains.kotlin.jvm") version "1.3.71" 
4
    application
5
}
6
repositories {
7
    mavenCentral()
8
    mavenLocal()
9
}
10
dependencies {
11
    implementation("org.jetbrains.kotlin:kotlin-bom")
12
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") 
13
    implementation("net.openhft.chronicle:chronicle-queue:5.19.8")   // line 17
14
    implementation("org.apache.logging.log4j:log4j-sl4fj18-impl:2.13.1")
15
} 
16
application {
17
    mainClass = "hello.AppKt"
18
} 
19
tasks.withType<KotlinCompile> {          // line 25
20
    kotlinOptions.jvmTarget = "1.8"
21
}



Importing KotlinCompile (line 1) allows specifying Java 1.8 as the compilation target (lines 25-27). Lines 17-18 show the additional dependencies you'd need to get started with Chronicle Queue. Note that build.gradle.kts assumes the package to use is hello. Let's turn to the code demonstrating Chronicle Queue usage:

Kotlin
 




xxxxxxxxxx
1
17


 
1
package hello
2
import net.openhft.chronicle.queue.ChronicleQueue
3
fun main(args: Array<String>) {
4
    val q: ChronicleQueue = ChronicleQueue.single("./build/hello-world")
4
    val q: ChronicleQueue = ChronicleQueue.single("./build/hello-world")
5
    try {
6
        val appender: ExcerptAppender = q.acquireAppender()
7
        appender.writeText("Hello, World!")
8
        val tailer: ExcerptTailer = q.createTailer()
9
        println(tailer.readText())
10
    } finally {
11
      q.close()
12
    }
13
}



ChronicleQueue.single(<path>) returns a new ChronicleQueue that uses the given path for storing the excerpts. The rest of the code is pretty much self-explanatory: the acquired appender appends the excerpt"Hello, World!" to the queue; the tailer reads from the queue and prints the excerpt to standard output. The queue must always be closed at the end of the program.

Remember that the Chronicle Queues are durable? Comment out two appender lines and run the code again with gradle run. You'll see that the program outputs Hello, World! again in the standard output: the tailer is reading from the queue that was written in the previous run. Such durability allows replaying incoming excerpts when tailers crash.

Detour: Excerpt types

Chronicle Queue only accepts the following types as excerpts:

  1. Serializable objects: note that serializing such objects are inefficient due to reliance on reflection
  2. Externalizable objects: if compatibility with Java is important but at the expense of handwritten logic
  3. net.openhft.chronicle.wire.Marshallable objects: high-performance data exchange using binary formats
  4. net.openhft.chronicle.bytes.BytesMarshallable objects: low-level binary or text encoding

As "Hello, World!" has already demonstrated strings, we detour a little and look at an example using Marshallable offered in the Chronicle Wire library.

Kotlin
 




xxxxxxxxxx
1
24


 
1
package types
2
import net.openhft.chronicle.wire.Marshallable
3
import net.openhft.chronicle.wire.SelfDescribingMarshallable 
4
class Person(val name: String, val age: Int): SelfDescribingMarshallable() 
5
fun main(args: Array<String>) {
6
    val person = Person("Shaolang", 3)
7
    val outputString = """
8
    !types.Person {
9
      name: Shaolang
10
      age: 3
11
    }
12
    """.trimIndent()
13
    println(person.toString() == outputString)
14
    val p = Marshallable.fromString<Person>(outputString)
15
    println(person == p)
16
    println(person.hashCode() == p.hashCode())
17
}



You'll see three true printed to standard output when you run the snippet above. SelfDescribingMarshallable makes it effortless to make a class Marshallable for persistence in Chronicle Queue.

Writing and Reading Domain Objects

With the knowledge from the small detour tucked under our belt, the following demonstrates the writing and reading of Marshallable objects to and from Chronicle Queue:

Kotlin
 




xxxxxxxxxx
1
30


 
1
package docs
2
import net.openhft.chronicle.queue.ChronicleQueue
3
import net.openhft.chronicle.wire.SelfDescribingMarshallable 
4
class Person(var name: String? = null, var age: Int? = null): SelfDescribingMarshallable() 
5
class Food(var name: String? = null): SelfDescribingMarshallable() 
6
fun main(args: Array<String>) {
7
    ChronicleQueue.single("./build/documents").use { q ->
8
        val appender = q.acquireAppender()
9
        appender.writeDocument(Person("Shaolang", 3))
10
        appender.writeText("Hello, World!")
11
        appender.writeDocument(Food("Burger"))
12
        val tailer = q.createTailer()
13
        val person = Person()
14
        tailer.readDocument(person)
15
        println(person)
16
        println("${tailer.readText()}\n")
17
        val food = Food()
18
        tailer.readDocument(food)
19
        println(food)
20
    }
21
}



Although it’ll make more sense to run the appender and tailer in different VM processes, keeping both in the same VM makes it much simpler to understand the discussion without having to sieve through non-related code.  After running the above, you should see the following printed out:

Kotlin
 




xxxxxxxxxx
1
10
9


 
1
!docs.Person {
2
  name: Shaolang,
3
  age: 3
4
}
5
Hello, World!
6
!docs.Food {
7
  name: Burger,
8
}



There are a few things to note:

  1. Because the Chronicle Queue aims to generate no garbage, it requires the domain model to be a mutable object; this is why the two classes uses var instead of val in their constructors.
  2. Chronicle Queue allows appenders to write different things to the same queue.
  3. Tailers need to know what it should be reading to get the proper result back.

If we were to change the last tailer.readDocument(food) to tailer.readDocument(person) and print out person instead, we'll see the following printed (at least as of Chronicle Queue 5.19.x, it doesn’t crash/throw any exceptions):

Kotlin
 




xxxxxxxxxx
1


 
1
!docs.Person {
2
  name: Burger,
3
  age: !!null ""
4
}



Because both Person and Food have an attribute with the same name, Chronicle Queue hydrates Person with whatever it could and leave the others blank.

That last point on tailers needing to know what they're reading is troubling: they are now ladened with the burden of filtering things they want to be notified from the avalanche of data the producer keeps throwing at them. To keep our codebases sane, we need to use the observer pattern.

(Kinda) Listening Only to Things You're Interested in

Other than using the excerpt appender directly, another way is to make it reify the first-class given to its methodWriter. The following snippet focuses on this reifying of the given listener:

Kotlin
 




xxxxxxxxxx
1
23


 
1
package listener
2
import net.openhft.chronicle.queue.ChronicleQueue
3
import net.openhft.chronicle.queue.ChronicleReaderMain
4
import net.openhft.chronicle.wire.SelfDescribingMarshallable
5
class Person(var name: String? = null, var age: Int? = null): SelfDescribingMarshallable()
6
interface PersonListener {
7
    fun onPerson(person: Person)
8
}
9
fun main(args: Array<String>) {
10
    val directory = "./build/listener"
11
    ChronicleQueue.single(directory).use { q ->
12
        val observable: PersonListener = q.acquireAppender()
13
              .methodWriter(PersonListener::class.java)
14
        observable.onPerson(Person("Shaolang", 3))
15
        observable.onPerson(Person("Elliot", 4))
16
    }
17
 
           
18
     ChronicleReaderMain.main(arrayOf("-d", directory))
19
}



Lines 17-18 invoke methodWriter with the given PersonListener on the acquired appender. Notice that the type assigned to observable is PersonListener, not ExcerptAppender. Now, any calls to the methods in PersonListener writes the given argument to the queue. However, there's a difference in writing to the queue using the appender directly and using a reified class. To see the difference, we'll use ChronicleReaderMain to examine the queue:

Kotlin
 




xxxxxxxxxx
1
11


 
1
0x47c900000000:
2
onPerson {
3
  name: Shaolang,
4
  age: 3
5
}
6
0x47c900000001:
7
onPerson {
8
  name: Elliot,
9
  age: 4
10
}



Notice that instead of !listener.Person { ... }, reified classes write excerpts using onPerson {...} to the queue. This difference allows tailers that implement PersonListener to be notified of new Person objects written to the queue and ignore others that they aren't interested in.

Yup, you've read that right: tailers that implement PersonListener. Unfortunately, Chronicle Queue (kinda) conflates observables and observers, thus making it a little hard in distinguishing observables from observers. I think the easiest way to tell the difference is to use the heuristics as shown in the following snippet's comments:

Kotlin
 




xxxxxxxxxx
1
22


1
interface PersonListener {
2
    onPerson(person: Person)
3
}
4
// this is an observer because it implements the listener interface
5
class PersonRegistry: PersonListener {
6
    override fun onPerson(person: Person) {
7
        // code omitted for brevity
8
    }
9
}
10
fun main(args: Array<String>) {
11
    // code omitted for brevity
12
    val observable: PersonListener = q.acquireAppender()    // this is an
13
            .methodWriter(PersonListener::class.java)       // observable
14
    // another way to differentiate: the observer will never call the
15
    // listener method, only observables do.
16
    observable.onPerson(Person("Shaolang", 3))
17
    // code omitted for brevity
18
}



Let's turn our focus to tailers. Even though the Chronicle Queue ensures that every tailer sees every excerpt, tailers can filter only excerpts that they want to see by implementing the listener class/interface and creating a net.openhft.chronicle.bytes.MethodReader with the implemented listener:

Kotlin
 




xxxxxxxxxx
1
37


1
package listener
2
import net.openhft.chronicle.bytes.MethodReader
3
import net.openhft.chronicle.queue.ChronicleQueue
4
import net.openhft.chronicle.wire.SelfDescribingMarshallable 
5
class Person(var name: String? = null, var age: Int? = null): SelfDescribingMarshallable() 
6
class Food(var name: String? = null): SelfDescribingMarshallable() 
7
interface PersonListener {
8
    fun onPerson(person: Person)
9
}
10
class PersonRegistry: PersonListener {
11
    override fun onPerson(person: Person) {
12
        println("in registry: ${person.name}")
13
    }
14
}
15
fun main(args: Array<String>) {
16
    ChronicleQueue.single("./build/listener2").use { q ->
17
        val appender = q.acquireAppender()
18
        val writer: PersonListener = appender.methodWriter(PersonListener::class.java)
19
        writer.onPerson(Person("Shaolang", 3))
20
        appender.writeDocument(Food("Burger"))
21
        writer.onPerson(Person("Elliot", 4))
22
        val registry: PersonRegistry = PersonRegistry()
23
        val reader: MethodReader = q.createTailer().methodReader(registry)
24
        reader.readOne()
25
        reader.readOne()
26
        reader.readOne()
27
    }
28
}



What's largely new to this is the implementation of PersonRegistry that simply prints the name of the person it is given. Instead of reading off the queue using an ExcerptTailer directly, the snippet creates a MethodReader from the tailer with the given instantiated PersonRegistry

Unlike.methodWriter that accepts Class, .methodReader expects objects. The appender writes three excerpts to the queue: person (via a call to onPerson), food (via .writeDocument), and person. Because tailers see every excerpt, the reader also makes three calls to "read" all excerpts, but you'll see only two outputs:

in registry: Shaolang
in registry: Elliot 

If only there were only two .readOne() calls instead of three, the output will not include in registry: Elliot.

MethodReader Uses Duck Typing

Remember the outputs from ChronicleReaderMain when we examined the queue that's populated by the reified PersonListener? Instead of a class name, the outputs are similar to onPerson { ... }. That suggests MethodReader filters excerpts that match the method signature, i.e., it doesn't care about the interface/class that contains the method signature; or simply put, duck typing:

Kotlin
 




xxxxxxxxxx
1
32


 
1
package duck
2
import net.openhft.chronicle.queue.ChronicleQueue
3
import net.openhft.chronicle.wire.SelfDescribingMarshallable
4
class Person(var name: String? = null, var age: Int? = null): SelfDescribingMarshallabl()
5
interface PersonListener {
6
    fun onPerson(person: Person)
7
}
8
interface VIPListener {
9
    fun onPerson(person: Person)
10
}
11
class VIPClub: VIPListener {
12
    override fun onPerson(person: Person) {
13
        println("Welcome to the club, ${person.name}!")
14
    }
15
}
16
fun main(args: Array<String>) {
17
    ChronicleQueue.single("./build/duck").use { q ->
18
        val writer = q.acquireAppender().methodWriter(PersonListener::class.java)
19
        writer.onPerson(Person("Shaolang", 3))
20
        val club = VIPClub()
21
        val reader = q.createTailer().methodReader(club)
22
        reader.readOne()
23
    }
24
}



Notice that VIPClub implements VIPListener that happens to have the same onPerson method signature as PersonListener. When you run the above, you'll see Welcome to the club, Shaolang! printed.

Named Tailers

In all demonstrations so far, we've been creating anonymous tailers. Because they are anonymous, every (re-)run results in reading all excerpts in the queue. Sometimes, such behavior is acceptable, or even desirable, but there are times it doesn't. To pick up reading where it last stopped is done simply by naming the tailer:

Kotlin
 




xxxxxxxxxx
1
30


 
1
package restartable
2
 
           
3
import net.openhft.chronicle.queue.ChronicleQueue
4
import net.openhft.chronicle.queue.ExcerptTailer 
5
 
           
6
fun readQueue(tailerName: String, times: Int) {
7
    ChronicleQueue.single("./build/restartable").use { q ->
8
        val tailer = q.createTailer(tailerName)       // tailer name given
9
        for (_n in 1..times) {
10
          println("$tailerName: ${tailer.readText()}")
11
        }
12
 
           
13
        println()       // to separate outputs for easier visualization
14
    }
15
}
16
 
           
17
fun main(args: Array<String>) {
18
    ChronicleQueue.single("./build/restartable").use { q ->
19
        val appender = q.acquireAppender()
20
        appender.writeText("Test Message 1")
21
        appender.writeText("Test Message 2")
22
        appender.writeText("Test Message 3")
23
        appender.writeText("Test Message 4")
24
    }
25
 
           
26
    readQueue("foo", 1)
27
    readQueue("bar", 2)
28
    readQueue("foo", 3)
29
    readQueue("bar", 1)
30
}



Notice that the tailer's name is given to createTailer method. The code above has two tailers-unimaginatively named foo and bar -reading off the queue and outputs the following when running:

Kotlin
 




xxxxxxxxxx
1
10


 
1
foo: Test Message 1
2
 
           
3
bar: Test Message 1
4
bar: Test Message 2
5
 
           
6
foo: Test Message 2
7
foo: Test Message 3
8
foo: Test Message 4
9
 
           
10
bar: Test Message 3 



Notice that the second time foo and bar read from the queue, they pick up from where they've left previously.

Roll 'Em

Chronicle Queue rolls the file it uses based on the rolling cycle defined when the queue is created; by default, it rolls the file daily. To change the rolling cycle, we cannot use the simple ChronicleQueue.single method anymore:

Kotlin
 




xxxxxxxxxx
1
13


 
1
package roll
2
 
           
3
import net.openhft.chronicle.queue.ChronicleQueue
4
import net.openhft.chronicle.queue.RollCycles
5
import net.openhft.chronicle.impl.single.SingleChronicleQueueBuilder
6
 
           
7
fun main(args: Array<String>) {
8
    var qbuilder: SingleChronicleQueueBuilder = ChronicleQueue.singleBuilder("./build/roll")
9
 
           
10
    qbuilder.rollCycle(RollCycles.HOURLY)
11
    val q: ChronicleQueue = qbuilder.build()
12
    // code omitted for brevity
13
}



First, we get an instance of SingleChronicleQueueBuilder and set the rolling cycle with.rollCycle the method. The snippet above configures the queue to roll the file hourly. When we are happy with the configuration, call.build() on the builder to get an instantiated ChronicleQueue. Note that both appender and tailer(s) must use the same roll cycle when accessing the same queue.

As SingleChronicleQueueBuilder supports the fluent interface, the code could also be simplified as follows:

Kotlin
 




xxxxxxxxxx
1


 
1
val q: ChronicleQueue = ChronicleQueue.singleBuilder("./build/roll")
2
                                      .rollCycle(RollCycles.HOURLY)
3
                                      .build() 



What's Next

This post covers the Chronicle Queue terminology and basics. The following sites have more information to dig from:

  1. Chronicle Queue GitHub repository
  2. Stack Overflow tagged questions
  3. Peter Lawrey's Blog

Have fun!

Topics:
chronicle wire, java, kotlin, tutorial

Published at DZone with permission of Shaolang Ai . See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}