Gentle Introduction to Hystrix – Wrap Up
Learn behavior tweaks and request collapsing in this final part of the gentle introduction to Netflix OSS Hystrix.
Join the DZone community and get the full member experience.
Join For FreeThis is a follow up to two other posts - Motivation for why something like Hystrix is needed in a distributed systems and a basic intro to Hystrix.
This will be a wrap of my Hystrix journey with details of various properties that can be tweaked to change the behavior of Hystrix and will touch on a few advanced concepts
Tweaking Hystrix Behavior
Hystrix configuration is explained in this wiki here, in brief two broad groups control the properties of Hystrix,
1. Command Properties
2. ThreadPool properties
The properties follow an order of precedence that is explained in the wiki, here I will concentrate on ones specified through a properties file.
For a sample Command defined the following way:
public class HelloWorldCommand extends HystrixCommand<String> { private static final Logger logger = LoggerFactory.getLogger(HelloWorldCommand.class); private final String name; public HelloWorldCommand(String name) { super(HystrixCommandGroupKey.Factory.asKey("default")); this.name = name; } @Override protected String run() throws Exception { logger.info("HelloWorld Command Invoked"); return "Hello " + name; }}
First behavior that can be tweaked is whether to execute the command in a thread pool or the same thread of execution as the caller(SEMAPHORE strategy type). If the execution is in a threadpool, then a timeout for the request can be set.
hystrix.command.HelloWorldCommand.execution.isolation.strategy=THREADhystrix.command.HelloWorldCommand.execution.isolation.thread.timeoutInMilliseconds=1000
The second behavior is the Circuit breaker which works based on information collected during a rolling window of time, configured this way, say for 10 seconds:
hystrix.command.HelloWorldCommand.metrics.rollingStats.timeInMilliseconds=10000
In this window if a certain percent of failures(say 50%) happen for a threshold of requests(say 20 in 10 seconds) then the circuit is broken, with a configuration which looks like this:
hystrix.command.HelloWorldCommand.circuitBreaker.requestVolumeThreshold=20hystrix.command.HelloWorldCommand.circuitBreaker.errorThresholdPercentage=50
Once a circuit is broken, it stays that way for a time set the following way, 5 seconds in this instance:
hystrix.command.HelloWorldCommand.circuitBreaker.sleepWindowInMilliseconds=5000
The threadpool settings are controlled using the Group Key that was specified, called default in this sample. A specific "Threadpool Key" could also have been specified as part of the constructor though.
hystrix.threadpool.default.coreSize=10hystrix.threadpool.default.queueSizeRejectionThreshold=5
Here 10 commands can potentially be run in parallel and another 5 held in a queue beyond which the requests will be rejected.
Request Collapsing
Tomaz Nurkiewicz in his blog site NoBlogDefFound has done an excellent job of explaining Request Collapsing . My example is a little simplistic, consider a case where a lot of requests are being made to retrieve a Person given an id, the following way:
public class PersonService { public Person findPerson(Integer id) { return new Person(id, "name : " + id); } public List<Person> findPeople(List<Integer> ids) { return ids .stream() .map(i -> new Person(i, "name : " + i)) .collect(Collectors.toList()); }}
The service responds with a canned response but assume that the call was to a remote datastore. Also see that this service implements a batched method to retrieve a list of People given a list of id's.
Request Collapsing is a feature which would batch multiple user requests occurring over a time period into a single such remote call and then fan out the response back to the user.
A hystrix command which takes the set of id's and gets the response of people can be defined the following way:
public class PersonRequestCommand extends HystrixCommand<List<Person>>{ private final List<Integer> ids; private final PersonService personService = new PersonService(); private static final Logger logger = LoggerFactory.getLogger(PersonRequestCommand.class); public PersonRequestCommand(List<Integer> ids) { super(HystrixCommandGroupKey.Factory.asKey("default")); this.ids = ids; } @Override protected List<Person> run() throws Exception { logger.info("Retrieving details for : " + this.ids); return personService.findPeople(this.ids); }}
Fairly straightforward up to this point, the complicated logic is now in the RequestCollapser which looks like this:
package aggregate.commands.collapsed;import com.netflix.hystrix.HystrixCollapser;import com.netflix.hystrix.HystrixCollapserKey;import com.netflix.hystrix.HystrixCollapserProperties;import com.netflix.hystrix.HystrixCommand;import java.util.Collection;import java.util.List;import java.util.Map;import java.util.function.Function;import java.util.stream.Collectors;public class PersonRequestCollapser extends HystrixCollapser<List<Person>, Person, Integer> { private final Integer id; public PersonRequestCollapser(Integer id) { super(Setter. withCollapserKey(HystrixCollapserKey.Factory.asKey("personRequestCollapser")) .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(2000))); this.id = id; } @Override public Integer getRequestArgument() { return this.id; } @Override protected HystrixCommand<List<Person>> createCommand(Collection<CollapsedRequest<Person, Integer>> collapsedRequests) { List<Integer> ids = collapsedRequests.stream().map(cr -> cr.getArgument()).collect(Collectors.toList()); return new PersonRequestCommand(ids); } @Override protected void mapResponseToRequests(List<Person> batchResponse, Collection<CollapsedRequest<Person, Integer>> collapsedRequests) { Map<Integer, Person> personMap = batchResponse.stream().collect(Collectors.toMap(Person::getId, Function.identity())); for (CollapsedRequest<Person, Integer> cr: collapsedRequests) { cr.setResponse(personMap.get(cr.getArgument())); } }}
There are a few things going on here, first the types in the parameterized type signature indicates the type of response(List<Person>), the response type expected by the caller (Person) and the request type of the request(id of the person). Then there are two methods one to create a batch command and the second to map the responses back to the original requests.
Now given this from a users perspective nothing much changes, the call is made as if to a single command and Request Collapsing handles batching, dispatching and mapping back the responses. This is how a sample test looks like:
@Testpublic void testCollapse() throws Exception { HystrixRequestContext requestContext = HystrixRequestContext.initializeContext(); logger.info("About to execute Collapsed command"); List<Observable<Person>> result = new ArrayList<>(); CountDownLatch cl = new CountDownLatch(1); for (int i = 1; i <= 100; i++) { result.add(new PersonRequestCollapser(i).observe()); } Observable.merge(result).subscribe(p -> logger.info(p.toString()) , t -> logger.error(t.getMessage(), t) , () -> cl.countDown()); cl.await(); logger.info("Completed executing Collapsed Command"); requestContext.shutdown();}
Conclusion
There is far more to Hystrix than what I have covered here. It is truly an awesome library, essential in creating a resilient system and I have come to appreciate the amount of thought process that has gone into designing this excellent library.
Reference
Here is my github repo with all the samples - https://github.com/bijukunjummen/hystrixdemo
Published at DZone with permission of Biju Kunjummen, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments