博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Copycat - StateMachine
阅读量:6297 次
发布时间:2019-06-22

本文共 16193 字,大约阅读时间需要 53 分钟。

看下用户注册StateMachine的过程,

CopycatServer.Builder builder = CopycatServer.builder(address);builder.withStateMachine(MapStateMachine::new);

MapStateMachine::new这会构造一个supplier

/**     * Sets the Raft state machine factory.     *     * @param factory The Raft state machine factory.     * @return The server builder.     * @throws NullPointerException if the {
@code factory} is {
@code null} */ public Builder withStateMachine(Supplier
factory) { this.stateMachineFactory = Assert.notNull(factory, "factory"); return this; }

在build中,传入初始化ServerContext

ServerContext context = new ServerContext(name, type, serverAddress, clientAddress, storage, serializer, stateMachineFactory, connections, threadContext);

ServerContext中,

this.stateMachineFactory = Assert.notNull(stateMachineFactory, "stateMachineFactory");
threadContext.execute(this::reset).join();

 

reset逻辑中,

// Create a new user state machine.    StateMachine stateMachine = stateMachineFactory.get();    // Create a new internal server state machine.    this.stateMachine = new ServerStateMachine(stateMachine, this, stateContext);

这里看到stateContext的定义,

this.stateContext = new SingleThreadContext(String.format("copycat-server-%s-%s-state", serverAddress, name), threadContext.serializer().clone());

也是一个单线程,所以这里有两个threadContext

这个stateContex是专门用于更新state

 

ServerStateMachine,用于管理StateMachine

ServerStateMachine(StateMachine stateMachine, ServerContext state, ThreadContext executor) {    this.stateMachine = Assert.notNull(stateMachine, "stateMachine");    this.state = Assert.notNull(state, "state");    this.log = state.getLog();    this.executor = new ServerStateMachineExecutor(new ServerStateMachineContext(state.getConnections(), new ServerSessionManager(state)), executor);    this.commits = new ServerCommitPool(log, this.executor.context().sessions());    init();  }

 

ServerStateMachineExecutor

作为StateMachine的执行环境

class ServerStateMachineExecutor implements StateMachineExecutor {  private static final Logger LOGGER = LoggerFactory.getLogger(ServerStateMachineExecutor.class);  private final ThreadContext executor;  private final ServerStateMachineContext context;  private final Queue
tasks = new ArrayDeque<>(); private final List
scheduledTasks = new ArrayList<>(); private final List
complete = new ArrayList<>(); private final Map
operations = new HashMap<>();

 

init

/**   * Initializes the state machine.   */  private void init() {    stateMachine.init(executor);  }

注意这里stateMachine类是用户定义的,

public void init(StateMachineExecutor executor) {    this.executor = Assert.notNull(executor, "executor");    this.context = executor.context();    this.clock = context.clock();    this.sessions = context.sessions();    if (this instanceof SessionListener) {      executor.context().sessions().addListener((SessionListener) this);    }    configure(executor);  }

configure

protected void configure(StateMachineExecutor executor) {    registerOperations();  }

 

/**   * Registers operations for the class.   */  private void registerOperations() {    Class
type = getClass(); for (Method method : type.getMethods()) { if (isOperationMethod(method)) { registerMethod(method); } } } /** * Returns a boolean value indicating whether the given method is an operation method. */ private boolean isOperationMethod(Method method) { Class
[] paramTypes = method.getParameterTypes(); return paramTypes.length == 1 && paramTypes[0] == Commit.class; }

 

我们看下,用户是如何定义operations的?

public class MapStateMachine extends StateMachine {  private Map
map = new HashMap<>(); public Object put(Commit
commit) { try { map.put(commit.operation().key(), commit.operation().value()); } finally { commit.close(); } } public Object get(Commit
commit) { try { return map.get(commit.operation().key()); } finally { commit.close(); } }}

你就理解这里通过reflection来找到Operation,

逻辑就是有一个参数,参数的类型是Commit

如果是Operation,调用registerMethod

private void registerMethod(Method method) {    Type genericType = method.getGenericParameterTypes()[0];    Class
argumentType = resolveArgument(genericType); if (argumentType != null && Operation.class.isAssignableFrom(argumentType)) { registerMethod(argumentType, method); } }

取得泛型的类型,例子里面的Put

private void registerMethod(Class
type, Method method) { Class
returnType = method.getReturnType(); if (returnType == void.class || returnType == Void.class) { registerVoidMethod(type, method); } else { registerValueMethod(type, method); } }
private void registerValueMethod(Class type, Method method) {    executor.register(type, wrapValueMethod(method));  }  /**   * Wraps a value method.   */  private Function wrapValueMethod(Method method) {    return c -> {      try {        return method.invoke(this, c);      } catch (InvocationTargetException e) {        throw new CommandException(e);      } catch (IllegalAccessException e) {        throw new AssertionError(e);      }    };  }

 

ServerStateMachineExecutor.register
@Override  public 
, U> StateMachineExecutor register(Class
type, Function
, U> callback) { operations.put(type, callback); return this; }

这里,会把operations注册到ServerStateMachineExecutor里面,便于后面调用

 

继续ServerStateMachine,

ServerStateMachine最主要的逻辑,就是apply,即把command apply到state machine上,

 

可以apply到某index为止的所有commit

/**   * Applies all commits up to the given index.   * 

* Calls to this method are assumed not to expect a result. This allows some optimizations to be * made internally since linearizable events don't have to be waited to complete the command. * * @param index The index up to which to apply commits. */ public void applyAll(long index) {

// If the effective commit index is greater than the last index applied to the state machine then apply remaining entries. long lastIndex = Math.min(index, log.lastIndex()); if (lastIndex > lastApplied) { for (long i = lastApplied + 1; i <= lastIndex; i++) { // 接着上次最后apply的index,继续 Entry entry = log.get(i); if (entry != null) { apply(entry).whenComplete((result, error) -> entry.release()); } setLastApplied(i); } } }

也可以单独apply一条index对应的entry

public 
CompletableFuture
apply(long index) { // If entries remain to be applied prior to this entry then synchronously apply them. if (index > lastApplied + 1) { applyAll(index - 1); //按顺序apply,所以之前的先要apply掉 } // Read the entry from the log. If the entry is non-null them apply the entry, otherwise // simply update the last applied index and return a null result. try (Entry entry = log.get(index)) { if (entry != null) { return apply(entry); } else { return CompletableFuture.completedFuture(null); } } finally { setLastApplied(index); } }

 

apply(entry)

/**   * Applies an entry to the state machine.   * 

* Calls to this method are assumed to expect a result. This means linearizable session events * triggered by the application of the given entry will be awaited before completing the returned future. * * @param entry The entry to apply. * @return A completable future to be completed with the result. */ @SuppressWarnings("unchecked") public

CompletableFuture
apply(Entry entry) { if (entry instanceof QueryEntry) { return (CompletableFuture
) apply((QueryEntry) entry); } else if (entry instanceof CommandEntry) { return (CompletableFuture
) apply((CommandEntry) entry); } else if (entry instanceof RegisterEntry) { return (CompletableFuture
) apply((RegisterEntry) entry); } else if (entry instanceof KeepAliveEntry) { return (CompletableFuture
) apply((KeepAliveEntry) entry); } else if (entry instanceof UnregisterEntry) { return (CompletableFuture
) apply((UnregisterEntry) entry); } else if (entry instanceof InitializeEntry) { return (CompletableFuture
) apply((InitializeEntry) entry); } else if (entry instanceof ConfigurationEntry) { return (CompletableFuture
) apply((ConfigurationEntry) entry); } return Futures.exceptionalFuture(new InternalException("unknown state machine operation")); }

 

看到不同的entry类型有不同的apply逻辑,

apply((CommandEntry) entry)

private CompletableFuture
apply(CommandEntry entry) { final CompletableFuture
future = new CompletableFuture<>(); final ThreadContext context = ThreadContext.currentContextOrThrow(); //这里保留当前thread的引用 // First check to ensure that the session exists. ServerSessionContext session = executor.context().sessions().getSession(entry.getSession()); // If the session is null, return an UnknownSessionException. Commands applied to the state machine must // have a session. We ensure that session register/unregister entries are not compacted from the log // until all associated commands have been cleaned. if (session == null) { //session不存在 log.release(entry.getIndex()); return Futures.exceptionalFuture(new UnknownSessionException("unknown session: " + entry.getSession())); } // If the session is not in an active state, return an UnknownSessionException. Sessions are retained in the // session registry until all prior commands have been released by the state machine, but new commands can // only be applied for sessions in an active state. else if (!session.state().active()) { //session的状态非active log.release(entry.getIndex()); return Futures.exceptionalFuture(new UnknownSessionException("inactive session: " + entry.getSession())); } // If the command's sequence number is less than the next session sequence number then that indicates that // we've received a command that was previously applied to the state machine. Ensure linearizability by // returning the cached response instead of applying it to the user defined state machine. else if (entry.getSequence() > 0 && entry.getSequence() < session.nextCommandSequence()) { //已经apply过的entry // Ensure the response check is executed in the state machine thread in order to ensure the // command was applied, otherwise there will be a race condition and concurrent modification issues. long sequence = entry.getSequence(); // Switch to the state machine thread and get the existing response. executor.executor().execute(() -> sequenceCommand(sequence, session, future, context)); //直接返回之前apply的结果 return future; } // If we've made it this far, the command must have been applied in the proper order as sequenced by the // session. This should be the case for most commands applied to the state machine. else { // Allow the executor to execute any scheduled events. long index = entry.getIndex(); long sequence = entry.getSequence(); // Calculate the updated timestamp for the command. long timestamp = executor.timestamp(entry.getTimestamp()); // Execute the command in the state machine thread. Once complete, the CompletableFuture callback will be completed // in the state machine thread. Register the result in that thread and then complete the future in the caller's thread. ServerCommit commit = commits.acquire(entry, session, timestamp); //这里有个ServerCommitPool的实现,为了避免反复生成ServerCommit对象,直接从pool里面拿一个,用完放回去 executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context)); // Update the last applied index prior to the command sequence number. This is necessary to ensure queries sequenced // at this index receive the index of the command. setLastApplied(index); // Update the session timestamp and command sequence number. This is done in the caller's thread since all // timestamp/index/sequence checks are done in this thread prior to executing operations on the state machine thread. session.setTimestamp(timestamp).setCommandSequence(sequence); return future; } }

 

executeCommand
ServerCommit commit = commits.acquire(entry, session, timestamp); executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context));

注意这里有两个线程,

一个是context,是

ThreadContext threadContext

用来响应server请求的

还有一个是executor里面的stateContext,用来改变stateMachine的状态的

所以这里是用executor来执行executeCommand,但把ThreadContext传入

/**   * Executes a state machine command.   */  private void executeCommand(long index, long sequence, long timestamp, ServerCommit commit, ServerSessionContext session, CompletableFuture
future, ThreadContext context) { // Trigger scheduled callbacks in the state machine. executor.tick(index, timestamp); // Update the state machine context with the commit index and local server context. The synchronous flag // indicates whether the server expects linearizable completion of published events. Events will be published // based on the configured consistency level for the context. executor.init(commit.index(), commit.time(), ServerStateMachineContext.Type.COMMAND); // Store the event index to return in the command response. long eventIndex = session.getEventIndex(); try { // Execute the state machine operation and get the result. Object output = executor.executeOperation(commit); // Once the operation has been applied to the state machine, commit events published by the command. // The state machine context will build a composite future for events published to all sessions. executor.commit(); // Store the result for linearizability and complete the command. Result result = new Result(index, eventIndex, output); session.registerResult(sequence, result); // 缓存执行结果 context.executor().execute(() -> future.complete(result)); // complete future,表示future执行结束 } catch (Exception e) { // If an exception occurs during execution of the command, store the exception. Result result = new Result(index, eventIndex, e); session.registerResult(sequence, result); context.executor().execute(() -> future.complete(result)); } }

 

ServerStateMachineExecutor.tick
根据时间,去触发scheduledTasks中已经到时间的task
 
ServerStateMachineExecutor.init
更新state machine的context
void init(long index, Instant instant, ServerStateMachineContext.Type type) {    context.update(index, instant, type);  }    //ServerStateMachineContext  void update(long index, Instant instant, Type type) {    this.index = index;    this.type = type;    clock.set(instant);  }
 
ServerStateMachineExecutor.executeOperation
, U> U executeOperation(Commit commit) { // Get the function registered for the operation. If no function is registered, attempt to // use a global function if available. Function function = operations.get(commit.type()); //从operations找到type对应的function if (function == null) { // If no operation function was found for the class, try to find an operation function // registered with a parent class. for (Map.Entry
entry : operations.entrySet()) { if (entry.getKey().isAssignableFrom(commit.type())) { //如果注册的type是commit.type的父类 function = entry.getValue(); break; } } // If a parent operation function was found, store the function for future reference. if (function != null) { operations.put(commit.type(), function); } } if (function == null) { throw new IllegalStateException("unknown state machine operation: " + commit.type()); } else { // Execute the operation. If the operation return value is a Future, await the result, // otherwise immediately complete the execution future. try { return (U) function.apply(commit); //真正执行function } catch (Exception e) { throw new ApplicationException(e, "An application error occurred"); } } }
 
 
 
 
 

转载地址:http://fslta.baihongyu.com/

你可能感兴趣的文章
Ruby中类 模块 单例方法 总结
查看>>
jQuery的validate插件
查看>>
5-4 8 管道符 作业控制 shell变量 环境变量配置
查看>>
Enumberable
查看>>
开发者论坛一周精粹(第五十四期) 求购备案服务号1枚!
查看>>
validate表单验证及自定义方法
查看>>
javascript 中出现missing ) after argument list的错误
查看>>
使用Swagger2构建强大的RESTful API文档(2)(二十三)
查看>>
Docker容器启动报WARNING: IPv4 forwarding is disabled. Networking will not work
查看>>
(转)第三方支付参与者
查看>>
程序员修炼之道读后感2
查看>>
DWR实现服务器向客户端推送消息
查看>>
js中forEach的用法
查看>>
Docker之功能汇总
查看>>
!!a标签和button按钮只允许点击一次,防止重复提交
查看>>
(轉貼) Eclipse + CDT + MinGW 安裝方法 (C/C++) (gcc) (g++) (OS) (Windows)
查看>>
还原数据库
查看>>
作业调度框架 Quartz.NET 2.0 beta 发布
查看>>
mysql性能的检查和调优方法
查看>>
项目管理中的导向性
查看>>