Over a million developers have joined DZone.

Caching Salesforce Data in Redis With StreamSets Data Collector

DZone 's Guide to

Caching Salesforce Data in Redis With StreamSets Data Collector

Learn how the Jedis library and the StreamSets Data Collector (SDC) can help you keep your Redis NoSQL database up-to-date.

· Database Zone ·
Free Resource

Redis is an open-source, in-memory, NoSQL database implementing a networked key-value store with optional persistence to disk. Perhaps the most popular key-value database, Redis is widely used for caching web pages, sessions and other objects that require blazingly fast access — lookups are typically in the millisecond range.

At RedisConf 2017, I presented a session called Cache All The Things! Data Integration via Jedis (slides), looking at how the open-source Jedis library provides a small, sane, easy-to-use Java interface to Redis, and how a StreamSets Data Collector (SDC) pipeline can read data from a platform such as Salesforce, write it to Redis via Jedis, and keep Redis up-to-date by subscribing to notifications of changes in Salesforce, writing new and updated data to Redis. In this blog entry, I'll describe how I built the SDC pipeline I showed during my session.

Reading Business Data From Salesforce

Salesforce is the de facto system-of-record for customer data in many enterprises. Salesforce offers a variety of APIs for reading data, presenting trade-offs between latency, throughput and convenience (for example, REST vs. SOAP). But access to data is certainly not in the millisecond range. Salesforce's performance metrics show that the platform is currently serving around five billion transactions/day, with an average response time of about 150 ms. Salesforce API calls are also a limited resource; depending on the Salesforce edition, subscribers are allowed between 1,000 and 5,000 API calls/day/user license, with further caps on the total number of API calls/day for some editions.

Given Salesforce's position as the system-of-record, we'd like to be able to use it to look up information such as customer details. But its latency and limits can be an obstacle, particularly when lookups are part of an automated workflow. Fortunately, we can combine the Salesforce APIs, StreamSets Data Collector, and Redis to build a cache that not only provides millisecond access time but is also continuously updated.

StreamSets Data Collector Salesforce Origin

The Salesforce Origin, released last year in SDC, allows you to both configure a SOQL query to read existing data, and subscribe to a Salesforce Streaming API PushTopic for notifications as the data changes.

My example use case was looking up account details from an account number. The first step of building a pipeline, then, is to read the desired data from Salesforce. In StreamSets Data Collector, create a new pipeline, install the Salesforce stage library, and restart if necessary. Set the pipeline's error handling, then drag a Salesforce origin onto the pipeline canvas.

Salesforce Origin

The Salesforce origin's Salesforce tab needs a username and password. If you're using a Salesforce sandbox, you'll also need to change the Auth Endpoint from login.salesforce.com to test.salesforce.com. Select Query Existing Data, leaving Subscribe for Notifications unchecked for now. The Salesforce Bulk API is very efficient for large volumes of data, but the SOAP API is more responsive for interactive use; we'll want to preview the pipeline a few times as we get things working. So, on the Query tab, uncheck Use Bulk API. You can use the following SOQL query to get all of the fields that are accessible by the current user:

FROM Account

Although SOQL does not natively support SELECT *, SDC retrieves the account metadata and expands the * wildcard to the complete list of fields to which the current user has access.

Leave the remaining configuration items with their default values.

Hit the Preview button and you should see some account data from Salesforce. I'm using a Developer Edition, so I see the standard Salesforce sample accounts:


StreamSets Data Collector Redis Destination

Now let's write some data to Redis! Drop a Redis destination onto the canvas, connecting its input to the origin's output.

Redis Destination

Configure the destination with your Redis URI; if you're building a test/demo with a default Redis install on your machine, then redis://localhost:6379 should work. We want to index account details by account number, so set the Key field to AccountNumber/, the Value to / (i.e. the entire record), and the Data Type to Hash. We want Redis to store the entire account record as a collection of name-value pairs.

If you run the pipeline right now, you'll notice that there is a problem:


Clicking into the stack trace and scrolling down to the cause, we can see the issue: we are trying to write null hash values to Redis.

Error Detail

We can easily rectify this by adding a Field Remover in between the origin and destination, with Action set to Remove Listed Fields If Their Values Are Null and Fields set to /* — we want to remove any field if its value is null.

Run the pipeline now and you should see that all of the accounts are successfully written to Redis:

Remove Null Fields

In redis-cli, we can explore the data:> INFO Keyspace
# Keyspace
db0:keys=14,expires=0,avg_ttl=0> KEYS *
 1) "SF111111"
 2) "CD355120-B"
 3) "CD451796"
 4) "AB123456"
 5) "CC978213"
 6) "CC634267"
 7) "CD355119-A"
 8) "NW654321"
 9) "CD355118"
10) "CD439877"
11) "CC947211"
12) "CC213425"
13) "CD736025"
14) "CD656092"> HGETALL AB123456
 1) "Id"
 2) "0013600000gnbjJAAQ"
 3) "IsDeleted"
 4) "false"
 5) "Name"
 6) "StreamSets, Inc."
...> HGET AB123456 Industry

Great! We've populated our Redis cache. But how do we keep it up to date as the data in Salesforce changes?

Subscribing for Notifications From Salesforce

The Salesforce Origin can subscribe to notifications via the Salesforce Streaming API. You will need to create a PushTopic via the Salesforce Developer Console. Here is the Apex I used in the Execute Anonymous window:

PushTopic pushTopic = new PushTopic();
pushTopic.Name = 'AccountUpdates';
pushTopic.Query = 'SELECT Id FROM Account';
pushTopic.ApiVersion = 40.0;
pushTopic.NotifyForOperationCreate = true;
pushTopic.NotifyForOperationUpdate = true;
pushTopic.NotifyForOperationUndelete = true;
pushTopic.NotifyForOperationDelete = true;
pushTopic.NotifyForFields = 'All';
insert pushTopic;

This Apex code creates a new PushTopic named AccountUpdates that will notify subscribers of any changes to account records, sending the Id of the relevant account. We could list all of the account fields in the query (Salesforce doesn't natively support SELECT *), but then we would have to update the PushTopic if any account fields were later added or removed. A more maintainable option is to SELECT just the record Id and let the subscriber retrieve the desired fields in an API call.

We can tell the Salesforce origin to subscribe to this PushTopic by enabling Subscribe for Notifications on the Salesforce tab, then setting the Push Topic to AccountUpdates on the Subscribe tab. Since we only receive the record Id in the notification, we need to add a Salesforce Lookup processor to our pipeline. Use a Stream Selector to separate out records from the initial load so that they do not trigger a lookup:

Stream Selector

Records received via the Streaming API have the attribute salesforce.cdc.type set to created, updated, deleted, or undeleted, so the Stream Selector can separate out these records with the condition ${str:length(record:attribute('salesforce.cdc.type')) > 0}.

The Salesforce Lookup processor simply reads all the available fields for the notified record:

FROM Account
WHERE Id = '${record:value('/Id')}'

Reset the origin and restart the pipeline, and you should see the initial records load into Redis. Leave the pipeline running, go to Salesforce, and make some modification to an account — create a new one, delete, or just change any field on an existing account.

Edit StreamSets Account Record

You should see the pipeline's record count increment. Go to redis-cli, retrieve the changed field via the account number and field name, and you should see the new value:> HGET AB123456 Industry

Success! We now have a high-speed cache of Salesforce account data, automatically updated as data in Salesforce changes. Watch this short video to see the pipeline in action:

What data are you caching in Redis? Let us know in the comments!

database ,tutorial ,salesforce ,redis ,streamsets ,nosql

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}