DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

How does AI transform chaos engineering from an experiment into a critical capability? Learn how to effectively operationalize the chaos.

Data quality isn't just a technical issue: It impacts an organization's compliance, operational efficiency, and customer satisfaction.

Are you a front-end or full-stack developer frustrated by front-end distractions? Learn to move forward with tooling and clear boundaries.

Developer Experience: Demand to support engineering teams has risen, and there is a shift from traditional DevOps to workflow improvements.

Core Badge
Avatar

Tomasz Nurkiewicz

DZone Core CORE

Senior software developer at 4Finance IT

Warsaw, PL

Joined Jan 2009

http://www.nurkiewicz.com

About

I am a software developer from Poland, currently working in banking industry. For the past few years I have been writing software in Java, however I actively seek for a close alternative. Certified in SCJP, SCJD, SCWCD and SCBCD, used to be active on StackOverflow. I feel comfortable at the back-end, however recently rediscovered front-end development. In spare time I love cycling. twitter: @tnurkiewicz

Stats

Reputation: 773
Pageviews: 3.1M
Articles: 71
Comments: 33
  • Articles
  • Comments

Articles

article thumbnail
GraphQL Server in Java, Part II: Understanding Resolvers
Learn more about resolvers in GraphQL and Java.
October 31, 2019
· 24,429 Views · 7 Likes
article thumbnail
GraphQL Server in Java, Part I: The Basics
Learn more about GraphQL in Java applications.
October 30, 2019
· 38,476 Views · 8 Likes
article thumbnail
One Method to Rule Them All: Map.merge()
Learn more about the one method here to rule them all!
March 8, 2019
· 51,540 Views · 27 Likes
article thumbnail
Thread Pool Self-Induced Deadlocks
Want to learn how a threat pool can create a self-induced threadlock? Check out this post to learn more about deadlocks in Java.
September 12, 2018
· 26,720 Views · 13 Likes
article thumbnail
Spring Boot 2: Migrating From Dropwizard Metrics to Micrometer
Goodbye Dropwizard, hello Micrometer! Let's take a sneak peek at how to move your metrics to Micrometer for Spring Boot 2 projects.
January 24, 2018
· 50,118 Views · 26 Likes
article thumbnail
RxJava: Fixed-Rate vs. Fixed-Delay
This dip into scheduling with RxJava will clear up how and when to use a fixed rate or a fixed delay, with considerations made for both approaches.
October 31, 2017
· 14,810 Views · 3 Likes
article thumbnail
RxJava: Idiomatic Concurrency — flatMap() vs. parallel()
When it comes to concurrent programming using the RxJava library, here's a breakdown of flatMap() and parallel() and some guidance as to which is the most helpful.
September 15, 2017
· 35,403 Views · 16 Likes
article thumbnail
RxJava FAQ: Loading Files With Backpressure
This guide will help you read large files, XML in this case, while managing resources efficiently by utilizing RXJava Flowables.
September 2, 2017
· 18,998 Views · 20 Likes
article thumbnail
RxJava: flatMap() vs. concatMap() vs. concatMapEager()
On the face of it, flatMap, concatMap, and concatMapEager do largely the same thing. But let's look under the hood to see which works best in a given situation.
Updated August 8, 2017
· 16,156 Views · 9 Likes
article thumbnail
Eager Subscription: RxJava FAQs
Learning RxJava can be a challenge, what with it being a fairly substantial shift in how you think. In this post, we take a look at some FAQs that come up.
August 3, 2017
· 7,036 Views · 7 Likes
article thumbnail
Small Scale Stream Processing Kata (Part 1): Thread Pools
Starting with thread pools, see how you can set up your own processing kata for streaming. See what works, what doesn't, and what works better than others.
November 17, 2016
· 11,095 Views · 15 Likes
article thumbnail
Functional Programming in Pure Java: Functor and Monad Examples
This article looks at functors and monads from a specific angle of Java data structures and libraries.
June 28, 2016
· 84,384 Views · 88 Likes
article thumbnail
Which Thread Executes CompletableFuture's Tasks and Callbacks?
If not told which thread to use, CompletableFuture works hard to find one, including potentially the caller's thread. This can ruin the whole idea of scheduling a task asynchronously. Read on to understand how to know which thread your background task will use.
December 7, 2015
· 21,541 Views · 4 Likes
article thumbnail
Displaying Progress of Spring Application Startup in the Browser
When you restart your enterprise application, what do your clients see when they open the web browser?
September 21, 2015
· 17,766 Views · 3 Likes
article thumbnail
Server-sent Events With RxJava and SseEmitter
The new release of Spring introduces features to make server-sent events simple.
August 3, 2015
· 18,108 Views · 3 Likes
article thumbnail
RESTful Considered Harmful
RESTful APIs are common across the internet, but is this a good thing?
July 25, 2015
· 67,825 Views · 31 Likes
article thumbnail
Consuming java.util.concurrent.BlockingQueue as rx.Observable
An implementation of the producer-consumer pattern using BlockingQueues
July 17, 2015
· 14,480 Views · 9 Likes
article thumbnail
Writing a Download Server, Part III: Headers: Content-length and Range
Content-length response header is tremendously helpful for clients that track download progress. There are however some circumstances when obtaining precise content length is hard.
July 1, 2015
· 10,360 Views · 1 Like
article thumbnail
Writing a Download Server (Part II)—Headers: Last-Modified, ETag and If-None-Match
Caching on the client side is one of the foundations of World Wide Web. Server should inform client about validity of resources and client should cache them as eagerly as possible. Without caching the web as we see it would be insanely slow. Just hit Ctrl + F5on any website and compare it with ordinary F5 - the latter is much faster as it uses already cached resources. Caching is also important for downloading. If we already fetched several megabytes of data and they haven't changed, pushing them through network is quite wasteful. Use ETag and If-None-Match headers HTTP ETag header can be used to avoid repeatable downloads of resources client already has. Along with first response server returns an ETag header, which is typically a hash value of the contents of a file. Client can keep ETag and send it (in If-None-Matchrequest header) when requesting the same resource later. If it wasn't changed in the meantime, server can simply return 304 Not Modified response. Let's start with an integration test for ETag support: def 'should send file if ETag not present'() { expect: mockMvc .perform( get('/download/' + FileExamples.TXT_FILE_UUID)) .andExpect( status().isOk()) } def 'should send file if ETag present but not matching'() { expect: mockMvc .perform( get('/download/' + FileExamples.TXT_FILE_UUID) .header(IF_NONE_MATCH, '"WHATEVER"')) .andExpect( status().isOk()) } def 'should not send file if ETag matches content'() { given: String etag = FileExamples.TXT_FILE.getEtag() expect: mockMvc .perform( get('/download/' + FileExamples.TXT_FILE_UUID) .header(IF_NONE_MATCH, etag)) .andExpect( status().isNotModified()) .andExpect( header().string(ETAG, etag)) } Interestingly there is a built-in ShallowEtagHeaderFilter in Spring framework. Installing it makes all the tests pass, including the last one: @WebAppConfiguration @ContextConfiguration(classes = [MainApplication]) @ActiveProfiles("test") class DownloadControllerSpec extends Specification { private MockMvc mockMvc @Autowired public void setWebApplicationContext(WebApplicationContext wac) { mockMvc = MockMvcBuilders.webAppContextSetup(wac) .addFilter(new Sha512ShallowEtagHeaderFilter(), "/download/*") .build() } //tests... } I actually plug in my own Sha512ShallowEtagHeaderFilter that uses SHA-512instead of default MD5. Also the default implementation for some reason prepends 0 in front of hash: public class ShallowEtagHeaderFilter { protected String generateETagHeaderValue(byte[] bytes) { StringBuilder builder = new StringBuilder("\"0"); DigestUtils.appendMd5DigestAsHex(bytes, builder); builder.append('"'); return builder.toString(); } //... } vs.: public class Sha512ShallowEtagHeaderFilter extends ShallowEtagHeaderFilter { @Override protected String generateETagHeaderValue(byte[] bytes) { final HashCode hash = Hashing.sha512().hashBytes(bytes); return "\"" + hash + "\""; } } Unfortunately we cannot use built-in filters in our case as they must first fully read response body in order to compute ETag. This basically turns off body streaming introduced in previous article - whole response is stored in memory. We must implement ETagfunctionality ourselves. Technically If-None-Match can include multiple ETag values. However neither Google Chrome nor ShallowEtagHeaderFilter support it, so we will skip that as well. In order to control response headers we now returnResponseEntity: @RequestMapping(method = GET, value = "/{uuid}") public ResponseEntity download( @PathVariable UUID uuid, @RequestHeader(IF_NONE_MATCH) Optional requestEtagOpt) { return storage .findFile(uuid) .map(pointer -> prepareResponse(pointer, requestEtagOpt)) .orElseGet(() -> new ResponseEntity<>(NOT_FOUND)); } private ResponseEntity prepareResponse(FilePointer filePointer, Optional requestEtagOpt) { return requestEtagOpt .filter(filePointer::matchesEtag) .map(this::notModified) .orElseGet(() -> serveDownload(filePointer)); } private ResponseEntity notModified(String etag) { log.trace("Cached on client side {}, returning 304", etag); return ResponseEntity .status(NOT_MODIFIED) .eTag(etag) .body(null); } private ResponseEntity serveDownload(FilePointer filePointer) { log.debug("Serving '{}'", filePointer); final InputStream inputStream = filePointer.open(); final InputStreamResource resource = new InputStreamResource(inputStream); return ResponseEntity .status(OK) .eTag(filePointer.getEtag()) .body(resource); } The process is controlled by optional requestEtagOpt. If it's present and matches whatever was sent by the client, we return 304. Otherwise 200 OK as usual. New methods in FilePointer introduced in this example look as follows: import com.google.common.hash.HashCode; import com.google.common.hash.Hashing; import com.google.common.io.Files; public class FileSystemPointer implements FilePointer { private final File target; private final HashCode tag; public FileSystemPointer(File target) { try { this.target = target; this.tag = Files.hash(target, Hashing.sha512()); } catch (IOException e) { throw new IllegalArgumentException(e); } } @Override public InputStream open() { try { return new BufferedInputStream(new FileInputStream(target)); } catch (FileNotFoundException e) { throw new IllegalArgumentException(e); } } @Override public String getEtag() { return "\"" + tag + "\""; } @Override public boolean matchesEtag(String requestEtag) { return getEtag().equals(requestEtag); } } Here you see FileSystemPointer implementation that reads files straight from file system. The crucial part is to cache tag instead of recalculating it on every request. The implementation above behaves as expected, for example web browsers won't download the resource again. 3. Use Last-Modified header Similar to ETag and If-None-Match headers there are Last-Modified and If-Modified-Since. I guess they are pretty self-explanatory: first server returns Last-Modified response header indicating when a given resource was last modified (duh!). Client caches this timestamp and passes it along with subsequent request to the same resource in If-Modified-Since request header. If the resource wasn't changed in the meantime, server will respond with 304, saving bandwidth. This is a fallback mechanism and it's a good practice to implement both ETags and Last-Modified. Let's start with integration tests: def 'should not return file if wasn\'t modified recently'() { given: Instant lastModified = FileExamples.TXT_FILE.getLastModified() String dateHeader = toDateHeader(lastModified) expect: mockMvc .perform( get('/download/' + FileExamples.TXT_FILE_UUID) .header(IF_MODIFIED_SINCE, dateHeader)) .andExpect( status().isNotModified()) } def 'should not return file if server has older version than the client'() { given: Instant lastModifiedLaterThanServer = FileExamples.TXT_FILE.getLastModified().plusSeconds(60) String dateHeader = toDateHeader(lastModifiedLaterThanServer) expect: mockMvc .perform( get('/download/' + FileExamples.TXT_FILE_UUID) .header(IF_MODIFIED_SINCE, dateHeader)) .andExpect( status().isNotModified()) } def 'should return file if was modified after last retrieval'() { given: Instant lastModifiedRecently = FileExamples.TXT_FILE.getLastModified().minusSeconds(60) String dateHeader = toDateHeader(lastModifiedRecently) expect: mockMvc .perform( get('/download/' + FileExamples.TXT_FILE_UUID) .header(IF_MODIFIED_SINCE, dateHeader)) .andExpect( status().isOk()) } private static String toDateHeader(Instant lastModified) { ZonedDateTime dateTime = ZonedDateTime.ofInstant(lastModified, ZoneOffset.UTC) DateTimeFormatter.RFC_1123_DATE_TIME.format(dateTime) } And the implementation: @RequestMapping(method = GET, value = "/{uuid}") public ResponseEntity download( @PathVariable UUID uuid, @RequestHeader(IF_NONE_MATCH) Optional requestEtagOpt, @RequestHeader(IF_MODIFIED_SINCE) Optional ifModifiedSinceOpt ) { return storage .findFile(uuid) .map(pointer -> prepareResponse( pointer, requestEtagOpt, ifModifiedSinceOpt.map(Date::toInstant))) .orElseGet(() -> new ResponseEntity<>(NOT_FOUND)); } private ResponseEntity prepareResponse(FilePointer filePointer, Optional requestEtagOpt, Optional ifModifiedSinceOpt) { if (requestEtagOpt.isPresent()) { final String requestEtag = requestEtagOpt.get(); if (filePointer.matchesEtag(requestEtag)) { return notModified(filePointer); } } if (ifModifiedSinceOpt.isPresent()) { final Instant isModifiedSince = ifModifiedSinceOpt.get(); if (filePointer.modifiedAfter(isModifiedSince)) { return notModified(filePointer); } } return serveDownload(filePointer); } private ResponseEntity serveDownload(FilePointer filePointer) { log.debug("Serving '{}'", filePointer); final InputStream inputStream = filePointer.open(); final InputStreamResource resource = new InputStreamResource(inputStream); return response(filePointer, OK, resource); } private ResponseEntity notModified(FilePointer filePointer) { log.trace("Cached on client side {}, returning 304", filePointer); return response(filePointer, NOT_MODIFIED, null); } private ResponseEntity response(FilePointer filePointer, HttpStatus status, Resource body) { return ResponseEntity .status(status) .eTag(filePointer.getEtag()) .lastModified(filePointer.getLastModified().toEpochMilli()).body(body); } Sadly using Optional idiomatically no longer looks good so I stick to isPresent(). We check both If-Modified-Since and If-None-Match. If neither match, we serve file as usual. Just to give you a taste of how these headers work, let's execute few end-to-end tests. First request: > GET /download/4a8883b6-ead6-4b9e-8979-85f9846cab4b HTTP/1.1 > ... > < HTTP/1.1 200 OK < ETag: "8b97c678a7f1d2e0af...921228d8e" < Last-Modified: Sun, 17 May 2015 15:45:26 GMT < ... Subsequent request with ETag (shortened): > GET /download/4a8883b6-ead6-4b9e-8979-85f9846cab4b HTTP/1.1 > If-None-Match: "8b97c678a7f1d2e0af...921228d8e" > ... > < HTTP/1.1 304 Not Modified < ETag: "8b97c678a7f1d2e0af...921228d8e" < Last-Modified: Sun, 17 May 2015 15:45:26 GMT < ... And in case our client supports Last-Modified only: > GET /download/4a8883b6-ead6-4b9e-8979-85f9846cab4b HTTP/1.1 > If-Modified-Since: Tue, 19 May 2015 06:59:55 GMT > ... > < HTTP/1.1 304 Not Modified < ETag: "8b97c678a7f1d2e0af9cda473b36c21f1b68e35b93fec2eb5c38d182c7e8f43a069885ec56e127c2588f9495011fd8ce032825b6d3136df7adbaa1f921228d8e" < Last-Modified: Sun, 17 May 2015 15:45:26 GMT There are many built-in tools such as filter that can handle caching for you. However if you need to be sure your files are streamed rather then pre-buffered on the server side, extra care needs to be taken. Writing a download server Part I: Always stream, never keep fully in memory Part II: headers: Last-Modified, ETag and If-None-Match Part III: headers: Content-length and Range Part IV: Implement HEAD operation (efficiently) Part V: Throttle download speed Part VI: Describe what you send (Content-type, et.al.) The sample application developed throughout these articles is available on GitHub.
June 26, 2015
· 3,580 Views
article thumbnail
Writing a Download Server Part I: Always Stream, Never Keep Fully in Memory
Downloading various files (either text or binary) is a bread and butter of every enterprise application. PDF documents, attachments, media, executables, CSV, very large files, etc. Almost every application, sooner or later, will have to provide some form of download. Downloading is implemented in terms of HTTP, so it's important to fully embrace this protocol and take full advantage of it. Especially in Internet facing applications features like caching or user experience are worth considering. This series of articles provides a list of aspects that you might want to consider when implementing all sorts of download servers. Note that I avoid "best practices" term, these are just guidelines that I find useful but are not necessarily always applicable. One of the biggest scalability issues is loading whole file into memory before streaming it. Loading full file into byte[] to later return it e.g. from Spring MVC controller is unpredictable and doesn't scale. The amount of memory your server will consume depends linearly on number of concurrent connections times average file size - factors you don't really want to depend on so much. It's extremely easy to stream contents of a file directly from your server to the client byte-by-byte (with buffering), there are actually many techniques to achieve that. The easiest one is to copy bytes manually: @RequestMapping(method = GET) public void download(OutputStream output) throws IOException { try(final InputStream myFile = openFile()) { IOUtils.copy(myFile, output); } } Your InputStream doesn't even have to be buffered, IOUtils.copy() will take care of that. However this implementation is rather low-level and hard to unit test. Instead I suggest returning Resource: @RestController @RequestMapping("/download") public class DownloadController { private final FileStorage storage; @Autowired public DownloadController(FileStorage storage) { this.storage = storage; } @RequestMapping(method = GET, value = "/{uuid}") public Resource download(@PathVariable UUID uuid) { return storage .findFile(uuid) .map(this::prepareResponse) .orElseGet(this::notFound); } private Resource prepareResponse(FilePointer filePointer) { final InputStream inputStream = filePointer.open(); return new InputStreamResource(inputStream); } private Resource notFound() { throw new NotFoundException(); } } @ResponseStatus(value= HttpStatus.NOT_FOUND) public class NotFoundException extends RuntimeException { } Two abstractions were created to decouple Spring controller from file storage mechanism.FilePointer is a file descriptor, irrespective to where that file was taken. Currently we use one method from it: public interface FilePointer { InputStream open(); //more to come } open() allows reading the actual file, no matter where it comes from (file system, database BLOB, Amazon S3, etc.) We will gradually extend FilePointer to support more advanced features, like file size and MIME type. The process of finding and creatingFilePointers is governed by FileStorage abstraction: public interface FileStorage { Optional findFile(UUID uuid); } Streaming allows us to handle hundreds of concurrent requests without significant impact on memory and GC (only a small buffer is allocated in IOUtils). BTW I am using UUID to identify files rather than names or other form of sequence number. This makes it harder to guess individual resource names, thus more secure (obscure). More on that in next articles. Having this basic setup we can reliably serve lots of concurrent connections with minimal impact on memory. Remember that many components in Spring framework and other libraries (e.g. servlet filters) may buffer full response before returning it. Therefore it's really important to have an integration test trying to download huge file (in tens of GiB) and making sure the application doesn't crash. Writing a download server Part I: Always stream, never keep fully in memory Part II: headers: Last-Modified, ETag and If-None-Match Part III: headers: Content-length and Range Part IV: Implement HEAD operation (efficiently) Part V: Throttle download speed Part VI: Describe what you send (Content-type, et.al.) The sample application developed throughout these articles is available on GitHub.
June 24, 2015
· 16,832 Views
article thumbnail
Spring: Injecting Lists, Maps, Optionals and getBeansOfType() Pitfalls
If you use Spring framework for more than a week you are probably aware of this feature. Suppose you have more than one bean implementing a given interface. Trying to autowire just one bean of such interface is doomed to fail because Spring has no idea which particular instance you need. You can work around that by using @Primary annotation to designate exactly one "most important" implementation that will have priority over others. But there are many legitimate use cases where you want to inject all beans implementing said interface. For example you have multiple validators that all need to be executed prior to business logic or several algorithm implementations that you want to exercise at the same time. Auto-discovering all implementations at runtime is a fantastic illustration ofOpen/closed principle: you can easily add new behavior to business logic (validators, algorithms, strategies - open for extension) without touching the business logic itself (closed for modification). Just in case I will start with a quick introduction, feel free to jump straight to subsequent sections. So let's take a concrete example. Imagine you have a StringCallableinterface and multiple implementations: interface StringCallable extends Callable { } @Component class Third implements StringCallable { @Override public String call() { return "3"; } } @Component class Forth implements StringCallable { @Override public String call() { return "4"; } } @Component class Fifth implements StringCallable { @Override public String call() throws Exception { return "5"; } } Now we can inject List, Set or evenMap (String represents bean name) to any other class. To simplify I'm injecting to a test case: @SpringBootApplication public class Bootstrap { } @ContextConfiguration(classes = Bootstrap) class BootstrapTest extends Specification { @Autowired List list; @Autowired Set set; @Autowired Map map; def 'injecting all instances of StringCallable'() { expect: list.size() == 3 set.size() == 3 map.keySet() == ['third', 'forth', 'fifth'].toSet() } def 'enforcing order of injected beans in List'() { when: def result = list.collect { it.call() } then: result == ['3', '4', '5'] } def 'enforcing order of injected beans in Set'() { when: def result = set.collect { it.call() } then: result == ['3', '4', '5'] } def 'enforcing order of injected beans in Map'() { when: def result = map.values().collect { it.call() } then: result == ['3', '4', '5'] } } So far so good, but only first test passes, can you guess why? Condition not satisfied: result == ['3', '4', '5'] | | | false [3, 5, 4] After all, why did we make an assumption that beans will be injected in the same order as they were... declared? Alphabetically? Luckily one can enforce the order with Orderedinterface: interface StringCallable extends Callable, Ordered { } @Component class Third implements StringCallable { //... @Override public int getOrder() { return Ordered.HIGHEST_PRECEDENCE; } } @Component class Forth implements StringCallable { //... @Override public int getOrder() { return Ordered.HIGHEST_PRECEDENCE + 1; } } @Component class Fifth implements StringCallable { //... @Override public int getOrder() { return Ordered.HIGHEST_PRECEDENCE + 2; } } Interestingly, even though Spring internally injects LinkedHashMap andLinkedHashSet, only List is properly ordered. I guess it's not documented and least surprising. To end this introduction, in Java 8 you can also inject Optionalwhich works as expected: injects a dependency only if it's available. Optional dependencies can appear e.g. when using profiles extensively and some beans are not bootstrapped in some profiles. Composite pattern Dealing with lists is quite cumbersome. Most of the time you want to iterate over them so in order to avoid duplication it's useful to encapsulate such list in a dedicated wrapper: @Component public class Caller { private final List callables; @Autowired public Caller(List callables) { this.callables = callables; } public String doWork() { return callables.stream() .map(StringCallable::call) .collect(joining("|")); } } Our wrapper simply calls all underlying callables one after another and joins their results: @ContextConfiguration(classes = Bootstrap) class CallerTest extends Specification { @Autowired Caller caller def 'Caller should invoke all StringCallbles'() { when: def result = caller.doWork() then: result == '3|4|5' } } It's somewhat controversial, but often this wrapper implements the same interface as well, effectively implementing composite classic design pattern: @Component @Primary public class Caller implements StringCallable { private final List callables; @Autowired public Caller(List callables) { this.callables = callables; } @Override public String call() { return callables.stream() .map(StringCallable::call) .collect(joining("|")); } } Thanks to @Primary we can simply autowire StringCallable everywhere as if there was just one bean while in fact there are multiple and we inject composite. This is useful when refactoring old application as it preserves backward compatibility. Why am I even starting with all these basics? If you look very closely, code snippet above introduces chicken and egg problem: an instance of StringCallable requires all instances of StringCallable, so technically speaking callables list should includeCaller as well. But Caller is currently being created, so it's impossible. This makes a lot of sense and luckily Spring recognizes this special case. But in more advanced scenarios this can bite you. Further down the road a new developer introduced this: @Component public class EnterpriseyManagerFactoryProxyHelperDispatcher { private final Caller caller; @Autowired public EnterpriseyManagerFactoryProxyHelperDispatcher(Caller caller) { this.caller = caller; } } Nothing wrong so far, except the class name. But what happens if one of theStringCallables has a dependency on it? @Component class Fifth implements StringCallable { private final EnterpriseyManagerFactoryProxyHelperDispatcher dispatcher; @Autowired public Fifth(EnterpriseyManagerFactoryProxyHelperDispatcher dispatcher) { this.dispatcher = dispatcher; } } We now created a circular dependency, and because we inject via constructors (as it was always meant to be), Spring slaps us in the face on startup: UnsatisfiedDependencyException: Error creating bean with name 'caller' defined in file ... UnsatisfiedDependencyException: Error creating bean with name 'fifth' defined in file ... UnsatisfiedDependencyException: Error creating bean with name 'enterpriseyManagerFactoryProxyHelperDispatcher' defined in file ... BeanCurrentlyInCreationException: Error creating bean with name 'caller': Requested bean is currently in creation: Is there an unresolvable circular reference? Stay with me, I'm building the climax here. This is clearly a bug, that can unfortunately be fixed with field injection (or setter for that matter): @Component public class Caller { @Autowired private List callables; public String doWork() { return callables.stream() .map(StringCallable::call) .collect(joining("|")); } } By decoupling bean creation from injection (impossible with constructor injection) we can now create a circular dependency graph, where Caller holds an instance of Fifth class which references Enterprisey..., which in turns references back to the same Callerinstance. Cycles in dependency graph are a design smell, leading to unmaintainable graph of spaghetti relationships. Please avoid them and if constructor injection can entirely prevent them, that's even better. Meeting getBeansOfType() Interestingly there is another solution that goes straight to Spring guts:ListableBeanFactory.getBeansOfType(): @Component public class Caller { private final List callables; @Autowired public Caller(ListableBeanFactory beanFactory) { callables = new ArrayList<>(beanFactory.getBeansOfType(StringCallable.class).values()); } public String doWork() { return callables.stream() .map(StringCallable::call) .collect(joining("|")); } } Problem solved? Quite the opposite!getBeansOfType() will silently skip (well, there isTRACE and DEBUG log...) beans under creation and only returns those already existing. Therefor Callerwas just created and container started successfully, while it no longer references Fifth bean. You might say I asked for it because we have a circular dependency so weird things happens. But it's an inherent feature of getBeansOfType(). In order to understand why using getBeansOfType() during container startup is a bad idea, have a look at the following scenario (unimportant code omitted): @Component class Alpha { static { log.info("Class loaded"); } @Autowired public Alpha(ListableBeanFactory beanFactory) { log.info("Constructor"); log.info("Constructor (beta?): {}", beanFactory.getBeansOfType(Beta.class).keySet()); log.info("Constructor (gamma?): {}", beanFactory.getBeansOfType(Gamma.class).keySet()); } @PostConstruct public void init() { log.info("@PostConstruct (beta?): {}", beanFactory.getBeansOfType(Beta.class).keySet()); log.info("@PostConstruct (gamma?): {}", beanFactory.getBeansOfType(Gamma.class).keySet()); } } @Component class Beta { static { log.info("Class loaded"); } @Autowired public Beta(ListableBeanFactory beanFactory) { log.info("Constructor"); log.info("Constructor (alpha?): {}", beanFactory.getBeansOfType(Alpha.class).keySet()); log.info("Constructor (gamma?): {}", beanFactory.getBeansOfType(Gamma.class).keySet()); } @PostConstruct public void init() { log.info("@PostConstruct (alpha?): {}", beanFactory.getBeansOfType(Alpha.class).keySet()); log.info("@PostConstruct (gamma?): {}", beanFactory.getBeansOfType(Gamma.class).keySet()); } } @Component class Gamma { static { log.info("Class loaded"); } public Gamma() { log.info("Constructor"); } @PostConstruct public void init() { log.info("@PostConstruct"); } } The log output reveals how Spring internally loads and resolves classes: Alpha: | Class loaded Alpha: | Constructor Beta: | Class loaded Beta: | Constructor Beta: | Constructor (alpha?): [] Gamma: | Class loaded Gamma: | Constructor Gamma: | @PostConstruct Beta: | Constructor (gamma?): [gamma] Beta: | @PostConstruct (alpha?): [] Beta: | @PostConstruct (gamma?): [gamma] Alpha: | Constructor (beta?): [beta] Alpha: | Constructor (gamma?): [gamma] Alpha: | @PostConstruct (beta?): [beta] Alpha: | @PostConstruct (gamma?): [gamma] Spring framework first loads Alpha and tries to instantiate a bean. However when runninggetBeansOfType(Beta.class) it discovers Beta so proceeds with loading and instantiating that one. Inside Beta we can immediately spot the problem: when Beta asks for beanFactory.getBeansOfType(Alpha.class) it gets no results ([]). Spring will silently ignore Alpha, because it's currently under creation. Later everything is as expected: Gamma is loaded, constructed and injected, Beta sees Gamma and when we return to Alpha, everything is in place. Notice that even moving getBeansOfType() to@PostConstruct method doesn't help - these callbacks aren't executed in the end, when all beans are instantiated - but while the container starts up. Suggestions getBeansOfType() is rarely needed and turns out to be unpredictable if you have cyclic dependencies. Of course you should avoid them in the first place and if you properly inject dependencies via collections, Spring can predictably handle the lifecycle of all beans and either wire them correctly or fail at runtime. In presence of circular dependencies betweens beans (sometimes accidental or very long in terms of nodes and edges in dependency graph) getBeansOfType() can yield different results depending on factors we have no control over, like CLASSPATH order. PS: Kudos to Jakub Kubryński for troubleshooting getBeansOfType().
April 23, 2015
· 34,790 Views
article thumbnail
CompletableFuture Can't Be Interrupted
I wrote a lot about InterruptedException and interrupting threads already. In short if you call Future.cancel() not inly given Future will terminate pending get(), but also it will try to interrupt underlying thread. This is a pretty important feature that enables better thread pool utilization. I also wrote to always prefer CompletableFuture over standardFuture. It turns out the more powerful younger brother of Future doesn't handle cancel() so elegantly. Consider the following task, which we'll use later throughout the tests: class InterruptibleTask implements Runnable { private final CountDownLatch started = new CountDownLatch(1) private final CountDownLatch interrupted = new CountDownLatch(1) @Override void run() { started.countDown() try { Thread.sleep(10_000) } catch (InterruptedException ignored) { interrupted.countDown() } } void blockUntilStarted() { started.await() } void blockUntilInterrupted() { assert interrupted.await(1, TimeUnit.SECONDS) } } Client threads can examine InterruptibleTask to see whether it has started or was interrupted. First let's see how InterruptibleTask reacts to cancel() from outside: def "Future is cancelled without exception"() { given: def task = new InterruptibleTask() def future = myThreadPool.submit(task) task.blockUntilStarted() and: future.cancel(true) when: future.get() then: thrown(CancellationException) } def "CompletableFuture is cancelled via CancellationException"() { given: def task = new InterruptibleTask() def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool) task.blockUntilStarted() and: future.cancel(true) when: future.get() then: thrown(CancellationException) } So far so good. Clearly both Future and CompletableFuture work pretty much the same way - retrieving result after it was canceled throws CancellationException. But what about thread in myThreadPool? I thought it will be interrupted and thus recycled by the pool, how wrong was I! def "should cancel Future"() { given: def task = new InterruptibleTask() def future = myThreadPool.submit(task) task.blockUntilStarted() when: future.cancel(true) then: task.blockUntilInterrupted() } @Ignore("Fails with CompletableFuture") def "should cancel CompletableFuture"() { given: def task = new InterruptibleTask() def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool) task.blockUntilStarted() when: future.cancel(true) then: task.blockUntilInterrupted() } First test submits ordinary to and waits until it's started. Later we cancel and wait until is observed. will return when underlying thread is interrupted. Second test, however, fails. will never interrupt underlying thread, so despite looking as if it was cancelled, backing thread is still running and no is thrown from . Bug or a feature? , so unfortunately a feature: Parameters:mayInterruptIfRunning - this value has no effect in this implementation because interrupts are not used to control processing. RTFM, you say, but why CompletableFuture works this way? First let's examine how "old" Future implementations differ from CompletableFuture. FutureTask, returned from ExecutorService.submit() has the following cancel() implementation (I removed Unsafe with similar non-thread safe Java code, so treat it as pseudo code only): public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; state = mayInterruptIfRunning ? INTERRUPTING : CANCELLED; try { if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state state = INTERRUPTED; } } } finally { finishCompletion(); } return true; } FutureTask has a state variable that follows this state diagram: In case of cancel() we can either enter CANCELLED state or go to INTERRUPTEDthrough INTERRUPTING. The core part is where we take runner thread (if exists, i.e. if task is currently being executed) and we try to interrupt it. This branch takes care of eager and forced interruption of already running thread. In the end we must notify all threads blocked on Future.get() in finishCompletion() (irrelevant here). So it's pretty obvious how old Future cancels already running tasks. What aboutCompletableFuture? Pseudo-code of cancel(): public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = false; if (result == null) { result = new AltResult(new CancellationException()); cancelled = true; } postComplete(); return cancelled || isCancelled(); } Quite disappointing, we barely set result to CancellationException, ignoringmayInterruptIfRunning flag. postComplete() has a similar role tofinishCompletion() - notifies all pending callbacks registered on that future. Its implementation is rather unpleasant (using non-blocking Treiber stack) but it definitely doesn't interrupt any underlying thread. Reasons and implications Limited cancel() in case of CompletableFuture is not a bug, but a design decision.CompletableFuture is not inherently bound to any thread, while Future almost always represents background task. It's perfectly fine to create CompletableFuture from scratch (new CompletableFuture<>()) where there is simply no underlying thread to cancel. Still I can't help the feeling that majority of CompletableFutures will have an associated task and background thread. In that case malfunctioning cancel() is a potential problem. I no longer advice blindly replacing Future with CompletableFutureas it might change the behavior of applications relying on cancel(). This meansCompletableFuture intentionally breaks Liskov substitution principle - and this is a serious implication to consider.
March 30, 2015
· 16,980 Views · 6 Likes
article thumbnail
Journey to Idempotency and Temporal Decoupling
Idempotency in HTTP means that the same request can be performed multiple times with the same effect as if it was executed just once. If you replace current state of some resource with new one, no matter how many times you do so, in the end state will be the same as if you did it just once. To give more concrete example: deleting a user is idempotent because no matter how many times you delete given user by unique identifier, in the end this user will be deleted. On the other hand creating new user is not idempotent because requesting such operation twice will create two users. In HTTP terms here is what RFC 2616: 9.1.2 Idempotent Methods has to say: 9.1.2 Idempotent Methods Methods can also have the property of "idempotence" in that [...] the side-effects of N > 0 identical requests is the same as for a single request. The methods GET, HEAD, PUT and DELETE share this property. Also, the methods OPTIONS and TRACE SHOULD NOT have side effects, and so are inherently idempotent. Temporal coupling is an undesirable property of a system where the correct behaviour is implicitly dependent on time dimension. In plain English, it might mean that for example system only works when all components are present at the same time. Blocking request-response communication (ReST, SOAP or any other form of RPC) require both client and server to be available at the same time, which is an example of this effect. Having basic understanding what these concepts mean, let's go through a simple case study - massively multiplayer online role-playing game. Our artificial use case is as follows: a player sends premium-rated SMS to purchase virtual sword inside game. Our HTTP gateway is called when SMS is delivered and we need to inform InventoryService, deployed on a different machine. Current API involves ReST and looks as follows: @Slf4j @RestController class SmsController { private final RestOperations restOperations; @Autowired public SmsController(RestOperations restOperations) { this.restOperations = restOperations; } @RequestMapping(value = "/sms/{phoneNumber}", method = POST) public void handleSms(@PathVariable String phoneNumber) { Optional maybePlayer = phoneNumberToPlayer(phoneNumber); maybePlayer .map(Player::getId) .map(this::purchaseSword) .orElseThrow(() -> new IllegalArgumentException("Unknown player for phone number " + phoneNumber)); } private long purchaseSword(long playerId) { Sword sword = new Sword(); HttpEntity entity = new HttpEntity<>(sword.toJson(), jsonHeaders()); restOperations.postForObject( "http://inventory:8080/player/{playerId}/inventory", entity, Object.class, playerId); return playerId; } private HttpHeaders jsonHeaders() { HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); return headers; } private Optional phoneNumberToPlayer(String phoneNumber) { //... } } Which in turns generates request similar to this: > POST /player/123123/inventory HTTP/1.1 > Host: inventory:8080 > Content-type: application/json > > {"type": "sword", "strength": 100, ...} < HTTP/1.1 201 Created < Content-Length: 75 < Content-Type: application/json;charset=UTF-8 < Location: http://inventory:8080/player/123123/inventory/1 This is fairly straightforward. SmsController simply forwards appropriate data to inventory:8080 service by POSTing sword that was purchased. This service, immediately or after a while, returns 201 Created HTTP response confirming the operation was successful. Additionally link to resource is created and returned, so you can query it. One might say: ReST state of the art. However if you care at least a little about money of your customers and understand what ACID is (something that Bitcoin exchanges still have to learn: see [1], [2], [3] and [4]) - this API is too fragile and prone to errors. Imagine all these types of errors: your request never reached inventory server your request reached server but it refused it server accepted connection but failed to read request server read request but hanged server processed request but failed to send response server sent 200 OK response but it was lost and you never received it server's response was received but client failed to process it server's response was sent but client timed-out earlier In all these cases you simply get an exception on the client side and you have no idea what's the server's state. Technically you should retry failed requests, but since POST is not idempotent, you might end up rewarding gamer with more than one sword (in cases 5-8). But without retry you might loose gamer's money without giving him his precious artifact. There must be a better way. Turning POST to idempotent PUT In some cases it's surprisingly simple to convert from POST to idempotent PUT by basically moving ID generation from server to client. With POST it was the server that generated sword's ID and sent it back to the client in Location header. Turns out eagerly generating UUID on the client side and changing the semantics a bit plus enforcing some constraints on the server side is enough: private long purchaseSword(long playerId) { Sword sword = new Sword(); UUID uuid = sword.getUuid(); HttpEntity entity = new HttpEntity<>(sword.toJson(), jsonHeaders()); asyncRetryExecutor .withMaxRetries(10) .withExponentialBackoff(100, 2.0) .doWithRetry(ctx -> restOperations.put( "http://inventory:8080/player/{playerId}/inventory/{uuid}", entity, playerId, uuid)); return playerId; } The API looks as follows: > PUT /player/123123/inventory/45e74f80-b2fb-11e4-ab27-0800200c9a66 HTTP/1.1 > Host: inventory:8080 > Content-type: application/json;charset=UTF-8 > > {"type": "sword", "strength": 100, ...} < HTTP/1.1 201 Created < Content-Length: 75 < Content-Type: application/json;charset=UTF-8 < Location: http://inventory:8080/player/123123/inventory/45e74f80-b2fb-11e4-ab27-0800200c9a66 Why it's such a big deal? Simply put (no pun intended) client can now retry PUT request as many times as he wants. When server receives PUT for the first time, it persists sword in the database with client-generated UUID (45e74f80-b2fb-11e4-ab27-0800200c9a66) as primary key. In case of second PUT attempt we can either update or reject such request. It wasn't possible with POST because every request was treated as a new sword purchase - now we can track whether such PUT came before or not. We just have to remember to subsequent PUT is not a bug, it's an update request: @RestController @Slf4j public class InventoryController { private final PlayerRepository playerRepository; @Autowired public InventoryController(PlayerRepository playerRepository) { this.playerRepository = playerRepository; } @RequestMapping(value = "/player/{playerId}/inventory/{invId}", method = PUT) @Transactional public void addSword(@PathVariable UUID playerId, @PathVariable UUID invId) { playerRepository.findOne(playerId).addSwordWithId(invId); } } interface PlayerRepository extends JpaRepository {} @lombok.Data @lombok.AllArgsConstructor @lombok.NoArgsConstructor @Entity class Sword { @Id @Convert(converter = UuidConverter.class) UUID id; int strength; @Override public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof Sword)) return false; Sword sword = (Sword) o; return id.equals(sword.id); } @Override public int hashCode() { return id.hashCode(); } } @Data @Entity class Player { @Id @Convert(converter = UuidConverter.class) UUID id = UUID.randomUUID(); @OneToMany(cascade = ALL, fetch = EAGER) @JoinColumn(name="player_id") Set swords = new HashSet<>(); public Player addSwordWithId(UUID id) { swords.add(new Sword(id, 100)); return this; } } Few shortcuts were made in code snippet above, like injecting repository directly to controller, as well as annotating is with @Transactional. But you get the idea. Also notice that this code is quite optimistic, assuming two swords with same UUID aren't inserted at exactly the same time. Otherwise constraint violation exception will occur. Side note 1: I use UUID type in both controller and JPA models. They aren't supported out of the box, for JPA you need custom converter: public class UuidConverter implements AttributeConverter { @Override public String convertToDatabaseColumn(UUID attribute) { return attribute.toString(); } @Override public UUID convertToEntityAttribute(String dbData) { return UUID.fromString(dbData); } } Similarly for Spring MVC (one-way only): @Bean GenericConverter uuidConverter() { return new GenericConverter() { @Override public Set getConvertibleTypes() { return Collections.singleton(new ConvertiblePair(String.class, UUID.class)); } @Override public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) { return UUID.fromString(source.toString()); } }; } Side note 2: if you can't change client, you can track duplicates by storing each requests' hash on the server side. This way when the same request is sent multiple times (retried by the client), it will be ignored. However sometimes we might have a legitimate use case for sending the exact same request twice (e.g. purchasing two swords within short period of time). Temporal coupling - client unavailability You think you're smart but PUT with retries is not enough. First of all a client can die while re-attempting failed requests. If server is severely damaged or down, retrying might take minutes or even hours. You can't simply block your incoming HTTP request just because one of your downstream dependencies is down - you must handle such requests asynchronously in background - if possible. But extending retry time increases probability of client dying or being restarted, which would loose our request. Imagine we received premium SMS but InventoryService is down at the moment. We can retry after second, two, four, etc., but what if InventoryService was down for couple of hours and it so happened that our service was restarted as well? We just lost that SMS and sword was never given to the gamer. An answer to such issue is to persist pending request first and handle it later in background. Upon SMS receive we barely store player ID in database table called pending_purchases. A background scheduler or an event wakes up asynchronous thread that will collect all pending purchases and try to send them to InventoryService (maybe even in batch?) Periodic batch threads running every minute or even second and collecting all pending requests will unavoidably introduce latency and unneeded database traffic. Thus I'm going for a Quartz scheduler instead that will schedule retry job for each pending request: @Slf4j @RestController class SmsController { private Scheduler scheduler; @Autowired public SmsController(Scheduler scheduler) { this.scheduler = scheduler; } @RequestMapping(value = "/sms/{phoneNumber}", method = POST) public void handleSms(@PathVariable String phoneNumber) { phoneNumberToPlayer(phoneNumber) .map(Player::getId) .map(this::purchaseSword) .orElseThrow(() -> new IllegalArgumentException("Unknown player for phone number " + phoneNumber)); } private UUID purchaseSword(UUID playerId) { UUID swordId = UUID.randomUUID(); InventoryAddJob.scheduleOn(scheduler, Duration.ZERO, playerId, swordId); return swordId; } //... } And job itself: @Slf4j public class InventoryAddJob implements Job { @Autowired private RestOperations restOperations; @lombok.Setter private UUID invId; @lombok.Setter private UUID playerId; @Override public void execute(JobExecutionContext context) throws JobExecutionException { try { tryPurchase(); } catch (Exception e) { Duration delay = Duration.ofSeconds(5); log.error("Can't add to inventory, will retry in {}", delay, e); scheduleOn(context.getScheduler(), delay, playerId, invId); } } private void tryPurchase() { restOperations.put(/*...*/); } public static void scheduleOn(Scheduler scheduler, Duration delay, UUID playerId, UUID invId) { try { JobDetail job = newJob() .ofType(InventoryAddJob.class) .usingJobData("playerId", playerId.toString()) .usingJobData("invId", invId.toString()) .build(); Date runTimestamp = Date.from(Instant.now().plus(delay)); Trigger trigger = newTrigger().startAt(runTimestamp).build(); scheduler.scheduleJob(job, trigger); } catch (SchedulerException e) { throw new RuntimeException(e); } } } Every time we receive premium SMS we schedule asynchronous job to be executed immediately. Quartz will take care of persistence (if application goes down, job will be executed as soon as possible after restart). Moreover if this particular instance goes down, another one can pick up this job - or we can form a cluster and load-balance requests between them: one instance receives SMS, another one requests sword in InventoryService. Obviously if HTTP call fails, retry is re-scheduled later, everything is transactional and fail-safe. In real code you would probably add max retry limit as well as exponential delay, but you get the idea. Temporal coupling - client and server can't meet Our struggle to implement retries correctly is a sign of obscure temporal coupling between client and server - they must live together at the same time. Technically this isn't necessary. Imagine gamer sending an e-mail with order to customer service which they handle within 48 hours, changing his inventory manually. The same can be applied to our case, but replacing e-mail server with some sort of message broker, e.g. JMS: @Bean ActiveMQConnectionFactory activeMQConnectionFactory() { return new ActiveMQConnectionFactory("tcp://localhost:61616"); } @Bean JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) { return new JmsTemplate(connectionFactory); } Having ActiveMQ connection set up we can simply send purchase request to broker: private UUID purchaseSword(UUID playerId) { final Sword sword = new Sword(playerId); jmsTemplate.send("purchases", session -> { TextMessage textMessage = session.createTextMessage(); textMessage.setText(sword.toJson()); return textMessage; }); return sword.getUuid(); } By entirely replacing synchronous request-response protocol with messaging over JMS topic we temporally decouple client from server. They no longer need to live at the same time. Moreover more than one producer and consumer can interact with each other. E.g. you can have multiple purchase channels and more importantly: multiple interested parties, not only InventoryService. Even better, if you use specialized messaging system like Kafka you can technically keep days (months?) worth of messages without loosing performance. The benefit is that if you add another consumer of purchase events to the system next to InventoryService it will receive lots of historical data immediately. Moreover now your application is temporally coupled with broker so since Kafka is distributed and replicated, it works better in that case. Disadvantages of asynchronous messaging Synchronous data exchange, as used in ReST, SOAP or any form of RPC is easy to understand and implement. Who cares this abstraction insanely leaks from latency perspective (local method call is typically orders of magnitude faster compared to remote, not to mention it can fail for numerous reasons unknown locally), it's quick to develop. One true caveat of messaging is feedback channel. You can longer just "send" ("return") message back, as there is no response pipe. You either need response queue with some correlation ID or temporary one-off response queues per request. Also we lied a little bit claiming that putting a message broker between two systems fixes temporal coupling. It does, but now we are coupled to messaging bus - which can just as well go down, especially since it's often under high load and sometimes not replicated properly. This article shows some challenges and partial solutions to provide guarantees in distributed systems. But in the end of the day, remember that "exactly once" semantics are nearly impossible to implement easily, so double check you really need them.
March 13, 2015
· 19,503 Views · 5 Likes
article thumbnail
Retry-After HTTP Header in Practice
Retry-After is a lesser known HTTP response header.
February 20, 2015
· 15,881 Views
article thumbnail
Asynchronous Timeouts with CompletableFuture
One day I was rewriting poorly implemented multi-threaded code that was blocking at some point on Future.get(): public void serve() throws InterruptedException, ExecutionException, TimeoutException { final Future responseFuture = asyncCode(); final Response response = responseFuture.get(1, SECONDS); send(response); } private void send(Response response) { //... } This was actually an Akka application written in Java with a thread pool of 1000 threads (sic!) - all of them blocked on thisget() call. Otherwise system couldn't keep up with the number of concurrent requests. After refactoring we got rid of all these threads and introduced just one, significantly reducing memory footprint. Let's simplify a bit and show examples in Java 8. The first step is to introduceCompletableFuture instead of plain Future (see: tip 9). It's simple if: you control how tasks are submitted to ExecutorService: just use CompletableFuture.supplyAsync(..., executorService) instead of executorService.submit(...) you deal with callback-based API: use promises Otherwise (if you have blocking API or Future already) there will be some thread blocked. That's why there are so many asynchronous APIs being born right now. So let's say we somehow rewritten our code to receiveCompletableFuture: public void serve() throws InterruptedException, ExecutionException, TimeoutException { final CompletableFuture responseFuture = asyncCode(); final Response response = responseFuture.get(1, SECONDS); send(response); } Obviously that doesn't fix anything, we have to take advantage of new reactive style of programming: public void serve() { final CompletableFuture responseFuture = asyncCode(); responseFuture.thenAccept(this::send); } This is functionally equivalent, but now serve() should run in no-time (no blocking or waiting). Just remember that this::send will be executed in the same thread that completed responseFuture. If you don't want to overload some arbitrary thread pool somewhere or send() is expensive, consider separate thread pool for that:thenAcceptAsync(this::send, sendPool). Great, but we lost two important properties: error propagation and timeout. Error propagation is hard because we changed API. When serve() method exits, asynchronous operations is probably not yet finished. If you care about exceptions, consider either returning responseFuture or some alternative mechanism. At minimum, log exception because otherwise it will be swallowed: final CompletableFuture responseFuture = asyncCode(); responseFuture.exceptionally(throwable -> { log.error("Unrecoverable error", throwable); return null; }); responseFuture.thenAccept(this::send); Be careful with the code above: exceptionally() tries torecover from failure, returning alternative result. It works here but if you chain exceptionally() with thenAccept() it will send() will be called even in case of failure, but withnull argument (or whatever we return from exceptionally(): final CompletableFuture responseFuture = asyncCode(); responseFuture .exceptionally(throwable -> { log.error("Unrecoverable error", throwable); return null; }) .thenAccept(this::send); //probably not what you think Problem with lost 1 second timeout is subtle. Our original code was waiting (blocking) for at most 1 second untilFuture finishes. Otherwise TimeoutException was thrown. We lost this functionality, even worse unit tests for timeouts are inconvenient and often skipped. In order to port timeouts without sacrificing event-driven spirit we need one extra building block: a future that always fails after a given time: public static CompletableFuture failAfter(Duration duration) { final CompletableFuture promise = new CompletableFuture<>(); scheduler.schedule(() -> { final TimeoutException ex = new TimeoutException("Timeout after " + duration); return promise.completeExceptionally(ex); }, duration.toMillis(), MILLISECONDS); return promise; } private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 1, new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat("failAfter-%d") .build()); That's simple: we create a promise (future without underlying task or thread pool) and complete it withTimeoutException after a given java.time.Duration. If you get() such future somewhere, TimeoutExceptionwill be thrown after blocking for at least that much time. Actually, it will be ExecutionException wrappingTimeoutException, no way around that. Notice that I use fixed scheduler thread pool with just one thread. It's not only for educational purposes: "1 thread ought to be enough for anybody" [1] in this scenario. failAfter() on its own is rather useless, but combine it with our responseFutureand we have a solution! final CompletableFuture responseFuture = asyncCode(); final CompletableFuture oneSecondTimeout = failAfter(Duration.ofSeconds(1)); responseFuture .acceptEither(oneSecondTimeout, this::send) .exceptionally(throwable -> { log.error("Problem", throwable); return null; }); A lot is going on here. After receiving responseFuture with our background task we also create "synthetic"oneSecondTimeout future that will never complete successfully but always fails after 1 second. Now we combine the two by calling acceptEither. This operator will execute block of code against first completed future, either responseFuture or oneSecondTimeout and simply ignore outcome of the slower one. If asyncCode()completes within 1 second this::send will be invoked and exception from oneSecondTimeout will get ignored. However! If asyncCode() is really slow,oneSecondTimeout kicks in first. But since it fails with an exception, exceptionally error handler is invoked instead of this::send. You can take for granted that eithersend() or exceptionally will be called, not both. Of course if we had two "ordinary" futures completing normally,send() would be called with a response from the first one, discarding the latter. This wasn't the cleanest solution. Cleaner one would wrap original future and make sure it finishes within given time. Such operator is available in com.twitter.util.Future(Scala; called within()), however is missing inscala.concurrent.Future (supposedly inspired by the former). Let's leave Scala behind and implement similar operator for CompletableFuture. It takes one future as input and returns a future that completes when underlying one is completed. However if it takes too long to complete the underlying future, exception is thrown: public static CompletableFuture within(CompletableFuture future, Duration duration) { final CompletableFuture timeout = failAfter(duration); return future.applyToEither(timeout, Function.identity()); } This leads to final, clean and flexible solution: final CompletableFuture responseFuture = within( asyncCode(), Duration.ofSeconds(1)); responseFuture .thenAccept(this::send) .exceptionally(throwable -> { log.error("Unrecoverable error", throwable); return null; }); Hope you enjoyed this article, as you can see reactive programming in Java is no longer a thing of the future (no pun intended).
January 2, 2015
· 60,721 Views · 36 Likes
article thumbnail
Converting between Completablefuture and Observable
CompletableFuture from Java 8 is an advanced abstraction over a promise that value of type T will be available in the future. Observable is quite similar, but it promises arbitrary number of items in the future, from 0 to infinity. These two representations of asynchronous results are quite similar to the point where Observable with just one item can be used instead of CompletableFuture and vice-versa. On the other hand CompletableFuture is more specialized and because it's now part of JDK, should become prevalent quite soon. Let's celebrate RxJava 1.0 release with a short article showing how to convert between the two, without loosing asynchronous and event-driven nature of them. From CompletableFuture to Observable CompletableFuture represents one value in the future, so turning it into Observable is rather simple. When Futurecompletes with some value, Observable will emit that value as well immediately and close stream: class FuturesTest extends Specification { public static final String MSG = "Don't panic" def 'should convert completed Future to completed Observable'() { given: CompletableFuture future = CompletableFuture.completedFuture("Abc") when: Observable observable = Futures.toObservable(future) then: observable.toBlocking().toIterable().toList() == ["Abc"] } def 'should convert failed Future into Observable with failure'() { given: CompletableFuture future = failedFuture(new IllegalStateException(MSG)) when: Observable observable = Futures.toObservable(future) then: observable .onErrorReturn({ th -> th.message } as Func1) .toBlocking() .toIterable() .toList() == [MSG] } CompletableFuture failedFuture(Exception error) { CompletableFuture future = new CompletableFuture() future.completeExceptionally(error) return future } } First test of not-yet-implemented Futures.toObservable() converts Future into Observable and makes sure value is propagated correctly. Second test created failed Future, replaces failure with exception's message and makes sure exception was propagated. The implementation is much shorter: public static Observable toObservable(CompletableFuture future) { return Observable.create(subscriber -> future.whenComplete((result, error) -> { if (error != null) { subscriber.onError(error); } else { subscriber.onNext(result); subscriber.onCompleted(); } })); } NB: Observable.fromFuture() exists, however we want to take full advantage of ComplatableFuture's asynchronous operators. From Observable toCompletableFuture> There are actually two ways to convert Observable to Future - creating CompletableFuture> orCompletableFuture (if we assume Observable has just one item). Let's start from the former case, described with the following test cases: def 'should convert Observable with many items to Future of list'() { given: Observable observable = Observable.just(1, 2, 3) when: CompletableFuture> future = Futures.fromObservable(observable) then: future.get() == [1, 2, 3] } def 'should return failed Future when after few items exception was emitted'() { given: Observable observable = Observable.just(1, 2, 3) .concatWith(Observable.error(new IllegalStateException(MSG))) when: Futures.fromObservable(observable) then: def e = thrown(Exception) e.message == MSG } Obviously Future doesn't complete until source Observable signals end of stream. Thus Observable.never() would never complete wrapping Future, rather then completing it with empty list. The implementation is much shorter and sweeter: public static CompletableFuture> fromObservable(Observable observable) { final CompletableFuture> future = new CompletableFuture<>(); observable .doOnError(future::completeExceptionally) .toList() .forEach(future::complete); return future; } The key is Observable.toList() that conveniently converts from Observable and Observable>. The latter emits one item of List type when source Observable finishes. From Observable to CompletableFuture Special case of the previous transformation happens when we know that CompletableFuture will return exactly one item. In that case we can convert it directly to CompletableFuture, rather than CompletableFuture>with one item only. Tests first: def 'should convert Observable with single item to Future'() { given: Observable observable = Observable.just(1) when: CompletableFuture future = Futures.fromSingleObservable(observable) then: future.get() == 1 } def 'should create failed Future when Observable fails'() { given: Observable observable = Observable. error(new IllegalStateException(MSG)) when: Futures.fromSingleObservable(observable) then: def e = thrown(Exception) e.message == MSG } def 'should fail when single Observable produces too many items'() { given: Observable observable = Observable.just(1, 2) when: Futures.fromSingleObservable(observable) then: def e = thrown(Exception) e.message.contains("too many elements") } Again the implementation is quite straightforward and almost identical: public static CompletableFuture> fromObservable(Observable observable) { final CompletableFuture> future = new CompletableFuture<>(); observable .doOnError(future::completeExceptionally) .toList() .forEach(future::complete); return future; } Helpers methods above aren't fully robust yet, but if you ever need to convert between JDK 8 and RxJava style of asynchronous computing, this article should be enough to get you started.
November 27, 2014
· 14,708 Views · 3 Likes
article thumbnail
ExecutorService - 10 Tips and Tricks
Just a quick reminder: both Java 5 and 6 are no longer supported, Java 7 won't be in half a year.
November 21, 2014
· 108,954 Views · 13 Likes
article thumbnail
Batching (collapsing) Requests in Hystrix
Hystrix has an advanced feature of collapsing (or batching) requests. If two or more commands run similar request at the same time, Hystrix can combine them together, run one batched request and dispatch split results back to all commands. Let's first see how Hystrix works without collapsing. Imagine we have a service that looks up StockPrice of a given Ticker: import lombok.Value; import java.math.BigDecimal; import java.time.Instant; @Value class Ticker { String symbol; } @Value class StockPrice { BigDecimal price; Instant effectiveTime; } interface StockPriceGateway { default StockPrice load(Ticker stock) { final Set oneTicker = Collections.singleton(stock); return loadAll(oneTicker).get(stock); } ImmutableMap loadAll(Set tickers); } Core implementation of StockPriceGateway must provide loadAll() batch method while load() method is implemented for our convenience. So our gateway is capable of loading multiple prices in one batch (e.g. to reduce latency or network protocol overhead), but at the moment we are not using this feature, always loading price of one stock at a time: class StockPriceCommand extends HystrixCommand { private final StockPriceGateway gateway; private final Ticker stock; StockPriceCommand(StockPriceGateway gateway, Ticker stock) { super(HystrixCommandGroupKey.Factory.asKey("Stock")); this.gateway = gateway; this.stock = stock; } @Override protected StockPrice run() throws Exception { return gateway.load(stock); } } Such command will always call StockPriceGateway.load() for each and everyTicker, as illustrated by the following tests: class StockPriceCommandTest extends Specification { def gateway = Mock(StockPriceGateway) def 'should fetch price from external service'() { given: gateway.load(TickerExamples.any()) >> StockPriceExamples.any() def command = new StockPriceCommand(gateway, TickerExamples.any()) when: def price = command.execute() then: price == StockPriceExamples.any() } def 'should call gateway exactly once when running Hystrix command'() { given: def command = new StockPriceCommand(gateway, TickerExamples.any()) when: command.execute() then: 1 * gateway.load(TickerExamples.any()) } def 'should call gateway twice when command executed two times'() { given: def commandOne = new StockPriceCommand(gateway, TickerExamples.any()) def commandTwo = new StockPriceCommand(gateway, TickerExamples.any()) when: commandOne.execute() commandTwo.execute() then: 2 * gateway.load(TickerExamples.any()) } def 'should call gateway twice even when executed in parallel'() { given: def commandOne = new StockPriceCommand(gateway, TickerExamples.any()) def commandTwo = new StockPriceCommand(gateway, TickerExamples.any()) when: Future futureOne = commandOne.queue() Future futureTwo = commandTwo.queue() and: futureOne.get() futureTwo.get() then: 2 * gateway.load(TickerExamples.any()) } } If you don't know Hystrix, by wrapping an external call in a command you gain a lot of features like timeouts, circuit breakers, etc. But this is not the focus of this article. Look at last two tests: when asking for price of arbitrary ticker twice, sequentially or in parallel (queue()), our external gateway is also called twice. Last test is especially interesting - we ask for the same ticker at almost the same time, but Hystrix can't figure that out. These two commands are fully independent, will be executed in different threads and don't know anything about each other - even though they run at almost the same time. Collapsing is all about finding such similar requests and combining them. Batching (I will use this term interchangeably with collapsing) doesn't happen automatically and requires a bit of coding. But first let's see how it behaves: def 'should collapse two commands executed concurrently for the same stock ticker'() { given: def anyTicker = TickerExamples.any() def tickers = [anyTicker] as Set and: def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker) def commandTwo = new StockTickerPriceCollapsedCommand(gateway, anyTicker) when: Future futureOne = commandOne.queue() Future futureTwo = commandTwo.queue() and: futureOne.get() futureTwo.get() then: 0 * gateway.load(_) 1 * gateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any()) } def 'should collapse two commands executed concurrently for the different stock tickers'() { given: def anyTicker = TickerExamples.any() def otherTicker = TickerExamples.other() def tickers = [anyTicker, otherTicker] as Set and: def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker) def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker) when: Future futureOne = commandOne.queue() Future futureTwo = commandTwo.queue() and: futureOne.get() futureTwo.get() then: 1 * gateway.loadAll(tickers) >> ImmutableMap.of( anyTicker, StockPriceExamples.any(), otherTicker, StockPriceExamples.other()) } def 'should correctly map collapsed response into individual requests'() { given: def anyTicker = TickerExamples.any() def otherTicker = TickerExamples.other() def tickers = [anyTicker, otherTicker] as Set gateway.loadAll(tickers) >> ImmutableMap.of( anyTicker, StockPriceExamples.any(), otherTicker, StockPriceExamples.other()) and: def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker) def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker) when: Future futureOne = commandOne.queue() Future futureTwo = commandTwo.queue() and: def anyPrice = futureOne.get() def otherPrice = futureTwo.get() then: anyPrice == StockPriceExamples.any() otherPrice == StockPriceExamples.other() } First test proves that instead of calling load() twice we barely called loadAll() once. Also notice that since we asked for the same Ticker (from two different threads),loadAll() asks for only one ticker. Second test shows two concurrent requests for two different tickers being collapsed into one batch call. Third test makes sure we still get proper responses to each individual request. Instead of extending HystrixCommand we must extend more complex HystrixCollapser. Now it's time to seeStockTickerPriceCollapsedCommand implementation, that seamlessly replacedStockPriceCommand: class StockTickerPriceCollapsedCommand extends HystrixCollapser, StockPrice, Ticker> { private final StockPriceGateway gateway; private final Ticker stock; StockTickerPriceCollapsedCommand(StockPriceGateway gateway, Ticker stock) { super(HystrixCollapser.Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Stock")) .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100))); this.gateway = gateway; this.stock = stock; } @Override public Ticker getRequestArgument() { return stock; } @Override protected HystrixCommand> createCommand(Collection> collapsedRequests) { final Set stocks = collapsedRequests.stream() .map(CollapsedRequest::getArgument) .collect(toSet()); return new StockPricesBatchCommand(gateway, stocks); } @Override protected void mapResponseToRequests(ImmutableMap batchResponse, Collection> collapsedRequests) { collapsedRequests.forEach(request -> { final Ticker ticker = request.getArgument(); final StockPrice price = batchResponse.get(ticker); request.setResponse(price); }); } } A lot is going on here, so let's review StockTickerPriceCollapsedCommand step by step. First three generic types: BatchReturnType (ImmutableMap in our example) is the type of batched command response. As you will see later, collapser turns multiple small commands into a batch command. This is the type of that batch command's response. Notice that it's the same as StockPriceGateway.loadAll() type). ResponseType (StockPrice) is the type of each individual command being collapsed. In our case we are collapsing HystrixCommand. Later we will split value of BatchReturnType into multiple StockPrice. RequestArgumentType (Ticker) is the input of each individual command we are about to collapse (batch). When multiple commands are batched together, we are eventually replacing all of them with one batched command. This command should receive all individual requests in order to perform one batch request. withTimerDelayInMilliseconds(100) will be explained soon. createCommand()creates a batch command. This command should replace all individual commands and perform batched logic. In our case instead of multiple individual load() calls we just make one: class StockPricesBatchCommand extends HystrixCommand> { private final StockPriceGateway gateway; private final Set stocks; StockPricesBatchCommand(StockPriceGateway gateway, Set stocks) { super(HystrixCommandGroupKey.Factory.asKey("Stock")); this.gateway = gateway; this.stocks = stocks; } @Override protected ImmutableMap run() throws Exception { return gateway.loadAll(stocks); } } The only difference between this class and StockPriceCommand is that it takes a bunch of Tickers and returns prices for all of them. Hystrix will collect a few instances ofStockTickerPriceCollapsedCommand and once it has enough (more on that later) it will create single StockPriceCommand. Hope this is clear, becausemapResponseToRequests() is slightly more involved. Once our collapsedStockPricesBatchCommand finishes, we must somehow split batch response and communicate replies back to individual commands, unaware of collapsing. From that perspective mapResponseToRequests() implementation is fairly straightforward: we receive batch response and a collection of wrappedCollapsedRequest. We must now iterate over all awaiting individual requests and complete them (setResponse()). If we don't complete some of the requests, they will hang infinitely and eventually time out. How it works This is the right moment to describe how collapsing is implemented. I said before that collapsing happens when two requests occur at the same time. There is no such thing asthe same time. In reality when first collapsible request comes in, Hystrix starts a timer. In our examples we set it to 100 milliseconds. During that period our command is suspended, waiting for other commands to join. After this configurable period Hystrix will callcreateCommand(), gathering all request keys (by calling getRequestArgument()) and run it. When batched command finishes, it will let us dispatch results to all awaiting individual commands. It is also possible to limit the number of collapsed requests if we are afraid of creating humongous batch - on the other hand how many concurrent requests can fit within this short time slot? Use cases and drawbacks Request collapsing should be used in systems with extreme load - high frequency of requests. If you get just one request per collapsing time window (100 milliseconds in examples), collapsing will just add overhead. That's because every time you call collapsible command, it must wait just in case some other command wants to join and form batch. This makes sense only when at least couple of commands are collapsed. Time wasted for waiting is balanced by savings in network latency and/or better utilization of resources in our collaborator (very often batch requests are much faster compared to individual calls). But keep in mind collapsing is a double edged sword, useful in specific cases. Last thing to remember - in order to use request collapsing you needHystrixRequestContext.initializeContext() and shutdown() in try-finally block: HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { //... } finally { context.shutdown(); } Collapsing vs. caching You might think that collapsing can be replaced with proper caching. This is not true. You use cache when: resource is likely to be accessed multiple times we can safely use previous value, it will remain valid for some period of time or we know precisely how to invalidate it we can afford concurrent requests for the same resource to compute it multiple times On the other hand collapsing does not enforce locality of data (1), it always hits the real service and never returns stale data (2). And finally if we ask for the same resource from multiple threads, we will only call backing service once (3). In case of caching, unless your cache is really smart, two threads will independently discover absence of given resource in cache and ask backing service twice. However collapsing can work together with caching - by consulting cache before running collapsible command. Summary Request collapsing is a useful tool, but with very limited use cases. It can significantly improve throughput in our system as well as limit load in external service. Collapsing can magically flatten peaks in traffic, rather than spreading it all over. Just make sure you are using it for commands running with extreme frequency.
November 5, 2014
· 13,050 Views · 1 Like
article thumbnail
Hazelcast's MapLoader Pitfalls
One of the core data structures provided by Hazelcast is IMap extendingjava.util.concurrent.ConcurrentMap - which is basically a distributed map, often used as cache. You can configure such map to use custom MapLoader - piece of Java code that will be asked every time you try to.get() something from that map (by key) which is not yet there. This is especially useful when you use IMap as a distributed in-memory cache - if client code asks for something that wasn't cached yet, Hazelcast will transparently execute your MapLoader.load(key): public interface MapLoader { V load(K key); Map loadAll(Collection keys); Set loadAllKeys(); } The remaining two methods are used during startup to optionally warm-up cache by loading pre-defined set of keys. Your custom MapLoader can reach out to (No)SQL database, web-service, file-system, you name it. Working with such a cache is much more convenient because you don't have to implement tedious "if not in cache load and put in cache" cycle. Moreover, MapLoader has a fantastic feature - if many clients are asking at the same time for the same key (from different threads, or even different cluster members - thus machines), MapLoader is executed only once. This significantly decreases load on external dependencies, without introducing any complexity. In essence IMap with MapLoader is similar to LoadingCache found in Guava - but distributed. However with great power comes great frustration, especially when you don't understand the peculiarities of API and inherent complexity of a distributed system. First let's see how to configure custom MapLoader. You can use hazelcast.xml for that ( element), but you then have no control over life-cycle of your loader (e.g. you can't use Spring bean). A better idea is to configure Hazelcast directly from code and pass an instance of MapLoader: class HazelcastTest extends Specification { public static final int ANY_KEY = 42 public static final String ANY_VALUE = "Forty two" def 'should use custom loader'() { given: MapLoader loaderMock = Mock() loaderMock.load(ANY_KEY) >> ANY_VALUE def hz = build(loaderMock) IMap emptyCache = hz.getMap("cache") when: def value = emptyCache.get(ANY_KEY) then: value == ANY_VALUE cleanup: hz?.shutdown() } Notice how we obtain an empty map, but when asked for ANY_KEY, we get ANY_VALUE in return. This is not a surprise, this is what our loaderMock was expected to do. I left Hazelcast configuration: def HazelcastInstance build(MapLoader loader) { final Config config = new Config("Cluster") final MapConfig mapConfig = config.getMapConfig("default") final MapStoreConfig mapStoreConfig = new MapStoreConfig() mapStoreConfig.factoryImplementation = {name, props -> loader } as MapStoreFactory mapConfig.mapStoreConfig = mapStoreConfig return Hazelcast.getOrCreateHazelcastInstance(config) } Any IMap (identified by name) can have a different configuration. However special "default" map specifies default configuration for all maps. Let's play a bit with custom loaders and see how they behave when MapLoader returns null or throws an exception: def 'should return null when custom loader returns it'() { given: MapLoader loaderMock = Mock() def hz = build(loaderMock) IMap cache = hz.getMap("cache") when: def value = cache.get(ANY_KEY) then: value == null !cache.containsKey(ANY_KEY) cleanup: hz?.shutdown() } public static final String SOME_ERR_MSG = "Don't panic!" def 'should propagate exceptions from loader'() { given: MapLoader loaderMock = Mock() loaderMock.load(ANY_KEY) >> {throw new UnsupportedOperationException(SOME_ERR_MSG)} def hz = build(loaderMock) IMap cache = hz.getMap("cache") when: cache.get(ANY_KEY) then: UnsupportedOperationException e = thrown() e.message.contains(SOME_ERR_MSG) cleanup: hz?.shutdown() } MapLoader is executed in a separate thread So far nothing surprising. The first trap you might encounter is how threads interact here. MapLoader is never executed from client thread, always from a separate thread pool: def 'loader works in a different thread'() { given: MapLoader loader = Mock() loader.load(ANY_KEY) >> {key -> "$key: ${Thread.currentThread().name}"} def hz = build(loader) IMap cache = hz.getMap("cache") when: def value = cache.get(ANY_KEY) then: value != "$ANY_KEY: ${Thread.currentThread().name}" cleanup: hz?.shutdown() } This test passes because current thread is "main" while loading occurs from within something like"hz.Cluster.partition-operation.thread-10". This is an important observation and is actually quite obvious if you remember that when many threads try to access the same absent key, loader is called only once. But more needs to be explained here. Almost every operation on IMap is encapsulated into one of operation objects (see also: Command pattern). This operation is later dispatched to one or all cluster members and executed remotely in a separate thread pool, or even on a different machine. Thus, don't expect loading to occur in the same thread, or even same JVM/server (!) This leads to an interesting situation where you request given key on one machine, but actual loading happens on the other. Or even more epic - machines A, B and C request given key whereas machine D physically loads value for that key. The decision which machine is responsible for loading is made based on consistent hashing algorithm. One final remark - of course you can customize the size of thread pools running these operations, see Advanced Configuration Properties. IMap.remove() calls MapLoader This one is totally surprising and definitely to be expected once you think about it: def 'IMap.remove() on non-existing key still calls loader (!)'() { given: MapLoader loaderMock = Mock() def hz = build(loaderMock) IMap emptyCache = hz.getMap("cache") when: emptyCache.remove(ANY_KEY) then: 1 * loaderMock.load(ANY_KEY) cleanup: hz?.shutdown() } Look carefully! All we do is removing absent key from a map. Nothing else. Yet, loaderMock.load() was executed. This is a problem especially when your custom loader is particularly slow or expensive. Why was it executed here? Look up the API of `java.util.Map#remove(): V remove(Object key) [...] Returns the value to which this map previously associated the key, or null if the map contained no mapping for the key. Maybe it's controversial but one might argue that Hazelcast is doing the right thing. If you consider our map withMapLoader attached as sort of like a view to external storage, it makes sense. When removing absent key, Hazelcast actually asks our MapLoader: what could have been a previous value? It pretends as if the map contained every single value returned from MapLoader, but loaded lazily. This is not a bug since there is a special method IMap.delete() that works just like remove(), but doesn't load "previous" value: @Issue("https://github.com/hazelcast/hazelcast/issues/3178") def "IMap.delete() doesn't call loader"() { given: MapLoader loaderMock = Mock() def hz = build(loaderMock) IMap cache = hz.getMap("cache") when: cache.delete(ANY_KEY) then: 0 * loaderMock.load(ANY_KEY) cleanup: hz?.shutdown() } Actually, there was a bug: IMap.delete() should not call MapLoader.load(), fixed in 3.2.6 and 3.3. If you haven't upgraded yet, even IMap.delete() will go to MapLoader. If you think IMap.remove() is surprising, check out howput() works! IMap.put() calls MapLoader If you thought remove() loading value first is suspicious, what about explicit put() loading a value for a given key first? After all, we are explicitly putting something into a map by key, why Hazelcast loads this value first via MapLoader? def 'IMap.put() on non-existing key still calls loader (!)'() { given: MapLoader loaderMock = Mock() def hz = build(loaderMock) IMap emptyCache = hz.getMap("cache") when: emptyCache.put(ANY_KEY, ANY_VALUE) then: 1 * loaderMock.load(ANY_KEY) cleanup: hz?.shutdown() } Again, let's restore to java.util.Map.put() JavaDoc: V put(K key, V value) [...] Returns: the previous value associated with key, or null if there was no mapping for key. Hazelcast pretends that IMap is just a lazy view over some external source, so when we put() something into an IMapthat wasn't there before, it first loads the "previous" value so that it can return it. Again this is a big issue when MapLoaderis slow or expensive - if we can explicitly put something into the map, why load it first? Luckily there is a straightforward workaround, putTransient(): def "IMap.putTransient() doesn't call loader"() { given: MapLoader loaderMock = Mock() def hz = build(loaderMock) IMap cache = hz.getMap("cache") when: cache.putTransient(ANY_KEY, ANY_VALUE, 1, TimeUnit.HOURS) then: 0 * loaderMock.load(ANY_KEY) cleanup: hz?.shutdown() } One caveat is that you have to provide TTL explicitly, rather then relying on configured IMap defaults. But this also means you can assign arbitrary TTL to every map entry, not only globally to whole map - useful. IMap.containsKey() involves MapLoader, can be slow or block Remember our analogy: IMap with backing MapLoader behaves like a view over external source of data. That's why it shouldn't be a surprise that containsKey() on an empty map will call MapLoader: def "IMap.containsKey() calls loader"() { given: MapLoader loaderMock = Mock() def hz = build(loaderMock) IMap emptyMap = hz.getMap("cache") when: emptyMap.containsKey(ANY_KEY) then: 1 * loaderMock.load(ANY_KEY) cleanup: hz?.shutdown() } Every time we ask for a key that's not present in a map, Hazelcast will ask MapLoader. Again, this is not an issue as long as your loader is fast, side-effect free and reliable. If this is not the case, this will kill you: def "IMap.get() after IMap.containsKey() calls loader twice"() { given: MapLoader loaderMock = Mock() def hz = build(loaderMock) IMap cache = hz.getMap("cache") when: cache.containsKey(ANY_KEY) cache.get(ANY_KEY) then: 2 * loaderMock.load(ANY_KEY) cleanup: hz?.shutdown() } Despite containsKey() calling MapLoader, it doesn't "cache" loaded value to use it later. That's why containsKey()followed by get() calls MapLoader two times, quite wasteful. Luckily if you call containsKey() on existing key, it runs almost immediately, although most likely will require network hop. What is not so fortunate is the behaviour of keySet(),values(), entrySet() and few other methods before version 3.3 of Hazelcast. These would all block in case any key is being loaded at a time. So if you have a map with thousands of keys and you ask for keySet(), one slowMapLoader.load() invocation will block whole cluster. This was fortunately fixed in 3.3, so that IMap.keySet(),IMap.values(), etc. do not block, even when some keys are being computed at the moment. As you can see IMap + MapLoader combo is powerful, but also filled with traps. Some of them are dictated by the API, osme by distributed nature of Hazelcast, finally some are implementation specific. Be sure you understand them before implementing loading cache feature.
September 24, 2014
· 15,612 Views · 1 Like
article thumbnail
Property-based Testing With Spock
Property based testing is an alternative approach to testing, complementingexample based testing. The latter is what we've been doing all our lives: exercising production code against "examples" - inputs we think are representative. Picking these examples is an art on its own: "ordinary" inputs, edge cases, malformed inputs, etc. But why are we limiting ourselves to just few examples? Why not test hundreds, millions... ALL inputs? There are at least two difficulties with that approach: Scale. A pure function taking just one int input would require 4 billion tests. This means few hundred gigabytes of test source code and several months of execution time. Square it if a function takes two ints. For String it practically goes to infinity. Assume we have these tests, executed on a quantum computer or something. How do you know the expected result for each particular input? You either enter it by hand (good luck) or generate expected output. Bygenerate I mean write a program that produces expected value for every input. But aren't we testing such program already in the first place? Are we suppose to write better, error-free version of code under test just to test it? Also known as ugly mirror antipattern. So you understand testing every single input, although ideal, is just a mental experiment, impossible to implement. That being said property based testing tries to get as close as possible to this testing nirvana. Issue #1 is solved by slamming code under test with hundreds or thousands of random inputs. Not all of them, not even a fraction. But a good, random representation. Issue #2 is surprisingly harder. Property based testing can generate random arguments, but it can't figure out what should be the expected outcome for that random input. Thus we need a different mechanism, giving name to whole philosophy. We have to come up with properties (invariants, behaviours) that code under test exhibits no matter what the input is. This sounds very theoretically, but there are many such properties in various scenarios: Absolute value of any number should never be negative Encoding and decoding any string should yield the same String back for every symmetric encoding Optimized version of some old algorithm should produce the same result as the old one for any input Total money in a bank should remain the same after arbitrary number of intra-bank transactions in any order As you can see there are many properties we can think of that do not mention specific example inputs. This is not exhaustive and strict testing. It's more like sampling and making sure samples are "sane". There are many, many libraries supporting property based testing for virtually every language. In this article we will explore Spock and ScalaCheck later. Spock + custom data generators Spock does not support property based testing out-of-the-box. However with help from data driven testing and 3rd-party data generators we can go quite far. Data tables in Spock can be generalized into so-called data pipes: def 'absolute value of #value should not be negative'() { expect: value.abs() >= 0 where: value << randomInts(100) } private static def List randomInts(int count) { final Random random = new Random() (1..count).collect { random.nextInt() } } Code above will generate 100 random integers and make sure for all of them.abs() is non-negative. You might think this test is quite dumb, but to a great surprise it actually discovers one bug! But first let's kill some boilerplate code. Generating random inputs, especially more complex, is cumbersome and boring. I found two libraries that can help us. spock-genesis: import spock.genesis.Gen def 'absolute value of #value should not be negative'() { expect: value.abs() >= 0 where: value << Gen.int.take(100) } Looks great, but if you want to generate e.g. lists of random integers,net.java.quickcheck has nicer API and is not Groovy-specific: import static net.java.quickcheck.generator.CombinedGeneratorsIterables.someLists import static net.java.quickcheck.generator.PrimitiveGenerators.integers def 'sum of non-negative numbers from #list should not be negative'() { expect: list.findAll{it >= 0}.sum() >= 0 where: list << someLists(integers(), 100) } This test is interesting. It makes sure sum of non-negative numbers is never negative - by generating 100 lists of randoms ints. Sounds reasonable. However multiple tests are failing. First of all due to integer overflow sometimes two positiveints add up to a negative one. Duh! Another type of failure that was discovered is actually frightening. While [1,2,3].sum() is 6, obviously, [].sum() is... null(WAT?) As you can see even silliest and most basic property based tests can be useful in finding unusual corner cases in your data. But wait, I said testing absolute of intdiscovered one bug. Actually it didn't, because of poor (too "random") data generators, not returning known edge values in the first place. We will fix that in the next article.
September 20, 2014
· 9,084 Views · 1 Like

Comments

GraphQL Server in Java, Part II: Understanding Resolvers

Mar 24, 2020 · Lindsay Burk

The last part is finally here: https://www.nurkiewicz.com/2020/03/graphql-server-in-java-part-iii.html


One Method to Rule Them All: Map.merge()

Nov 01, 2019 · Lindsay Burk

Because getOrDefault() doesn't add item to the map.

GraphQL Server in Java, Part I: The Basics

Oct 30, 2019 · Lindsay Burk

Well, imagine your system is composed of a few independent services and you have many heterogeneous clients (web, mobile, batch). All clients evolve separately. By creating a single GraphQL endpoint (in a facade) you allow each client to request data in needs in one go. Both clients and server aren't coupled by the API evolution. An alternative is an exponentially growing number of endpoints/versions and possibility of over-fetching or N+1 queries.

GraphQL Server in Java, Part I: The Basics

Oct 30, 2019 · Lindsay Burk

I appreciate the irony :-). Indeed I love SQL (especially because it's not JSON) but don't want to expose your SQL database to the client. Especially when there are dozens of databases and a handful of other services that you combine into a single request.

HashMap Performance Improvements in Java 8

Jan 02, 2019 · James Sugrue

Because the number of buckets is smaller than the number of possible hash codes so, by definition, twój different hashes MAY end up in the same bucket

Transaction Synchronization and Spring Application Events: Understanding @TransactionalEventListener

Aug 21, 2017 · Bartłomiej Słota

Very good article with clear narrative and examples. Thanks for sharing!

Java 8 Elvis Operator

May 06, 2014 · Robert Greathouse

The fact that you have to wrap all nullable references with Optional is actually a good thing. This forces you to handle nulls on the type system (i.e. compiler) level. In Groovy you have shorter syntax, but still it's possible to access null and get NPE. Also the code doesn't document itself.

Finally names map and flatMap aren't random, they have a long history and they come from monads, available in functional languages.

However I do agree some JVM languages deal with nulls better, e.g. Kotlin with question mark after type.

HashMap Performance Improvements in Java 8

May 05, 2014 · James Sugrue

Pierre, everything you say is obviously right. It was a mental shortcut from my side, thanks for clarification. With regards to key/value pair distribution - it's even worse. Keys with different hash codes may still land in the same bucket, even if otherwise they could be easily distributed.


Also I agree with your statement about order of entries in a map. But unspecified order in HashMap is so strongly emphasized that I don't believe it will impact reasonably written software. On the other hand I saw unit tests breaking after migration from Java 6 to 7 because order of entries in HashMap changed - so it's not really the first time.

HashMap Performance Improvements in Java 8

May 05, 2014 · James Sugrue

Pierre, everything you say is obviously right. It was a mental shortcut from my side, thanks for clarification. With regards to key/value pair distribution - it's even worse. Keys with different hash codes may still land in the same bucket, even if otherwise they could be easily distributed.


Also I agree with your statement about order of entries in a map. But unspecified order in HashMap is so strongly emphasized that I don't believe it will impact reasonably written software. On the other hand I saw unit tests breaking after migration from Java 6 to 7 because order of entries in HashMap changed - so it's not really the first time.

Lazy sequences implementation for Java 8

May 25, 2013 · Allen Coin

Thank you for your valuable comment!

Ad. 1: I took the concept and much of implementation details from Scala (Stream). Possibility to express infinite streams of data without actually evaluating them is a core concept. In Java 8 streams are mainly just wrappers around collections (with few exceptions, e.g. java.util.Random.ints() - infinite) so laziness would give nothing. But LazySeq focuses on lazy, expensive to compute, typically infinite sequences. Therefore I believe laziness is the core concept, not a detail or option. Of course it's also not a general-purpose List implementation, it is very special purpose.

Ad. 2: Memoization helps reusing already computed values. But I explain clearly how it can blow away your heap and thus should be used with care. This applies to any collection in any language - don't try to put too much into memory. On the other hand this data structure without memoization can be easily turned into a (recursive) function.

Ad. 3: head/tail concept was taken, again, from Stream in Scala. If you look at the source code, most operations like map() and filter() are insanely simple to implement on top of that. E.g. seq.map(f) turns into f(head) concatenated with (lazy) tail.map(f). head/tail also simplifies thinking about data structure (first element is eager, rest is lazy)

But I agree that linked list is slow and wasteful. An alternative would be a small array (10-16 elements) in head and lazy rest. But this would be much more cumbersome to implement. Moreover I can't simply wrap existing collection as this could violate immutability. And did I mention that this was a toy-project to play with Java 8 and have fun? :-)

Thank you again for taking time to critique my tiny library.

Lazy sequences implementation for Java 8

May 25, 2013 · Allen Coin

Thank you for your valuable comment!

Ad. 1: I took the concept and much of implementation details from Scala (Stream). Possibility to express infinite streams of data without actually evaluating them is a core concept. In Java 8 streams are mainly just wrappers around collections (with few exceptions, e.g. java.util.Random.ints() - infinite) so laziness would give nothing. But LazySeq focuses on lazy, expensive to compute, typically infinite sequences. Therefore I believe laziness is the core concept, not a detail or option. Of course it's also not a general-purpose List implementation, it is very special purpose.

Ad. 2: Memoization helps reusing already computed values. But I explain clearly how it can blow away your heap and thus should be used with care. This applies to any collection in any language - don't try to put too much into memory. On the other hand this data structure without memoization can be easily turned into a (recursive) function.

Ad. 3: head/tail concept was taken, again, from Stream in Scala. If you look at the source code, most operations like map() and filter() are insanely simple to implement on top of that. E.g. seq.map(f) turns into f(head) concatenated with (lazy) tail.map(f). head/tail also simplifies thinking about data structure (first element is eager, rest is lazy)

But I agree that linked list is slow and wasteful. An alternative would be a small array (10-16 elements) in head and lazy rest. But this would be much more cumbersome to implement. Moreover I can't simply wrap existing collection as this could violate immutability. And did I mention that this was a toy-project to play with Java 8 and have fun? :-)

Thank you again for taking time to critique my tiny library.

Lazy sequences implementation for Java 8

May 25, 2013 · Allen Coin

Thank you for your valuable comment!

Ad. 1: I took the concept and much of implementation details from Scala (Stream). Possibility to express infinite streams of data without actually evaluating them is a core concept. In Java 8 streams are mainly just wrappers around collections (with few exceptions, e.g. java.util.Random.ints() - infinite) so laziness would give nothing. But LazySeq focuses on lazy, expensive to compute, typically infinite sequences. Therefore I believe laziness is the core concept, not a detail or option. Of course it's also not a general-purpose List implementation, it is very special purpose.

Ad. 2: Memoization helps reusing already computed values. But I explain clearly how it can blow away your heap and thus should be used with care. This applies to any collection in any language - don't try to put too much into memory. On the other hand this data structure without memoization can be easily turned into a (recursive) function.

Ad. 3: head/tail concept was taken, again, from Stream in Scala. If you look at the source code, most operations like map() and filter() are insanely simple to implement on top of that. E.g. seq.map(f) turns into f(head) concatenated with (lazy) tail.map(f). head/tail also simplifies thinking about data structure (first element is eager, rest is lazy)

But I agree that linked list is slow and wasteful. An alternative would be a small array (10-16 elements) in head and lazy rest. But this would be much more cumbersome to implement. Moreover I can't simply wrap existing collection as this could violate immutability. And did I mention that this was a toy-project to play with Java 8 and have fun? :-)

Thank you again for taking time to critique my tiny library.

Spring Data JDBC Generic DAO Implementation: Most Lightweight ORM Ever

Jan 31, 2013 · Allen Coin

Actually the library does support many-to-one relationships , although it's a bit tricky. I hope I can collect few eager users/developers around it so that we can better understand what typical applications need. I already got some fantastic pull requests with MSSQL and Oracle (still on the way) support.

Of course it will never be full-blown, complete solution. It's intended to be small and maybe even temporary (during transition period to more comprehensive solutions).

Spring Data JDBC Generic DAO Implementation: Most Lightweight ORM Ever

Jan 31, 2013 · Allen Coin

Actually the library does support many-to-one relationships , although it's a bit tricky. I hope I can collect few eager users/developers around it so that we can better understand what typical applications need. I already got some fantastic pull requests with MSSQL and Oracle (still on the way) support.

Of course it will never be full-blown, complete solution. It's intended to be small and maybe even temporary (during transition period to more comprehensive solutions).

Spring Data JDBC Generic DAO Implementation: Most Lightweight ORM Ever

Jan 31, 2013 · Allen Coin

Actually the library does support many-to-one relationships , although it's a bit tricky. I hope I can collect few eager users/developers around it so that we can better understand what typical applications need. I already got some fantastic pull requests with MSSQL and Oracle (still on the way) support.

Of course it will never be full-blown, complete solution. It's intended to be small and maybe even temporary (during transition period to more comprehensive solutions).

@Cacheable overhead in Spring

Jan 24, 2013 · Allen Coin

@Jim O'callaghan - have a look at this discussion (bottom) - seems like you have the same issue.

@Cacheable overhead in Spring

Jan 24, 2013 · Allen Coin

@Jim O'callaghan - have a look at this discussion (bottom) - seems like you have the same issue.

@Cacheable overhead in Spring

Jan 24, 2013 · Allen Coin

@Jim O'callaghan - have a look at this discussion (bottom) - seems like you have the same issue.

The Limited Usefulness of AsyncContext.start()

Nov 06, 2012 · James Sugrue

Don't get me wrong, asynchrounous servlets are great. It's only the AsyncContext.start() method that is not really that much useful. But the concept of asynchronous servlets (including the ability to server multiple requests using just one thread) is a great addition to standard.

The Limited Usefulness of AsyncContext.start()

Nov 06, 2012 · James Sugrue

Don't get me wrong, asynchrounous servlets are great. It's only the AsyncContext.start() method that is not really that much useful. But the concept of asynchronous servlets (including the ability to server multiple requests using just one thread) is a great addition to standard.

RateLimiter - Discovering Google Guava

Nov 03, 2012 · James Sugrue

Thanks for in-depth explanation. I hope I described the warm-up functionality clearly by saying: it will gradually increase allowed frequency over configured time up to configured maximum value instead of allowing maximum frequency from the very beginning (?)

RateLimiter - Discovering Google Guava

Nov 03, 2012 · James Sugrue

Thanks for in-depth explanation. I hope I described the warm-up functionality clearly by saying: it will gradually increase allowed frequency over configured time up to configured maximum value instead of allowing maximum frequency from the very beginning (?)

RateLimiter - Discovering Google Guava

Nov 03, 2012 · James Sugrue

Indeed in some circumstances RateLimiter knows in advance that tryAcquire() cannot succeed, very smart!

RateLimiter - Discovering Google Guava

Nov 03, 2012 · James Sugrue

Indeed in some circumstances RateLimiter knows in advance that tryAcquire() cannot succeed, very smart!

String Memory Internals

Oct 20, 2012 · James Sugrue

Thanks, fixed!
String Memory Internals

Oct 20, 2012 · James Sugrue

Thanks, fixed!
Consider assertThat() in place of assertEquals()

Oct 10, 2012 · James Sugrue

Even better, use FEST assertions:

assertThat(result1).isEqualTo(42);
assertThat(result2).contains("foo");

Note that your example is a bit confusing. You are checking whether "result" variable is equal to 42 and contains "foo" string - at the same time.

Consider assertThat() in place of assertEquals()

Oct 10, 2012 · James Sugrue

Or even better, use FEST assertions:

assertThat(result1).isEqualTo(42);

assertThat(result2).contains("foo");

BTW because you are using the same variable "result" from your code sample it seems like this variable should be both 42 (int) and contain "foo" string...

Enabling JMX in Hibernate, Ehcache, Quartz, DBPC and Spring

Dec 22, 2011 · James Sugrue

Please correct: s/DBPC/DBCP/ - this typo is already fixed in the source article, where it originates.
New Java 7 Feature: String in Switch support

Mar 22, 2011 · James Sugrue

"compiler will likely generate more efficient bytecode as compared to the if-then-else statement" - see this. Actually I believe this approach is described in the specification itself.
Reduce Boilerplate Code for DAO's -- Hades Introduction

Oct 15, 2010 · Shekhar Gulati

Hades is a really nice library, I used it in few projects recently and it worked well. I also wrote a similar introduction, if you want to grab a different example: http://nurkiewicz.blogspot.com/2010/07/hades-dry-principle-in-jpaspring.html
Mockito - Pros, Cons, and Best Practices

Oct 14, 2010 · James Sugrue

Recently I discovered that ArgumentCaptor is much easier to use than custom ArgumentMatcher. In most cases they can be used interchangeably, but using captor seems more obvious and straightforward.
How to Copy Bean Properties With a Single Line of Code

Sep 01, 2010 · James Sugrue

This code definitely needs more unit tests than traditional approach. Also there is a ready-made, more flexible and elegant solution, Dozer. But both approaches suffer the same problem - while analyzing source codes, string/reflection/other magic methods of copying data from one object to another aren't easy to spot. After doing some development with Dozer I gave it up and use traditional, property-by-property approach. Simplicity is king! P.S.: Although, I find this idea being really cool: BEANUTILS-375.

User has been successfully modified

Failed to modify user

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • [email protected]

Let's be friends: