DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Cutting-Edge Object Detection for Autonomous Vehicles: Advanced Transformers and Multi-Sensor Fusion
  • Writing DTOs With Java8, Lombok, and Java14+
  • Graph API for Entra ID (Azure AD) Object Management
  • A Comprehensive Guide to IAM in Object Storage

Trending

  • How AI Agents Are Transforming Enterprise Automation Architecture
  • Understanding IEEE 802.11(Wi-Fi) Encryption and Authentication: Write Your Own Custom Packet Sniffer
  • Build Your First AI Model in Python: A Beginner's Guide (1 of 3)
  • Unlocking AI Coding Assistants: Generate Unit Tests
  1. DZone
  2. Coding
  3. Languages
  4. Getting Started With Chronicle Queue

Getting Started With Chronicle Queue

Applications built on Chronicle Queue cannot tell the producer to slow down in putting messages onto the queue (no back-pressure mechanics).

By 
Shaolang Ai user avatar
Shaolang Ai
·
Updated Aug. 19, 22 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
8.3K Views

Join the DZone community and get the full member experience.

Join For Free

Chronicle Queue is low-latency, broker-less, durable message queue. Its closest cousin is probably 0MQ, except that 0MQ doesn't store the messages published and the open-source version of the Chronicle Queue doesn't support cross-machine communication. Chronicle Queue's biggest claim to fame is that it generates no garbage as it uses RandomAccessFile s as off-heap storage.

Chronicle Queue is producer-centric, i.e., applications built on Chronicle Queue cannot tell the producer to slow down in putting messages onto the queue (no back-pressure mechanics). Such design is useful in cases where there is little to no control over the producer's throughput, e.g., FX price updates.

Terminology

Where most message queues use the terms Producer and Consumer, Chronicle Queue uses Appender and Tailer instead to make the distinction that it always appends messages to the queue and it never "destroys/drops" any message after the trailer (read: receiver) reads the message from the queue. And instead of Message, Chronicle Queue prefers the term Excerpt because the blob written to Chronicle Queue can range from byte arrays to strings to domain models.

Hello, World!

Let's use the traditional "Hello, World!" to demonstrate basic usage. Add the following to build.gradle.kts if you are using Gradle:

Kotlin
 
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile     // line 1


plugins {
    id("org.jetbrains.kotlin.jvm") version "1.3.71" 
    application
}

repositories {
    mavenCentral()
    mavenLocal()
}

dependencies {
    implementation("org.jetbrains.kotlin:kotlin-bom")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") 
    implementation("net.openhft.chronicle:chronicle-queue:5.19.8")   // line 17
    implementation("org.apache.logging.log4j:log4j-sl4fj18-impl:2.13.1")
}

application {
    mainClass = "hello.AppKt"
} 

tasks.withType<KotlinCompile> {          // line 25
    kotlinOptions.jvmTarget = "1.8"
}

Importing KotlinCompile (line 1) allows specifying Java 1.8 as the compilation target (lines 25-27). Lines 17-18 show the additional dependencies you'd need to get started with Chronicle Queue. Note that build.gradle.kts assumes the package to use is hello. Let's turn to the code demonstrating Chronicle Queue usage:

Kotlin
 
package hello

import net.openhft.chronicle.queue.ChronicleQueue

fun main(args: Array<String>) {
    val q: ChronicleQueue = ChronicleQueue.single("./build/hello-world")
  
    try {
        val appender: ExcerptAppender = q.acquireAppender()
        appender.writeText("Hello, World!")
      
        val tailer: ExcerptTailer = q.createTailer()
        println(tailer.readText())
    } finally {
      q.close()
    }
}

ChronicleQueue.single(<path>) returns a new ChronicleQueue that uses the given path for storing the excerpts. The rest of the code is pretty much self-explanatory: the acquired appender appends the excerpt"Hello, World!" to the queue; the tailer reads from the queue and prints the excerpt to standard output. The queue must always be closed at the end of the program.

Remember that the Chronicle Queues are durable? Comment out two appender lines and run the code again with gradle run. You'll see that the program outputs Hello, World! again in the standard output: the tailer is reading from the queue that was written in the previous run. Such durability allows replaying incoming excerpts when tailers crash.

Detour: Excerpt Types

Chronicle Queue only accepts the following types as excerpts:

  1. Serializable objects: note that serializing such objects are inefficient due to reliance on reflection
  2. Externalizable objects: if compatibility with Java is important but at the expense of handwritten logic
  3. net.openhft.chronicle.wire.Marshallable objects: high-performance data exchange using binary formats
  4. net.openhft.chronicle.bytes.BytesMarshallable objects: low-level binary or text encoding

As "Hello, World!" has already demonstrated strings, we detour a little and look at an example using Marshallable offered in the Chronicle Wire library.

Kotlin
 
package types

import net.openhft.chronicle.wire.Marshallable
import net.openhft.chronicle.wire.SelfDescribingMarshallable 

class Person(val name: String, val age: Int): SelfDescribingMarshallable() 

fun main(args: Array<String>) {
    val person = Person("Shaolang", 3)
    val outputString = """
    !types.Person {
      name: Shaolang
      age: 3
    }
    """.trimIndent()
  
    println(person.toString() == outputString)
  
    val p = Marshallable.fromString<Person>(outputString)
    println(person == p)
    println(person.hashCode() == p.hashCode())
}


You'll see three true printed to standard output when you run the snippet above. SelfDescribingMarshallable makes it effortless to make a class Marshallable for persistence in Chronicle Queue.

Writing and Reading Domain Objects

With the knowledge from the small detour tucked under our belt, the following demonstrates the writing and reading of Marshallable objects to and from Chronicle Queue:

Kotlin
 
package docs

import net.openhft.chronicle.queue.ChronicleQueue
import net.openhft.chronicle.wire.SelfDescribingMarshallable 

class Person(var name: String? = null, var age: Int? = null): SelfDescribingMarshallable() 

class Food(var name: String? = null): SelfDescribingMarshallable() 

fun main(args: Array<String>) {
    ChronicleQueue.single("./build/documents").use { q ->
        val appender = q.acquireAppender()                                                    
        appender.writeDocument(Person("Shaolang", 3))
        appender.writeText("Hello, World!")
        appender.writeDocument(Food("Burger"))
                                                    
        val tailer = q.createTailer()
                                                    
        val person = Person()                                                    
        tailer.readDocument(person)
        println(person)
        println("${tailer.readText()}\n")
                                                    
        val food = Food()
        tailer.readDocument(food)
        println(food)
    }
}


Although it’ll make more sense to run the appender and tailer in different VM processes, keeping both in the same VM makes it much simpler to understand the discussion without having to sieve through non-related code.  After running the above, you should see the following printed out:

Kotlin
 




xxxxxxxxxx
1
10
9


 
1
!docs.Person {
2
  name: Shaolang,
3
  age: 3
4
}
5
Hello, World!
6
!docs.Food {
7
  name: Burger,
8
}


There are a few things to note:

  1. Because the Chronicle Queue aims to generate no garbage, it requires the domain model to be a mutable object; this is why the two classes uses var instead of val in their constructors.
  2. Chronicle Queue allows appenders to write different things to the same queue.
  3. Tailers need to know what it should be reading to get the proper result back.

If we were to change the last tailer.readDocument(food) to tailer.readDocument(person) and print out person instead, we'll see the following printed (at least as of Chronicle Queue 5.19.x, it doesn’t crash/throw any exceptions):

Kotlin
 




xxxxxxxxxx
1


 
1
!docs.Person {
2
  name: Burger,
3
  age: !!null ""
4
}


Because both Person and Food have an attribute with the same name, Chronicle Queue hydrates Person with whatever it could and leave the others blank.

That last point on tailers needing to know what they're reading is troubling: they are now ladened with the burden of filtering things they want to be notified from the avalanche of data the producer keeps throwing at them. To keep our codebases sane, we need to use the observer pattern.

(Kinda) Listening Only to Things You're Interested in

Other than using the excerpt appender directly, another way is to make it reify the first-class given to its methodWriter. The following snippet focuses on this reifying of the given listener:

Kotlin
 
package listener

import net.openhft.chronicle.queue.ChronicleQueue
import net.openhft.chronicle.queue.ChronicleReaderMain
import net.openhft.chronicle.wire.SelfDescribingMarshallable

class Person(var name: String? = null, var age: Int? = null): SelfDescribingMarshallable()

interface PersonListener {
    fun onPerson(person: Person)
}

fun main(args: Array<String>) {
    val directory = "./build/listener"
  
    ChronicleQueue.single(directory).use { q ->
        val observable: PersonListener = q.acquireAppender()
              .methodWriter(PersonListener::class.java)
        observable.onPerson(Person("Shaolang", 3))
        observable.onPerson(Person("Elliot", 4))
    }

     ChronicleReaderMain.main(arrayOf("-d", directory))
}

Lines 17-18 invoke methodWriter with the given PersonListener on the acquired appender. Notice that the type assigned to observable is PersonListener, not ExcerptAppender. Now, any calls to the methods in PersonListener writes the given argument to the queue. However, there's a difference in writing to the queue using the appender directly and using a reified class. To see the difference, we'll use ChronicleReaderMain to examine the queue:

Kotlin
 




xxxxxxxxxx
1
11


 
1
0x47c900000000:
2
onPerson {
3
  name: Shaolang,
4
  age: 3
5
}
6
0x47c900000001:
7
onPerson {
8
  name: Elliot,
9
  age: 4
10
}


Notice that instead of !listener.Person { ... }, reified classes write excerpts using onPerson {...} to the queue. This difference allows tailers that implement PersonListener to be notified of new Person objects written to the queue and ignore others that they aren't interested in.

Yup, you've read that right: tailers that implement PersonListener. Unfortunately, Chronicle Queue (kinda) conflates observables and observers, thus making it a little hard in distinguishing observables from observers. I think the easiest way to tell the difference is to use the heuristics as shown in the following snippet's comments:

Kotlin
 




xxxxxxxxxx
1
22


 
1
interface PersonListener {
2
    onPerson(person: Person)
3
}
4
// this is an observer because it implements the listener interface
5
class PersonRegistry: PersonListener {
6
    override fun onPerson(person: Person) {
7
        // code omitted for brevity
8
    }
9
}
10
fun main(args: Array<String>) {
11
    // code omitted for brevity
12
    val observable: PersonListener = q.acquireAppender()    // this is an
13
            .methodWriter(PersonListener::class.java)       // observable
14
    // another way to differentiate: the observer will never call the
15
    // listener method, only observables do.
16
    observable.onPerson(Person("Shaolang", 3))
17
    // code omitted for brevity
18
}


Let's turn our focus to tailers. Even though the Chronicle Queue ensures that every tailer sees every excerpt, tailers can filter only excerpts that they want to see by implementing the listener class/interface and creating a net.openhft.chronicle.bytes.MethodReader with the implemented listener:

Kotlin
 
package listener

import net.openhft.chronicle.bytes.MethodReader
import net.openhft.chronicle.queue.ChronicleQueue
import net.openhft.chronicle.wire.SelfDescribingMarshallable 

class Person(var name: String? = null, var age: Int? = null): SelfDescribingMarshallable() 

class Food(var name: String? = null): SelfDescribingMarshallable() 

interface PersonListener {
    fun onPerson(person: Person)
}

class PersonRegistry: PersonListener {
    override fun onPerson(person: Person) {
        println("in registry: ${person.name}")
    }
}

fun main(args: Array<String>) {
    ChronicleQueue.single("./build/listener2").use { q ->
        val appender = q.acquireAppender()
        val writer: PersonListener = appender.methodWriter(PersonListener::class.java)
        writer.onPerson(Person("Shaolang", 3))
        appender.writeDocument(Food("Burger"))
        writer.onPerson(Person("Elliot", 4))
                                                    
        val registry: PersonRegistry = PersonRegistry()
        val reader: MethodReader = q.createTailer().methodReader(registry)
        reader.readOne()
        reader.readOne()
        reader.readOne()
    }
}


What's largely new to this is the implementation of PersonRegistry that simply prints the name of the person it is given. Instead of reading off the queue using an ExcerptTailer directly, the snippet creates a MethodReader from the tailer with the given instantiated PersonRegistry. 

Unlike.methodWriter that accepts Class, .methodReader expects objects. The appender writes three excerpts to the queue: person (via a call to onPerson), food (via .writeDocument), and person. Because tailers see every excerpt, the reader also makes three calls to "read" all excerpts, but you'll see only two outputs:

in registry: Shaolang
in registry: Elliot 

If only there were only two .readOne() calls instead of three, the output will not include in registry: Elliot.

MethodReader Uses Duck Typing

Remember the outputs from ChronicleReaderMain when we examined the queue that's populated by the reified PersonListener? Instead of a class name, the outputs are similar to onPerson { ... }. That suggests MethodReader filters excerpts that match the method signature, i.e., it doesn't care about the interface/class that contains the method signature; or simply put, duck typing:

Kotlin
 
package duck

import net.openhft.chronicle.queue.ChronicleQueue
import net.openhft.chronicle.wire.SelfDescribingMarshallable

class Person(var name: String? = null, var age: Int? = null): SelfDescribingMarshallabl()

interface PersonListener {
    fun onPerson(person: Person)
}

interface VIPListener {
    fun onPerson(person: Person)
}

class VIPClub: VIPListener {
    override fun onPerson(person: Person) {
        println("Welcome to the club, ${person.name}!")
    }
}

fun main(args: Array<String>) {
    ChronicleQueue.single("./build/duck").use { q ->
        val writer = q.acquireAppender().methodWriter(PersonListener::class.java)
        writer.onPerson(Person("Shaolang", 3))
                                               
        val club = VIPClub()
        val reader = q.createTailer().methodReader(club)
        reader.readOne()
    }
}


Notice that VIPClub implements VIPListener that happens to have the same onPerson method signature as PersonListener. When you run the above, you'll see Welcome to the club, Shaolang! printed.

Named Tailers

In all demonstrations so far, we've been creating anonymous tailers. Because they are anonymous, every (re-)run results in reading all excerpts in the queue. Sometimes, such behavior is acceptable, or even desirable, but there are times it doesn't. To pick up reading where it last stopped is done simply by naming the tailer:

Kotlin
 




xxxxxxxxxx
1
30


 
1
package restartable
2
 
           
3
import net.openhft.chronicle.queue.ChronicleQueue
4
import net.openhft.chronicle.queue.ExcerptTailer 
5
 
           
6
fun readQueue(tailerName: String, times: Int) {
7
    ChronicleQueue.single("./build/restartable").use { q ->
8
        val tailer = q.createTailer(tailerName)       // tailer name given
9
        for (_n in 1..times) {
10
          println("$tailerName: ${tailer.readText()}")
11
        }
12
 
           
13
        println()       // to separate outputs for easier visualization
14
    }
15
}
16
 
           
17
fun main(args: Array<String>) {
18
    ChronicleQueue.single("./build/restartable").use { q ->
19
        val appender = q.acquireAppender()
20
        appender.writeText("Test Message 1")
21
        appender.writeText("Test Message 2")
22
        appender.writeText("Test Message 3")
23
        appender.writeText("Test Message 4")
24
    }
25
 
           
26
    readQueue("foo", 1)
27
    readQueue("bar", 2)
28
    readQueue("foo", 3)
29
    readQueue("bar", 1)
30
}



Notice that the tailer's name is given to createTailer method. The code above has two tailers-unimaginatively named foo and bar -reading off the queue and outputs the following when running:

Kotlin
 




xxxxxxxxxx
1
10


 
1
foo: Test Message 1
2
 
           
3
bar: Test Message 1
4
bar: Test Message 2
5
 
           
6
foo: Test Message 2
7
foo: Test Message 3
8
foo: Test Message 4
9
 
           
10
bar: Test Message 3 



Notice that the second time foo and bar read from the queue, they pick up from where they've left previously.

Roll 'Em

Chronicle Queue rolls the file it uses based on the rolling cycle defined when the queue is created; by default, it rolls the file daily. To change the rolling cycle, we cannot use the simple ChronicleQueue.single method anymore:

Kotlin
 




xxxxxxxxxx
1
13


 
1
package roll
2
 
           
3
import net.openhft.chronicle.queue.ChronicleQueue
4
import net.openhft.chronicle.queue.RollCycles
5
import net.openhft.chronicle.impl.single.SingleChronicleQueueBuilder
6
 
           
7
fun main(args: Array<String>) {
8
    var qbuilder: SingleChronicleQueueBuilder = ChronicleQueue.singleBuilder("./build/roll")
9
 
           
10
    qbuilder.rollCycle(RollCycles.HOURLY)
11
    val q: ChronicleQueue = qbuilder.build()
12
    // code omitted for brevity
13
}



First, we get an instance of SingleChronicleQueueBuilder and set the rolling cycle with.rollCycle the method. The snippet above configures the queue to roll the file hourly. When we are happy with the configuration, call.build() on the builder to get an instantiated ChronicleQueue. Note that both appender and tailer(s) must use the same roll cycle when accessing the same queue.

As SingleChronicleQueueBuilder supports the fluent interface, the code could also be simplified as follows:

Kotlin
 




xxxxxxxxxx
1


 
1
val q: ChronicleQueue = ChronicleQueue.singleBuilder("./build/roll")
2
                                      .rollCycle(RollCycles.HOURLY)
3
                                      .build() 



What's Next

This post covers the Chronicle Queue terminology and basics. The following sites have more information to dig from:

  1. Chronicle Queue GitHub repository
  2. Stack Overflow tagged questions
  3. Peter Lawrey's Blog

Have fun!

Kotlin (programming language) Object (computer science)

Published at DZone with permission of Shaolang Ai. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Cutting-Edge Object Detection for Autonomous Vehicles: Advanced Transformers and Multi-Sensor Fusion
  • Writing DTOs With Java8, Lombok, and Java14+
  • Graph API for Entra ID (Azure AD) Object Management
  • A Comprehensive Guide to IAM in Object Storage

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!