# clickhouse-example
**Repository Path**: cjiangbo/clickhouse-example
## Basic Information
- **Project Name**: clickhouse-example
- **Description**: ClickHouse 笔记
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 1
- **Created**: 2022-12-19
- **Last Updated**: 2024-06-01
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# ClickHouse - MergeTree
### 1、基础表引擎 - MergeTree
MergeTree 系列的引擎被设计用于插入极大量的数据到一张表当中,数据可以以数据片段的形式一个接着一个的快速写入,数据片段在后台按照一定的规则进行合并,相比在插入时不断修改(重写)已存储的数据,这种策略会高效很多
- 存储的数据按主键排序(能够创建一个小型的稀疏索引来加快数据检索)
- 可以指定分区键,在相同数据集和相同结果集的情况下 ClickHouse 中某些带分区的操作会比普通操作更快,查询中指定了分区键时 ClickHouse 会自动截取分区数据
- 支持数据副本(ReplicatedMergeTree 系列)
- 支持数据采样
### 2、建表语句
```SQL
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1,
INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2
) ENGINE = MergeTree()
ORDER BY expr
[PARTITION BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[TTL expr [DELETE|TO DISK 'xxx'|TO VOLUME 'xxx'], ...]
[SETTINGS name=value, ...]
```
- ENGINE :指定表引擎和参数,MergeTree 引擎无参数
- PARTITON BY:选填,分区键,用于指定表数据以何种标准进行分区
- 分区键既可以是单个字段,也可以通过元组的形式指定多个字段,同时也支持使用列表达式
- 未设置则 ClickHouse 会生成一个名称为 all 的分区
- ORDER BY:必填,排序键,用于指定在一个分区内,数据以何种标准进行排序
- 排序键既可以是单个字段,例如 ORDER BY CounterID,也可以是通过元组声明的多个字段,例如 ORDER BY (CounterID, EventDate)
- 同一个分区内数据是按照排序键排好序的,多个分区之间就无法按照排序键排序
- PRIMARY KEY:选填,表示主键,ClickHouse 按照主键字段生成一级索引
- 未设置主键默认和排序键相同,通常直接使用 ORDER BY 代为指定主键,而无须单独使用 PRIMARY KEY 声明
- MergeTree 允许主键有重复数据(可以通过 ReplacingMergeTree 实现去重)
### 3、存储结构
#### **3.1、存储结构**
##### 3.1.1 目录及文件说明
test.ALARM_INFO 总大小约为 7.6 M,共 728319 行数据

- **分区目录:** 各类数据文件(primary.idx、data.mrk、data.bin 等)都是以分区目录的形式存储,属于相同分区的数据,最终会被合并到同一个分区目录,而不同分区的数据永远不会被合并在一起
- **checksums.txt:** 校验文件,使用二进制的格式进行存储,保存了各类文件(primary.txt、count.txt 等)的 size 大小以及哈希值,用于快速校验文件的完整性和正确性
- **columns.txt:** 列信息文件,使用明文格式存储,用于保存此分区下的列字段信息
- **count.txt:** 计数文件,使用明文格式存储,用于记录当前分区下的数据总数
- **primary.idx:** 一级索引文件,使用二进制格式存储,用于存储稀疏索引,MergeTree 表只能声明一次一级索引(通过 ORDER BY 或 PRIMARY KEY)
- **data.bin:** 数据文件,使用压缩格式存储,默认为 LZ4 格式,用于存储表的数据
- 旧版本中每一个列字段都有自己独立的 .bin 数据文件,并以列字段命名,新版本只有一个 data.bin
- **data.mrk:** 标记文件,使用二进制格式存储,标记文件中保存了 data.bin 文件中数据的偏移量信息,并且标记文件与稀疏索引对齐,因此 MergeTree 通过标记文件建立了稀疏索引(primary.idx)与数据文件(data.bin)之间的映射关系
- 在读取数据的时候,首先会通过稀疏索引(primary.idx)找到对应数据的偏移量信息(data.mrk),再根据偏移量信息直接从 data.bin 文件中读取数据
- **data.mrk3:** 如果使用了自适应大小的索引间隔,则标记文件会以 data.mrk3 结尾,工作原理和 data.mrk 文件相同
- **partition.dat & minmax_[column].idx:** 如果使用了分区键会额外生成 partition.dat 与 minmax_column.idx 索引文件,均使用二进制格式存储
- partition.dat:用于保存当前分区下分区表达式最终生成的值
- minmax_[column].idx:记录当前分区下分区字段对应原始数据的最小值和最大值
- **skp_idx\_[IndexName].idx 和 skp_idx_[IndexName].mrk3:** 如果指定了二级索引,则会生成相应的二级索引文件与标记文件,使用二进制存储
- 二级索引在 ClickHouse 中又被称为跳数索引,目前拥有 minmax、set、ngrambf_v1 和 token_v1 四种类型,这些种类的跳数索引的目的和一级索引相同,都是为了进一步减少数据的扫描范围,从而加速整个查询过程
##### 3.1.2 数据文件存储格式(.bin)
- **compact:** 所有字段的数据都存储到 data.bin 中(紧凑格式),只会有一个数据文件,如 3.1.1 中的示例 test.ALARM_INFO
- **wide:** 每个字段都存储到单独的文件中,存储格式受参数 min_bytes_for_wide_part 和 min_rows_for_wide_part 控制,只有当纪录数或记录占用的空间超过配置参数,才以wide格式存储
- min_bytes_for_wide_part:默认 10485760 bytes(10M),以字节为单位的最小未压缩大小
- min_rows_for_wide_part:默认 0 即不限制,最小行数
- min_bytes_for_wide_part & min_rows_for_wide_part 可在建表的时候通过 SETTINGS 指定
```SQL
# 可通过系统表 merge_tree_settings 查看默认配置
SELECT * FROM system.merge_tree_settings WHERE name IN ('min_bytes_for_wide_part', 'min_rows_for_wide_part')
```
wide 示例:
- test.SKIP_INDEX 总大小约为 769 M,共 100000000 行数据
- 共有三个存储文件:KEY1.bin、KEY2.bin、VALUE.bin
```sql
CREATE TABLE test.SKIP_INDEX
(
KEY1 UInt64,
KEY2 UInt64,
VALUE UInt64
)
ENGINE MergeTree primary key KEY1
SETTINGS index_granularity=8192;
```

#### 3.2、数据分区
在 MergeTree 数据表中,数据是以分区目录的形式进行存储的,每个分区的数据独立分开存储,MergeTree 在查询数据时,可以跳过无用的数据文件,只在最小分区目录子集中查询
##### 3.2.1 分区 ID 生成规则
MergeTree 数据表的分区规则由分区 ID 决定,而具体到每个分区对应的 ID 则是由分区键的取值决定的,分区键支持使用任何一个或一组字段表达式声明,其业务语义可以是年、月、日或者组织单位等任何一种规则,而针对取值数据的类型不同,分区 ID 的生成逻辑拥有四种规则
1. 如果不使用分区键,即不使用 PARTITION BY 声明任何分区表达式,则分区 ID 默认为 all,所有的数据都会被写入 all 这个分区
2. 整型,如果分区键的取值为整型(UInt64、Int8 等),且无法转成日期类型 YYYYMMDD 格式,则直接按照该整型的字符串形式作为分区 ID 的取值
3. 日期,如果分区键的取值为日期类型,或者是能够转换为 YYYYMMDD 格式的整型,则按照 YYYYMMDD 进行格式化后的字符串形式作为分区 ID 的取值
4. 其它,如果分区键取值既不是整型,也不是日期类型,如 String、Float 等,则通过 128 位 Hash 算法取其 Hash 值作为分区 ID 的取值
注:如果分区字段有多个,那么会按照对应的规则为每个字段都生成一个分区 ID,最后再将这些分区 ID 使用减号合并起来,作为最终的分区 ID
##### 3.2.2 分区目录命名规则
命令规则:**PartitionID_MinBlockNum_MaxBlockNum_Level_DataVersion**
- **PartitionID:** 分区 ID,根据指定的分区键生成
- **MinBlockNum & MaxBlockNum:** 最小数据块编号和最大数据块编号,自增的整数(面对所有分区,全局自增),从 1 开始,每当创建一个新的分区时就会自增 1,并且对于一个新的分区目录而言,MinBlockNum 和 MaxBlockNum 是相等的,如 202005_1_1_0、202006_2_2_0
- **Level:** 合并的层级,分区被合并的次数,每个新创建的分区目录其初始值都为 0,如果相同分区发生合并动作,则该分区对应的 Level 加 1
- **DataVersion:** 表示 mutate 操作的 DataVersion,每一次 mutate 都会生成一个新的 DataVersion 的分区目录
- DataVersion 和 MinBlockNum、MaxBlockNum 共用自增 ID 空间,通过这个值可以判断每个分区是否包含在本次 mutate 操作的影响范围内
##### 3.2.3 分区目录合并
MergeTree 的分区目录并不是在数据表被创建之后就存在的,而是在数据写入的过程中被创建的,如果一张表中没有任何数据,那么也就不会有任何的分区目录;
MergeTree 的分区目录也不是一成不变的,MergeTree 的每一次数据写入,都会生成一批新的分区目录,即使不同批次写入的数据属于相同的分区,也会生成不同的分区目录,也就是说对于同一个分区而言,会存在对应多个分区目录的情况,而在之后的某个时刻(一般 10 到 15 分钟),ClickHouse 会通过后台任务将属于相同分区的多个目录合并(Merge)成一个新的目录,当然也可以通过 optimize TABLE table_name FINAL 语句立即合并;
合并之前的旧目录会在之后的某个时刻(默认 8 分钟)被删除,但已不再是激活状态(active = 0),数据查询时会被过滤掉;
属于同一个分区的多个目录,在合并之后会生成一个全新的目录,目录中的索引和数据文件也会相应地进行合并,而新目录名称的生成方式遵循如下规则:
- **PartitionID:** 保持不变
- **MinBlockNum:** 取同一分区内所有目录中最小的 MinBlockNum
- **MaxBlockNum:** 取同一分区内所有目录中最大的 MaxBlockNum
- **Level:** 取同一分区内最大 Level 值并加 1
- **DataVersion:** 未有 mutate 操作则不带 DataVersion 后缀,如果有 mutate 操作则取 DataVersion 最大值
##### 3.2.4 分区示例
###### 3.2.4.1 创表语句
```sql
CREATE TABLE test.ALARM_INFO
(
`KEY` String,
`CREATE_TIME` DateTime,
`DST_IP` String,
`TYPE2` Int32,
`TYPE1` Int32,
`LEVEL` Int32,
`NAME` String,
`ID` Int64,
`SRC_IP` String,
`STATUS` Int32,
`TOTAL` Int64,
`TENANT_ID` Int64,
INDEX IDX_ID ID TYPE minmax GRANULARITY 1
)
ENGINE = MergeTree
PARTITION BY (toYYYYMM(toDate(CREATE_TIME)), TYPE1, TYPE2, TENANT_ID)
ORDER BY (SRC_IP, CREATE_TIME, KEY, DST_IP)
SETTINGS index_granularity = 8192
```
###### 3.2.4.2 所有分区
test.ALARM_INFO 下的所有分区如下:

也可通过系统表 system.parts 查看分区情况
```sql
SELECT name, partition_id, min_block_number, max_block_number, level, data_version
FROM system.parts
WHERE table = 'ALARM_INFO' and active;
```

###### 3.2.4.3 分区目录命名
示例分区 **202212-100000-109999--1_138_138_0**:

- 分区键:PARTITION BY (toYYYYMM(toDate(CREATE_TIME)), TYPE1, TYPE2, TENANT_ID)
- 分区 ID:202212-100000-109999--1
- toYYYYMM(toDate(CREATE_TIME)):CREATE_TIME 数据类型为 DateTime,生成的分区 ID 为 202212
- TYPE1:TYPE1 数据类型为 Int32,生成的分区 ID 为 100000
- TYPE2:TYPE2 数据类型为 Int32,生成的分区 ID 为 109999
- TENANT_ID:TENANT_ID 数据类型为 Int64,生成的分区 ID 为 -1
- MinBlockNum & MaxBlockNum:新分区 MinBlockNum 和 MaxBlockNum 默认相等,为 138
- Level:初始值为 0
###### 3.2.4.4 columns.txt & count.txt

###### 3.2.4.5 partition.dat & minmax_[column].idx
202212-100000-109999--1_138_138_0 分区下:
**partition.dat**,记录分区表达式最终生成的值,此示例分区表达式值为 202212-100000-109999--1
**minmax_[column].idx**,记录当前分区下分区字段对应原始数据的最小值和最大值,各 minmax\_[column].idx 记录的数值如下
- **minmax_CREATE_TIME.idx:** CREATE_TIME 分区表达式的最大最小值,便于查找时过滤数据

- **minmax_TENANT_ID.idx:** 分区直接指定字段,最大最小值固定为 -1
- **minmax_TYPE1.idx:**分区直接指定字段,最大最小值固定为 100000
- **minmax_TYPE2.idx:**分区直接指定字段,最大最小值固定为 109999
###### 3.2.4.6 分区合并
示例分区:
- **202212-100000-109999--1_138_138_0**
- toYYYYMM(toDate(CREATE_TIME)) = 202212
- TYPE1 = 100000
- TYPE2 = 109999
- TENANT_ID = -1
- **202212-10000-10001--1_129_129_0**
- toYYYYMM(toDate(CREATE_TIME)) = 202212
- TYPE1 = 10000
- TYPE2 = 10001
- TENANT_ID = -1
新增数据:
```sql
# 202212-100000-109999--1(分区 ID) 新增如下数据
# 执行顺序 1
INSERT INTO test.ALARM_INFO
(`KEY`, CREATE_TIME, DST_IP, TYPE2, TYPE1, `LEVEL`, NAME, ID, SRC_IP, STATUS, TOTAL, TENANT_ID)
VALUES('1671098400000', '2022-12-15 18:45:00', '192.168.1.1', 109999, 100000, 2, '其它APT事件(合并测试1)', 1604798696220835841, '192.168.1.1', 0, 1, -1);
# 执行顺序 4
INSERT INTO test.ALARM_INFO
(`KEY`, CREATE_TIME, DST_IP, TYPE2, TYPE1, `LEVEL`, NAME, ID, SRC_IP, STATUS, TOTAL, TENANT_ID)
VALUES('1671098400000', '2022-12-15 18:45:00', '192.168.1.1', 109999, 100000, 2, '其它APT事件(合并测试2)', 1604798809794154498, '192.168.1.1', 0, 1, -1);
# 202212-10000-10001--1(分区 ID) 新增如下数据
# 执行顺序 2
INSERT INTO test.ALARM_INFO
(`KEY`, CREATE_TIME, DST_IP, TYPE2, TYPE1, `LEVEL`, NAME, ID, SRC_IP, STATUS, TOTAL, TENANT_ID)
VALUES('1671094800000', '2022-12-15 17:35:00', '192.168.1.1', 10001, 10000, 2, '目录遍历攻击(合并测试3)', 1604799242579218433, '192.168.1.1', 0, 13, -1);
# 执行顺序 3
INSERT INTO test.ALARM_INFO
(`KEY`, CREATE_TIME, DST_IP, TYPE2, TYPE1, `LEVEL`, NAME, ID, SRC_IP, STATUS, TOTAL, TENANT_ID)
VALUES('1671094800000', '2022-12-15 17:35:00', '192.168.1.1', 10001, 10000, 2, '目录遍历攻击(合并测试4)', 1604799273151619073, '192.168.1.1', 0, 13, -1);
# 执行合并命令
optimize TABLE test.ALARM_INFO FINAL;
```
**202212-100000-109999--1**(分区 ID )新增数据后分区如下:

**202212-100000-109999--1**(分区 ID )执行合并后分区如下:

说明(PartitionID_MinBlockNum_MaxBlockNum_Level):
- 202212-100000-109999--1_138_138_0 为原有的分区
- 202212-100000-109999--1_168_168_0 & 202212-100000-109999--1_165_165_0 为新增的分区
- MinBlockNum & MaxBlockNum 根据执行顺序依次全局递增,分别为 165(执行顺序 1)、168(执行顺序 4)
- Level 未合并时为 0
- 合并后只存在一个分区 202212-100000-109999--1_138_168_1
- 取同一分区内所有目录中最小的 MinBlockNum 为 138
- 取同一分区内所有目录中最大的 MaxBlockNum 为 168
- 取同一分区内最大 Level 值并加 1 为 1
**202212-10000-10001--1**(分区 ID )新增数据后分区如下:

**202212-10000-10001--1**(分区 ID )执行合并后分区如下:

说明(PartitionID_MinBlockNum_MaxBlockNum_Level):
- 202212-10000-10001--1_129_129_0 为原有的分区
- 202212-10000-10001--1_166_166_0 & 202212-10000-10001--1_167_167_0 为新增的分区
- MinBlockNum & MaxBlockNum 根据执行顺序依次全局递增,分别为 166(执行顺序 2)、167(执行顺序 3)
- Level 未合并时为 0
- 合并后只存在一个分区 202212-10000-10001--1_129_167_1
- 取同一分区内所有目录中最小的 MinBlockNum 为 129
- 取同一分区内所有目录中最大的 MaxBlockNum 为 167
- 取同一分区内最大 Level 值并加 1 为 1
###### 3.2.4.7 mutate 示例
示例数据:
- **202212-100000-109999--1_138_168_1**
- toYYYYMM(toDate(CREATE_TIME)) = 202212
- TYPE1 = 100000
- TYPE2 = 109999
- TENANT_ID = -1
- ID = 6406723454606794851
修改数据:
```sql
# 修改数据
ALTER TABLE test.ALARM_INFO UPDATE STATUS = 1 WHERE ID = 6406723454606794851;
# 执行合并命令
optimize TABLE test.ALARM_INFO FINAL;
```
修改数据之前分区如下:

修改数据之后分区如下:多了个带 data_version 后缀的分区

执行合并后分区如下:

说明(PartitionID_MinBlockNum_MaxBlockNum_Level_DataVersion):
- 202212-100000-109999--1_138_168_1 为原有的分区
- 202212-100000-109999--1_138_168_1_169 为新增的分区
- DataVersion = 169,DataVersion 和 MinBlockNum、MaxBlockNum 共用自增 ID 空间,自增后为 169
- Level 未合并时为 1
- 合并后只存在一个分区 202212-100000-109999--1_138_168_2_169
- 取同一分区内所有目录中最小的 MinBlockNum 为 138
- 取同一分区内所有目录中最大的 MaxBlockNum 为 168
- 取同一分区内最大 Level 值并加 1 为 2
- 取同一分区内所有目录中最大的 DataVersion 为 169
#### 3.3、一级索引
MergeTree 的主键使用 PRIMARY KEY 定义,主键定义之后,MergeTree 会依据 index_granularity 间隔(默认 8192 行)为数据表生成一级索引并保存至 primary.idx 文件中,并按照主键进行排序,如果不指定 PAIMARY KEY,那么主键默认和排序键相同,在这种情况下,索引(primary.idx)和数据(data.bin)会按照完全相同的规则排序
##### 3.3.1 稀疏索引
primary.idx文件内的一级索引采用稀疏索引实现,在稀疏索引中,每一行索引标记对应的是一段数据,而不是一行,仅需使用少量的索引标记就能够记录大量数据的区间位置信息,且数据量越大优势越为明显,由于稀疏索引占用空间小,所以primary.idx内的索引数据常驻内存
##### 3.3.2 索引粒度
索引粒度由 index_granularity 参数控制,表示索引与索引之间的间隔,ClickHouse 也提供了自适应粒度大小的特性
- index_granularity 可在建表的时候通过 SETTINGS 指定
- index_granularity 也影响数据标记文件(.mrk)的间隔粒度和数据文件(.bin)的数据压缩
- 数据以 index_granularity 的粒度(默认 8192 )被标记成多个小的区间,其中每个区间最多 index_granularity 行数据

MarkRange 说明:
- MarkRange 表示一个具体的区间,与索引编号对应
- start 和 end 属性对应索引编号的取值,表示此区间范围
##### 3.3.3 索引生成规则
MergeTree 间隔 index_granularity 行数据生成一条索引记录,其索引值会依据声明的主键字段获取
如:
单个主键
- PARTITION BY toYYYYMM(EventDate)
- ORDER BY CounterID

多个主键
- PARTITION BY toYYYYMM(EventDate)
- ORDER BY(CounterID,EventDate)

##### 3.3.4 索引查询过程
1. 根据 where 条件生成查询条件区间,即便是单个值的查询条件,也会被转换成区间的形式
- WHERE ID = 'A003':['A003', 'A003']
- WHERE ID > 'A000':('A000', +inf)
- WHERE ID < 'A188:(-inf, 'A188')
- WHERE ID LIKE 'A006%':['A006', 'A007')
2. 递归交集判断,以递归的形式,依次对 MarkRange 的数值区间与条件区间做交集判断,从最大的区间开始
- 如果不存在交集,则直接通过剪枝算法优化此整段 MarkRange
- 如果存在交集,且 MarkRange 步长大于 8 (end-start),则将此区间进一步拆分成 8 个子区间(由 merge_tree_coarse_index_granularity 指定),并重复此规则继续做递归交集判断
- 如果存在交集,且 MarkRange 不可再分解(步长小于8),则记录 MarkRange 并返回
3. 合并 MarkRange 区间,将最终匹配的 MarkRange 聚在一起,合并它们的范围
merge_tree_coarse_index_granularity:
搜索数据时 ClickHouse 检查索引文件中的数据标记,如果 ClickHouse 发现所需键在某个范围内,则会将该范围划分为merge_tree_coarse_index_granularity 子范围,然后在该范围内递归搜索所需键,默认8,可选任何正偶数整数
示例:

#### 3.4、二级索引(跳数索引)
二级索引又称跳数索引,由数据的聚合信息构建而成,根据索引类型的不同,其聚合信息的内容也不同,跳数索引的目的与一级索引一样,也是查询时减少数据扫描的范围;
声明跳数索引会额外生成相应的索引文件(skp_idx\_[IndexName].idx)和索引标记文件(skp_idx_[IndexName].mrk3);
旧版本中跳数索引在默认情况下是关闭的,需要配置开启:
```sql
SET allow_experimental_data_skipping_indices = 1
```
##### 3.4.1 索引声明
创表时声明:
```sql
CREATE TABLE table_name (
column1 type,
column2 type,
......
INDEX index_name expr TYPE index_type(...) GRANULARITY granularity
)
```
ALTER TABLE 声明:
```sql
ALTER TABLE table_name ADD INDEX index_name expr TYPE index_type(...) GRANULARITY granularity;
# 注:该方式声明跳数索引只应用于新插入的数据,对旧数据无效果,要使旧数据生效,需执行如下语句
ALTER TABLE table_name MATERIALIZE INDEX index_name;
```
说明:
- index_name:索引名称,用于在每个分区中创建索引文件,在删除或具体化索引时也需要将其作为参数
- expr:索引表达式,用于计算存储在索引中的值集,可以是列、简单操作符、函数的子集的组合
- index_type(...):索引类型,决定索引生成的计算方式
- granularity:索引汇总粒度,每种跳数索引都有一个 granularity 参数,表示每隔 granularity 个索引粒度 (index_granularity ) 才会生成一次跳数索引
示例:
```sql
# 索引名称 minmax_test
# 应用字段 total
# 索引类型 minmax
# 索引汇总粒度 3
ALTER TABLE table_name ADD INDEX minmax_test total TYPE minmax GRANULARITY 3;
```
跨 3 个 index_granularity 粒度,取这 3 个 index_granularity 粒度中的最大最小值汇总后再次取最大最小值生成索引:[1, 9]

##### 3.4.2 索引类型
测试数据 1:
```sql
CREATE TABLE test.SKIP_INDEX
(
KEY1 UInt64,
KEY2 UInt64,
VALUE UInt64
)
ENGINE MergeTree primary key KEY1
SETTINGS index_granularity=8192;
# numbers(N) 返回 0 ~ N-1 的整数
# intDiv 计算数值的商,向下舍入取整(按绝对值)
INSERT INTO test.SKIP_INDEX SELECT number, number, intDiv(number,4096) FROM numbers(100000000);
```
test.SKIP_INDEX 表最终生成的数据:
- 总共生成 100000000 行数据
- KEY1 与 KEY2 值一致,取值依次为 0 ~ 99999999,KEY1 为一级索引
- VALUE 取值从 0 开始,每增加 4096 行取值加 1
- KEY1(0 ~ 4095):取值 0
- KEY1 (4096 ~ 8191):取值 1
- 索引粒度 index_granularity 为 8192
###### 3.4.2.1 minmax
- 记录了一段数据内的最小值和最大值
- 如果表达式是一个元组,它分别存储元组元素的每个成员的值
- 只适用于标量或元组表达式,不适用返回值是数组或 map 数据类型的表达式
- 查询数据时可以先基于 minmax 值判断,从而跳过大多数不需要扫描的索引粒度
示例:
1、引用测试数据 1
2、无跳数索引执行如下 SQL
```sql
# clickhouse-client --host 192.168.200.138 --port 9000 --password=123456 --send_logs_level=trace <<< "SELECT * FROM test.SKIP_INDEX WHERE KEY2 = 831647" > /dev/null
SELECT * FROM test.SKIP_INDEX WHERE KEY2 = 831647;
```
3、查看执行日志:
扫描了全表数据,耗时 ≈ 0.56s
```
executeQuery: Read 100000000 rows, 763.00 MiB in 0.557068758 sec., 179511054 rows/sec., 1.34 GiB/sec.
```
4、增加跳数索引
```sql
# 建立索引
ALTER TABLE test.SKIP_INDEX ADD INDEX IDX_KEY2 KEY2 TYPE minmax GRANULARITY 1;
# 使已经存在的数据生效
ALTER TABLE test.SKIP_INDEX MATERIALIZE INDEX IDX_KEY2;
```
- 在 KEY2 上建立索引
- 索引类型为 minmax
- 索引汇总粒度 granularity 为 1
5、重新执行SQL(步骤 2 )查看日志:
只扫描了 8192 行数据,耗时 ≈ 0.008s
```
executeQuery: Read 8192 rows, 192.00 KiB in 0.008414033 sec., 973611 rows/sec., 22.28 MiB/sec.
```
###### 3.4.2.2 set(max_rows)
- 记录了声明字段或表达式的取值(唯一值,无重复)
- max_rows 指在一个 index_granularity 内生成索引记录的行数上限,为 0 表示无限制
- 适用重复性较高的列
示例:
1、引用测试数据 1
2、无跳数索引执行如下 SQL
```sql
# clickhouse-client --host 192.168.200.138 --port 9000 --password=123456 --send_logs_level=trace <<< "SELECT * FROM test.SKIP_INDEX WHERE VALUE IN (125, 700)" > /dev/null
SELECT * FROM test.SKIP_INDEX WHERE VALUE IN (125, 700);
```
3、查看执行日志:
扫描了全表数据,耗时 ≈ 0.17s
```
executeQuery: Read 100000000 rows, 763.13 MiB in 0.156677674 sec., 638253028 rows/sec., 4.76 GiB/sec.
```
4、增加跳数索引
```sql
# 建立索引
ALTER TABLE test.SKIP_INDEX ADD INDEX IDX_VALUE VALUE TYPE set(100) GRANULARITY 2;
# 使已经存在的数据生效
ALTER TABLE test.SKIP_INDEX MATERIALIZE INDEX IDX_VALUE;
```
- 在 VALUE 上建立索引
- 索引类型为 set,单个 index_granularity 只生成 100 条索引,测试数据 1 VALUE 字段取值在单个 index_granularity 排重后只会有两个值
- 索引汇总粒度 granularity 为 2,在两个 index_granularity 内排重
5、重新执行SQL(步骤 2 )查看日志:
只扫描了 32768 行数据,耗时 ≈ 0.02s
```
executeQuery: Read 32768 rows, 448.00 KiB in 0.020263033 sec., 1617132 rows/sec., 21.59 MiB/sec.
```
定位 VALUE = 125 数据:

###### 3.4.2.3 ngrambf_v1
待补充
###### 3.4.2.4 tokenbf_v1
待补充
##### 3.4.3 函数支持
在使用 where 条件查询的时候,如果 where 条件表达式中包含跳数索引列,ClickHouse 会在执行函数时尝试使用索引,不同的函数对索引的支持是不同的,其中 set 索引对所有函数都生效,其他跳数索引支持如下

#### 3.5、数据存储
在 MergeTree 中数据按列存储,在旧版本中每列独立存储即每个列字段都拥有一个与之对应的 .bin 文件,新版本中这些列字段对应的 .bin 文件合并在一起了,只有一个 data.bin(分区数据量达到阀值依旧是分开存储);
按列存储优势:
- 可以更好地进行数据压缩,相同类型的数据放在一起,对压缩更加友好
- 能够最小化数据扫描的范围
.bin 文件中的数据是压缩后的数据,数据会事先依照 ORDER BY 的声明排序,然后以压缩数据块的形式被组织并写入.bin文件中,目前支持 LZ4、ZSTD、Multiple 和 Delta 几种算法,默认使用LZ4算法进行压缩
压缩数据块:
一个压缩数据块由头信息和压缩数据两部分组成,头信息固定使用 9 位字节表示,具体由 1 个 UInt8(1字节)整型和 2 个 UInt32(4字节)整型组成,分别代表使用的压缩算法类型、压缩后的数据大小和压缩前的数据大小;

每个压缩数据块的体积,按照其压缩前的数据字节大小,都被严格控制在64KB~1MB,其上下限分别由 min_compress_block_size(默认65536)与 max_compress_block_size(默认1048576)参数指定;
而一个压缩数据块最终的大小,则和一个索引粒度(index_granularity)内数据的实际大小相关,MergeTree 在数据具体的写入过程中,会依照索引粒度(默认情况下,每次取8192行),按批次获取数据并进行处理,一批次获取一个索引粒度的数据:
- 单个批次数据 < 64KB:如果单个批次数据小于 64KB,则继续获取下一批数据,直至累积到 >= 64KB 时生成压缩数据块
- 单个批次数据 64KB <= size <= 1MB:如果单个批次数据大小恰好在64KB与1MB之间,则直接生成压缩数据块
- 单个批次数据 > 1MB:如果单个批次数据超过 1MB,则首先按照 1MB 大小截断并生成压缩数据块,剩余数据继续依照上述规则执行,此时会出现一个批次数据生成多个压缩数据块的情况
在具体读取某一列数据时(.bin文件),首先需要将压缩数据加载到内存并解压,然后才进行后续的数据处理,通过压缩数据块,可以在不读取整个 .bin 文件的情况下将读取粒度降低到压缩数据块级别,从而进一步缩小数据读取的范围
#### 3.6、数据标记文件(.mrk)
数据标记和索引区间是对齐的,均按照 index_granularity 的粒度间隔,只需简单通过索引区间的下标编号就可以直接找到对应的数据标记;
数据标记文件与 .bin 文件一一对应,每一个 .bin 文件都有一个与之对应的 .mrk 数据标记文件,用于记录数据在 .bin 文件中的偏移量信息,一行标记数据使用一个元组表示,元组内包含两个整型数值的偏移量信息,分别表示在此段数据区间内,在对应的 .bin 压缩文件中,压缩数据块的起始偏移量,以及将该数据压缩块解压后其未压缩数据的起始偏移量;
MergeTree 定位压缩数据块并读取数据:
1. 在查询某一列数据时,MergeTree 无须一次性加载整个 .bin 文件,而是可以根据需要,只加载特定的压缩数据块,而这项特性需要借助标记文件中所保存的压缩文件中的偏移量
2. 在读取解压后的数据时,MergeTree 并不需要一次性扫描整段解压数据,它可以根据需要,以 index_granularity 的粒度加载特定的一小段,为了实现这项特性,需要借助标记文件中保存的解压数据块中的偏移量
#### 3.7、数据查询过程
如果一条查询语句没有指定任何 WHERE 条件,或是指定了 WHERE 条件,但条件没有匹配到任何索引(分区索引、一级索引和二级索引),那么 MergeTree 就不能预先减小数据范围,在后续进行数据查询时,它会扫描所有分区目录,以及目录内索引段的最大区间,虽然不能减少数据范围,但是 MergeTree 仍然能够借助数据标记,以多线程的形式同时读取多个压缩数据块,以提升性能
### 4、数据副本
#### 4.1、概述
Replicated 系列的表引擎均支持数据副本
特点:
- 依赖 zookeeper,以实现副本之间的同步
- 表级别的副本,每张表的副本配置都可以按照它的实际需求进行个性化定义,包括副本的数量,以及副本在集群内的分布位置等
- 多主架构(Multi Master),可以在任意一个副本上执行 INSERT 和 ALTER,它们的效果是相同的,这些操作会借助ZooKeeper的协同能力被分发至每个副本以本地形式执行
- Block数据块,在执行 INSERT 命令写入数据时,会依据 max_insert_block_size 的大小(默认1048576行)将数据切分成若干个 Block 数据块,Block 数据块是数据写入的基本单元,具有写入的原子性和唯一性
- 原子性,在数据写入时,一个 Block 块内的数据要么全部写入成功,要么全部失败
- 唯一性,按照当前 Block 数据块的数据顺序、数据行和数据大小等指标,计算 Hash 信息,根据 Hash 信息预防 Block 数据块重复写入
- 支持单集群和多集群架构,既可以将所有节点组成一个单一集群,也可以按照业务的诉求,把节点划分为多个小集群
#### 4.2、配置
##### 4.2.1、Zookeeper 配置
ClickHouse 使用一组 Zookeeper 标签定义相关配置,默认情况下,在全局配置 config.xml 中定义即可,也可将该配置抽离出来,独立使用配置文件
示例:
1. 目录:/etc/clickhouse-server/config.d
2. 配置文件:metrika.xml
3. 配置内容:
```xml
192.168.200.143
2181
```
4. 全局配置 config.xml 中使用 标签导入 metrika.xml 配置
```xml
/etc/clickhouse-server/config.d/metrika.xml
```
5. 引用 zookeeper 配置,incl 与 metrika.xml 配置文件内的节点名称要彼此对应
```xml
```
6. 重启服务
- systemctl restart clickhouse-server
- systemctl status clickhouse-server
可通过系统表 system.zookeeper 查看 zookeeper 信息:
```sql
-- clickhouse-client --host=127.0.0.1 --port=9000 --user=default --password=123456
SELECT * FROM system.zookeeper WHERE path = '/';
```
****
##### 4.2.2、Replicated 系列表定义
```sql
ENGINE = ReplicatedMergeTree('zk_path', 'replica_name')
```
- zk_path:指定在 ZooKeeper 中创建的数据表的路径,可自定义,一般参考约定路径命名,同一张数据表的同一个分片的不同副本 定义相同的 zk_path
- 约定路径为:/clickhouse/tables/{shard}/table_name
- clickhouse/tables/:固定前缀
- {shard}:分片编号,通常用数值替代,例如 01、02、03
- table_name:数据表名称
- replica_name:在 ZooKeeper 中副本的名称,该名称是区分不同副本实例的唯一标识,同一张数据表的同一个分片的不同副本定义不同的 replica_name,即不应该有副本的 replica_name 相同
示例:
```sql
# REPLICATED_TABLE 表拥有一个分片,一个副本,zk_path 相同,replica_name 不同
ENGINE = ReplicatedMergeTree('clickhouse/tables/01/REPLICATED_TABLE', '192.168.200.142')
ENGINE = ReplicatedMergeTree('clickhouse/tables/01/REPLICATED_TABLE', '192.168.200.143')
```
完整示例:
```sql
CREATE TABLE test.REPLICATED_TABLE
(
`KEY` String,
`CREATE_TIME` DateTime,
`DST_IP` String,
`TYPE2` Int32,
`TYPE1` Int32,
`LEVEL` Int32,
`NAME` String,
`ID` Int64,
`SRC_IP` String,
`STATUS` Int32,
`TOTAL` Int64,
`TENANT_ID` Int64,
INDEX IDX_ID ID TYPE minmax GRANULARITY 1
)
ENGINE = ReplicatedMergeTree('clickhouse/tables/01/REPLICATED_TABLE', '192.168.200.142')
PARTITION BY (toYYYYMM(toDate(CREATE_TIME)), TYPE1, TYPE2, TENANT_ID)
ORDER BY (SRC_IP, CREATE_TIME, KEY, DST_IP)
SETTINGS index_granularity = 8192
CREATE TABLE test.REPLICATED_TABLE
(
`KEY` String,
`CREATE_TIME` DateTime,
`DST_IP` String,
`TYPE2` Int32,
`TYPE1` Int32,
`LEVEL` Int32,
`NAME` String,
`ID` Int64,
`SRC_IP` String,
`STATUS` Int32,
`TOTAL` Int64,
`TENANT_ID` Int64,
INDEX IDX_ID ID TYPE minmax GRANULARITY 1
)
ENGINE = ReplicatedMergeTree('clickhouse/tables/01/REPLICATED_TABLE', '192.168.200.143')
PARTITION BY (toYYYYMM(toDate(CREATE_TIME)), TYPE1, TYPE2, TENANT_ID)
ORDER BY (SRC_IP, CREATE_TIME, KEY, DST_IP)
SETTINGS index_granularity = 8192
```
#### 4.3、Zookeeper 目录说明
Zookeeper 只负责副本之间的协同通信(INSERT 数据写入、MERGE 分区、MUTATION 操作、ALTER 修改元数据),不会涉及表数据的传输,查询也不需要通过 Zookeeper,即 Zookeeper 并不会承受太大压力
- /metadata:元数据信息,包括主键、分区键、采样表达式等
- /columns:元数据信息 - 列字段信息,包括列名称、数据类型
- /replicas:元数据信息 - 副本名称,对应 replica_name
- /leader_election:用于主副本选举,主副本主导 MERGE 分区和 MUTATION 操作,这些操作在主副本完成之后再借助 ZooKeeper 将消息事件分发至其他副本,选举的方式是向 /leader_election 插入子节点,第一个插入成功的副本就是主副本
- /blocks:记录 Block 数据块的 Hash 信息和对应的 partition_id,Hash 信息可判断 Block 数据块是否重复,partition_id 可找到需要同步的数据分区
- /block_numbers:按照分区的写入顺序,以相同的顺序记录 partition_id,各个副本在本地进行 MERGE 时,都会依照相同的block_numbers 顺序进行
- /quorum:记录 quorum 的数量,当至少有 quorum 数量的副本写入成功后,整个写操作才算成功,quorum 的数量由insert_quorum 参数控制,默认值为 0
- /log:常规操作日志节点(INSERT、MERGE 分区、DROP PARTITION),保存副本需要执行的任务指令,log 使用 ZooKeeper 的持久顺序型节点,每条指令的名称以 log- 为前缀递增,如log-0000000000、log-0000000001,每个副本实例都会监听/log节点,当有新的指令加入时,它们会把指令加入副本各自的任务队列,并执行任务
- /mutations:MUTATION 操作日志节点,当执行 ALERT DELETE 或 ALERT UPDATE 查询时,操作指令会被添加到这个节点mutations 使用 ZooKeeper 的持久顺序型节点,命名无前缀,每条指令直接以递增数字的形式保存,如0000000000、0000000001
- /replicas/{replica_name}/*:每个副本各自的节点下的一组监听节点,用于指导副本在本地执行具体的任务指令
- /replicas/{replica_name}/queue:任务队列节点,用于执行具体的操作任务,当副本从 /log 或 /mutations 节点监听到操作指令时,会将执行任务添加至该节点下,基于队列执行
- /replicas/{replica_name}/log_pointer:log 日志指针节点,记录最后一次执行的 log 日志下标,如 log_pointer=4 则指向 /log/log-0000000003(从0开始计数)
- /replicas/{replica_name}/mutation_pointer:mutations 日志指针节点,记录最后一次执行的 mutations 日志,如mutation_pointer=0000000000 指向 /mutations/000000000
#### 4.4、INSERT 执行流程
某个数据表拥有一个分片和一个副本,副本(副本 01、副本 02),现向任意一个副本中写入数据,假设在副本 01 上执行 INSERT 操作:
1. **副本 01 在本地完成分区目录的写入**
- 在 Zookeeper 的 /blocks 节点中写入数据分区的 block_id,用于去重(重复执行同一条 INSERT 只会产生一条记录)
- 如果设置了 insert_quorum 参数(默认为 0 ),并且 insert_quorum >= 2,则副本 01 会监控已完成写入操作的副本个数,只有当写入副本个数大于或等于 insert_quorum 时,整个写入操作才算成功
2. **副本 01 推送 log 日志,由执行 INSERT 的副本向 Zookeeper 的 /log 节点推送操作日志**
3. **其他副本实例拉取 log 日志**
- 其他副本(副本 02)监听 /log 节点变化,当副本 01 推送了 log 之后,副本 02 触发日志的拉取任务并更新 log_pointer,将其指向最新日志下标
- 在拉取了 log 之后,并不会直接执行,而是将其转为任务对象放至队列(/replicas/{replica_name}/queue)
4. **执行 /queue 队列任务,发起下载请求,副本 02 选择一个远端的其他副本作为数据的下载来源,选择算法大致如下**
- 从 /replicas 节点拿到所有的副本节点
- 遍历所有副本节点,选取其中一个,选取的副本需要拥有最大的 log_pointer 下标,并且 /queue 子节点数量最少,log_pointer 下标最大意味该副本执行的日志最多数据更加完整,/queue最小意味该副本目前的任务执行负担较小
- 如果下载请求失败,默认情况下将重新请求,一共会尝试 5 次(由 max_fetch_partition_retries_count 参数控制,默认为5)
- 本示例只有两个副本,所有只能选择副本 01
5. **下载数据并完成写入,副本 01 响应副本 02 的下载请求,副本 02 下载数据并写入本地**
#### 4.5、MERGE 分区合并执行流程
无论 MERGE 操作从哪个副本发起,其合并计划都会交由主副本来制定;
某个数据表拥有一个分片和一个副本,副本(副本 01、副本 02,主副本为副本 01),现在副本 02 上执行 OPTIMIZE,强制触发 MERGE 操作:
1. **发起 MERGE 操作的副本与主副本建立连接**
- 副本 02 通过 /replicas 节点信息找到主副本-副本 01,并尝试建立远程连接
2. **主副本接收通信,副本 01 接收来自副本 02 的通信,建立连接**
3. **主副本制定 MERGE 计划并推送 log 日志**
- 主副本还会锁住执行线程,对日志的接收情况进行监听,其监听行为由 replication_alter_partitions_sync 参数控制,默认值为1,当此参数为 0 时不做任何等待,为 1 时只等待主副本自身完成,为 2 时等待所有副本拉取完成(拉取 log 中的任务)
4. **各个副本分别拉取 log 日志,并推送至各自的 /queue 队列**
5. **各个副本分别在本地执行 MERGE 操作**
注:主副本选举,ClickHouse 使用 Zookeeper 来进行主副本选举,在集群中,每个分片都有一个主副本和零个或多个备份副本 ,主副本是分片数据的主要拥有者,负责执行写操作和一些读操作,备份副本仅用于读操作和容错,如果主副本失效,则备份副本会自动接管
当一个主副本失效时,Zookeeper 将会在备份副本之间选出一个新的主副本
- 备份副本会定期向主副本发送心跳信号,以检查主副本的状态,如果主副本长时间没有响应,备份副本就会认为主副本已经失效,开始进行主副本选举
- 在 Zookeeper 中,每个备份副本都创建一个临时节点,表示它们是可用的备份副本
- 如果当前没有主副本或者主副本已经失效,则备份副本会尝试获取一个 zookeeper 锁,如果它成功获取了锁,则它成为新的主副本
#### 4.6、MUTATION 操作执行流程
对 ReplicatedMergeTree 执行 ALTER DELETE 或者 ALTER UPDATE 操作,即为 MUTATION 操作;
无论 MUTATION 操作从哪个副本发起,都由主副本响应;
某个数据表拥有一个分片和一个副本,副本(副本 01、副本 02,主副本为副本 01),现在副本 02 上执行 ALTER UPDATE,触发 MUTATION 操作:
1. **推送 mutations 日志,副本 02 推送 mutations 日志**
2. **其他副本监听 mutations 日志,副本 01、副本 02 都会监听**
3. **由主副本响应 mutations 日志并再次推送 log 日志**
- 并不是所有副本都会响应 mutations 日志,只有主副本会响应 mutations 日志,主副本响应 mutations 日志并转换为 log 日志,以通知各个副本执行具体的操作
- 此示例由副本 01 响应 mutations 日志并再次推送 log 日志
4. **各个副本分别拉取 log 日志,副本 01、副本 02 分别拉取 log 日志并推送至执行队列 /queue**
5. **各个副本分别在本地执行 MUTATION 操作**
#### **4.7、ALTER 修改元数据**
某个数据表拥有一个分片和一个副本,副本(副本 01、副本 02),现在副本 02 上执行 ALTER 新增字段:
1. **修改共享元数据,在副本 02 上通过 ALTER 新增字段,副本 02 会修改 Zookeeper 中的共享元数据节点 /metadata,/columns 等**
- 数据修改后,节点的版本号也会同时提升
- 副本 02 还需负责监听所有副本的修改完成情况
2. **其他副本监听共享元数据变更并各自执行本地修改**
- 副本 01 和副本 02 两个副本分别监听共享元数据的变更,然后对本地的元数据版本号与共享版本号进行对比,最后进行更改
3. **发起副本确认所有副本完成修改**
#### **4.8、负载均衡器(clickhouse-balance)**
clickhouse-balance是ClickHouse官方提供的一个负载均衡器,它可以自动发现ClickHouse集群中的主副本和备份副本,并将客户端请求路由到合适的副本
- 自动发现主副本和备份副本:自动发现集群中的主副本和备份副本,并以动态方式管理它们的状态
- 支持多种负载均衡算法:如随机、轮询、最少连接等
- 支持多种协议和格式:如 HTTP、TCP、JSON、TabSeparated 等,可以与各种客户端和应用程序集成
- 支持健康检查和故障恢复:可以检测副本状态并在副本失效时自动切换到备份副本
负载均衡算法:
- 轮询算法(Round Robin):按照顺序依次将请求分配给每个副本,当所有副本都被分配了一次后,再从头开始分配
- 随机算法(Random):随机选择一个副本来处理请求,由于是随机选择,因此不一定能够实现均衡的负载分配
- 最少连接算法(Least Connections):选择连接数最少的副本来处理请求,这种算法可以实现对连接数较少的副本进行负载分配
- IP哈希算法(IP Hash):根据客户端IP地址的哈希值选择一个副本来处理请求,这种算法可以保证同一个客户端的请求都被分配到同一个副本处理,适用于需要保持会话状态的应用场景
- 一致性哈希算法(Consistent Hashing):通过将副本和请求映射到一个环上,根据请求的哈希值选择一个副本来处理请求,这种算法可以保证在添加或删除副本时,只有少量的请求需要重新分配,因此具有良好的可扩展性
- 加权轮询算法(Weighted Round Robin):按照副本的权重依次将请求分配给每个副本,权重越高的副本处理的请求越多,可以根据服务器的性能和负载情况来调整权重,以实现更为灵活的负载均衡
配置示例:
```ini
# Zookeeper配置
zookeeper {
address = "localhost:2181"
session_timeout = 30s
}
# 副本配置
replicas {
# 主副本
my_cluster_1 {
host = "192.168.1.1"
port = 9000
}
# 备份副本
my_cluster_2 {
host = "192.168.1.2"
port = 9000
}
}
# 负载均衡算法配置
load_balancing {
# 使用轮询算法
method = round_robin
}
# 健康检查和故障恢复配置
health_check {
# 检查间隔为10秒
interval = 10s
# 超时时间为5秒
timeout = 5s
# 最大重试次数为3次
max_retries = 3
}
# 日志和监控配置
logging {
# 输出日志到控制台
console {
level = info
}
# 输出监控信息到Prometheus
prometheus {
port = 9090
}
}
```
### 5、Distributed 表引擎
Distributed 表引擎是分布式表的代名词,它自身不存储任何数据,而是作为数据分片的透明代理,能够自动路由数据至集群中的各
个节点;
- **对于分布式表与本地表之间表结构的一致性检查,Distributed 表引擎采用了读时检查的机制,这意味着如果它们的表结构不兼容,只有在查询时才会抛出错误,而在创建表时并不会进行检查;**
- INSERT 和 SELECT 查询,Distributed 将会以分布式的方式作用于 local 本地表
- Distributed 支持部分元数据操作,包括:CREATE、DROP、RENAME、ALTER,其中 ALTER 并不包括分区的操作(ATTACH PARTITION、REPLACE、PARTITION 等),这些只会修改 Distributed 表自身,并不会修改本地表
- **Distributed 表不支持任何 MUTATION 类型的操作,包括 ALTER DELETE 和 ALTER UPDATE**
#### 5.1、表引擎定义
```
ENGINE = Distributed(cluster, database, table [,sharding_key])
```
- cluster:集群名称,与集群配置中的自定义名称相对应,在对分布式表执行写入和查询的过程中,使用集群的配置信息来找到对应的节点
- database:数据库名称
- table:本地表名称
- sharding_key:分片键,选填参数
#### 5.2、分片键
- 可以是整型类型字段的取值,包括 Int 系列和 UInt 系列
- 可以是返回整型的表达式
```sql
-- 按照字段取模,字段需是整型类型的取值
Distributed(cluster, database, table ,userid)
-- 也可以是返回整型的表达式
-- 按照随机数划分
Distributed(cluster, database, table ,rand())
-- 按照用户 id 的 hash 值划分
Distributed(cluster, database, table , intHash64(userid))
```
注:如果集群配置了权重,则按权重取模
```xml
10
……
20
……
```
#### 5.3、写入流程
##### 5.3.1、配置说明
集群配置:
两个分片,无副本
```xml
10
ch5.nauu.com
9000
20
ch6.nauu.com
9000
```
分布式表:
```sql
CREATE TABLE test_shard_2_all ON CLUSTER sharding_simple (
id UInt64
)ENGINE = Distributed(sharding_simple, default, test_shard_2_local, id)
```
##### 5.3.2、写入流程
假设在 **ch5 节点**,通过分布式表 **test_shard_2_all** 写入 **10、30、55、200** 四条记录
- 10 % 30 = 10,写入分片 ch6
- 30 % 30 = 0,写入分片 ch5
- 55 % 30 = 25,写入分片 ch6
- 200 % 30 = 20,写入分片 ch6
写入过程:
1. 在 ch5 执行 INSERT
- 根据分片规则划分数据,**30** 写入 **ch5**,**10、55、200** 写入 **ch6**
- 属于当前分片的数据写入本地表,即 **30 写入 ch5 节点的 test_shard_2_local**
2. 归属其他节点的数据,在当前节点生成临时文件,并与远端节点**(ch6)**建立连接,准备发送数据
- 临时文件目录为当前节点的分布式表目录下,即**在 ch5 的 test_shard_2_all 目录下生成临时文件**
- 文件命令格式:/database@host:port/[increase_num].bin
- 此示例 ch6 节点数据在 ch5 中的临时文件:/test_shard_2_all/default@ch6.nauu.com:9000/1.bin
3. 当前节点**(ch5)**向远端节点**(ch6)**发送数据
- 每份目录将会由独立的线程负责发送
- 数据在传输之前会被压缩
4. 远端节点**(ch6)**接收数据并写入本地
5. 当前节点**(ch5)**确认是否写入完毕
**注:在由 Distributed 表负责向远端分片发送数据时,有异步写和同步写两种模式**
- 异步写:在 Distributed 表写完本地分片之后,INSERT 就会返回成功写入的信息
- 同步写:在执行 INSERT 之后,会等待所有分片完成写入
- 参数配置:由 insert_distributed_sync 参数控制,默认为 false,即异步写
- 如果将其设置为 true,可通过 insert_distributed_timeout 参数控制同步等待的超时时间
##### 5.3.3、副本数据复制
通过 Distributed 复制数据:
- 在这种实现方式下,即使本地表不使用 ReplicatedMergeTree 表引擎,也能实现数据副本的功能,Distributed 会同时负责分片和副本的数据写入工作,而副本数据的写入流程与分片逻辑相同
- Distributed 节点需要同时负责分片和副本的数据写入工作,它很有可能会成为写入的单点瓶颈
通过 ReplicatedMergeTree 复制数据:
- 在集群的 shard 配置中增加 internal_replication 参数并将其设置为 true(默认为 false),那么 Distributed 表在该 shard 中只会选择一个合适的 replica 并对其写入数据
- 配合使用 ReplicatedMergeTree 作为本地表的引擎,则在该 shard 内,多个 replica 副本之间的数据复制会交由ReplicatedMergeTree 处理,不再由 Distributed 负责,从而为其减负
- 在 shard 中选择 replica 的算法大致如下:在 ClickHouse 的服务节点中,拥有一个全局计数器 errors_count,当服务出现任何异常时,该计数累积加 1,当 shard 拥有多个 replica 时,选择 errors_count 错误最少的那个
配置示例:
```xml
true
ch5.nauu.com
9000
ch6.nauu.com
9000
```
#### 5.4、查询流程
##### 5.4.1、副本路由规则
在查询数据的时候,如果集群中的一个 shard,拥有多个 replica,那么 Distributed 表引擎需要面临副本选择的问题,会使用负载均衡算法从众多 replica 中选择一个,而具体使用何种负载均衡算法,则由 load_balancing 参数控制
- random:默认的负载均衡算法,在 ClickHouse 的服务节点中,拥有一个全局计数器 errors_count,当服务发生任何异常时,该计数累积加1,random 算法会选择 errors_count 错误数量最少的 replica,如果多个 replica 的 errors_count 计数相同,则在它们之中随机选择一个
- nearest_hostname:首先选择 errors_count 错误数量最少的 replica,多个 replica 的 errors_count 计数相同,则选择集群配置中 host 名称与当前host 最相似的一个,相似规则是以当前 host 名称为基准按字节逐位比较,找出不同字节数最少的一个
- in_order:首先选择 errors_count 错误数量最少的 replica,多个 replica 的 errors_count 计数相同,则按照集群配置中 replica 的定义顺序逐个选择
- first_or_random:首先选择 errors_count 错误数量最少的 replica,多个 replica 的 errors_count 计数相同,选择集群配置中第一个定义的 replica,如果该replica不可用,则进一步随机选择一个其他的 replica
load_balancing 配置可在 config.xml 的 部分中配置(修改后需重启):
```xml
random
```
##### 5.4.2、多分片查询
- 首先由接收 SELECT 查询的 Distributed 表负责串联起整个过程,先将 SQL 语句,按照分片数量将查询拆分成若干个针对本地表的子查询,然后向各个分片发起查询,最后再汇总各个分片的返回结果
- 注意使用 Global 优化分布式子查询