Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Inside the Java SDK: Connection Management

DZone's Guide to

Inside the Java SDK: Connection Management

Want to learn more about SDK and connection management? Check out this post on Java SDK and how it manages nodes and other services.

· Java Zone ·
Free Resource

The CMS developers love. Open Source, API-first and Enterprise-grade. Try BloomReach CMS for free.

In this second installment of “Inside the Java SDK,” we are going to take an in-depth look at how the SDK manages and pools sockets to the various nodes and services. While not ultimately necessary to follow, I recommend you check out the first post on bootstrapping as well.

Please note that this post was written with the Java SDK 2.5.9 / 2.6.0 releases in mind. Because of that, things might change over time, but the overall approach should stay mostly the same.

In the spirit of the OSI and TCP models, I propose a three-layer model that represents the SDKs connection stack:

+-----------------+
| Service Layer   |
+-----------------+
| Endpoint Layer  |
+-----------------+
| Channel Layer   |
+-----------------+


Higher levels build on top of lower levels, so we’ll start with the Channel layer and work our way up the stack.

The Channel Layer

The channel layer is the lowest level the SDK deals with networking and is built on top of the excellent, fully asynchronous IO library called Netty. We’ve been extensive Netty users for years and contributed patches as well as the Memcache codec to the project.

Every Netty channel corresponds to a socket and is multiplexed on top of event loops. We’ll cover the threading model in a later blog post, but, for now, it’s important to know that instead of the “one thread per socket” model of traditional blocking IO, Netty takes all open sockets and distributes them across a handful of event loops. It does this in a very efficient way, so it’s no wonder that Netty is used all over the industry for high performance and low latency networking components.

Since a channel is only concerned with bytes going in and out, we need a way to encode and decode application level requests (like an N1QL query or a Key/Value get request) into their proper binary representation. In Netty, this is done by adding handlers to the channel pipeline. All networks write operations that will work their way down the pipeline, and then, server responses come back up the pipeline (this is also called inbound and outbound in Netty terminology).

Some handlers are added independent of the service used (like logging or encryption) and others depend on the service type (for example, for an N1QL response, we have JSON streaming parsers customized to the response structure).

If you ever wondered how to get packet-level logging output during development or debugging (for production use tcpdump, Wireshark or similar), all you need to do is enable the TRACE log level in your favorite log library and you’ll see an output like this:

[cb-io-1-1] 2018-06-28 14:03:34 TRACE LoggingHandler:94 - [id: 0x41407638, L:/127.0.0.1:60923 - R:localhost/127.0.0.1:11210] WRITE: 243B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 80 1f 00 db 00 00 00 00 00 00 00 e5 00 00 00 00 |................|
|00000010| 00 00 00 00 00 00 00 00 7b 22 61 22 3a 22 63 6f |........{"a":"co|
|00000020| 75 63 68 62 61 73 65 2d 6a 61 76 61 2d 63 6c 69 |uchbase-java-cli|
|00000030| 65 6e 74 2f 32 2e 36 2e 30 2d 53 4e 41 50 53 48 |ent/2.6.0-SNAPSH|
|00000040| 4f 54 20 28 67 69 74 3a 20 32 2e 36 2e 30 2d 62 |OT (git: 2.6.0-b|
|00000050| 65 74 61 2d 31 36 2d 67 35 63 65 30 38 62 30 2c |eta-16-g5ce08b0,|
|00000060| 20 63 6f 72 65 3a 20 31 2e 36 2e 30 2d 62 65 74 | core: 1.6.0-bet|
|00000070| 61 2d 33 33 2d 67 31 62 33 65 36 66 62 29 20 28 |a-33-g1b3e6fb) (|
|00000080| 4d 61 63 20 4f 53 20 58 2f 31 30 2e 31 33 2e 34 |Mac OS X/10.13.4|
|00000090| 20 78 38 36 5f 36 34 3b 20 4a 61 76 61 20 48 6f | x86_64; Java Ho|
|000000a0| 74 53 70 6f 74 28 54 4d 29 20 36 34 2d 42 69 74 |tSpot(TM) 64-Bit|
|000000b0| 20 53 65 72 76 65 72 20 56 4d 20 31 2e 38 2e 30 | Server VM 1.8.0|
|000000c0| 5f 31 30 31 2d 62 31 33 29 22 2c 22 69 22 3a 22 |_101-b13)","i":"|
|000000d0| 30 43 34 37 35 41 43 41 35 46 33 38 30 41 32 31 |0C475ACA5F380A21|
|000000e0| 2f 30 30 30 30 30 30 30 30 34 31 34 30 37 36 33 |/000000004140763|
|000000f0| 38 22 7d |8"} |
+--------+-------------------------------------------------+----------------+


Please note here the little LoggingHandler  up there. This is because we only add the logging handler if tracing is enabled to the pipeline, so you are not paying the overhead when you are not using it (which is most of the time):

bootstrap = new BootstrapAdapter(new Bootstrap()
  // *snip*
  .option(ChannelOption.ALLOCATOR, allocator)
  .option(ChannelOption.TCP_NODELAY, tcpNodelay)
  .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, env.socketConnectTimeout())
  .handler(new ChannelInitializer<Channel>() {
    @Override
    protected void initChannel(Channel channel) throws Exception {
      ChannelPipeline pipeline = channel.pipeline();
      if (env.sslEnabled()) {
        pipeline.addLast(new SslHandler(sslEngineFactory.get()));
      }
      if (LOGGER.isTraceEnabled()) {
        pipeline.addLast(LOGGING_HANDLER_INSTANCE);
      }
    customEndpointHandlers(pipeline);
  }
}));


You can also see that, depending on the environment configuration, we make other adjustments like adding an SSL/TLS handler to the pipeline or configuring TCP no delay and the socket timeouts.

The  customEndpointHandlers  method is overridden for each service. Here is the pipeline for the KV layer (slightly simplified):

if (environment().keepAliveInterval() > 0) {
    pipeline.addLast(new IdleStateHandler(environment().keepAliveInterval(), 0, 0, TimeUnit.MILLISECONDS));
}

pipeline
    .addLast(new BinaryMemcacheClientCodec())
    .addLast(new BinaryMemcacheObjectAggregator(Integer.MAX_VALUE));

pipeline
    .addLast(new KeyValueFeatureHandler(context()))
    .addLast(new KeyValueErrorMapHandler());

if (!environment().certAuthEnabled()) {
    pipeline.addLast(new KeyValueAuthHandler(username(), password(), environment().forceSaslPlain()));
}

pipeline
    .addLast(new KeyValueSelectBucketHandler(bucket()))
    .addLast(new KeyValueHandler(this, responseBuffer(), false, true));


Lots going on here! Let’s go through it one by one:

  • The  IdleStateHandler  is used to trigger application-level keepalives.
  • The next two handlers BinaryMemcacheClientCodec   and  BinaryMemcacheObjectAggregator  deal with encoding Memcache request and response objects into their byte representations and back.
  •  KeyValueFeatureHandler  , KeyValueErrorMapHandler,  KeyValueAuthHandler  and KeyValueSelectBucketHandler   all perform handshaking, authentication, bucket selection, and so forth during the connect phase, reducing themselves from the pipeline once complete.
  • Finally, the  KeyValueHandler does most of the work and “knows” all the different request types going in and out of the system.

If you want to take a look at a different one, here is an example of the N1QL pipeline.

Before we move up one layer, there is one important bit. The RxJava Observable completion also happens at this layer. Once a response is decoded, it is completed either on the event loop directly or in a thread pool (configured by default).

It is important to know that once a channel goes down (because the underlying socket is closed) all-state at this level is gone. On a reconnect attempt, a fresh channel is created. So, who manages a channel? Let’s move up a layer.

The Endpoint Layer

The endpoint layer is responsible for managing the lifecycle of a channel including bootstrap, reconnect, and disconnect. You can find the code here.

There is always a 1:1 relationship between the endpoint and the channel that it manages. But, if a channel goes away and a socket needs to be reconnected, the endpoint stays the same and gets a new one internally. The endpoint is also the place where the request is handed over to the event loops (simplified):

@Override
public void send(final CouchbaseRequest request) {
    if (channel.isActive() && channel.isWritable()) {
        channel.write(request, channel.voidPromise());
    } else {
        responseBuffer.publishEvent(ResponseHandler.RESPONSE_TRANSLATOR, request, request.observable());
    }
}


If our channel is active and writable, we’ll write the request into the pipeline. Otherwise, it is sent back and re-queued for another attempt.

Here is a very important aspect of the endpoint to keep in mind. If a channel closed, the endpoint will try to reconnect (with the configured backoff) as long as it is explicitly told to stop. It stops when the manager of the endpoint calls disconnect on it. This will happen, ultimately, when the respective service/node is not part of the config anymore. So, at the end of a rebalance or during a failover, the client will receive a new cluster config. From this, it infers that this endpoint can be terminated and then it does so accordingly. If for whatever reason, there is a delay between a socket disconnect and the information that is propagating, you might see some reconnect attempts that will stop eventually.

One endpoint is all very well, but more is always better right? So, let’s go up one more layer to figure out how endpoints are pooled to create sophisticated connection pools on a per node and service basis.

The Service Layer

The service layer manages one or more endpoints per node. Each service is only responsible for one node. For example, if you have a Couchbase cluster of five nodes with only the KV service enabled on each, then, if you inspect a heap dump, you’ll find five instances of the  KeyValueService .

In older client versions, you were only able to configure a fixed number of endpoints per service through methods like kvEndpointsqueryEndpoints,  and so forth. Due to more complex requirements, we’ve deprecated this “fixed” approach with a powerful connection pool implementation. This is why instead of i.e. queryEndpoints,  you should now use queryServiceConfig   and equivalents.

Here are the current default pools per service in 2.5.9 and 2.6.0:

  •  KeyValueService  : one endpoint per node, fixed.
  •  QueryService  : from zero to 12 endpoints per node, dynamic.
  •  ViewService  : from zero to 12 endpoints per node, dynamic.
  •  AnalyticsService  : from zero to 12 endpoints per node, dynamic.
  •  SearchService  : from zero to 12 endpoints per node, dynamic.

The reason why KV is not pooled by default is that the connection handshaking is way more costly (remember all the handlers in the pipeline) and the traffic pattern is usually very different from the heavier query-based services. Experience from the field has shown that increasing the number of KV endpoints only makes sense in “bulk load” scenarios and very spiky traffic where the “pipe” of one socket is just too small. If this is not properly benchmarked, it could also be that adding more sockets to the KV layer can degrade your performance instead of improving it — I guess more is not always better.

The pooling logic can be found here if you are curious. But, it’s worth examining certain semantics in there.

During the connect phase of the service, it ensures that the minimum number of endpoints is established up front. If the minimum equals the maximum, dynamic pooling is effectively disabled and the code will pick one of the endpoints for each request:

synchronized (epMutex) {
    int numToConnect = minEndpoints - endpoints.size();
    if (numToConnect == 0) {
        LOGGER.debug("No endpoints needed to connect, skipping.");
        return Observable.just(state());
    }
    for (int i = 0; i < numToConnect; i++) {
        Endpoint endpoint = endpointFactory.create(hostname, bucket, username, password, port, ctx);
        endpoints.add(endpoint);
        endpointStates.register(endpoint, endpoint);
    }

    LOGGER.debug(logIdent(hostname, PooledService.this)
            + "New number of endpoints is {}", endpoints.size());
}


This can be observed from the logs right away during bootstrap:

[cb-computations-5] 2018-06-28 14:03:34 DEBUG Service:257 - [localhost][KeyValueService]: New number of endpoints is 1
[cb-computations-8] 2018-06-28 14:03:35 DEBUG Service:248 - [localhost][QueryService]: No endpoints needed to connect, skipping.


When a request comes in, it is either dispatched or, if another endpoint needs to be created, there is still room in the pool that is handled as well (slightly simplified):

@Override
public void send(final CouchbaseRequest request) {
    Endpoint endpoint = endpoints.size() > 0 ? selectionStrategy.select(request, endpoints) : null;

    if (endpoint == null) {
        if (fixedEndpoints || (endpoints.size() >= maxEndpoints)) {
            RetryHelper.retryOrCancel(env, request, responseBuffer);
        } else {
            maybeOpenAndSend(request);
        }
    } else {
        endpoint.send(request);
    }
}


Note that if we can’t find a suitable endpoint and the pool is fixed or we have reached our ceiling, then the operation is scheduled for retry. This is very similar to the endpoint logic when it is not active or writable.

In pooled HTTP based services, we don’t want to keep those sockets around forever so you can configure an idle time (which is 300s by default). Each pool runs an idle timer that regularly examines the endpoints. If they have been idle for longer than the configured interval, and disconnect it. It is important to note that logic always ensures that we do not fall below the minimum number.

Common Connection-Related Errors

Now that you have a good idea on how the SDK handles sockets and pools them, let’s talk about a couple of error scenarios that can come up.

Request Cancellations

Let’s talk about the RequestCancelledException first.

If you are performing an operation and it fails with a RequestCancelledException,  there are usually two different causes:

  • The operation circled around inside the client (without being sent over the network) for longer than the configured  maxRequestLifetime  .
  • A request has been written to the network, but, before we got a response, the underlying channel was closed.

There are other less common reasons (i.e. issues during encoding of a request) but, for the purpose of this example, we will focus on the second cause.

So, why do we have to cancel the request and not retry it on another socket that is still active? The reason is that we don’t know if the operation already caused a side effect on the server (for example a mutation applied). If we would retry non-idempotent operations, there would be weird effects that are hard to diagnose in practice. Instead, we tell the caller that the request has failed and then it’s up to the application logic to figure out what to do next. If it was a simple get request and you are still in your timeout budget, you can retry on your own. If it’s a mutation, you need to either put some more logic in place to read the document and figure out if it has been applied or you know it can be sent again right away. And, then, there is always the option to propagate the error back to the caller of your API. In any case, it’s predictable from the SDK side and won’t cause any more harm in the background.

Bootstrap Issues

The other source of errors that is worth knowing about issues during the socket connect phase. Usually, you’ll find descriptive errors in the logs that tell you what is going on (for example, wrong credentials), but there are two that might be a little harder to decipher — the connect safeguard timeout and select bucket errors during rebalance.

As you’ve seen before, the KV pipeline contains many handlers that work back and forth with the server during bootstrap to figure out all kinds of config settings and negotiate supported features. At the time of writing each individual operation, it does not have an individual timeout but rather the connect safeguard timeout kicks in if it takes longer than the connect phase is allowed to in terms of total budget.

So, if you see the ConnectTimeoutException   in the logs with the message "Connect callback didnot return," hit safeguarding timeout. This means that one operation or the sum of all of them took longer than there was a budget for and another reconnect attempt will be performed. This is not harmful, in general, since we will reconnect, but it is a good indication that there might be some slowness on the network or somewhere else in the stack that should be looked at more carefully. A good next step would be to start Wireshark or tcpdump and record the bootstrap phases to figure out where the time is spent. Then, either pivot to the client or the server side depending on the recorded timings. By default, the safeguard timeout is configured as the socketConnectTimeout  plus the connectCallbackGracePeriod   which is set to 2 seconds and can be tuned via  thecom.couchbase.connectCallbackGracePeriod  system property.

One of the steps during bootstrap, since we added support for RBAC (role-based access control), is called “select bucket” through the  KeyValueSelectBucketHandler . Since there is a disconnect between authentication and having access to a bucket, it is possible that the client connects to a KV service but the KV engine itself is not yet ready to serve it. The client will gracefully handle the situation and retry – and no impact to an actual workload is observed – but, since log hygiene is also a concern, we are currently improving the SDK algorithm here. If you want to learn more, you can follow the progress at JVMCBC-553.

Final Thoughts

By now, you should have a solid understanding of how the SDK manages its underlying sockets and pools them at the service layer. If you want to go digging into the codebase, start here and then look at the respective namespaces for the service and endpoint. All the Netty channel handlers are below the endpoint namespace as well.

BloomReach CMS: the API-first CMS of the future. Open-source & enterprise-grade. - As a Java developer, you will feel at home using Maven builds and your favorite IDE (e.g. Eclipse or IntelliJ) and continuous integration server (e.g. Jenkins). Manage your Java objects using Spring Framework, write your templates in JSP or Freemarker. Try for free.

Topics:
java ,sdk ,connection management

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}