# tapdata
**Repository Path**: cz_0004/tapdata
## Basic Information
- **Project Name**: tapdata
- **Description**: No description available
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: develop
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2024-02-01
- **Last Updated**: 2024-02-01
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
## 安装准备
### 环境准备
1. 在开始之前, 请保证您的环境安装了 Docker
2. 当前工具仅在 Linux 下进行过完整测试, 其他操作系统的适配正在进行中
3. 克隆当前仓库代码到本地: `git clone https://github.com/tapdata/tapdata.git && cd tapdata`
### 快速启动
1. 执行 `bash build/quick-use.sh` 会快速启动一个使用环境, 然后会自动进入 tapshell 交互客户端
2. 下次进入环境时, 可执行 `bash bin/tapshell.sh` 进入交互命令行工具
### 从源码编译启动
1. 执行 `bash build/quick-dev.sh` 会从源码编译, 并启动一个完整的使用环境, 然后会自动进入 tapshell 交互客户端
2. 下次进入环境时, 可执行 `bash bin/tapshell.sh` 进入交互命令行工具
### 环境清理
1. 执行 `bash build/clean.sh` 会清理包括编译中间产物, 编译镜像, 运行容器在内的全部内容, 但是会保留运行的任务配置与进度等信息
2. 如果需要删除任务运行配置, 请删除主目录的 data 目录即可
## 使用说明
1. 在环境启动后, 可通过 `bash bin/tapshell.sh` 进入交互客户端
交互客户端可使用命令模式, 或者 Shell API 模式进行实时数据平台的使用
### 基本概念
1. 数据连接器: 平台支持的数据连接类型, 比如 Mysql, PG, MongoDB
2. 数据源: 通过连接器创建的具体的数据来源
3. 数据表: 具有一定数据结构的数据集合
4. 任务:
### 查看资源
1. 查看支持的数据连接器
```
>>> show connectors
1839d8 MongoDB
183a77 Mysql
183af5 PostgreSQL
```
2. 查看创建的数据源
```
>>> show datasources
id status database_type name
183afa ready MongoDB mongodb
```
3. 查看数据源下的表
```
>>> use mongodb
datasource switch to: mongodb
>>> show tables
```
4. 查看数据表的结构
```
>>> use mongodb
datasource switch to: mongodb
>>> desc CAR_CLAIM
{
"_id": "OBJECT_ID",
"SETTLED_DATE": "DATE_TIME",
"CLAIM_ID": "STRING",
"SETTLED_AMOUNT": "INT32",
"CLAIM_REASON": "STRING",
"POLICY_ID": "STRING",
"CLAIM_DATE": "DATE_TIME",
"LAST_CHANGE": "DATE_TIME",
"CLAIM_AMOUNT": "INT32"
}
```
5. 查看任务列表, 分别为 任务id, 名字, 状态, 类型
```
>>> show jobs
system has 3 jobs
18415a: migrate running sync/initial_sync+cdc
1843e1: migrate2 error sync/initial_sync+cdc
```
### 操作数据源
```
# mongodb
>>> source = DataSource("mongodb", "$name").uri("$uri")
# mysql
>>> source = DataSource("mysql", "$name").host("$host").port($port).username("$username").port($port).db("$db")
# pg
>>> source = DataSource("postgres", "$name").host("$host").port($port).username("$username").port($port).db("$db").schema("$schema").logPluginName("wal2json")
# 保存数据源, 并加载表结构
>>> source.save()
# 重新加载表结构
>>> validate datasource $name
# 删除数据源
>>> delete datasource $name
```
### 操作任务
1. 同步一张表, 默认为全量+增量同步
```
# 创建一个工作流
>>> p = Pipeline("$name")
# 使用 readFrom 从源读取数据, 使用 writeTo 将其写向目标
>>> p.readFrom("$source_name.$table").writeTo("$sink_name.$table")
# 启动任务
>>> p.start()
# 监控工作流的任务, 查看指标与日志
>>> p.monitor()
>>> p.logs()
# 停止任务
>>> p.stop()
# 列出任务
>>> show jobs
# 监控任务, 查看指标与日志
>>> monitor job $name
>>> logs job $name [tail=False] [limit=10] [t=30]
# 停止任务
>>> stop job $name
# 删除任务
>>> delete job $name
```
2. 同步一张表, 并使用自定义函数进行一些简单数据处理, 目前你可以使用 Python3 语法来进行函数的定义
```
# 1. 定义一个方法, 对 record 进行变换, 并返回 record
>>> def fn(record):
record["x"] = 1
return record
# 2. 使用 processor 算子指定变换方法
>>> p.readFrom(...).processor(fn).writeTo(...)
```
3. 同步多张表
```
# 创建一个工作流
>>> p = Pipeline("$name")
# 新建一个包含多张数据表的数据读取源, 支持正则匹配
>>> source = Source("$datasource_name", ["table1", "table2"...])
>>> source = Source("$datasource_name", table_re="xxx.*")
# 通过 writeTo 方法, 可修改同步表的前后缀
>>> p.readFrom(source).writeTo("$datasource_name", prefix="", suffix="")
```
## 开源 License
Tapdata 使用复合 License
整个项目使用 Server Side Public License, 数据源连接相关的子项目使用 Apache V2 License
## 加入我们
- 微信
- [Slack](https://join.slack.com/t/tapdatacommunity/shared_invite/zt-1biraoxpf-NRTsap0YLlAp99PHIVC9eA)