DataX 是阿里开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
DataX一般和DataX-Web联合使用,实现对任意数据库之间数据同步的调度执行。对于数据的同步,可以是全量更新和增量更新两种方式,对于大数据量的事物数据,例如:销售记录数据的同步,一般都是选择增量更新方式。
DataX-Web提供的增量更新支持基于自增ID的增量更新和基于时间的增量更新。在调度执行job任务时,dataX-Web会动态生成参数并在调用DataX执行时传入,例如:
Python /opt/module/datax/bin/datax.py -p "-DlastTime=2022-01-01 -DcurrentTime=2022-06-04"
DataX会把传入的参数写入环境变量,读入json脚本时会通过正则表达式查找${}包含的变量,并搜索环境变量进行替换。
通过以上代码分析,可知在json任意的位置定义${}变量,都能够被替换,如:
DataX的动态变量替换机制不够灵活,只能实现简单的变量替换,如果需要复杂的变量支持时,DataX就无法实现。例如:DataX-Web只能提供lastTime和currentTime两个参数,如果还需要支持其他参数,就没有办法了。
针对这块考虑对DataX进行修改,引入avaitor表达式框架,通过Avaitor表达式丰富的功能,实现复杂的动态参数机制。
实现代码:
- 下载DataX源码,并打开datax-common/pom.xml。添加avaitor表达式依赖。
org.apache.commons
commons-lang3
com.alibaba
fastjson
commons-io
commons-io
com.googlecode.aviator
aviator
5.2.7
- 在com.alibaba.datax.common.util包下添加FieldTokenProcessor.java类。
package com.alibaba.datax.common.util;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.Expression;
/**
* @author zouxuemo
*
* 提供对字符串中定义的表达式进行处理(通过Avaitor执行表达式计算)并重新拼装字符串的工具类
* List expression = FieldTokenProcessor.parseExpression("aa${1+2}bb${3*100}");
* System.out.println(FieldTokenProcessor.evaluateExpression(expression, null));
*/
public class FieldTokenProcessor {
private static FieldTokenProcessor precossor = new FieldTokenProcessor();
static {
try {
AviatorEvaluator.addStaticFunctions("s", StringUtils.class);
AviatorEvaluator.addStaticFunctions("util", DapAviatorUtil.class);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
}
public static FieldTokenProcessor instance() {
return precossor;
}
/**
* 分析表达式,返回分析结果
* 实例:
* List expressionFragment = parseExpression("my name is {name}, i have age {datediff(sysdate(), string_to_date(birthday, 'yyyy-MM-dd'), 'y')}, thank you!")
* //结果:["my name is ", Expression("name"), ", i have age ", Expression("datediff(sysdate(), string_to_date(birthday, 'yyyy-MM-dd'), 'y')"), ", thank you!"]
*
* @param expression
* @return
*/
public List parseExpression(String expression) {
List expressionFragment = new ArrayList();
if (expression == null)
return expressionFragment;
int tokenBegin = 0, end = expression.length() - 1;
StringBuffer buf = new StringBuffer();
boolean find = false;
for (int i = 0; i 0) {
expressionFragment.add(buf.toString());
buf.setLength(0);
}
i++;
tokenBegin = i + 1;
find = true;
} else if (expression.charAt(i) == '}' && find) {
String exp = expression.substring(tokenBegin, i);
if (exp.length() > 0) {
expressionFragment.add(AviatorEvaluator.compile(exp, true));
}
find = false;
} else if (!find){
buf.append(ch);
}
}
if (find) {
throw new RuntimeException("表达式变量未设置结束嵌套符 - " + expression);
} else if (buf.length() > 0) {
expressionFragment.add(buf.toString());
}
return expressionFragment;
}
public String evaluateExpression(List expressionFragment, Map env) {
if (expressionFragment == null)
return "";
StringBuffer buf = new StringBuffer();
for (Object o : expressionFragment) {
if (o instanceof String) {
buf.append(o);
} else {
Expression exp = (Expression)o;
Object result = exp.execute(env);
buf.append(result);
}
}
return buf.toString();
}
}
- 打开com.alibaba.datax.common.util包下StrUtil.java源码,修改原动态变量替换代码。
// public static String replaceVariable(final String param) {
// Map mapping = new HashMap();
//
// Matcher matcher = VARIABLE_PATTERN.matcher(param);
// while (matcher.find()) {
// String variable = matcher.group(2);
// String value = System.getProperty(variable);
// if (StringUtils.isBlank(value)) {
// value = matcher.group();
// }
// mapping.put(matcher.group(), value);
// }
//
// String retString = param;
// for (final String key : mapping.keySet()) {
// retString = retString.replace(key, mapping.get(key));
// }
//
// return retString;
// }
public static String replaceVariable(final String param) {
Map env = buildEnv();
List expression = FieldTokenProcessor.instance().parseExpression(param);
return FieldTokenProcessor.instance().evaluateExpression(expression, env);
}
private static SimpleDateFormat dateSdf = new SimpleDateFormat("yyyy-MM-dd");
private static Map buildEnv() {
Map env = new HashMap<>();
Calendar cal = Calendar.getInstance();
env.put("date", dateSdf.format(new Date()));
env.put("year", cal.get(Calendar.YEAR));
env.put("quarter", (cal.get(Calendar.MONTH) / 3) + 1);
env.put("month", cal.get(Calendar.MONTH) + 1);
env.put("week", cal.get(Calendar.WEEK_OF_YEAR));
env.put("day", cal.get(Calendar.DATE));
env.put("hour", cal.get(Calendar.HOUR_OF_DAY));
env.put("minute", cal.get(Calendar.MINUTE));
// 遍历环境变量,查找变量名里不带'.'的,认为是系统传入的变量,例如:lastTime
// TODO 有可能覆盖前面预设值的变量
Properties p = System.getProperties();
for (Map.Entry entry : p.entrySet()) {
String key = (String)entry.getKey();
if (key.indexOf('.') == -1) {
env.put(key, entry.getValue());
}
}
return env;
}
- datax-common项目打包,部署:找到datax的lib目录,把这两个文件拷贝进去
使用示例:
执行结果:
Original: https://blog.csdn.net/zouxuemo/article/details/125123214
Author: 大卫·科波菲
Title: 扩展DataX的动态传参机制实现Avaitor表达式支持