# statser **Repository Path**: tjopenlab/statser ## Basic Information - **Project Name**: statser - **Description**: 统计分析语言StatisticsReport - **Primary Language**: Python - **License**: BSD-3-Clause - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-01-04 - **Last Updated**: 2023-10-12 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README StatisticsReporter统计分析引擎 ===== StatisticsReporter是一个统计分析引擎,用户通过一个基于python语法的分析角本,描述出想得到的数据,引擎将对角本进行解析、计算。 StatisticsReporter有时被简写为SR,它支持的数据处理语言为SRL(Statistics Reporter Language)。 * Pandas是按需处理原始数据,StatisticsReporter拿原始数据按照数据处理的要求填表 * 数据源可以是jdbc或dataframe * 填表要求是一个象SQL的非过程性描述语言 * 以报表为中心的计算系统,方便你计算报表,而不是方便你设计报表 提供下列功能: * 统计分析 * 数据源支持:SQL、Dataframe、Stream * 统计结果生成二维表、Pandas Dataframe ------ ## 安装引擎 一共两个python包:statser, sr_yaml。sr_yaml是自己定制的yaml解释器,可以不关注。 ``` python setup.py install ``` ### 示例程序 ``` sample.py ``` ### 起动从结点(Slave)进程 ``` SR_MaterSlave ``` 或者自己调用 SR_MaterSlave.start_Slaves_Process()起动从结点进程。 #### SR_MaterSlave,单独运行这个程序,将起动从结点(Slave) ``` '-c','--config' 配置文件 '-n','--slavesnum' 从结点数量 '-h','--host' 主结点IP地址 '-p','--port' 主结点端口 '-a','--authkey' 验证用key值 ``` ------ ## 主要编程接口API ### StatisticsReporter - df_StatisticsReporter ```python df_StatisticsReporter(sr_name, sr_filename, df=None, debug=False) ``` debug : 是否处于调试状态,调试状态SRL引擎给出的信息带行列号 - sql_StatisticsReporter ``` sql_StatisticsReporter(sr_name, sr_filename, host='127.0.0.1', user='root', password='', dbname='dbname', params={}, sql='', gb_table='', debug=False) ``` - stream_StatisticsReporter ```python stream_StatisticsReporter(sr_filename, _src=None, parameters={}, srd=None, debug=False, ifDoFinal=SR_Utils.IfDoFinal.DoFinal, fun_require=deal_require, dump=False) ``` - merge_src, 合并统计结果 ```python def merge_src(_src_master, _src_slaves=None) _src_master 目标 _src_slaves 源,可以是_src数组 ``` ### SR_MaterSlave,多进程(结点)计算API 结点,一般指的是一个独立的计算机(或虚拟机),一个结点上可以跑多个从(Slave)服务进程,缺省情况下是每CPU核(Core)一个服务进程。 StatisticsReporter在概念不造成混乱的情况下,每个执行SR任务的,都叫一个结点。 * 根据配置文件初始化程序环境 ```python ctl_Master_cfg(startManager, config_ini) ``` * 配置文件config.ini ``` [database] host = 192.168.1.110 user = root password = Elastos182 dbname = op_einvoicedb [SR_Mater] host = 127.0.0.1 port = 4800 authkey = tongji_4800 ``` ``` * 环境数据库准备 - 如果没有指定数据库连接池,则用的ctl_Master_cfg()配置出来的环境中的数据库连接池。 ​```python ctl_ConnPool(conn_pool) ``` * sql_SR_Master_Go ``` sql_SR_Master_Go(sr_filename, _src=None, parameters={}, conn_pool=None, dictResult=False, params={}, sql='', limitRows=0, fun_require=deal_require, nslaves=1, timeoutSeconds=100) ``` * sql可以是一个数组,这些sql执行的结果将被合并 * df_SR_Master_Go ``` df_SR_Master_Go(sr_filename, _src=None, parameters={}, df=None, fun_require=deal_require, nslaves=0) ``` ### 分析结果相关 #### SR_Utils * getResultDf ``` def getResultDf(_src, xMultiIndex=True, yMultiIndex=True, separator='/', nonVal='.', keepIndexInData=False) ``` #### Dict2Html * dict2Html ``` def dict2Html(_src, dict_=None, valNone='x', id_='sr', css_class='sr_default_style', css_style='', numFormatStr="{:.2f}", src_moredata=False, dataSep=' ', fmtYtitle=True, hideFlag='*') ``` * df2Html ``` def df2Html(df, id_='sr', css_class='sr_default_style', css_style='', numFormatStr="{:.2f}"): ``` ### 程序应用相关 #### RuleTemplate ``` def register_rule(name, rule) ``` #### SQL_Utils ``` ``` #### ActionTemplate ``` def register_template(name, template) ``` #### SR_Context ``` ``` #### SR_Thread * df_SR_Thread_Go ``` def df_SR_Thread_Go(sr_filename, _src=None, parameters={}, df=None, fun_require=deal_require, npartitions=0) 用数据组去生成角本中间码,均分各个数据结到多个线程,所有线程结束后,把内容汇集,执行final动作 :param sr_filename: :param _src: :param parameters: :param df: :param fun_require: :param npartitions: :return: ``` * stream_SR_Thread_Go ``` def stream_SR_Thread_Go(sr_filename, _src=None, parameters={}, srd=None, fun_require=deal_require, npartitions=0) :param sr_filename: :param _src: :param parameters: :param srd: :param fun_require: :param npartitions: :return: ``` * sql_SR_Thread_Go ``` def sql_SR_Thread_Go(sr_filename, _src=None, parameters={}, conn_pool=None, dictResult=False, params={}, sql='', limitRows=0, fun_require=deal_require, npartitions=0): Mysqldb是线程安全的,所以每个连接将能够访问它们各自的游标,查询和结果集,而不影响其他连接,但每个线程或进程将需要自己的连接。 :param sr_filename: :param _src: :param parameters: :param conn_pool: :param dictResult: :param params: :param sql: :param limitRows: :param fun_require: :param npartitions: ``` ### 程序辅助相关 #### MiscUtils ``` ``` #### StrUtils ``` ``` #### WebHelper ``` ``` #### StreamRegularDict ``` ``` ------ ## SRL(Statistics Reporter Language)语言 SQL是讲怎么处理数据的,SRL(Statistics Reporter Language)是讲我想要什么数据的,SQL面向数据源,SRL面向需求。SRL更象中国人的思维方式,我告诉你我要啥,至于如何得到啥,那是你引擎(engine)的事。SRL,与做统计报告的人交流的语言。 SRL中,输入流通过FROM关键字定义,在引擎中,FROM是一个向外输出Python Dict数据结构的一个生成器,我们称之为“FROM生成器”。实际引擎实现时,不一定真的把所有数据转成Dict格式,如有约定顺序的字段集合(Python Set数据结构),就是直接通过顺序值直接访问,这样实现是为了计算效率。 Python Dict 字典是无序(3.6版本后有序),可修改,它能存储任何数据,数据结构和json类似。字典的每个值都具备自己独有的名称即键( key ),每个 key 都有对应的值( value ),字典中的 key 和 value 必须成对出现 。如果一个key不在Dict中,则它可能具有缺省值(通常是Null)。 FROM生成器是一个Python生成器。 通过列表生成式,我们可以直接创建一个列表,但是,受到内存限制,列表容量肯定是有限的,而且创建一个包含很多个元素的列表,不仅占用很大的存储空间,如果我们仅仅需要访问前面几个元素,那后面绝大多数元素占用的空间都白白浪费了。所以,如果列表元素可以按照某种算法推算出来,那我们是否可以在循环的过程中不断推算出后续的元素呢?这样就不必创建完整的list,从而节省大量的空间,在Python中,这种一边循环一边计算的机制,称为生成器:generator 生成器是一个特殊的程序,可以被用作控制循环的迭代行为,python中生成器是迭代器的一种,使用yield返回值函数,每次调用yield会暂停,而可以使用next()函数和send()函数恢复生成器。 生成器类似于返回值为数组的一个函数,这个函数可以接受参数,可以被调用,但是,不同于一般的函数会一次性返回包括了所有数值的数组,生成器一次只能产生一个值,这样消耗的内存数量将大大减小,而且允许调用函数可以很快的处理前几个返回值,因此生成器看起来像是一个函数,但是表现得却像是迭代器。 ### 注释语句 - 支持注释语句(//后面的内容),这个注释可能是生成报表标题时用的标题,这种利用注释扩充一个语言的表达能力的手法,可能会让人困惑; ```python $.database !='sql' or $.database == 'sql' // 这里是这句话的注释 ``` - 放弃当前项的注释,//后面什么也没有,表明当前条件只是个标题,不参与实际条件的构成。如: ```python $.database !='sql' or $.database == 'sql' // 关于这句话该写点注释 // ``` ### 变量引用 - $.variable 表示: 变量引用,对应df['variable']。df代表FROM生成器的当前行。因为最早的FROM生成器是DataFrame,所以给了它这样一个名字。 - $.[y,x] 表示: `_src.data[y,x]` `_src` 是类SR_Context的实例 - 当前行,x列的计算存储(data)、计数存储(extdata)、额外存储(moredata)访问方式如下: `_src.extdata[_src.curY, x]` `_src.extdata[_src.curY, x]` `_src.moredata[_src.curY, x]` #额外存储(moredata)是list类型,向其添加数据需要用append, `_src.moredata[_src.curY, x].append(val)` ### 续行 一条语句可以写多行,注意续行要缩进至少一个空格 ```python Y // this is comment 这是一个注释,是生成报表标题时用的标题 cond_1 // 这是一个注释,是生成报表标题时用的标题 - Y1 : age > 10 and sex == '1' // 这些一些注释,是生成报表标题时用的标题 // 注释abstract #name: Y2 ``` 多行字符串可以使用|保留换行符,也可以使用>折叠换行。 ```python this: | Foo Bar that: > Foo Bar ``` ### $:pyton 语句 支持 $:pyton 语句,这些语句装被嵌入生成的python角本,需要注意的是: - 因为"冒号加空格"在yaml中有特别意思,所以python需要的冒号,写成中文全角冒号':', 或者冒号后面不要跟空格; - python语句“def”(函数定义)尾部的冒号会自动检查加上,也就是你可以写也可以不写; - $:........python语句,这里(python语句前面)的 ‘,‘ 将被转换为空格。 ### 条件动作表达式 - 那些不属于X条件(每行的真值执行一次),也不属于Y条件(每行执行一次)的表达式,是放在main函数中执行的,main函数在执行角本时,先于X/Y条件执行。 - pass表达式,一个空的表达式(如果连分号都没有,语法上就错了),或者是内容只有pass一个单词表达式,为空表达式, 表示当前条件什么也不做。这个设计主要是为表格的描述文字提供办法。 ```python X2 // A4 // A4 : ; X2 // A4 // A4 : pass; ``` ### 行Y(或 列X)条件 - 基于DataFrame生成行Y(或 列X)条件,语法格式为 ```python y/x groupby : groupby列1名字,groupby列2名字... ``` - 基于Dict生成行Y(或 列X)条件,语法格式为 y/x dict : dict表达式。 可以通过这个手段引入规则(Rule)库 - 如果条件表达式为空(''),则把它处理为永真条件 ```python y : ... x : ... ``` 这个示例中的y、x条件就是条件表达式为空(''),被处理为永真条件 - 下面两种写法,表示行、列号自动顺沿,注意别与强制写了行列号的语句的冲突 ```python y : ... x : ... ``` - x(/y)条件可以有多行。而x(/y)只能有一个,如有多个,后面的一个有效。 - \$x(/y) 写法的意思是: $x(/y): 代表 x: 或 y: - 支持纵轴(Y)方向动态Key式条件 - y dict : 语法格式: - y dict : 计算字典的表达式 //注释,(注释当然可以省略) - 根据dict字典生成Y轴定位的key的表达式,定位的方式是关键字,getDictPos(),表达式计算结果到list中去找。 - dict 是这样的 {'key1' : ['1', '2', '3']},则得到优化。 - dict的key本身也是一个表达式,如是字段名:\$.fieldName “\$.”代表当前源数据中的当前行。 - y range : 语法格式: - y range : 计算字典的表达式 //注释,(注释当然可以省略) - 根据dict字典生成Y轴定位的key的表达式,定位的方式是区间,sortKeysList(),有序list的区间,(keys[i] >= key > keys[i-1])。 - dict 是这样的 {'key1' : ['1', '2', '3']},则得到优化。 - dict的key本身也是一个表达式,如是字段名:\$.fieldName - y rule : 语法格式: - y rule : \$rule //注释,(注释当然可以省略) - 生成Y轴的表达式 - y dynamic : 语法格式: - y dynamic : key //注释,(注释当然可以省略) - 生成Y轴定位的key的表达式, 如果后面的key值未曾出现过,将被加到key列表中,也就是说,y轴是根据key的多少自动扩展的 * 如果key是None,那么key等价于:'_None__'+当前行号,这样设计的目的是可以通过 dynamic: None的方式, 让StatisticsReporter生成纵向无限的列表 - 让无法返回值的python表达式返回值 通过这种方式把表达式返回的值是tuple类型 s = eval("name, name.update({'a':1})", requireDict) - 如果一个Y条件都没有,StatisticsReporter引擎会强行增加一个永真的一行 ### 动作表达式 - 示例: 条件表达式 : \$act1 valExpr1; ​\$act2 valExpr2; ​\$act3 valExpr3 - y(/x) groupby/dict/rule 都可以指定动作表达式, 动作表达式用{x}代表当前行或列 ### final函数 - final函数是在整个统计工作结束时用来做收尾工作的,它的工作对象是data、extdata,这两个存在SR_Context中的统计结果,可能是一个放sum,一个放count,而最后要算平均值。 - final函数的定义方法有两种: (1) ```python $: final = lambda a, b: a + b ``` ​ (2) ```python $: def final(data, extdata): $: ....data += extdata $: ....pass ``` ### \$开始的关键字 - $require SR需要的外部量,如数据字典,http://192.168.1.106:8080/Openparts-web/dict/getDictsByCode?code=EDUCATION。 这样得来的变量,可以直接在表达式中引用, 如: X(/Y) dict : requiredVariable。 - $final - $sql - $action: - CommonUtils.stateY(y),设置行状态,这个是自动生成的,不需要在角本中描述 - CommonUtils.actionX(x, actionExpr),这里的actionExpr,动作表达式,X条件计算结果不True执行函数引用的表达式 - SR_Context.curY,当前行。 ### 动作表达式模板ActionTemplate规范 - actionTemplate条件、动作表达式等都可以是多行的 ```python template_max = { 'plan' : ''' for i in range(sr.numY): sr.data[i, {x}] = float('-inf') ''', 'actor' : '''sr.getCurData({x})[0] = max(sr.getCurData({x})[0], {valExpr})''', 'final' : '''''' } ``` - 动作模板的引用 动作模板引用可以是分号隔开的多个引用,示例如下: ```python - X1 // A3 // A3 : 1; $pearsonr([float($.C13), float($.C41)]) #; ...... - X1 // A3 // A3 : 1; $pearsonr({'_p__': [float($.C13), float($.C41)], '其它参数': 值}) #; ...... ``` - 如果参数中包括给动作模板的更多变量的,用dict包装:把真正的参数放在'_p__'变量中。 - 动作表达式主关键字($引导的那个关键字)后接的是一个python表达式 ### 规则(Rule)表达式规范 - ruleTemplate条件、动作表达式等都可以是多行的 ```python rule_sample = { 'name' : 'a sample rule', 'actor' : '''_src.getCurData({x})[0] = np.sum([_src.getCurData({x})[0], {valExpr}])''' } ``` - 规则模板的引用 动作模板引用可以是分号隔开的多个引用,示例如下: ```python $.C5 !='A0' // A3 // : $rule_sample 参数 $.C5 !='A0' // A3 // : $rule_sample {'_p__': 参数, '其它参数': 值} ``` - 如果参数中包括给规则模板的更多变量的,用dict包装:把真正的参数放在'_p__'变量中。 - 规则表达式主关键字($引导的那个关键字)后接的是一个python表达式 ### 灵活的python 这里介绍一些python语言特有的一些特性。 - y range : name.update({}), name // comment * 表达式:name.update({}), name; 返回的结果是tuple(None, name值),作为y(/x) dict(/range)条件,这时是想得到一个dict字典,StatisticsReporter是取这个tuple中第一个为dict的元素。 ### 分析角本中间格式dump - df_StatisticsReporter()、sql_StatisticsReporter()、stream_StatisticsReporter()角本分析时,已经使用parameter、fun_require,所以基于dump中间格式的继续分析,这些参数不再起作用。 ### 集群(Cluster)计算支持 - 一个计算事务,如果是要跨机器访问数据,网络是瓶颈,如果是数据在内存,事务处理的主要时间是操作这块内存,这时,你搞多进程、多线程等等并行计算策略,都没什么意义。所以, 对你要处理的事务是什么模式的,架构师要有综合的分析,不能觉得计算群集是解决问题的万能神器,网络、存储、访问通道(计算机里的总线)都可能需要提升。 - src_Dump()支持把数据dump出来,merge_src()支持自己提供_src数据,把slaves中的数据合并当前计算中来 ```python # 所有线程结束后,把内容汇集,执行final动作 _src_master = SR_Utils.src_Load(_src.dump) for thread_ in threads: _src_master.registerSlaves(thread_._src) _src_master.py_module = threads[0]._src.py_module merge_src(_src_master) ``` 由于GIL的存在,python中的多线程其实并不是真正意义上的多线程, I/O密集型使用多线程并发执行提高效率、计算密集型使用多进程并行执行提高效率。 - SR_MasterSlave启动Slave ``` -n, --slavesnum -c, --config -h, --host -a, --authkey -p, --port ``` ctl_Master()、ctl_ConnPool()这两个函数的功能是在当前python计算环境内初始化出一个连接或数据库连接,这样当后续如果相关数没有指定时,就可以用这个环境中的设置,程序执行效率会高些。 ```python conn_pool = SQL_Utils.getDbpool(host='192.168.1.110', user='root', password='Elastos182', dbname='op_einvoicedb', dictResult=False) # 初始化并发环境 ctl_Master(True) ctl_ConnPool(conn_pool) start = time() status, res, _src = sql_SR_Master_Go(grammer, _src, {}, None, dictResult=False, limitRows=10000, nslaves=4) #status, res, _src = sql_StatisticsReporter(grammer, _src, {}, conn, dictResult=False, debug=False, dump=True) finish = time() print ("-1-----> {:10.6}s".format(finish - start)) ``` ------ ## 约定 - LIMIT_offset_rows__ - \_p_o_s__ ------ ## 版权 ### BSD 3-Clause License - 如不特殊注明,所有模块都以此协议授权使用。 - 任何使用了StatisticsReporter的全部或部分功能的项目、产品或文章等形式的成果必须显式注明StatisticsReporter及此项目主页。