Modern Microservices, Part 5: Persistence and Packaging
In this guide, learn about adding persistence to our microservice powered by Postgres and package it for easy distribution via Docker Compose.
Join the DZone community and get the full member experience.
Join For FreeWe have a somewhat bare-bones chat service in our series so far. Our service exposes endpoints for managing topics and letting users post messages in topics. For a demo, we have been using a makeshift in-memory store that shamelessly provides no durability guarantees. A basic and essential building block in any (web) service is a data store (for storing, organizing, and retrieving data securely and efficiently). In this tutorial, we will improve the durability, organization, and persistence of data by introducing a database. There are several choices of databases: in-memory (a very basic form of which we have used earlier), object-oriented databases, key-value stores, relational databases, and more. We will not repeat an in-depth comparison of these here and instead defer to others.
Furthermore, in this article, we will use a relational (SQL) database as our underlying data store. We will use the popular GORM library (an ORM framework) to simplify access to our database. There are several relational databases available, both free as well as commercial. We will use Postgres (a very popular, free, lightweight, and easy-to-manage database) for our service. Postgres is also an ideal choice for a primary source-of-truth data store because of the strong durability and consistency guarantees it provides.
Setting Up the Database
A typical pattern when using a database in a service is:
|---------------| |-----------| |------------| |------|
| Request Proto | <-> | Service | <-> | ORM/SQL | <-> | DB |
|---------------| |-----------| |------------| |------|
- A gRPC request is received by the service (we have not shown the REST Gateway here).
- The service converts the model proto (e.g.,
Topic
) contained in the request (e.g.,CreateTopicRequest
) into the ORM library. - The ORM library generates the necessary SQL and executes it on the DB (and returns any results).
Setting Up Postgres
We could go the traditional way of installing Postgres (by downloading and installing its binaries for the specific platforms). However, this is complicated and brittle. Instead, we will start using Docker (and Docker Compose) going forward for a compact developer-friendly setup.
Set Up Docker
Set up Docker Desktop for your platform following the instructions.
Add Postgres to Docker Compose
Now that Docker is set up, we can add different containers to this so we can build out the various components and services OneHub requires.
version: '3.9'
services:
pgadmin:
image: dpage/pgadmin4
ports:
- ${PGADMIN_LISTEN_PORT}:${PGADMIN_LISTEN_PORT}
environment:
PGADMIN_LISTEN_PORT: ${PGADMIN_LISTEN_PORT}
PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL}
PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD}
volumes:
- ./.pgadmin:/var/lib/pgadmin
postgres:
image: postgres:15.3
environment:
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- ./.pgdata:/var/lib/postgresql/data
ports:
- 5432:5432
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 5
That's it. A few key things to note are:
- The Docker Compose file is an easy way to get started with containers - especially on a single host without needing complicated orchestration engines (hint: Kubernetes).
- The main part of Docker Compose files are the
service
sections that describe the containers for each of the services that Docker Compose will be executing as a "single unit in a private network." This is a great way to package multiple related services needed for an application and bring them all up and down in one step instead of having to manage them one by one individually. The latter is not just cumbersome, but also error-prone (manual dependency management, logging, port checking, etc). - For now, we have added one service -
postgres
- running on port5432
. - Since the services are running in an isolated context, environment variables can be set to initialize/control the behavior of the services. These environment variables are read from a specific .env file (below). This file can also be passed as a CLI flag or as a parameter, but for now, we are using the default .env file. Some configuration parameters here are the Postgres username, password, and database name.
- .env:
POSTGRES_DB=onehubdb
POSTGRES_USER=postgres
POSTGRES_PASSWORD=docker
ONEHUB_DB_ENDOINT=postgres://postgres:docker@postgres:5432/onehubdb
PGADMIN_LISTEN_PORT=5480
PGADMIN_DEFAULT_EMAIL=admin@onehub.com
PGADMIN_DEFAULT_PASSWORD=password
- All data in a container is transient and is lost when the container is shut down. In order to make our database durable, we will store the data outside the container and map it as a volume. This way from within the container, Postgres will read/write to its local directory (
/var/lib/postgresql/data
) even though all reads/writes are sent to the host's file system (./.pgdata
) - Another great benefit of using Docker is that all the ports used by the different services are "internal" to the network that Docker creates. This means the same
postgres
service (which runs on port5432
) can be run on multiple Docker environments without having their ports changed or checked for conflicts. This works because, by default, ports used inside a Docker environment are not exposed outside the Docker environment. Here we have chosen to expose port 5432 explicitly in theports
section of docker-compose.yml.
That's it. Go ahead and bring it up:
docker compose up
If all goes well, you should see a new Postgres database created and initialized with our username, password, and DB parameters from the .env file. The database is now ready:
onehub-postgres-1 | 2023-07-28 22:52:32.199 UTC [1] LOG: starting PostgreSQL 15.3 (Debian 15.3-1.pgdg120+1) on aarch64-unknown-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit
onehub-postgres-1 | 2023-07-28 22:52:32.204 UTC [1] LOG: listening on IPv4 address "0.0.0.0", port 5432
onehub-postgres-1 | 2023-07-28 22:52:32.204 UTC [1] LOG: listening on IPv6 address "::", port 5432
onehub-postgres-1 | 2023-07-28 22:52:32.209 UTC [1] LOG: listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
onehub-postgres-1 | 2023-07-28 22:52:32.235 UTC [78] LOG: database system was shut down at 2023-07-28 22:52:32 UTC
onehub-postgres-1 | 2023-07-28 22:52:32.253 UTC [1] LOG: database system is ready to accept connections
The OneHub Docker application should now show up on the Docker desktop and should look something like this:
(Optional) Setup a DB Admin Interface
If you would like to query or interact with the database (outside code), pgAdmin and adminer are great tools. They can be downloaded as native application binaries, installed locally, and played. This is a great option if you would like to manage multiple databases (e.g., across multiple Docker environments).
... Alternatively ...
If it is for this single project and downloading yet another (native app) binary is undesirable, why not just include it as a service within Docker itself!? With that added, our docker-compose.yml now looks like this:
- docker-compose.yml:
version: '3.9'
services:
pgadmin:
image: dpage/pgadmin4
ports:
- ${PGADMIN_LISTEN_PORT}:${PGADMIN_LISTEN_PORT}
environment:
PGADMIN_LISTEN_PORT: ${PGADMIN_LISTEN_PORT}
PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL}
PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD}
volumes:
- ./.pgadmin:/var/lib/pgadmin
postgres:
image: postgres:15.3
environment:
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- ./.pgdata:/var/lib/postgresql/data
ports:
- 5432:5432
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 5
The accompanying environment variables are in our .env file:
- .env:
POSTGRES_DB=onehubdb
POSTGRES_USER=postgres
POSTGRES_PASSWORD=docker
ONEHUB_DB_ENDOINT=postgres://postgres:docker@postgres:5432/onehubdb
PGADMIN_LISTEN_PORT=5480
PGADMIN_DEFAULT_EMAIL=admin@onehub.com
PGADMIN_DEFAULT_PASSWORD=password
Now you can simply visit the pgAdmin web console on your browser. Use the email and password specified in the .env file and off you go! To connect to the Postgres instance running in the Docker environment, simply create a connection to postgres
(NOTE: container local DNS names within the Docker environment are the service names themselves).
- On the left-side Object Explorer panel, (right) click on
Servers >> Register >> Server...
and give a name to your server ("postgres").
- In the Connection tab, use the hostname "postgres" and set the names of the database, username, and password as set in the .env file for the
POSTGRES_DB
,POSTGRES_USER
, andPOSTGRES_PASSWORD
variables respectively.
- Click Save, and off you go!
Introducing Object Relational Mappers (ORMs)
Before we start updating our service code to access the database, you may be wondering why the gRPC service itself is not packaged in our docker-compose.yml file. Without this, we would still have to start our service from the command line (or a debugger). This will be detailed in a future post.
In a typical database, initialization (after the user and DB setup) would entail creating and running SQL scripts to create tables, checking for new versions, and so on. One example of a table creation statement (that can be executed via psql
or pgadmin
) is:
CREATE TABLE topics (
id STRING NOT NULL PRIMARY KEY,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
name STRING NOT NULL,
users TEXT[],
);
Similarly, an insertion would also have been manual construction of SQL statements, e.g.:
INSERT INTO topics
( id, name )
VALUES ( "1", "Taylor Swift" );
... followed by a verification of the saved results:
select * from topics ;
This can get pretty tedious (and error-prone with vulnerability to SQL injection attacks). SQL expertise is highly valuable but seldom feasible - especially being fluent with the different standards, different vendors, etc. Even though Postgres does a great job in being as standards-compliant as possible - for developers - some ease of use with databases is highly desirable.
Here ORM libraries are indispensable, especially for developers not dealing with SQL on a regular basis (e.g., yours truly). ORM (Object Relational Mappers) provide an object-like interface to a relational database. This simplifies access to data in our tables (i.e., rows) as application-level classes (Data Access Objects). Table creations and migrations can also be managed by ORM libraries. Behind the scenes, ORM libraries are generating and executing SQL queries on the underlying databases they accessing.
There are downsides to using an ORM:
- ORMs still incur a learning cost for developers during adoption. Interface design choices can play a role in impacting developer productivity.
- ORMs can be thought of as a schema compiler. The underlying SQL generated by them may not be straightforward or efficient. This results in ORM access to a database being slower than raw SQL, especially for complex queries. However, for complex queries or complex data pattern accesses, other scalability techniques may need to be applied (e.g., sharding, denormalization, etc.).
- The queries generated by ORMs may not be clear or straightforward, resulting in increased debugging times on slow or complex queries.
Despite these downsides, ORMs can be put to good use when not overly relied upon. We shall use a popular ORM library, GORM. GORM comes with a great set of examples and documentation and the quick start is a great starting point.
Create DB Models
GORM models are our DB models. GORM models are simple Golang structs with struct tags on each member to identify the member's database type. Our User
, Topic
and Message
models are simply this:
Topic, Message, User Models
package datastore
import (
"time"
"github.com/lib/pq"
)
type BaseModel struct {
CreatedAt time.Time
UpdatedAt time.Time
Id string `gorm:"primaryKey"`
Version int // used for optimistic locking
}
type User struct {
BaseModel
Name string
Avatar string
ProfileData map[string]interface{} `gorm:"type:json"`
}
type Topic struct {
BaseModel
CreatorId string
Name string `gorm:"index:SortedByName"`
Users pq.StringArray `gorm:"type:text[]"`
}
type Message struct {
BaseModel
ParentId string
TopicId string `gorm:"index:SortedByTopicAndCreation,priority:1"`
CreatedAt time.Time `gorm:"index:SortedByTopicAndCreation,priority:2"`
SourceId string
UserId string
ContentType string
ContentText string
ContentData map[string]interface{} `gorm:"type:json"`
}
Why are these models needed when we have already defined models in our .proto files? Recall that the models we use need to reflect the domain they are operating in. For example, our gRPC structs (in .proto files) reflect the models and programming models from the application's perspective. If/When we build a UI, view-models would reflect the UI/view perspectives (e.g., a FrontPage
view model could be a merge of multiple data models).
Similarly, when storing data in a database, the models need to convey intent and type information that can be understood and processed by the database. This is why GORM expects data models to have annotations on its (struct) member variables to convey database-specific information like column types, index definitions, index column orderings, etc. A good example of this in our data model is the SortByTopicAndCreation
index (which, as the name suggests, helps us list topics sorted by their creation timestamp).
Database indexes are one or more (re)organizations of data in a database that speed up retrievals of certain queries (at the cost of increased write times and storage space). We won't go into indexes deeply. There are fantastic resources that offer a deep dive into the various internals of database systems in great detail (and would be highly recommended).
The increased writes and storage space must be considered when creating more indexes in a database. We have (in our service) been mindful about creating more indexes and kept these to the bare minimum (to suit certain types of queries). As we scale our services (in future posts) we will revisit how to address these costs by exploring asynchronous and distributed index-building techniques.
Data Access Layer Conventions
We now have DB models. We could at this point directly call the GORM APIs from our service implementation to read and write data from our (Postgres) database; but first, a brief detail on the conventions we have decided to choose.
Motivations
Database use can be thought of as being in two extreme spectrums:
On the one hand, a "database" can be treated as a better filesystem with objects written by some key to prevent data loss. Any structure, consistency guarantees, optimization, or indexes are fully the responsibility of the application layer. This gets very complicated, error-prone, and hard very fast.
On the other extreme, use the database engine as the undisputed brain (the kitchen sink) of your application. Every data access for every view in your application is offered (only) by one or very few (possibly complex) queries. This view, while localizing data access in a single place, also makes the database a bottleneck and its scalability daunting. In reality, vertical scaling (provisioning beefier machines) is the easiest, but most expensive solution - which most vendors will happily recommend in such cases. Horizontal scaling (getting more machines) is hard as increased data coupling and probabilities of node failures (network partitions) mean more complicated and careful tradeoffs between consistency and availability.
Our sweet spot is somewhere in between. While ORMs (like GORM) provide an almost 1:1 interface compatibility between SQL and the application needs, being judicious with SQL remains advantageous and should be based on the (data and operational) needs of the application. For our chat application, some desirable (data) traits are:
- Messages from users must not be lost (durability).
- Ordering of messages is important (within a topic).
- Few standard query types:
- CRUD on Users, Topics, and Messages
- Message ordering by timestamp but limited to either within a topic or by a user (for last N messages)
Given our data "shapes" are simple and given the read usage of our system is much higher especially given the read/write application (i.e .,1 message posted is read by many participants on a Topic), we are choosing to optimize for write consistency, simplicity and read availability, within a reasonable latency).
Now we are ready to look at the query patterns/conventions.
Unified Database Object
First, we will add a simple data access layer that will encapsulate all the calls to the database for each particular model (topic, messages, users). Let us create an overarching "DB" object that represents our Postgres DB (in db/db.go):
type OneHubDB struct {
storage *gorm.DB
}
This tells GORM that we have a database object (possibly with a connection) to the underlying DB. The Topic Store, User Store, and Message Store modules all operate on this single DB instance (via GORM) to read/write data from their respective tables (topics, users, messages). Note that this is just one possible convention. We could have instead used three different DB (gorm.DB
) instances, one for each entity type: e.g., TopicDB
, UserDB
, and MessageDB
.
Use Custom IDs Instead of Auto-Generated Ones
We are choosing to generate our own primary key (IDs) for topics, users, and messages instead of depending on the auto-increment (or auto-id) generation by the database engine. This was for the following reasons:
- An auto-generated key is localized to the database instance that generates it. This means if/when we add more partitions to our databases (for horizontal scaling) these keys will need to be synchronized and migrating existing keys to avoid duplications at a global level is much harder.
- Auto increment keys offer reduced randomness, making it easy for attackers to "iterate" through all entities.
- Sometimes we may simply want string keys that are custom assignable if they are available (for SEO purposes).
- Lack of attribution to keys (e.g., a central/global key server can also allow attribution/annotation to keys for analytics purposes).
For these purposes, we have added a GenId
table that keeps track of all used IDs so we can perform collision detection, etc:
type GenId struct {
Class string `gorm:"primaryKey"`
Id string `gorm:"primaryKey"`
CreatedAt time.Time
}
Naturally, this is not a scalable solution when the data volume is large, but suffices for our demo and when needed, we can move this table to a different DB and still preserve the keys/IDs. Note that GenId
itself is also managed by GORM and uses a combination of Class
+ Id
as its primary key. An example of this is Class=Topic and Id=123
.
Random IDs are assigned by the application in a simple manner:
func randid(maxlen int) string {
max_id := int64(math.Pow(36, maxlen))
randval := rand.Int63() % max_id
return strconv.FormatInt(randval, 36)
}
func (tdb *OneHubDB) NextId(cls string) string {
for {
gid := GenId{Id: randid(), Class: cls, CreatedAt: time.Now()}
err := tdb.storage.Create(gid).Error
log.Println("ID Create Error: ", err)
if err == nil {
return gid.Id
}
}
}
- The method
randid
generates amaxlen
-sized string of random characters. This is as simple as(2^63) mod maxid
wheremaxid = 36 ^ maxlen
. - The
NextId
method is used by the different entity create methods (below) to repeatedly generate random IDs if collisions exist. In case you are worried about excessive collisions or are interested in understanding their probabilities, you can learn about them here.
Judicious Use of Indexes
Indexes are very beneficial to speed up certain data retrieval operations at the expense of increased writes and storage. We have limited our use of indexes to a very handful of cases where strong consistency was needed (and could be scaled easily):
- Topics sorted by name (for an alphabetical sorting of topics)
- Messages sorted by the topic and creation time stamps (for the message list natural ordering)
What is the impact of this on our application? Let us find out.
Topic Creations and Indexes
When a topic is created (or it is updated) an index write would be required. Topic creations/updates are relatively low-frequency operations (compared to message postings). So a slightly increased write latency is acceptable. In a more realistic chat application, a topic creation is a bit more heavyweight due to the need to check permissions, apply compliance rules, etc. So this latency hit is acceptable. Furthermore, this index would only be needed when "searching" for topics and even an asynchronous index update would have sufficed.
Message Related Indexes
To consider the usefulness of indexes related to messages, let us look at some usage numbers. This is a very simple application, so these scalability issues most likely won't be a concern (so feel free to skip this section). If your goals are a bit more lofty, looking at Slack's usage numbers we can estimate/project some usage numbers for our own demo to make it interesting:
- Number of daily active topics: 100
- Number of active users per topic: 10
- Message sent by an active user in a topic: Every 5 minutes (assume time to type, read other messages, research, think, etc.)
Thus, the number of messages created each day is:
= 100 * 10 * (1400 minutes in a day / 5 minutes)
= 280k messages per day
~ 3 messages per second
In the context of these numbers, if we were to create a message every 3 seconds, even with an extra index (or three), we can handle this load comfortably in a typical database that can handle 10k IOPS, which is rather modest.
It is easy to wonder if this scales as the number of topics or active users per topic or the creation frenzy increases. Let us consider a more intense setup (in a larger or busier organization). Instead of the numbers above, if we had 10k topics and 100 active users with a message every minute (instead of 5 minutes), our write QPS would be:
WriteQPS:
= 10000 * 100 * 1400 / 1
= 1.4B messages per day
~ 14k messages per second
That is quite a considerable blow-up. We can solve this in a couple of ways:
- Accept a higher latency on writes - For example, instead of requiring a write to happen in a few milliseconds, accept an SLO of, say, 500ms.
- Update indexes asynchronously - This doesn't get us that much further, as the number of writes in a system has not changed - only the when has changed.
- Shard our data
Let us look at sharding! Our write QPS is in aggregate. On a per-topic level, it is quite low (14k/10000 = 1.4 qps). However, user behavior for our application is that such activities on a topic are fairly isolated. We only want our messages to be consistent and ordered within a topic - not globally. We now have the opportunity to dynamically scale our databases (or the Messages
tables) to be partitioned by topic IDs. In fact, we could build a layer (a control plane) that dynamically spins up database shards and moves topics around reacting to load as and when needed. We will not go that extreme here, but this series is tending towards just that especially in the context of SaaS applications.
The _annoyed_ reader
might be wondering if this deep dive was needed right now! Perhaps not - but by understanding our data and user experience needs, we can make careful tradeoffs. Going forward, such mini-dives will benefit us immensely to quickly evaluate tradeoffs (e.g., when building/adding new features).
Store Specific Implementations
Now that we have our basic DB and common methods, we can go to each of the entity methods' implementations. For each of our entity methods, we will create the basic CRUD methods:
Create
Update
Get
Delete
List/Search
The
Create
andUpdate
methods are combined into a single "Save
" method to do the following:- If an ID is not provided then treat it as a create.
- If an ID is provided treat it as an update-or-insert (upsert) operation by using the
NextId
method if necessary.
- Since we have a base model,
Create
andUpdate
will setCreatedAt
andUpdatedAt
fields respectively. - The delete method is straightforward. The only key thing here is instead of leveraging GORM's cascading delete capabilities, we also delete the related entities in a separate call. We will not worry about consistency issues resulting from this (e.g., errors in subsequent delete methods).
- For the
Get
method, we will fetch using a standard GORM get-query-pattern based on a common id column we use for all models. If an entity does not exist, then we return a nil.
Users DB
Our user entity methods are pretty straightforward using the above conventions. The Delete
method additionally also deletes all Messages
for/by the user first before deleting the user itself. This ordering is to ensure that if the deletion of topics fails, then the user deletion won't proceed giving the caller to retry.
package datastore
import (
"errors"
"log"
"strings"
"time"
"gorm.io/gorm"
)
func (tdb *OneHubDB) SaveUser(topic *User) (err error) {
db := tdb.storage
topic.UpdatedAt = time.Now()
if strings.Trim(topic.Id, " ") == "" {
return InvalidIDError
// create a new one
}
result := db.Save(topic)
err = result.Error
if err == nil && result.RowsAffected == 0 {
topic.CreatedAt = time.Now()
err = tdb.storage.Create(topic).Error
}
return
}
func (tdb *OneHubDB) DeleteUser(topicId string) (err error) {
err = tdb.storage.Where("topic_id = ?", topicId).Delete(&Message{}).Error
if err == nil {
err = tdb.storage.Where("id = ?", topicId).Delete(&User{}).Error
}
return
}
func (tdb *OneHubDB) GetUser(id string) (*User, error) {
var out User
err := tdb.storage.First(&out, "id = ?", id).Error
if err != nil {
log.Println("GetUser Error: ", id, err)
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
} else {
return nil, err
}
}
return &out, err
}
func (tdb *OneHubDB) ListUsers(pageKey string, pageSize int) (out []*User, err error) {
query := tdb.storage.Model(&User{}).Order("name asc")
if pageKey != "" {
count := 0
query = query.Offset(count)
}
if pageSize <= 0 || pageSize > tdb.MaxPageSize {
pageSize = tdb.MaxPageSize
}
query = query.Limit(pageSize)
err = query.Find(&out).Error
return out, err
}
Topics DB
Our topic
entity methods are also pretty straightforward using the above conventions. The Delete
method additionally also deletes all messages in the topic first before deleting the user itself. This ordering is to ensure that if the deletion of topics fails then the user deletion won't proceed giving the caller a chance to retry.
package datastore
import (
"errors"
"log"
"strings"
"time"
"gorm.io/gorm"
)
/////////////////////// Topic DB
func (tdb *OneHubDB) SaveTopic(topic *Topic) (err error) {
db := tdb.storage
topic.UpdatedAt = time.Now()
if strings.Trim(topic.Id, " ") == "" {
return InvalidIDError
// create a new one
}
result := db.Save(topic)
err = result.Error
if err == nil && result.RowsAffected == 0 {
topic.CreatedAt = time.Now()
err = tdb.storage.Create(topic).Error
}
return
}
func (tdb *OneHubDB) DeleteTopic(topicId string) (err error) {
err = tdb.storage.Where("topic_id = ?", topicId).Delete(&Message{}).Error
if err == nil {
err = tdb.storage.Where("id = ?", topicId).Delete(&Topic{}).Error
}
return
}
func (tdb *OneHubDB) GetTopic(id string) (*Topic, error) {
var out Topic
err := tdb.storage.First(&out, "id = ?", id).Error
if err != nil {
log.Println("GetTopic Error: ", id, err)
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
} else {
return nil, err
}
}
return &out, err
}
func (tdb *OneHubDB) ListTopics(pageKey string, pageSize int) (out []*Topic, err error) {
query := tdb.storage.Model(&Topic{}).Order("name asc")
if pageKey != "" {
count := 0
query = query.Offset(count)
}
if pageSize <= 0 || pageSize > tdb.MaxPageSize {
pageSize = tdb.MaxPageSize
}
query = query.Limit(pageSize)
err = query.Find(&out).Error
return out, err
}
Messages DB
package datastore
import (
"errors"
"strings"
"time"
"gorm.io/gorm"
)
func (tdb *OneHubDB) GetMessages(topic_id string, user_id string, pageKey string, pageSize int) (out []*Message, err error) {
user_id = strings.Trim(user_id, " ")
topic_id = strings.Trim(topic_id, " ")
if user_id == "" && topic_id == "" {
return nil, errors.New("Either topic_id or user_id or both must be provided")
}
query := tdb.storage
if topic_id != "" {
query = query.Where("topic_id = ?", topic_id)
}
if user_id != "" {
query = query.Where("user_id = ?", user_id)
}
if pageKey != "" {
offset := 0
query = query.Offset(offset)
}
if pageSize <= 0 || pageSize > 10000 {
pageSize = 10000
}
query = query.Limit(pageSize)
err = query.Find(&out).Error
return out, err
}
// Get messages in a topic paginated and ordered by creation time stamp
func (tdb *OneHubDB) ListMessagesInTopic(topic_id string, pageKey string, pageSize int) (out []*Topic, err error) {
err = tdb.storage.Where("topic_id= ?", topic_id).Find(&out).Error
return
}
func (tdb *OneHubDB) GetMessage(msgid string) (*Message, error) {
var out Message
err := tdb.storage.First(&out, "id = ?", msgid).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
} else {
return nil, err
}
}
return &out, err
}
func (tdb *OneHubDB) ListMessages(topic_id string, pageKey string, pageSize int) (out []*Message, err error) {
query := tdb.storage.Where("topic_id = ?").Order("created_at asc")
if pageKey != "" {
count := 0
query = query.Offset(count)
}
if pageSize <= 0 || pageSize > tdb.MaxPageSize {
pageSize = tdb.MaxPageSize
}
query = query.Limit(pageSize)
err = query.Find(&out).Error
return out, err
}
func (tdb *OneHubDB) CreateMessage(msg *Message) (err error) {
msg.CreatedAt = time.Now()
msg.UpdatedAt = time.Now()
result := tdb.storage.Model(&Message{}).Create(msg)
err = result.Error
return
}
func (tdb *OneHubDB) DeleteMessage(msgId string) (err error) {
err = tdb.storage.Where("id = ?", msgId).Delete(&Message{}).Error
return
}
func (tdb *OneHubDB) SaveMessage(msg *Message) (err error) {
db := tdb.storage
q := db.Model(msg).Where("id = ? and version = ?", msg.Id, msg.Version)
msg.UpdatedAt = time.Now()
result := q.UpdateColumns(map[string]interface{}{
"updated_at": msg.UpdatedAt,
"content_type": msg.ContentType,
"content_text": msg.ContentText,
"content_data": msg.ContentData,
"user_id": msg.SourceId,
"source_id": msg.SourceId,
"parent_id": msg.ParentId,
"version": msg.Version + 1,
})
err = result.Error
if err == nil && result.RowsAffected == 0 {
// Must have failed due to versioning
err = MessageUpdateFailed
}
return
}
The Messages
entity methods are slightly more involved. Unlike the other two, Messages
entity methods also include Searching by Topic
and Searching by User
(for ease).
This is done in the GetMessages
method that provides paginated (and ordered) retrieval of messages for a topic or by a user.
Write Converters To/From Service/DB Models
We are almost there. Our database is ready to read/write data. It just needs to be invoked by the service. Going back to our original plan:
|---------------| |-----------| |--------| |------|
| Request Proto | <-> | Service | <-> | GORM | <-> | DB |
|---------------| |-----------| |--------| |------|
We have our service models (generated by protobuf tools) and we have our DB models that GORM understands. We will now add converters to convert between the two. Converters for entity X will follow these conventions:
- A method XToProto of type
func(input *datastore.X) (out *protos.X)
- A method XFromProto of type
func(input *protos.X) (out *datastore.X)
With that one of our converters (for Topics
) is quite simply (and boringly):
package services
import (
"log"
"github.com/lib/pq"
ds "github.com/panyam/onehub/datastore"
protos "github.com/panyam/onehub/gen/go/onehub/v1"
"google.golang.org/protobuf/types/known/structpb"
tspb "google.golang.org/protobuf/types/known/timestamppb"
)
func TopicToProto(input *ds.Topic) (out *protos.Topic) {
var userIds map[string]bool = make(map[string]bool)
for _, userId := range input.Users {
userIds[userId] = true
}
out = &protos.Topic{
CreatedAt: tspb.New(input.BaseModel.CreatedAt),
UpdatedAt: tspb.New(input.BaseModel.UpdatedAt),
Name: input.Name,
Id: input.BaseModel.Id,
CreatorId: input.CreatorId,
Users: userIds,
}
return
}
func TopicFromProto(input *protos.Topic) (out *ds.Topic) {
out = &ds.Topic{
BaseModel: ds.BaseModel{
CreatedAt: input.CreatedAt.AsTime(),
UpdatedAt: input.UpdatedAt.AsTime(),
Id: input.Id,
},
Name: input.Name,
CreatorId: input.CreatorId,
}
if input.Users != nil {
var userIds []string
for userId := range input.Users {
userIds = append(userIds, userId)
}
out.Users = pq.StringArray(userIds)
}
return
}
The full set of converters can be found here - Service/DB Models Converters.
Hook Up the Converters in the Service Definitions
Our last step is to invoke the converters above in the service implementation. The methods are pretty straightforward. For example, for the TopicService we have:
CreateTopic
- During creation we allow custom IDs to be passed in. If an entity with the ID exists the request is rejected. If an ID is not passed in, a random one is assigned.
Creator
andName
parameters are required fields.- The topic is converted to a "
DBTopic
" model and saved by calling theSaveTopic
method.
UpdateTopic
All our Update<Entity>
methods follow a similar pattern:
- Fetch the existing entity (by ID) from the DB.
- Update the entity fields based on fields marked in the
update_mask
(so patches are allowed). - Update with any extra entity-specific operations (e.g.,
AddUsers
,RemoveUsers
, etc.) - these are just for convenience so the caller would not have to provide an entire "final" users list each time. - Convert the updated proto to a "DB Model."
- Call
SaveTopic
on the DB.SaveTopic
uses the "version" field in our DB to perform an optimistically concurrent write. This ensures that by the time the model is loaded and it is being written, a write by another request/thread will not be overwritten.
The Delete
, List
and Get
methods are fairly straightforward. The UserService and MessageService also are implemented in a very similar way with minor differences to suit specific requirements.
Testing It All Out
We have a database up and running (go ahead and start it with docker compose up
). We have converters to/from service and database models. We have implemented our service code to access the database. We just need to connect to this (running) database and pass a connection object to our services in our runner binary (cmd/server.go):
- Add an extra flag to accept a path to the DB. This can be used to change the DB path if needed.
var (
addr = flag.String("addr", ":9000", "Address to start the onehub grpc server on.")
gw_addr = flag.String("gw_addr", ":8080", "Address to start the grpc gateway server on.")
db_endpoint = flag.String("db_endpoint", "", fmt.Sprintf("Endpoint of DB where all topics/messages state are persisted. Default value: ONEHUB_DB_ENDPOINT environment variable or %s", DEFAULT_DB_ENDPOINT))
)
- Create
*gorm.DB
instance from thedb_endpoint
value.
We have already created a little utility method for opening a GORM-compatible SQL DB given an address:
package utils
import (
// "github.com/panyam/goutils/utils"
"log"
"strings"
"github.com/panyam/goutils/utils"
"gorm.io/driver/postgres"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func OpenDB(db_endpoint string) (db *gorm.DB, err error) {
log.Println("Connecting to DB: ", db_endpoint)
if strings.HasPrefix(db_endpoint, "sqlite://") {
dbpath := utils.ExpandUserPath((db_endpoint)[len("sqlite://"):])
db, err = gorm.Open(sqlite.Open(dbpath), &gorm.Config{})
} else if strings.HasPrefix(db_endpoint, "postgres://") {
db, err = gorm.Open(postgres.Open(db_endpoint), &gorm.Config{})
}
if err != nil {
log.Println("Cannot connect DB: ", db_endpoint, err)
} else {
log.Println("Successfully connected DB: ", db_endpoint)
}
return
}
Now let us create the method OpenOHDB
, which is a simple wrapper that also checks for a db_endpoint
value from an environment variable (if it is not provided) and subsequently opens a gorm.DB
instance needed for a OneHubDB
instance:
func OpenOHDB() *ds.OneHubDB {
if *db_endpoint == "" {
*db_endpoint = cmdutils.GetEnvOrDefault("ONEHUB_DB_ENDPOINT", DEFAULT_DB_ENDPOINT)
}
db, err := cmdutils.OpenDB(*db_endpoint)
if err != nil {
log.Fatal(err)
panic(err)
}
return ds.NewOneHubDB(db)
}
With the above two, we need a simple change to our main
method:
func main() {
flag.Parse()
ohdb := OpenOHDB()
go startGRPCServer(*addr, ohdb)
startGatewayServer(*gw_addr, *addr)
}
Now we shall also pass the ohdb
instance to the GRPC service creation methods. And we are ready to test our durability! Remember we set up auth in a previous part, so we need to pass login credentials, albeit fake ones (where password = login + "123"
).
Create a Topic
curl localhost:8080/v1/topics -u auser:auser123 | json_pp
{
"nextPageKey" : "",
"topics" : []
}
That's right. We do not have any topics yet so let us create some.
curl -X POST localhost:8080/v1/topics \
-u auser:auser123 \
-H 'Content-Type: application/json' \
-d '{"topic": {"name": "First Topic"}}' | json_pp
Yielding:
{
"topic" : {
"createdAt" : "1970-01-01T00:00:00Z",
"creatorId" : "auser",
"id" : "q43u",
"name" : "First Topic",
"updatedAt" : "2023-08-04T08:14:56.413050Z",
"users" : {}
}
}
Let us create a couple more:
curl -X POST localhost:8080/v1/topics \
-u auser:auser123 \
-H 'Content-Type: application/json' \
-d '{"topic": {"name": "First Topic", "id": "1"}}' | json_pp
curl -X POST localhost:8080/v1/topics \
-u auser:auser123 \
-H 'Content-Type: application/json' \
-d '{"topic": {"name": "Second Topic", "id": "2"}}' | json_pp
curl -X POST localhost:8080/v1/topics \
-u auser:auser123 \
-H 'Content-Type: application/json' \
-d '{"topic": {"name": "Third Topic", "id": "3"}}' | json_pp
With a list query returning:
{
"nextPageKey" : "",
"topics" : [
{
"createdAt" : "1970-01-01T00:00:00Z",
"creatorId" : "auser",
"id" : "q43u",
"name" : "First Topic",
"updatedAt" : "2023-08-04T08:14:56.413050Z",
"users" : {}
},
{
"createdAt" : "1970-01-01T00:00:00Z",
"creatorId" : "auser",
"id" : "dejc",
"name" : "Second Topic",
"updatedAt" : "2023-08-05T06:52:33.923076Z",
"users" : {}
},
{
"createdAt" : "1970-01-01T00:00:00Z",
"creatorId" : "auser",
"id" : "zuoz",
"name" : "Third Topic",
"updatedAt" : "2023-08-05T06:52:35.100552Z",
"users" : {}
}
]
}
Get Topic by ID
We can do a listing as in the previous section. We can also obtain individual topics:
curl localhost:8080/v1/topics/q43u -u auser:auser123 | json_pp
{
"topic" : {
"createdAt" : "1970-01-01T00:00:00Z",
"creatorId" : "auser",
"id" : "q43u",
"name" : "First Topic",
"updatedAt" : "2023-08-04T08:14:56.413050Z",
"users" : {}
}
}
Send and List Messages on a Topic
Let us send a few messages on the "First Topic" (id = "q43u
"):
curl -X POST localhost:8080/v1/topics/q43u/messages -u 'auser:auser123' -H 'Content-Type: application/json' -d '{"message": {"content_text": "Message 1"}}'
curl -X POST localhost:8080/v1/topics/q43u/messages -u 'auser:auser123' -H 'Content-Type: application/json' -d '{"message": {"content_text": "Message 2"}}'
curl -X POST localhost:8080/v1/topics/q43u/messages -u 'auser:auser123' -H 'Content-Type: application/json' -d '{"message": {"content_text": "Message 3"}}'
Now to list them:
curl localhost:8080/v1/topics/q43u/messages -u 'auser:auser123' | json_pp
{
"messages" : [
{
"contentData" : null,
"contentText" : "Message 1",
"contentType" : "",
"createdAt" : "0001-01-01T00:00:00Z",
"id" : "hlso",
"topicId" : "q43u",
"updatedAt" : "2023-08-07T05:00:36.547072Z",
"userId" : "auser"
},
{
"contentData" : null,
"contentText" : "Message 2",
"contentType" : "",
"createdAt" : "0001-01-01T00:00:00Z",
"id" : "t3lr",
"topicId" : "q43u",
"updatedAt" : "2023-08-07T05:00:39.504294Z",
"userId" : "auser"
},
{
"contentData" : null,
"contentText" : "Message 3",
"contentType" : "",
"createdAt" : "0001-01-01T00:00:00Z",
"id" : "8ohi",
"topicId" : "q43u",
"updatedAt" : "2023-08-07T05:00:42.598521Z",
"userId" : "auser"
}
],
"nextPageKey" : ""
}
Conclusion
Who would have thought setting up and using a database would have been such a meaty topic? We covered a lot of ground here that will both give us a good "functioning" service as well as a foundation when implementing new ideas in the future:
- We chose a relational database - Postgres - for its strong modeling capabilities, consistency guarantees, performance, and versatility.
- We also chose an ORM library (GORM) to improve our velocity and portability if we need to switch to another relational data store.
- We wrote data models that GORM could use to read/write from the database.
- We eased the setup by hosting both Postgres and its admin UI (pgAdmin) in a Docker Compose file.
- We decided to use GORM carefully and judiciously to balance velocity with minimal reliance on complex queries.
- We discussed some conventions that will help us along in our application design and extensions.
- We also addressed a way to assess, analyze, and address scalability challenges as they might arise and use that to guide our tradeoff decisions (e.g., type and number of indexes, etc).
- We wrote converter methods to convert between service and data models.
- We finally used the converters in our service to offer a "real" persistent implementation of a chat service where messages can be posted and read.
Now that we have a "minimum usable app," there are a lot of useful features to add to our service and make it more and more realistic (and hopefully production-ready). Take a breather and see you soon in continuing the exciting adventure! In the next post, we will look at also including our main binary (with gRPC service and REST Gateways) in the Docker Compose environment without sacrificing hot reloading and debugging.
Published at DZone with permission of Amardeep Singh. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments