Solr Distributed Search and the Stale Check
25 Feb 2013
Yokozuna
For the past eight months I have been working on a project called Yokozuna. The goal of this project is to tightly integrate Solr with Riak to provide robust search functionality, but allow Riak to handle the distributed bits which it is good at. Riak handles consistent hashing, data ownership claim, handoff, anti-entropy, sibling resolution, replication, etc. However, there is one distributed aspect that is still in Solr’s control—distributed search.
Distributed Search
Since version 1.3.0 Solr has provided support for Distributed Search. Distributed Search allows one to run a query against a set of shards, handling the coordination and collation of the results on behalf of the user. This allows the splitting of an index across multiple nodes while at the same time keeping the convenience of a non-distributed search.
As an index grows there inevitably comes a time when a single machine cannot adequately take full responsibility of it. The problems vary from disk space constraints, inadequate processing power, or large document sets with common terms. At some point, splitting the index makes sense because it allows to share the load as well as parallelize some of the work.
Distributed Search, for the most part, hides the fact that the query is distributed. Yes, the shards must be passed in, so the user must know which shards make up the full index, but the user doesn’t have to worry about the complexity of coordination or properly merging the results.
As with anything, there is a tradeoff. In this case additional communication and coordination must be introduced. You cannot have a distributed process without these two things. If an index is split across two nodes, then both of those nodes must communicate with themselves or some other entity. This entity, in turn, must coordinate the communication and determine a result. In this case “coordination” refers to HTTP, which is sent via TCP sockets.
A Distributed Search request can be sent to any of the shard nodes, which will act as the coordinator. The coordinator uses Solr’s Java client, SolrJ, to make a request to each shard, in parallel. When all of the shards have responded the coordinator will merge the results and determine the winning set of document identifiers. A second request pulls back the document data which is needed for the response. This means every Distributed Search must perform two, sequential stages. The fact that the stages must are done sequentially, and not in parallel, is key. The latency for the total request will be at least as great as the sum of the latency for each stage. Reducing the latency of each stage is paramount.
Stale Check
SolrJ uses the Apache HTTP Client to execute requests. As with any good client, a connection pool is used to take advantage of HTTP 1.1 persistent connections. A persistent connection is one that is kept open between requests so that it may be reused. This is done to avoid constant creation and destruction of TCP sockets, which adds latency. For each stage of the Distributed Search the coordinator will lease a connection from the pool. Since workload may vary, the pool will grow or shrink as needed. On both the client (coordinator) and server (shard), application-level timeouts are put in place to determine when a connection is idle and should be closed. But a problem arises. If the client and server don’t agree on when a connection is idle then one side may decide to close the socket before the other side is ready. If the server closes the socket first then that will cause the client to hit a socket reset error if it blindly sends a request over the socket. To avoid this the HTTP client performs a stale check every time a connection is leased from the pool, adding latency. On my particular benchmark rig, each stale check call accounted for an additional ~10ms. That is the time it takes for a mechanical drive to seek--a lifetime for the computer.
Simply disabling the check reduces the latency, but opens the window for socket reset error (see Reset Generation in RFC 793). The easiest way to avoid this is to have the client initiate the socket close. The server, actively reading or polling its sockets, will immediately see the close and properly finish the shutdown sequence. Leaving no lingering sockets in a dubious state. For Yokozuna I did this by configuring the client idle to be lower than the server’s. That’s half the job, the other job is to determine when a client socket has hit that idle and close it. For this I added a sweeper which periodically sweeps the pool of idle connections, closing any that have surpassed the threshold. Now the latency of the stale check has been removed and there is no concern for socket reset errors.
Results
The results of removing the stale check are terrific. I saw a 2–4 fold increase in throughput across the the board with a reduction of over 100ms in latency in almost all cases. To achieve these results I used a synthetic benchmark of my own creation. I call it the fruit benchmark. It loads Yokozuna with 1M plain text values, each a list of various fruits. A given fruit will occur a specific number of times in the corpus. This allows running boolean queries with a predetermined number of results. E.g. I know apple occurs 100K times, and grape 1 time. Thus the query apple AND grape will run a search where one subquery matches 100K and the other 1. Resulting in 1 matching document which contains both apple and grape. In the past I’ve used this benchmark which great success to test Riak Search’s conjunction query performance. Following is a break down of the cardinalities of each query along with the results.
Benchmark | Result Set Cardinalities |
---|---|
alpha | 100K, 100K, 0 |
beta | 100K, 100K, 1 |
charlie | 100K, 100K, 1, 100K, 100K, 100K |
delta | 10K, 100, 10 |
echo | 10, 100, 10 |
foxtrot | 10K, 10K, 10K |
golf | 1K, 10K, 100K |
hotel | 10K, 100K, 100K |
india | 1, 10, 1, 100, 10 |
juliet | 1, 1 |
kilo | 1M |
lima | 1 |
Figure 1 compares the throughput (y-axis) of various
benchmarks (x-axis). The green line is Yokozuna/Solr with
stale check enabled and the red-orange line is with stale
check removed. Notice that the red line beats the green line
in every benchmark, but is slightly less consistent (i.e. not
as flat). My hunch is that removing some of the latency has
allowed other variables to come into play such as disk IO.
There are two other patches I’m looking at making to
Distributed Search that might help smooth this line out a bit.
Both involve ID mapping: use a single TermEnum
for the entire list, and add a cache.
This plot is the same as the last except now the y-axis represents latency, so lower is better. It shows a 100ms drop across the board except for query alpha. The lines even follow the same pattern, rising and dipping in the same spots and staying in a 20ms range of each other. Demonstrating a significant improvement from removing the stale check.
More Stale Check, For The Curious
What is it exactly about the stale check that causes this additional latency? How did I manage to come up with my figure of ~10ms stated earlier? Why does a socket reset error occur if stale check is simply turned off?
The Apache HTTP docs make it clear that the stale check may add unwanted latency and shouldn’t be used in high-throughput scenarios. But what exactly is it that eats up the time?
In the client, the method
DefaultRequestDirector.execute()
will determine
if stale check is enabled and call isStale()
on
the connection leased from the pool. Most of the code related
to this check is very cheap. A matter of checking the values
of various member fields. But eventually there is a
blocking read()
call. In the case that the server
has closed the socket this call will return immediately
with -1
to signal EOF
. If the socket
is still open then it should block for approximately 1ms
before returning control back (before the read call is made
the socket timeout is temporarily set to 1ms). It blocks
because there is nothing to read. A request must be sent to
the server for there to be anything to read but a request
cannot be sent without first checking the socket.
The majority of the work of the stale check is performed by
the method
isDataAvailable()
. To verify this method is only
called when stale check is enabled I turned to DTrace. To
monitor method entry and exits with DTrace requires the JVM be
started with the ExtendedDTraceProbes
flag
enabled. I used the following one-liner while running
Distributed Search queries to
verify isDataAvailable()
is not called.
$ dtrace -n 'hotspot*:::method-entry
> /copyinstr(arg1) == "org/apache/http/impl/io/SocketInputBuffer"/
> { printf("%s:%s(%s)", copyinstr(arg1), copyinstr(arg3), copyinstr(arg5)); }'
Next I wanted to determine the amount of time the method was taking while the machine is under load. Normally I would use DTrace again but my test rig, which is a SmartOS machine, doesn’t provide hotspot probes when a JVM is running. Luckily there is a tool called BTrace which is essentially DTrace for Java. With this tool I was able to write the a script to track call latency on the fly. As shown below, each call took ~10ms to complete.
isDataAvailable() took 9787 microseconds
isDataAvailable() took 9789 microseconds
isDataAvailable() took 9684 microseconds
isDataAvailable() took 9672 microseconds
Disabling the stale check is the move to make in this case.
But by disabling the check the client always assumes the
connection is good and in the ESTABLISHED
state.
The server has an idle of 50 seconds. SolrJ, on the other
hand, defaults to a timeout of infinity, i.e. it never times
out. As load varies the server will start closing connections.
The client, not performing the stale check, will forgo the
read, miss the EOF
and send a request on a socket
that has since been closed. The server responds to this by
sending an RST
causing the client to encounter a
socket reset error upon its next read. This behavior can be
verified without disabling stale check. Run a Distributed
Search and then use netstat
to confirm that
sockets have been opened between the coordinator and the
shards and are in the state ESTABLISHED
. Wait 51
seconds and run netstat again. Notice that the sockets on the
coordinator are in the
CLOSE-WAIT
state
(RFC 793 §3.5
provides more information on CLOSE-WAIT
). This
is telling you that the server has closed its socket but the
client still hasn’t closed on it’s side. Wait another minute
and netstat will report the same state. Wait as long as you
want. The socket will sit there in CLOSE-WAIT
.
This is why the stale check is on by default.