DataX的安装及使用
DataX的安装
DataX不需要依赖其他服务,直接上传、解压、安装、配置环境变量即可
也可以直接在windows上解压
DataX的使用
stream2stream
编写配置文件stream2stream.json
# stream2stream.json
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
执行同步任务
datax.py stream2stream.json
执行结果
mysql2mysql
需要新建student2数据库,并创建student表
编写配置文件mysql2mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
"last_mod"
],
"splitPk": "age",
"connection": [
{
"table": [
"student"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
"last_mod"
],
"preSql": [
"truncate student2"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/student2?useUnicode=true&characterEncoding=utf8",
"table": [
"student2"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
执行同步任务
datax.py mysql2mysql.json
mysql2hdfs
写hive跟hdfs时一样的
编写配置文件mysql2hdfs.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
"last_mod"
],
"splitPk": "age",
"connection": [
{
"table": [
"student"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://master:9000",
"fileType": "text",
"path": "/user/hive/warehouse/datax.db/students",
"fileName": "student",
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "INT"
},
{
"name": "gender",
"type": "string"
},
{
"name": "clazz",
"type": "string"
},
{
"name": "last_mod",
"type": "string"
}
],
"writeMode": "append",
"fieldDelimiter": ","
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
hbase2mysql
{
"job": {
"content": [
{
"reader": {
"name": "hbase11xreader",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master:2181"
},
"table": "student",
"encoding": "utf-8",
"mode": "normal",
"column": [
{
"name": "rowkey",
"type": "string"
},
{
"name": "cf1:name",
"type": "string"
},
{
"name": "cf1:age",
"type": "string"
},
{
"name": "cf1:gender",
"type": "string"
},
{
"name": "cf1:clazz",
"type": "string"
}
],
"range": {
"startRowkey": "",
"endRowkey": "",
"isBinaryRowkey": false
}
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz"
],
"preSql": [
"truncate student2"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/student2?useUnicode=true&characterEncoding=utf8",
"table": [
"student2"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
mysql2hbase
mysql中的score表需将cource_id改为course_id,并将student_id、course_id设为主键,并将所有字段的类型改为int
hbase需先创建score表:create 'score','cf1'
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"student_id",
"course_id",
"score"
],
"splitPk": "course_id",
"connection": [
{
"table": [
"score"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
]
}
]
}
},
"writer": {
"name": "hbase11xwriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master:2181"
},
"table": "score",
"mode": "normal",
"rowkeyColumn": [
{
"index":0,
"type":"string"
},
{
"index":-1,
"type":"string",
"value":"_"
},
{
"index":1,
"type":"string"
}
],
"column": [
{
"index":2,
"name": "cf1:score",
"type": "int"
}
],
"encoding": "utf-8"
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
mysql2Phoenix
在Phoenix中创建STUDENT表
CREATE TABLE IF NOT EXISTS STUDENT (
ID VARCHAR NOT NULL PRIMARY KEY,
NAME VARCHAR,
AGE BIGINT,
GENDER VARCHAR ,
CLAZZ VARCHAR
);
编写配置文件MySQLToPhoenix.json
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz"
],
"splitPk": "id",
"connection": [
{
"table": [
"student"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student?useSSL=false"
]
}
]
}
},
"writer": {
"name": "hbase11xsqlwriter",
"parameter": {
"batchSize": "256",
"column": [
"ID",
"NAME",
"AGE",
"GENDER",
"CLAZZ"
],
"hbaseConfig": {
"hbase.zookeeper.quorum": "master,node1,node2",
"zookeeper.znode.parent": "/hbase"
},
"nullMode": "skip",
"table": "STUDENT"
}
}
}
]
}
}
HDFSToHBase
将students.txt数据上传至HDFS的
/data/student1/
目录
在HBase中创建datax表:create 'datax','cf1'
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/data/student1/",
"defaultFS": "hdfs://master:9000",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
{
"index": 5,
"type": "string"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "hbase11xwriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master,node1,node2"
},
"table": "datax",
"mode": "normal",
"rowkeyColumn": [
{
"index": 0,
"type": "string"
},
{
"index": -1,
"type": "string",
"value": "_"
},
{
"index": 1,
"type": "string"
}
],
"column": [
{
"index": 2,
"name": "cf1:age",
"type": "string"
},
{
"index": 3,
"name": "cf1:gender",
"type": "string"
},
{
"index": 4,
"name": "cf1:clazz",
"type": "string"
},
{
"index": 5,
"name": "cf1:ts",
"type": "string"
}
],
"versionColumn": {
"index": 5
},
"encoding": "utf-8"
}
}
}
]
}
}
Original: https://www.cnblogs.com/lmandcc/p/15435407.html
Author: lmandcc
Title: DataX的安装及使用
相关阅读
Title: Python之进程+线程+协程(同步对象、信号量、队列)
文章目录
- Event同步对象
- semaphore信号量
- 队列
本篇是关于Python进程方面的内容了,主要是Event同步对象,信号量和队列
Event同步对象
1、概念:
我们可以对一个线程set一个值来等待,在等待期间,其他线程都不能继续往下执行,直到这个值被clear,其他的线程才能接着往下执行
就比如考试,监考老师进入教室,一群学生开始考试;
然后只有老师说停笔,你才能够走出考场;
即使你卷子已经做完了,也得等着老师的通知才能离开。
2、测试代码:
import threading,time#老师类class Teacher(threading.Thread): def run(self): print("老师:开始考试,今天考到12:30!") print(event.isSet()) #查看是否有设置值 #开始设置值 event.set() time.sleep(5) #设置标准考试时间为5秒钟 #老师的动作 print("老师:12:30到了,考试结束!") #老师的动作结束了,所以再次查看是否设置了值还是False print(event.isSet()) #再次查看是否设置了值 event.set()#学生类 class Student(threading.Thread): def run(self): event.wait() print("学生:唉,开考了......") time.sleep(1) #假设学生考试只花1秒中 event.clear() #清除 event.wait() print("学生:终于考完了......") if __name__=="__main__": #实例化一个Event对象 event=threading.Event() L= [] for i in range(5): L.append(Student()) L.append(Teacher()) #启动线程 for t in L: t.start() for t in L: t.join()
3、测试结果:
可以看到,几个学生同时开考,等老师这个任务结束后,这群学生才可以结束
semaphore信号量
1、引用概念:
信号量用来控制可以同时开启线程的个数,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。
计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)
BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。
2、测试代码:
import threading,time#继承线程的类class MyThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) #输出线程名 time.sleep(5) #睡5秒,每一个线程都会停留5秒 #释放信号量 semaphore.release() if __name__=="__main__": #设置信号量,为5表明可以一次性执行的线程是5个 semaphore=threading.Semaphore(5) L= [] for i in range(100): L.append(MyThread()) for t in L: #启动线程 t.start()
3、测试结果:
可以看到,线程每五个五个得出来
队列
1、先进先出型:
import queue #导入线程队列L= []#创建线程队列对象q= queue.Queue(5)#能装5个对象的队列(不指定,则任意大小),block=False表示队列满了会提示错误信息#在线程队列放入值q.put([1,233333])q.put([2,'little girl'])q.put([3,{'name':'初音'}])#q.put({'name2':'初音2'})#取值while True: data= q.get(block=True) #block=False表示如果卡住了会提示错误, #因为该线程队列已经没有数据可以取了,所以会提示队列空的信息 print(data,'------')
2、先进后出型:
import queue#后进先出队列q= queue.LifoQueue()#在线程队列放入值q.put([1,233333])q.put([2,'little girl'])q.put([3,{'name':'初音'}])#取值while True: data= q.get(block=False) #block=False表示如果卡住了会提示错误, #因为该线程队列已经没有数据可以取了,所以会提示队列空的信息 print(data,'-----')队列的其他方法import queue#创建队列q= queue.Queue()#在线程队列放入值q.put([1,233333])q.put([2,'little girl'])q.put([3,{'name':'初音'}])#取值print(q.qsize()) #队列值的个数print(q.empty()) #是否为空print(q.full()) #是否满q.task_done()
Original: https://blog.51cto.com/u_15738244/5535523
Author: mb62e7593c01ba5
Title: Python之进程+线程+协程(同步对象、信号量、队列)