DataX的安装及使用

Python43

DataX的安装及使用

DataX的安装

DataX不需要依赖其他服务,直接上传、解压、安装、配置环境变量即可
也可以直接在windows上解压

DataX的使用

stream2stream

编写配置文件stream2stream.json
# stream2stream.json
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "sliceRecordCount": 10,
            "column": [
              {
                "type": "long",
                "value": "10"
              },
              {
                "type": "string",
                "value": "hello,你好,世界-DataX"
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 5
       }
    }
  }
}
执行同步任务
datax.py stream2stream.json
执行结果

DataX的安装及使用

mysql2mysql

需要新建student2数据库,并创建student表

编写配置文件mysql2mysql.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz",
                            "last_mod"
                        ],
                        "splitPk": "age",
                        "connection": [
                            {
                                "table": [
                                    "student"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/student"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz",
                            "last_mod"
                        ],
                        "preSql": [
                            "truncate student2"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://master:3306/student2?useUnicode=true&characterEncoding=utf8",
                                "table": [
                                    "student2"
                                ]
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 6
            }
        }
    }
}
执行同步任务
datax.py mysql2mysql.json

mysql2hdfs

写hive跟hdfs时一样的

编写配置文件mysql2hdfs.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz",
                            "last_mod"
                        ],
                        "splitPk": "age",
                        "connection": [
                            {
                                "table": [
                                    "student"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/student"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://master:9000",
                        "fileType": "text",
                        "path": "/user/hive/warehouse/datax.db/students",
                        "fileName": "student",
                        "column": [
                            {
                                "name": "id",
                                "type": "bigint"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            },
                            {
                                "name": "age",
                                "type": "INT"
                            },
                            {
                                "name": "gender",
                                "type": "string"
                            },
                            {
                                "name": "clazz",
                                "type": "string"
                            },
                            {
                                "name": "last_mod",
                                "type": "string"
                            }
                        ],
                        "writeMode": "append",
                        "fieldDelimiter": ","
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 6
            }
        }
    }
}

hbase2mysql

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hbase11xreader",
                    "parameter": {
                        "hbaseConfig": {
                            "hbase.zookeeper.quorum": "master:2181"
                        },
                        "table": "student",
                        "encoding": "utf-8",
                        "mode": "normal",
                        "column": [
                            {
                                "name": "rowkey",
                                "type": "string"
                            },
                            {
                                "name": "cf1:name",
                                "type": "string"
                            },
                            {
                                "name": "cf1:age",
                                "type": "string"
                            },
                            {
                                "name": "cf1:gender",
                                "type": "string"
                            },
                            {
                                "name": "cf1:clazz",
                                "type": "string"
                            }
                        ],
                        "range": {
                            "startRowkey": "",
                            "endRowkey": "",
                            "isBinaryRowkey": false
                        }
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz"
                        ],
                        "preSql": [
                            "truncate student2"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://master:3306/student2?useUnicode=true&characterEncoding=utf8",
                                "table": [
                                    "student2"
                                ]
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 6
            }
        }
    }
}

mysql2hbase

mysql中的score表需将cource_id改为course_id,并将student_id、course_id设为主键,并将所有字段的类型改为int
hbase需先创建score表:create 'score','cf1'

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "student_id",
                            "course_id",
                            "score"
                        ],
                        "splitPk": "course_id",
                        "connection": [
                            {
                                "table": [
                                    "score"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/student"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "hbase11xwriter",
                    "parameter": {
                      "hbaseConfig": {
                        "hbase.zookeeper.quorum": "master:2181"
                      },
                      "table": "score",
                      "mode": "normal",
                      "rowkeyColumn": [
                          {
                            "index":0,
                            "type":"string"
                          },
                          {
                            "index":-1,
                            "type":"string",
                            "value":"_"
                          },
                          {
                            "index":1,
                            "type":"string"
                          }
                      ],
                      "column": [
                        {
                          "index":2,
                          "name": "cf1:score",
                          "type": "int"
                        }
                      ],
                      "encoding": "utf-8"
                    }
                  }
            }
        ],
        "setting": {
            "speed": {
                "channel": 6
            }
        }
    }
}

mysql2Phoenix

在Phoenix中创建STUDENT表
CREATE TABLE IF NOT EXISTS STUDENT (
 ID VARCHAR NOT NULL PRIMARY KEY,
 NAME VARCHAR,
 AGE BIGINT,
 GENDER VARCHAR ,
 CLAZZ VARCHAR
);
编写配置文件MySQLToPhoenix.json
{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz"
                        ],
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": [
                                    "student"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/student?useSSL=false"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "hbase11xsqlwriter",
                    "parameter": {
                        "batchSize": "256",
                        "column": [
                            "ID",
                            "NAME",
                            "AGE",
                            "GENDER",
                            "CLAZZ"
                        ],
                        "hbaseConfig": {
                            "hbase.zookeeper.quorum": "master,node1,node2",
                            "zookeeper.znode.parent": "/hbase"
                        },
                        "nullMode": "skip",
                        "table": "STUDENT"
                    }
                }
            }
        ]
    }
}

HDFSToHBase

将students.txt数据上传至HDFS的 /data/student1/目录
在HBase中创建datax表: create 'datax','cf1'

{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "path": "/data/student1/",
                        "defaultFS": "hdfs://master:9000",
                        "column": [
                            {
                                "index": 0,
                                "type": "string"
                            },
                            {
                                "index": 1,
                                "type": "string"
                            },
                            {
                                "index": 2,
                                "type": "string"
                            },
                            {
                                "index": 3,
                                "type": "string"
                            },
                            {
                                "index": 4,
                                "type": "string"
                            },
                            {
                                "index": 5,
                                "type": "string"
                            }
                        ],
                        "fileType": "text",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "hbase11xwriter",
                    "parameter": {
                        "hbaseConfig": {
                            "hbase.zookeeper.quorum": "master,node1,node2"
                        },
                        "table": "datax",
                        "mode": "normal",
                        "rowkeyColumn": [
                            {
                                "index": 0,
                                "type": "string"
                            },
                            {
                                "index": -1,
                                "type": "string",
                                "value": "_"
                            },
                            {
                                "index": 1,
                                "type": "string"
                            }
                        ],
                        "column": [
                            {
                                "index": 2,
                                "name": "cf1:age",
                                "type": "string"
                            },
                            {
                                "index": 3,
                                "name": "cf1:gender",
                                "type": "string"
                            },
                            {
                                "index": 4,
                                "name": "cf1:clazz",
                                "type": "string"
                            },
                            {
                                "index": 5,
                                "name": "cf1:ts",
                                "type": "string"
                            }
                        ],
                        "versionColumn": {
                            "index": 5
                        },
                        "encoding": "utf-8"
                    }
                }
            }
        ]
    }
}

Original: https://www.cnblogs.com/lmandcc/p/15435407.html
Author: lmandcc
Title: DataX的安装及使用



相关阅读

Title: Python之进程+线程+协程(同步对象、信号量、队列)

文章目录

本篇是关于Python进程方面的内容了,主要是Event同步对象,信号量和队列

Event同步对象

1、概念:
我们可以对一个线程set一个值来等待,在等待期间,其他线程都不能继续往下执行,直到这个值被clear,其他的线程才能接着往下执行

就比如考试,监考老师进入教室,一群学生开始考试;
然后只有老师说停笔,你才能够走出考场;
即使你卷子已经做完了,也得等着老师的通知才能离开。

2、测试代码:

import threading,time#老师类class Teacher(threading.Thread):    def run(self):        print("老师:开始考试,今天考到12:30!")        print(event.isSet())  #查看是否有设置值        #开始设置值        event.set()          time.sleep(5)  #设置标准考试时间为5秒钟        #老师的动作        print("老师:12:30到了,考试结束!")        #老师的动作结束了,所以再次查看是否设置了值还是False                print(event.isSet())  #再次查看是否设置了值                event.set()#学生类      class Student(threading.Thread):    def run(self):        event.wait()        print("学生:唉,开考了......")        time.sleep(1)  #假设学生考试只花1秒中        event.clear()  #清除        event.wait()        print("学生:终于考完了......")        if __name__=="__main__":    #实例化一个Event对象    event=threading.Event()    L= []        for i in range(5):        L.append(Student())            L.append(Teacher())    #启动线程    for t in L:        t.start()    for t in L:        t.join()

3、测试结果:

DataX的安装及使用

可以看到,几个学生同时开考,等老师这个任务结束后,这群学生才可以结束

semaphore信号量

1、引用概念:

信号量用来控制可以同时开启线程的个数,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。
计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)
BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

2、测试代码:

import threading,time#继承线程的类class MyThread(threading.Thread):    def run(self):        if semaphore.acquire():            print(self.name)  #输出线程名            time.sleep(5)  #睡5秒,每一个线程都会停留5秒            #释放信号量            semaphore.release()             if __name__=="__main__":    #设置信号量,为5表明可以一次性执行的线程是5个    semaphore=threading.Semaphore(5)    L= []        for i in range(100):        L.append(MyThread())            for t in L:  #启动线程        t.start()

3、测试结果:

DataX的安装及使用

可以看到,线程每五个五个得出来

队列

1、先进先出型:

import queue  #导入线程队列L= []#创建线程队列对象q= queue.Queue(5)#能装5个对象的队列(不指定,则任意大小),block=False表示队列满了会提示错误信息#在线程队列放入值q.put([1,233333])q.put([2,'little girl'])q.put([3,{'name':'初音'}])#q.put({'name2':'初音2'})#取值while True:    data= q.get(block=True)    #block=False表示如果卡住了会提示错误,    #因为该线程队列已经没有数据可以取了,所以会提示队列空的信息    print(data,'------')

DataX的安装及使用

2、先进后出型:

import queue#后进先出队列q= queue.LifoQueue()#在线程队列放入值q.put([1,233333])q.put([2,'little girl'])q.put([3,{'name':'初音'}])#取值while True:    data= q.get(block=False)    #block=False表示如果卡住了会提示错误,    #因为该线程队列已经没有数据可以取了,所以会提示队列空的信息    print(data,'-----')队列的其他方法import queue#创建队列q= queue.Queue()#在线程队列放入值q.put([1,233333])q.put([2,'little girl'])q.put([3,{'name':'初音'}])#取值print(q.qsize())  #队列值的个数print(q.empty())  #是否为空print(q.full())  #是否满q.task_done()

DataX的安装及使用

Original: https://blog.51cto.com/u_15738244/5535523
Author: mb62e7593c01ba5
Title: Python之进程+线程+协程(同步对象、信号量、队列)