Writing [Feed] About Pub

Solr Distributed Search and the Stale Check

25 Feb 2013

Yokozuna

For the past eight months I have been working on a project called Yokozuna. The goal of this project is to tightly integrate Solr with Riak to provide robust search functionality, but allow Riak to handle the distributed bits which it is good at. Riak handles consistent hashing, data ownership claim, handoff, anti-entropy, sibling resolution, replication, etc. However, there is one distributed aspect that is still in Solr’s control—distributed search.

Distributed Search

Since version 1.3.0 Solr has provided support for Distributed Search. Distributed Search allows one to run a query against a set of shards, handling the coordination and collation of the results on behalf of the user. This allows the splitting of an index across multiple nodes while at the same time keeping the convenience of a non-distributed search.

As an index grows there inevitably comes a time when a single machine cannot adequately take full responsibility of it. The problems vary from disk space constraints, inadequate processing power, or large document sets with common terms. At some point, splitting the index makes sense because it allows to share the load as well as parallelize some of the work.

Distributed Search, for the most part, hides the fact that the query is distributed. Yes, the shards must be passed in, so the user must know which shards make up the full index, but the user doesn’t have to worry about the complexity of coordination or properly merging the results.

As with anything, there is a tradeoff. In this case additional communication and coordination must be introduced. You cannot have a distributed process without these two things. If an index is split across two nodes, then both of those nodes must communicate with themselves or some other entity. This entity, in turn, must coordinate the communication and determine a result. In this case “coordination” refers to HTTP, which is sent via TCP sockets.

A Distributed Search request can be sent to any of the shard nodes, which will act as the coordinator. The coordinator uses Solr’s Java client, SolrJ, to make a request to each shard, in parallel. When all of the shards have responded the coordinator will merge the results and determine the winning set of document identifiers. A second request pulls back the document data which is needed for the response. This means every Distributed Search must perform two, sequential stages. The fact that the stages must are done sequentially, and not in parallel, is key. The latency for the total request will be at least as great as the sum of the latency for each stage. Reducing the latency of each stage is paramount.

Stale Check

SolrJ uses the Apache HTTP Client to execute requests. As with any good client, a connection pool is used to take advantage of HTTP 1.1 persistent connections. A persistent connection is one that is kept open between requests so that it may be reused. This is done to avoid constant creation and destruction of TCP sockets, which adds latency. For each stage of the Distributed Search the coordinator will lease a connection from the pool. Since workload may vary, the pool will grow or shrink as needed. On both the client (coordinator) and server (shard), application-level timeouts are put in place to determine when a connection is idle and should be closed. But a problem arises. If the client and server don’t agree on when a connection is idle then one side may decide to close the socket before the other side is ready. If the server closes the socket first then that will cause the client to hit a socket reset error if it blindly sends a request over the socket. To avoid this the HTTP client performs a stale check every time a connection is leased from the pool, adding latency. On my particular benchmark rig, each stale check call accounted for an additional ~10ms. That is the time it takes for a mechanical drive to seek--a lifetime for the computer.

Simply disabling the check reduces the latency, but opens the window for socket reset error (see Reset Generation in RFC 793). The easiest way to avoid this is to have the client initiate the socket close. The server, actively reading or polling its sockets, will immediately see the close and properly finish the shutdown sequence. Leaving no lingering sockets in a dubious state. For Yokozuna I did this by configuring the client idle to be lower than the server’s. That’s half the job, the other job is to determine when a client socket has hit that idle and close it. For this I added a sweeper which periodically sweeps the pool of idle connections, closing any that have surpassed the threshold. Now the latency of the stale check has been removed and there is no concern for socket reset errors.

Results

The results of removing the stale check are terrific. I saw a 2–4 fold increase in throughput across the the board with a reduction of over 100ms in latency in almost all cases. To achieve these results I used a synthetic benchmark of my own creation. I call it the fruit benchmark. It loads Yokozuna with 1M plain text values, each a list of various fruits. A given fruit will occur a specific number of times in the corpus. This allows running boolean queries with a predetermined number of results. E.g. I know apple occurs 100K times, and grape 1 time. Thus the query apple AND grape will run a search where one subquery matches 100K and the other 1. Resulting in 1 matching document which contains both apple and grape. In the past I’ve used this benchmark which great success to test Riak Search’s conjunction query performance. Following is a break down of the cardinalities of each query along with the results.

Benchmark Result Set Cardinalities
alpha 100K, 100K, 0
beta 100K, 100K, 1
charlie 100K, 100K, 1, 100K, 100K, 100K
delta 10K, 100, 10
echo 10, 100, 10
foxtrot 10K, 10K, 10K
golf 1K, 10K, 100K
hotel 10K, 100K, 100K
india 1, 10, 1, 100, 10
juliet 1, 1
kilo 1M
lima 1

Figure 1 compares the throughput (y-axis) of various benchmarks (x-axis). The green line is Yokozuna/Solr with stale check enabled and the red-orange line is with stale check removed. Notice that the red line beats the green line in every benchmark, but is slightly less consistent (i.e. not as flat). My hunch is that removing some of the latency has allowed other variables to come into play such as disk IO. There are two other patches I’m looking at making to Distributed Search that might help smooth this line out a bit. Both involve ID mapping: use a single TermEnum for the entire list, and add a cache.

This plot is the same as the last except now the y-axis represents latency, so lower is better. It shows a 100ms drop across the board except for query alpha. The lines even follow the same pattern, rising and dipping in the same spots and staying in a 20ms range of each other. Demonstrating a significant improvement from removing the stale check.

More Stale Check, For The Curious

What is it exactly about the stale check that causes this additional latency? How did I manage to come up with my figure of ~10ms stated earlier? Why does a socket reset error occur if stale check is simply turned off?

The Apache HTTP docs make it clear that the stale check may add unwanted latency and shouldn’t be used in high-throughput scenarios. But what exactly is it that eats up the time?

In the client, the method DefaultRequestDirector.execute() will determine if stale check is enabled and call isStale() on the connection leased from the pool. Most of the code related to this check is very cheap. A matter of checking the values of various member fields. But eventually there is a blocking read() call. In the case that the server has closed the socket this call will return immediately with -1 to signal EOF. If the socket is still open then it should block for approximately 1ms before returning control back (before the read call is made the socket timeout is temporarily set to 1ms). It blocks because there is nothing to read. A request must be sent to the server for there to be anything to read but a request cannot be sent without first checking the socket.

The majority of the work of the stale check is performed by the method isDataAvailable(). To verify this method is only called when stale check is enabled I turned to DTrace. To monitor method entry and exits with DTrace requires the JVM be started with the ExtendedDTraceProbes flag enabled. I used the following one-liner while running Distributed Search queries to verify isDataAvailable() is not called.

$ dtrace -n 'hotspot*:::method-entry > /copyinstr(arg1) == "org/apache/http/impl/io/SocketInputBuffer"/ > { printf("%s:%s(%s)", copyinstr(arg1), copyinstr(arg3), copyinstr(arg5)); }'

Next I wanted to determine the amount of time the method was taking while the machine is under load. Normally I would use DTrace again but my test rig, which is a SmartOS machine, doesn’t provide hotspot probes when a JVM is running. Luckily there is a tool called BTrace which is essentially DTrace for Java. With this tool I was able to write the a script to track call latency on the fly. As shown below, each call took ~10ms to complete.

isDataAvailable() took 9787 microseconds isDataAvailable() took 9789 microseconds isDataAvailable() took 9684 microseconds isDataAvailable() took 9672 microseconds

Disabling the stale check is the move to make in this case. But by disabling the check the client always assumes the connection is good and in the ESTABLISHED state. The server has an idle of 50 seconds. SolrJ, on the other hand, defaults to a timeout of infinity, i.e. it never times out. As load varies the server will start closing connections. The client, not performing the stale check, will forgo the read, miss the EOF and send a request on a socket that has since been closed. The server responds to this by sending an RST causing the client to encounter a socket reset error upon its next read. This behavior can be verified without disabling stale check. Run a Distributed Search and then use netstat to confirm that sockets have been opened between the coordinator and the shards and are in the state ESTABLISHED. Wait 51 seconds and run netstat again. Notice that the sockets on the coordinator are in the CLOSE-WAIT state (RFC 793 §3.5 provides more information on CLOSE-WAIT). This is telling you that the server has closed its socket but the client still hasn’t closed on it’s side. Wait another minute and netstat will report the same state. Wait as long as you want. The socket will sit there in CLOSE-WAIT. This is why the stale check is on by default.