看下用户注册StateMachine的过程,
CopycatServer.Builder builder = CopycatServer.builder(address);builder.withStateMachine(MapStateMachine::new);
MapStateMachine::new这会构造一个supplier
/** * Sets the Raft state machine factory. * * @param factory The Raft state machine factory. * @return The server builder. * @throws NullPointerException if the { @code factory} is { @code null} */ public Builder withStateMachine(Supplierfactory) { this.stateMachineFactory = Assert.notNull(factory, "factory"); return this; }
在build中,传入初始化ServerContext
ServerContext context = new ServerContext(name, type, serverAddress, clientAddress, storage, serializer, stateMachineFactory, connections, threadContext);
ServerContext中,
this.stateMachineFactory = Assert.notNull(stateMachineFactory, "stateMachineFactory");
threadContext.execute(this::reset).join();
reset逻辑中,
// Create a new user state machine. StateMachine stateMachine = stateMachineFactory.get(); // Create a new internal server state machine. this.stateMachine = new ServerStateMachine(stateMachine, this, stateContext);
这里看到stateContext的定义,
this.stateContext = new SingleThreadContext(String.format("copycat-server-%s-%s-state", serverAddress, name), threadContext.serializer().clone());
也是一个单线程,所以这里有两个threadContext
这个stateContex是专门用于更新state
ServerStateMachine,用于管理StateMachine
ServerStateMachine(StateMachine stateMachine, ServerContext state, ThreadContext executor) { this.stateMachine = Assert.notNull(stateMachine, "stateMachine"); this.state = Assert.notNull(state, "state"); this.log = state.getLog(); this.executor = new ServerStateMachineExecutor(new ServerStateMachineContext(state.getConnections(), new ServerSessionManager(state)), executor); this.commits = new ServerCommitPool(log, this.executor.context().sessions()); init(); }
ServerStateMachineExecutor
作为StateMachine的执行环境
class ServerStateMachineExecutor implements StateMachineExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(ServerStateMachineExecutor.class); private final ThreadContext executor; private final ServerStateMachineContext context; private final Queuetasks = new ArrayDeque<>(); private final List scheduledTasks = new ArrayList<>(); private final List complete = new ArrayList<>(); private final Map operations = new HashMap<>();
init
/** * Initializes the state machine. */ private void init() { stateMachine.init(executor); }
注意这里stateMachine类是用户定义的,
public void init(StateMachineExecutor executor) { this.executor = Assert.notNull(executor, "executor"); this.context = executor.context(); this.clock = context.clock(); this.sessions = context.sessions(); if (this instanceof SessionListener) { executor.context().sessions().addListener((SessionListener) this); } configure(executor); }
configure
protected void configure(StateMachineExecutor executor) { registerOperations(); }
/** * Registers operations for the class. */ private void registerOperations() { Class type = getClass(); for (Method method : type.getMethods()) { if (isOperationMethod(method)) { registerMethod(method); } } } /** * Returns a boolean value indicating whether the given method is an operation method. */ private boolean isOperationMethod(Method method) { Class [] paramTypes = method.getParameterTypes(); return paramTypes.length == 1 && paramTypes[0] == Commit.class; }
我们看下,用户是如何定义operations的?
public class MapStateMachine extends StateMachine { private Map
你就理解这里通过reflection来找到Operation,
逻辑就是有一个参数,参数的类型是Commit
如果是Operation,调用registerMethod
private void registerMethod(Method method) { Type genericType = method.getGenericParameterTypes()[0]; Class argumentType = resolveArgument(genericType); if (argumentType != null && Operation.class.isAssignableFrom(argumentType)) { registerMethod(argumentType, method); } }
取得泛型的类型,例子里面的Put
private void registerMethod(Class type, Method method) { Class returnType = method.getReturnType(); if (returnType == void.class || returnType == Void.class) { registerVoidMethod(type, method); } else { registerValueMethod(type, method); } }
private void registerValueMethod(Class type, Method method) { executor.register(type, wrapValueMethod(method)); } /** * Wraps a value method. */ private Function wrapValueMethod(Method method) { return c -> { try { return method.invoke(this, c); } catch (InvocationTargetException e) { throw new CommandException(e); } catch (IllegalAccessException e) { throw new AssertionError(e); } }; }
ServerStateMachineExecutor.register
@Override public, U> StateMachineExecutor register(Class type, Function , U> callback) { operations.put(type, callback); return this; }
这里,会把operations注册到ServerStateMachineExecutor里面,便于后面调用
继续ServerStateMachine,
ServerStateMachine最主要的逻辑,就是apply,即把command apply到state machine上,
可以apply到某index为止的所有commit
/** * Applies all commits up to the given index. ** Calls to this method are assumed not to expect a result. This allows some optimizations to be * made internally since linearizable events don't have to be waited to complete the command. * * @param index The index up to which to apply commits. */ public void applyAll(long index) {
// If the effective commit index is greater than the last index applied to the state machine then apply remaining entries. long lastIndex = Math.min(index, log.lastIndex()); if (lastIndex > lastApplied) { for (long i = lastApplied + 1; i <= lastIndex; i++) { // 接着上次最后apply的index,继续 Entry entry = log.get(i); if (entry != null) { apply(entry).whenComplete((result, error) -> entry.release()); } setLastApplied(i); } } }
也可以单独apply一条index对应的entry
publicCompletableFuture apply(long index) { // If entries remain to be applied prior to this entry then synchronously apply them. if (index > lastApplied + 1) { applyAll(index - 1); //按顺序apply,所以之前的先要apply掉 } // Read the entry from the log. If the entry is non-null them apply the entry, otherwise // simply update the last applied index and return a null result. try (Entry entry = log.get(index)) { if (entry != null) { return apply(entry); } else { return CompletableFuture.completedFuture(null); } } finally { setLastApplied(index); } }
apply(entry)
/** * Applies an entry to the state machine. ** Calls to this method are assumed to expect a result. This means linearizable session events * triggered by the application of the given entry will be awaited before completing the returned future. * * @param entry The entry to apply. * @return A completable future to be completed with the result. */ @SuppressWarnings("unchecked") public
CompletableFuture apply(Entry entry) { if (entry instanceof QueryEntry) { return (CompletableFuture ) apply((QueryEntry) entry); } else if (entry instanceof CommandEntry) { return (CompletableFuture ) apply((CommandEntry) entry); } else if (entry instanceof RegisterEntry) { return (CompletableFuture ) apply((RegisterEntry) entry); } else if (entry instanceof KeepAliveEntry) { return (CompletableFuture ) apply((KeepAliveEntry) entry); } else if (entry instanceof UnregisterEntry) { return (CompletableFuture ) apply((UnregisterEntry) entry); } else if (entry instanceof InitializeEntry) { return (CompletableFuture ) apply((InitializeEntry) entry); } else if (entry instanceof ConfigurationEntry) { return (CompletableFuture ) apply((ConfigurationEntry) entry); } return Futures.exceptionalFuture(new InternalException("unknown state machine operation")); }
看到不同的entry类型有不同的apply逻辑,
apply((CommandEntry) entry)
private CompletableFutureapply(CommandEntry entry) { final CompletableFuture future = new CompletableFuture<>(); final ThreadContext context = ThreadContext.currentContextOrThrow(); //这里保留当前thread的引用 // First check to ensure that the session exists. ServerSessionContext session = executor.context().sessions().getSession(entry.getSession()); // If the session is null, return an UnknownSessionException. Commands applied to the state machine must // have a session. We ensure that session register/unregister entries are not compacted from the log // until all associated commands have been cleaned. if (session == null) { //session不存在 log.release(entry.getIndex()); return Futures.exceptionalFuture(new UnknownSessionException("unknown session: " + entry.getSession())); } // If the session is not in an active state, return an UnknownSessionException. Sessions are retained in the // session registry until all prior commands have been released by the state machine, but new commands can // only be applied for sessions in an active state. else if (!session.state().active()) { //session的状态非active log.release(entry.getIndex()); return Futures.exceptionalFuture(new UnknownSessionException("inactive session: " + entry.getSession())); } // If the command's sequence number is less than the next session sequence number then that indicates that // we've received a command that was previously applied to the state machine. Ensure linearizability by // returning the cached response instead of applying it to the user defined state machine. else if (entry.getSequence() > 0 && entry.getSequence() < session.nextCommandSequence()) { //已经apply过的entry // Ensure the response check is executed in the state machine thread in order to ensure the // command was applied, otherwise there will be a race condition and concurrent modification issues. long sequence = entry.getSequence(); // Switch to the state machine thread and get the existing response. executor.executor().execute(() -> sequenceCommand(sequence, session, future, context)); //直接返回之前apply的结果 return future; } // If we've made it this far, the command must have been applied in the proper order as sequenced by the // session. This should be the case for most commands applied to the state machine. else { // Allow the executor to execute any scheduled events. long index = entry.getIndex(); long sequence = entry.getSequence(); // Calculate the updated timestamp for the command. long timestamp = executor.timestamp(entry.getTimestamp()); // Execute the command in the state machine thread. Once complete, the CompletableFuture callback will be completed // in the state machine thread. Register the result in that thread and then complete the future in the caller's thread. ServerCommit commit = commits.acquire(entry, session, timestamp); //这里有个ServerCommitPool的实现,为了避免反复生成ServerCommit对象,直接从pool里面拿一个,用完放回去 executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context)); // Update the last applied index prior to the command sequence number. This is necessary to ensure queries sequenced // at this index receive the index of the command. setLastApplied(index); // Update the session timestamp and command sequence number. This is done in the caller's thread since all // timestamp/index/sequence checks are done in this thread prior to executing operations on the state machine thread. session.setTimestamp(timestamp).setCommandSequence(sequence); return future; } }
executeCommand
ServerCommit commit = commits.acquire(entry, session, timestamp); executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context));
注意这里有两个线程,
一个是context,是
ThreadContext threadContext
用来响应server请求的
还有一个是executor里面的stateContext,用来改变stateMachine的状态的
所以这里是用executor来执行executeCommand,但把ThreadContext传入
/** * Executes a state machine command. */ private void executeCommand(long index, long sequence, long timestamp, ServerCommit commit, ServerSessionContext session, CompletableFuturefuture, ThreadContext context) { // Trigger scheduled callbacks in the state machine. executor.tick(index, timestamp); // Update the state machine context with the commit index and local server context. The synchronous flag // indicates whether the server expects linearizable completion of published events. Events will be published // based on the configured consistency level for the context. executor.init(commit.index(), commit.time(), ServerStateMachineContext.Type.COMMAND); // Store the event index to return in the command response. long eventIndex = session.getEventIndex(); try { // Execute the state machine operation and get the result. Object output = executor.executeOperation(commit); // Once the operation has been applied to the state machine, commit events published by the command. // The state machine context will build a composite future for events published to all sessions. executor.commit(); // Store the result for linearizability and complete the command. Result result = new Result(index, eventIndex, output); session.registerResult(sequence, result); // 缓存执行结果 context.executor().execute(() -> future.complete(result)); // complete future,表示future执行结束 } catch (Exception e) { // If an exception occurs during execution of the command, store the exception. Result result = new Result(index, eventIndex, e); session.registerResult(sequence, result); context.executor().execute(() -> future.complete(result)); } }
ServerStateMachineExecutor.tick
根据时间,去触发scheduledTasks中已经到时间的task
ServerStateMachineExecutor.init
更新state machine的context
void init(long index, Instant instant, ServerStateMachineContext.Type type) { context.update(index, instant, type); } //ServerStateMachineContext void update(long index, Instant instant, Type type) { this.index = index; this.type = type; clock.set(instant); }
ServerStateMachineExecutor.executeOperation
, U> U executeOperation(Commit commit) { // Get the function registered for the operation. If no function is registered, attempt to // use a global function if available. Function function = operations.get(commit.type()); //从operations找到type对应的function if (function == null) { // If no operation function was found for the class, try to find an operation function // registered with a parent class. for (Map.Entry entry : operations.entrySet()) { if (entry.getKey().isAssignableFrom(commit.type())) { //如果注册的type是commit.type的父类 function = entry.getValue(); break; } } // If a parent operation function was found, store the function for future reference. if (function != null) { operations.put(commit.type(), function); } } if (function == null) { throw new IllegalStateException("unknown state machine operation: " + commit.type()); } else { // Execute the operation. If the operation return value is a Future, await the result, // otherwise immediately complete the execution future. try { return (U) function.apply(commit); //真正执行function } catch (Exception e) { throw new ApplicationException(e, "An application error occurred"); } } }