# tiny_distributed_fsm **Repository Path**: uk224/tiny_distributed_fsm ## Basic Information - **Project Name**: tiny_distributed_fsm - **Description**: 迷你分布式状态机 - **Primary Language**: Python - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2022-07-28 - **Last Updated**: 2023-06-13 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # tiny_distributed_fsm #### 项目后续规划 1. 增加一个可用的实例(如qqbot),便于理解项目的实际使用和意义 2. 修复BUG(现在还不少) 3. 对任何原理,代码等的疑问,欢迎随时联系我,qq:380470827 #### 介绍 迷你分布式状态机 具体来讲,是所有流程类的程序(回想一下离校系统,淘宝的下单系统)的一种抽象(节点和节点相连),所以可以接入到任何流程类的系统中。 开发语言是python,但只是为了能快速理解系统原理而作出的选择。使用任何语言都是可以实现的。 #### 项目架构 tiny_distributed_fsm 是一个通用的小型分布式状态机。 重流程的系统(如交易系统)往往需要关注一个订单的状态,保证其能正常从开始到结束。在大型系统中,还需要保证其状态在不同实例上的正确性。 相较于传统的链式程序,使用一个分布式状态机可以带来以下的好处: 1. 所有的逻辑被封装为不同的节点,整个逻辑清晰明了,并且能够很好地支持逻辑上的修改,增加,删除等。 2. 通过推单机制可以保证订单能够被分布式系统中不同的实例处理,并且可以扩展出一系列报警及时帮助使用者发现订单的异常状态。 #### 详细设计 tiny_distributed_fsm 主要由状态机和推单系统组成。 状态机:负责控制节点的流转,核心逻辑存放在fsm.py中,主要由General和Soldier两个类实现。 -General:负责读取配置和初始化。 -Soldier:负责控制节点的流转,根据订单的状态选择调用do还是retry方法,并且根据函数执行返回的结果决定订单的下一个节点。 推单系统:负责订单的推送,包括LocalPusher和GlobalPusher两个部分组成。 -LocalPusher: 本地推单机制,通过一个优先队列实现,会按照next_exec_time逐个推送订单。在本地执行出现异常的时候,订单会被推送到GlobalPusher。 -GlobalPusher: 分布式的全局推单机制。每台实例会有一个GlobalPusher,其中包含一个PusherWorker。PusherWorker分为Leader和Worker两种。 Leader负责确认所有Worker的存活情况,并且为每个Worker推送各自的分片起点和范围。而每个Worker负责扫描各自范围内需要推单(未达到终态的节点) 并进行推单。 Leader和Worker每秒钟都需要进行打卡。连续三次没有打卡的被视为没有存活。Leader不存活的情况下,所有Worker会去尝试抢夺Leader,从而保障系统的 可用性。 #### 使用说明 1. 配置数据库,示例如下: ``` { "host": "localhost", "port": "3306", "user": "root", "password": "123456", "database": "testdb" } ``` 2. 在自己选定的database中执行sql下的语句(sql文件夹下的两个sql文件) ``` CREATE TABLE `fsm_record` ( id bigint NOT NULL AUTO_INCREMENT COMMENT '自增id', record_id varchar(255) NOT NULL COMMENT 'id', cur_node varchar(255) NOT NULL COMMENT '当前节点', status varchar(255) NOT NULL COMMENT '节点状态', exec_times bigint NOT NULL COMMENT '执行次数', lock_time bigint NOT NULL COMMENT '是否锁定,0:未锁定,非0:锁定的时间戳', next_exec_time bigint NOT NULL COMMENT '下次执行时间', mode varchar(255) NOT NULL COMMENT '订单推送模式', UNIQUE KEY (record_id), PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='记录表'; CREATE TABLE `workers` ( id bigint NOT NULL AUTO_INCREMENT COMMENT '自增id', worker_id varchar(255) NOT NULL COMMENT 'id', checkin_time bigint NOT NULL COMMENT '打卡时间', shards_id bigint NOT NULL COMMENT '分片id', sharding_range bigint NOT NULL COMMENT '分片范围', PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='工作线程表'; ``` 3. 配置节点流转 ``` { "nodes": [ { "name": "start", "type": "start", "jump_node": "success_operation", "retry_node": "start", "fail_node": "fail_operation" }, { "name": "success_operation", "type": "normal", "jump_node": "end", "retry_node": "success_operation", "fail_node": "fail_operation" }, { "name": "fail_operation", "type": "normal", "jump_node": "end", "retry_node": "fail_operation", "fail_node": "fail_operation" }, { "name": "end", "type": "final" } ], "max_workers": 500 } ``` 4. 继承node_method, 实现每个节点对应的do,retry方法, 在最后注册到sys.modules中 ``` import node_method import sys import const import time sys.path.append("../") class StartNode(node_method.NodeMethod): def __init__(self, context): self.context = context def do(self): print("start new_node do") time.sleep(1) int_time = int(time.time()) if int_time % 3 == 0: return const.NODE_FAIL elif int_time % 3 == 1: return const.NODE_SUCCESS else: return const.NODE_RETRY def retry(self): print("start new_node retry") time.sleep(1) int_time = int(time.time()) if int_time % 3 == 0: return const.NODE_FAIL elif int_time % 3 == 1: return const.NODE_SUCCESS else: return const.NODE_RETRY sys.modules['start'] = StartNode ``` 5. 初始化状态机,完成配置。 ``` logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') DB("./dbconfig.json") w = General('route_test.json') w.bootstrap() ```