数据同步工具datax配置与示例


前言

  • 什么是Datax?
    DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。
  • Datax特点
    DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

一、部署步骤

1、jdk环境

查看jdk的版本,推荐使用1.8,1.8以上也可。
在这里插入图片描述

2、python环境

datax是以python脚本形式的语言,需要python的运行环境,python2,3都可。
以python3为例,mac安装。

步骤一:安装

方式一:官网下载安装包

访问Python官方网站(Python安装包)下载适用于Mac的Python安装包。根据你的操作系统版本选择合适的安装包,安装步骤与正常软件安装步骤一致。
在这里插入图片描述

方式二:brew命令安装

首先mac中要有Homebrew(包管理器),如果未安装可以通过命令行进行安装。
安装步骤:

  1. 打开终端,输入命令
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
  1. 更新Homebrew的配方和包
brew update
  1. 查看python版本
brew search python@

在这里插入图片描述

  1. 安装python,可以指定版本,如果不指定版本,会安装最新版
brew install python3
  1. 安装时可能会提示git认证过期,若没有跳至第7步,需要重新认证,获取token地址:https://github.com/settings/tokens/new?scopes=gist,public_repo&description=Homebrew
    在这里插入图片描述在这里插入图片描述

  2. 在终端执行,第五步获取的token值

export HOMEBREW_GITHUB_API_TOKEN=

步骤二:配置环境变量

安装好python环境后,配置环境变量。

  • 配置编辑 vim ~/.bash_profile,加入alias python="/opt/homebrew/bin/python3",后面路径为安装的执行路径,可用which python3获取;
  • 刷新配置文件,source ~/.bash_profile,关闭终端重新打开

步骤三:验证

  • 配置完成后,验证是否安装成功
python --version

3、maven环境(可选)

如果下载源码进行编译的话,需要maven环境,直接下载datax工具包则不需要。
在这里插入图片描述

二、下载安装datax

方式1、2任选其一。

1、下载datax源码

  1. git地址:Datax源码
    在这里插入图片描述

  2. 本地打开源码后可能有依赖引用不到,需要手动添加,更换pom中的路径。
    在这里插入图片描述
    jconn4 -jar包下载地址:http://www.java2s.com/Code/Jar/j/Downloadjconn3jar.htm

  3. idea中添加jar包
    在这里插入图片描述

  4. 通过maven打包

cd  代码路径
mvn -U clean package assembly:assembly -Dmaven.test.skip=true

在这里插入图片描述

  1. 打包执行后,在target目录下
    在这里插入图片描述

2、下载datax安装文件

  1. 下载地址:Datax工具包
  2. 下载后解压
tar -zxvf datax.tar.gz -C /Users/wangzhaokun/Downloads/google

3、验证

  1. 自测脚本
# python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json
 
# 例如:
python /Users/wangzhaokun/Downloads/google/datax/bin/datax.py /Users/wangzhaokun/Downloads/google/datax/job/job.json

在这里插入图片描述
在这里插入图片描述
2. 异常处理
若执行脚本中出现缺少文件xxx/datax/plugin/reader/._drdsreader/plugin.json类似文件存在,则需要删除 plugin 目录下的所有的以 _ 开头的文件

#进入/datax/plugin目录下
find ./* -type f -name ".*er" | xargs rm -rf

三、配置示例

场景:oracle数据库中,视图数据同步到表中。

1、生成配置模版

生成配置文件可选的reader\writer在plugin目录下

  1. 执行python datax.py -r oraclereader -w oraclewriter,生成模版文件
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "oraclereader",
                    "parameter": {
                        "column": [],
                        "connection": [
                            {
                                "jdbcUrl": [],
                                "table": []
                            }
                        ],
                        "password": "",
                        "username": ""
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [],
                        "connection": [
                            {
                                "jdbcUrl": "",
                                "table": []
                            }
                        ],
                        "password": "",
                        "preSql": [],
                        "session": [],
                        "username": "",
                        "writeMode": ""
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
} 

2、根据配置模版生成配置文件

文件放在job目录下,模版文件中可通过表对表,也可能过sql查询结果,然后写入结果表。

#test.json
{
	"content":[
		{
			"reader":{
				"name":"oraclereader",
				"parameter":{
					"connection":[
						{
							"jdbcUrl":[
								"jdbc:oracle:thin:@//ip:port/gfmisdev"
							],
							"querySql":[
								" SELECT * from  MV_LEDGER_AGENCY"
							]
						}
					],
					"password":"*",
					"username":"KF_PAY_0512"
				}
			},
			"writer":{
				"name":"oraclewriter",
				"parameter":{
					"column":[
						"*"
					],
					"connection":[
						{
							"jdbcUrl":"jdbc:oracle:thin:@//ip:port/gfmisdev",
							"table":[
								"FM_LEDGER_AGENCY_TEST"
							]
						}
					],
					"password":"*",
					"preSql":[
						
					],
					"username":"KF_PAY_0512"
				}
			}
		}
	],
	"setting":{
		"speed":{
			"channel":"1"
		}
	}
}

3、运行datax

在bin目录下执行,也可通过全路径执行。

python datax.py ../job/test.json

4、执行成功后查看结果表,数据正常进入

在这里插入图片描述

四、任务配置说明

1、任务配置参数说明

  1. jdbcUrl

描述:目的数据库的 JDBC 连接信息。作业运行时,DataX 会在你提供的 jdbcUrl 后面追加如下属性:yearIsDateType=false&zeroDateTimeBehavior=convertToNull&rewriteBatchedStatements=true
注意:1、在一个数据库上只能配置一个 jdbcUrl 值。这与 MysqlReader 支持多个备库探测不同,因为此处不支持同一个数据库存在多个主库的情况(双主导入数据情况)
2、jdbcUrl按照Mysql官方规范,并可以填写连接附加控制信息,比如想指定连接编码为 gbk ,则在 jdbcUrl 后面追加属性 useUnicode=true&characterEncoding=gbk。

  1. username

目的数据库的用户名

  1. password

目的数据库的密码

  1. table

描述:目的表的表名称。支持写入一个或者多个表。当配置为多张表时,必须确保所有表结构保持一致。
注意:table 和 jdbcUrl 必须包含在 connection 配置单元中

  1. column

描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: “column”: [“id”,“name”,“age”]。如果要依次写入全部列,使用表示, 例如: “column”: [“”]。
column配置项必须指定,不能留空!
注意:1、不推荐你这样配置,因为当你目的表字段个数、类型等有改动时,你的任务可能运行不正确或者失败
2、 column 不能配置任何常量值

  1. session

DataX在获取Mysql连接时,执行session指定的SQL语句,修改当前connection session属性

  1. preSql

写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,请使用 @table 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, … datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:“preSql”:[“delete from 表名”],效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称

  1. postSql

写入数据到目的表后,会执行这里的标准语句

  1. writeMode

控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句,默认值:insert,所有选项:insert/replace/update

  1. batchSize

一次性批量提交的记录数大小,该值可以极大减少DataX与Mysql的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。默认值:1024.

2、任务配置示例

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "123456",
            "column": ["id","name","age"],//结果列,逗号分隔,若为空	或者"*",表示所有列,不建议"*"
            "connection": [
              {
                "querySql": ["SELECT * FROM user"], //可写多个sql,逗号分隔,查询多张表,及做其他操作
                "jdbcUrl": ["jdbc:mysql://localhost:3306/test"] //可配置多个数据源,逗号分隔
              }
            ],
             "where": "" // 可选,用于指定增量同步时的筛选条件,如时间戳字段大于某个值,多用于表同步,非sql查询时
             "preSql": [], // 可选,执行前的SQL语句,如创建表或清空表等
             "postSql": [] // 可选,执行后的SQL语句
            
            
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "123456",
            "column": ["id","name","age"],//结果列,逗号分隔,若为空	或者"*",表示所有列,不建议"*"
            "writeMode": "insert",//insert:插入模式,eplace:替换模式,truncateinsert:清空并插入模式,update:更新模式
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://localhost:3306/test",//可配置多个数据源,逗号分隔
                "table": ["user_copy"], //结果表,可多张,逗号分隔
              }
            ],
             
              "preSql": [], // 可选,执行前的SQL语句,如创建表或清空表等
              "postSql": [] // 可选,执行后的SQL语句
          }
        }
      }
    ]
  },
  "setting": {
    "speed": {
      "channel": 3  //并行度。channel 参数的值为 3,表示使用 3 个并行通道进行数据传输
    }
  }
}

3、动态参数说明

lastRuntime为动态参数,在json文件中需要用${}定义,例如 ${lastRuntime} 。

(1)示例

{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "yourUsername",
                        "password": "yourPassword",
                        "column": ["*"],
                        "connection": [
                            {
                                "querySql": ["select * from yourTable where time > ${lastRuntime}"]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "username": "yourUsername",
                        "password": "yourPassword",
                        "column": ["*"],
                        "writeMode": "insert",
                        "session": ["set session sql_mode='ANSI'"],
                        "preSql": ["delete from yourTable where time > ${lastRuntime}"]
                    }
                }
            }
        ]
    }
}

(2)执行

方式一:

python datax.py -p lastRuntime=2022-01-01 your_job_config.json

方式二:
定义一个参数的jparams.json文件,如:

{
    "lastRuntime": "2022-01-01"
}
python datax.py -jparams.json your_job_config.json

五、官方文档地址

相关推荐

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-08 18:20:04       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-08 18:20:04       100 阅读
  3. 在Django里面运行非项目文件

    2024-04-08 18:20:04       82 阅读
  4. Python语言-面向对象

    2024-04-08 18:20:04       91 阅读

热门阅读

  1. AI技术创业机会之金融科技

    2024-04-08 18:20:04       29 阅读
  2. MQ5之CCI交叉信号

    2024-04-08 18:20:04       31 阅读
  3. P8605 [蓝桥杯 2013 国 AC] 网络寻路

    2024-04-08 18:20:04       28 阅读
  4. UDP协议

    UDP协议

    2024-04-08 18:20:04      30 阅读
  5. 跨域问题及解决方案

    2024-04-08 18:20:04       35 阅读
  6. 【软件架构学习】一、基本概念

    2024-04-08 18:20:04       33 阅读
  7. 《空对象模式(极简c++)》

    2024-04-08 18:20:04       34 阅读
  8. QString()和QString(““)的区别

    2024-04-08 18:20:04       31 阅读