扩展DataX的动态传参机制实现Avaitor表达式支持

大数据16

DataX 是阿里开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

DataX一般和DataX-Web联合使用,实现对任意数据库之间数据同步的调度执行。对于数据的同步,可以是全量更新和增量更新两种方式,对于大数据量的事物数据,例如:销售记录数据的同步,一般都是选择增量更新方式。

DataX-Web提供的增量更新支持基于自增ID的增量更新和基于时间的增量更新。在调度执行job任务时,dataX-Web会动态生成参数并在调用DataX执行时传入,例如:

Python /opt/module/datax/bin/datax.py -p "-DlastTime=2022-01-01 -DcurrentTime=2022-06-04"

DataX会把传入的参数写入环境变量,读入json脚本时会通过正则表达式查找${}包含的变量,并搜索环境变量进行替换。

扩展DataX的动态传参机制实现Avaitor表达式支持

扩展DataX的动态传参机制实现Avaitor表达式支持

扩展DataX的动态传参机制实现Avaitor表达式支持

通过以上代码分析,可知在json任意的位置定义${}变量,都能够被替换,如:

扩展DataX的动态传参机制实现Avaitor表达式支持

DataX的动态变量替换机制不够灵活,只能实现简单的变量替换,如果需要复杂的变量支持时,DataX就无法实现。例如:DataX-Web只能提供lastTime和currentTime两个参数,如果还需要支持其他参数,就没有办法了。

针对这块考虑对DataX进行修改,引入avaitor表达式框架,通过Avaitor表达式丰富的功能,实现复杂的动态参数机制。

实现代码:

  1. 下载DataX源码,并打开datax-common/pom.xml。添加avaitor表达式依赖。

            org.apache.commons
            commons-lang3

            com.alibaba
            fastjson

            commons-io
            commons-io

            com.googlecode.aviator
            aviator
            5.2.7

  1. 在com.alibaba.datax.common.util包下添加FieldTokenProcessor.java类。
package com.alibaba.datax.common.util;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.lang3.StringUtils;

import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.Expression;

/**
 * @author zouxuemo
 *
 * 提供对字符串中定义的表达式进行处理(通过Avaitor执行表达式计算)并重新拼装字符串的工具类
 * List expression = FieldTokenProcessor.parseExpression("aa${1+2}bb${3*100}");
 * System.out.println(FieldTokenProcessor.evaluateExpression(expression, null));
 */
public class FieldTokenProcessor {
    private static FieldTokenProcessor precossor = new FieldTokenProcessor();

    static {
        try {
            AviatorEvaluator.addStaticFunctions("s", StringUtils.class);
            AviatorEvaluator.addStaticFunctions("util", DapAviatorUtil.class);
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        }
    }

    public static FieldTokenProcessor instance() {
        return precossor;
    }

    /**
     * 分析表达式,返回分析结果
     * 实例:
     *  List expressionFragment = parseExpression("my name is {name}, i have age {datediff(sysdate(), string_to_date(birthday, 'yyyy-MM-dd'), 'y')}, thank you!")
     *  //结果:["my name is ", Expression("name"), ", i have age ", Expression("datediff(sysdate(), string_to_date(birthday, 'yyyy-MM-dd'), 'y')"), ", thank you!"]
     *
     * @param expression
     * @return
     */
    public List parseExpression(String expression)  {
        List expressionFragment = new ArrayList();

        if (expression == null)
            return expressionFragment;

        int tokenBegin = 0, end = expression.length() - 1;
        StringBuffer buf = new StringBuffer();
        boolean find = false;
        for (int i = 0; i  0) {
                    expressionFragment.add(buf.toString());

                    buf.setLength(0);
                }

                i++;
                tokenBegin = i + 1;
                find = true;
            } else if (expression.charAt(i) == '}' && find) {
                String exp = expression.substring(tokenBegin, i);
                if (exp.length() > 0) {
                    expressionFragment.add(AviatorEvaluator.compile(exp, true));
                }

                find = false;
            } else if (!find){
                buf.append(ch);
            }
        }

        if (find) {
            throw new RuntimeException("表达式变量未设置结束嵌套符 - " + expression);
        } else if (buf.length() > 0) {
            expressionFragment.add(buf.toString());
        }

        return expressionFragment;
    }

    public String evaluateExpression(List expressionFragment, Map env) {
        if (expressionFragment == null)
            return "";

        StringBuffer buf = new StringBuffer();
        for (Object o : expressionFragment) {
            if (o instanceof String) {
                buf.append(o);
            } else {
                Expression exp = (Expression)o;
                Object result = exp.execute(env);

                buf.append(result);
            }
        }

        return buf.toString();
    }
}
  1. 打开com.alibaba.datax.common.util包下StrUtil.java源码,修改原动态变量替换代码。
//    public static String replaceVariable(final String param) {
//        Map mapping = new HashMap();
//
//        Matcher matcher = VARIABLE_PATTERN.matcher(param);
//        while (matcher.find()) {
//            String variable = matcher.group(2);
//            String value = System.getProperty(variable);
//            if (StringUtils.isBlank(value)) {
//                value = matcher.group();
//            }
//            mapping.put(matcher.group(), value);
//        }
//
//        String retString = param;
//        for (final String key : mapping.keySet()) {
//            retString = retString.replace(key, mapping.get(key));
//        }
//
//        return retString;
//    }

    public static String replaceVariable(final String param) {
        Map env = buildEnv();

        List expression = FieldTokenProcessor.instance().parseExpression(param);
        return FieldTokenProcessor.instance().evaluateExpression(expression, env);
    }

    private static SimpleDateFormat dateSdf = new SimpleDateFormat("yyyy-MM-dd");
    private static Map buildEnv() {
        Map env = new HashMap<>();
        Calendar cal = Calendar.getInstance();
        env.put("date", dateSdf.format(new Date()));
        env.put("year", cal.get(Calendar.YEAR));
        env.put("quarter", (cal.get(Calendar.MONTH) / 3) + 1);
        env.put("month", cal.get(Calendar.MONTH) + 1);
        env.put("week", cal.get(Calendar.WEEK_OF_YEAR));
        env.put("day", cal.get(Calendar.DATE));
        env.put("hour", cal.get(Calendar.HOUR_OF_DAY));
        env.put("minute", cal.get(Calendar.MINUTE));

        // 遍历环境变量,查找变量名里不带'.'的,认为是系统传入的变量,例如:lastTime
        // TODO 有可能覆盖前面预设值的变量
        Properties p = System.getProperties();
        for (Map.Entry entry : p.entrySet()) {
            String key = (String)entry.getKey();
            if (key.indexOf('.') == -1) {
                env.put(key, entry.getValue());
            }
        }

        return env;
    }
  1. datax-common项目打包,部署:找到datax的lib目录,把这两个文件拷贝进去

扩展DataX的动态传参机制实现Avaitor表达式支持

使用示例:

扩展DataX的动态传参机制实现Avaitor表达式支持

执行结果:

扩展DataX的动态传参机制实现Avaitor表达式支持

Original: https://blog.csdn.net/zouxuemo/article/details/125123214
Author: 大卫·科波菲
Title: 扩展DataX的动态传参机制实现Avaitor表达式支持