Class CommitProcessor
java.lang.Object
java.lang.Thread
org.apache.zookeeper.server.ZooKeeperThread
org.apache.zookeeper.server.ZooKeeperCriticalThread
org.apache.zookeeper.server.quorum.CommitProcessor
- All Implemented Interfaces:
Runnable
,RequestProcessor
This RequestProcessor matches the incoming committed requests with the
locally submitted requests. The trick is that locally submitted requests that
change the state of the system will come back as incoming committed requests,
so we need to match them up. Instead of just waiting for the committed requests,
we process the uncommitted requests that belong to other sessions.
The CommitProcessor is multi-threaded. Communication between threads is
handled via queues, atomics, and wait/notifyAll synchronized on the
processor. The CommitProcessor acts as a gateway for allowing requests to
continue with the remainder of the processing pipeline. It will allow many
read requests but only a single write request to be in flight simultaneously,
thus ensuring that write requests are processed in transaction id order.
- 1 commit processor main thread, which watches the request queues and
assigns requests to worker threads based on their sessionId so that
read and write requests for a particular session are always assigned
to the same thread (and hence are guaranteed to run in order).
- 0-N worker threads, which run the rest of the request processor pipeline
on the requests. If configured with 0 worker threads, the primary
commit processor thread runs the pipeline directly.
Typical (default) thread counts are: on a 32 core machine, 1 commit
processor thread and 32 worker threads.
Multi-threading constraints:
- Each session's requests must be processed in order.
- Write requests must be processed in zxid order
- Must ensure no race condition between writes in one session that would
trigger a watch being set by a read request in another session
The current implementation solves the third constraint by simply allowing no
read requests to be processed in parallel with write requests.
-
Nested Class Summary
Nested classes/interfaces inherited from class java.lang.Thread
Thread.State, Thread.UncaughtExceptionHandler
Nested classes/interfaces inherited from interface org.apache.zookeeper.server.RequestProcessor
RequestProcessor.RequestProcessorException
-
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final LinkedBlockingQueue
<Request> Requests that have been committed.protected final AtomicInteger
The number of requests currently being processedRequests that we are holding until commit comes in.protected LinkedBlockingQueue
<Request> Incoming requests.protected final LinkedBlockingQueue
<Request> Incoming requests that are waiting on a commit, contained in order of arrivalprotected boolean
protected boolean
For testing purposes, we use a separated stopping condition for the outer loop.protected WorkerService
static final String
Default max commit batch size: 1static final String
Default max read batch size: -1 to disable the featurestatic final String
Default: numCoresstatic final String
Default worker pool shutdown timeout in ms: 5000 (5s)Fields inherited from class java.lang.Thread
MAX_PRIORITY, MIN_PRIORITY, NORM_PRIORITY
-
Constructor Summary
ConstructorsConstructorDescriptionCommitProcessor
(RequestProcessor nextProcessor, String id, boolean matchSyncs, ZooKeeperServerListener listener) -
Method Summary
Modifier and TypeMethodDescriptionvoid
protected void
static int
static int
protected boolean
needCommit
(Request request) void
processRequest
(Request request) void
run()
static void
setMaxCommitBatchSize
(int size) static void
setMaxReadBatchSize
(int size) void
shutdown()
void
start()
protected void
Methods inherited from class org.apache.zookeeper.server.ZooKeeperCriticalThread
handleException
Methods inherited from class java.lang.Thread
activeCount, checkAccess, clone, countStackFrames, currentThread, destroy, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, stop, stop, suspend, toString, yield
-
Field Details
-
ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS
Default: numCores- See Also:
-
ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT
Default worker pool shutdown timeout in ms: 5000 (5s)- See Also:
-
ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE
Default max read batch size: -1 to disable the feature- See Also:
-
ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE
Default max commit batch size: 1- See Also:
-
queuedRequests
Incoming requests. -
queuedWriteRequests
Incoming requests that are waiting on a commit, contained in order of arrival -
committedRequests
Requests that have been committed. -
pendingRequests
Requests that we are holding until commit comes in. Keys represent session ids, each value is a linked list of the session's requests. -
numRequestsProcessing
The number of requests currently being processed -
stoppedMainLoop
protected volatile boolean stoppedMainLoopFor testing purposes, we use a separated stopping condition for the outer loop. -
stopped
protected volatile boolean stopped -
workerPool
-
-
Constructor Details
-
CommitProcessor
public CommitProcessor(RequestProcessor nextProcessor, String id, boolean matchSyncs, ZooKeeperServerListener listener)
-
-
Method Details
-
needCommit
-
run
public void run() -
endOfIteration
protected void endOfIteration() -
waitForEmptyPool
- Throws:
InterruptedException
-
start
public void start() -
getMaxReadBatchSize
public static int getMaxReadBatchSize() -
getMaxCommitBatchSize
public static int getMaxCommitBatchSize() -
setMaxReadBatchSize
public static void setMaxReadBatchSize(int size) -
setMaxCommitBatchSize
public static void setMaxCommitBatchSize(int size) -
commit
-
processRequest
- Specified by:
processRequest
in interfaceRequestProcessor
-
shutdown
public void shutdown()- Specified by:
shutdown
in interfaceRequestProcessor
-