Blink使用 CREATE TABLE
作为输出结果数据的格式定义,同时定义数据如何写入到目的数据结果表。
结果表有两种类型:
- Append类型:输出存储是日志系统、消息系统、操作日志类的RDS数据库等,数据流输出结果追加到存储中,不会修改原有数据。
- Update类型:输出存储生命了主键的数据库(如:RDS、HBase等),数据流的输出存在Upsert操作。
CREATE TABLE tableName
(columnName dataType [, columnName dataType ]*)
[ WITH (propertyName=propertyValue [, propertyName=propertyValue ]*) ];
CREATE TABLE rds_output(
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
type='rds',
url='yourDatabaseURL',
tableName='yourTableName',
userName='yourDatabaseUserName',
password='yourDatabasePassword'
);
创建Oracle数据库结果表
- 实时计算将每行结果数据拼成一行SQL写入目标数据库。
- 数据库中存在需要的表时,Oracle结果表会向该表写入或更新数据;不存在需要的表时,Oracle结果表会新建一个用于写入结果的表。
- 逻辑表和物理表的主键不必一致,但是逻辑表的主键必须包含物理表主键。
- 如果未定义主键,以Append方式插入数据;如果已经定义主键,以Upsert方式插入数据。
CREATE TABLE oracle_sink(
employee_id BIGINT,
employee_name VARCHAR,
employee_age INT,
PRIMARY KEY(employee_id)
) WITH (
type = 'oracle',
url = '',
userName = '',
password = '',
tableName = ''
);
参数描述是否必选示例值
结果表类型是固定值为oracle。
数据库连接串是
登录数据库的用户名是无
登录数据库的密码是无
数据库的表名是无
向结果表插入数据的最大尝试次数否默认值为10。
单次写入数据的批次大小。否默认值为50。
去重的缓存大小。否默认值为500。
写超时时间,单位为毫秒。否默认值为500。
更新表的某行数据时,是否不更新该行指定的列数据。否默认不填写。
是否忽略删除操作。否默认值为false。
注意
- batchSize和bufferSize在指定主键后参数才生效。参数在达到任一阈值时都会触发数据写入。
- flushIntervalMs 如果在写超时时间内没有向数据库写入数据,系统会将缓存的数据再写一次。
- excludeUpdateColumns在更新表的某行数据时,默认不更新主键数据。
Oracle 字段类型实时计算Flink版字段类型CHAR、VARCHAR、VARCHAR2VARCHARFLOATDOUBLENUMBERBIGINTDECIMALDECIMAL
CREATE TABLE oracle_source (
employee_id BIGINT,
employee_name VARCHAR,
employ_age INT
) WITH (
type = 'random'
);
CREATE TABLE oracle_sink(
employee_id BIGINT,
employee_name VARCHAR,
employ_age INT,
primary key(employee_id)
)with(
type = 'oracle',
url = 'jdbc:oracle:thin:@192.168.171.62:1521:sit0',
userName = 'blink_test',
password = 'blink_test',
tableName = 'oracle_sink'
);
INSERT INTO oracle_sink
SELECT * FROM oracle_source;
创建交互式分析Hologres结果表
由于Hologres是异步写入数据的,因此需要添加 blink.checkpoint.fail_on_checkpoint_error=true
作业参数,作业异常时才会触发Failover。
create table Hologres_sink(
name varchar,
age BIGINT,
birthday BIGINT
) with (
type='hologres',
dbname='',
tablename='',
username='',
password='',
endpoint='',
field_delimiter='|'
);
参数说明是否必填备注
结果表类型是固定值为hologres。
数据库名称。是无
表名称是无
用户名是无
密码是无
Hologres VPC 端点信息是参考
列表。
导出数据时,不同行之间使用的分隔符。否默认值为"\u0002"。
流式写入语义。否默认值为insertorignore。
分区表写入否默认值为false。
是否忽略撤回消息。否默认值为false。
当写入分区表时,是否根据分区值自动创建不存在的分区表。否false(默认值):不会自动创建。true:自动创建。
注意
- 如果Schema不为Public时,则tableName需要填写为 schema.tableName。
- field_delimiter参数不能在数据中插入分隔符,且需要与 bulkload语义一同使用。
- createPartTable参数如果分区值中存在短划线(-),暂不支持自动创建分区表。
流处理,也称为流数据或流事件处理,即对一系列无界数据或事件连续处理。
根据Hologres Sink的配置和Hologres表的属性,流式语义分为以下两种:
- Exactly-once(仅一次):即使在发生各种故障的情况下,系统只处理一次数据或事件。
- At-least-once(至少一次):如果在系统完全处理之前丢失了数据或事件,则从源头重新传输,因此可以多次处理数据或事件。如果第一次重试成功,则不必进行后续重试。
在Hologres结果表中使用流式语义,注意事项:
- 如果Hologres物理表未设置主键,则Hologres Sink使用 At-least-once语义。
- 如果Hologres物理表已设置主键,则Hologres Sink通过主键确保 Exactly-once语义。当同主键数据出现多次时,需要设置 mutateType参数确定更新结果表的方式。
mutateType取值如下:
- insertorignore(默认值):保留首次出现的数据,忽略后续所有数据。仅第一次数据流触发更新,后续忽略。无法实现局部更新。
- insertorreplace:使用后续出现的数据整行替换已有数据。根据主键覆盖更新。无法实现局部更新。
- insertorupdate:使用后续出现的数据选择性替换已有数据。根据主键覆盖更新。可以局部更新。
默认情况下,Hologres Sink只能向一张表导入数据。如果导入数据至分区表的父表,即使导入成功,也会查询数据失败。可以设置参数 partitionRouter为true,开启自动将数据路由到对应分区表的功能。
注意事项:
- tablename参数需要填写为父表的表名。
- Blink Connector不会自动创建分区表,需要提前手动创建需要导入数据的分区表,否则会导入失败。
在把多个流的数据写到一张Hologres宽表的场景中,会涉及到宽表Merge和数据的局部更新。示例如下:
假设有两个Flink数据流,一个数据流中包含A、B和C字段,另一个数据流中包含A、D和E字段,Hologres宽表WIDE_TABLE包含A、B、C、D和E字段,其中A字段为主键。具体操作如下:
注意
- 宽表必须有主键。
- 每个数据流的数据都必须包含完整的主键字段。
- 列存模式的宽表Merge场景在高RPS的情况下,CPU使用率会偏高,建议关闭表中字段的Dictionary encoding功能。
HologresBLINKINTINTINT[]ARRAY
创建云原生数据仓库AnalyticDB MySQL版2.0结果表
云原生数据仓库AnalyticDB MySQL版是阿里巴巴自主研发的海量数据实时高并发在线分析(Realtime OLAP)云计算服务,支持在毫秒级单位时间内,对千亿级数据进行实时地多维分析透视和业务探索。
CREATE TABLE stream_test_hotline_agent (
id INTEGER,
len BIGINT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
type='ads',
url='yourDatabaseURL',
tableName='',
userName='',
password='',
batchSize='20'
);
参数说明备注type结果表类型固定值为ads。urlJDBC连接地址云原生数据仓库AnalyticDB MySQL版数据库地址。示例:
。tableName表名无username账号无password密码无maxRetryTimes写入重试次数可选,默认值为10。bufferSize流入多少条数据后开始输出。可选,默认值为5000,表示输入的数据达到5000条就开始输出。batchSize一次批量写入的条数可选,默认值为1000。batchWriteTimeoutMs写入超时时间可选,单位为毫秒,默认值为5000。表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。connectionMaxActive单连接池最大连接数可选,默认值为30。ignoreDelete是否忽略delete操作默认值为false。
注意
- 如果错误码是20015,则表示batchSize设置过大。云原生数据仓库AnalyticDB MySQL版batchSize不能超过1 MB。例如,batchSize设置为
1000
,则平均每条记录大小不能超过1 KB。
云原生数据仓库AnalyticDB MySQL版字段类型实时计算Flink版字段类型BOOLEANBOOLEANTINYINT、SMALLINT、INTINTBIGINTBIGINTDOUBELDOUBLEVARCHARVARCHARDATEDATETIMETIMETIMESTAMPTIMESTAMP
创建数据总线DataHub结果表
create table datahub_output(
name VARCHAR,
age BIGINT,
birthday BIGINT
)with(
type='datahub',
endPoint=',
project='<yourProjectName>',
topic='<yourTopicName>',
accessId='<yourAccessId>',
accessKey='<yourAccessKey>',
batchSize='<yourBatchSize>',
batchWriteTimeoutMs='1000'
);
参数参数说明是否必填备注type源表类型是固定值为
。endPointendPoint地址是DataHub的endPoint地址,请参考
。projectDataHub项目名称是无topicDataHub中Topic名称是无accessIdAccessKey ID是无accessKeyAccessKey Secret是无maxRetryTimes读取最大重试次数否Blink 2.2.7以下版本:默认3;Blink 2.2.7及以上版本:默认20batchSize一次批量写入的条数否Blink 3.3以下版本:默认300;Blink 3.3及以上版本:默认100batchWriteTimeoutMs缓存数据的超时时间否可选,单位为毫秒,默认值为5000。表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。maxBlockMessages每次写入的最大Block数否默认值为100。reserveMilliSecondTIMESTAMP类型是否保留毫秒否默认值为false。partitionBy写入结果表前会根据该值进行Hash分类,数据会流向对应的结果表。否默认值为空,随机进行数据分配。
决定数据流到Blink的哪个Subtask。hashFields指定了列名之后,相同列的值会写入到同一个Shard。否默认值为Null,即随机写入。可以指明多个列值,用逗号(,)分隔。例如,
决定数据流写到DataHub的哪个Shard。
DataHub字段类型实时计算字段类型BIGINTBIGINTTIMESTAMPBIGINTSTRINGVARCHARDOUBLEDOUBLEBOOLEANBOOLEANDECIMALDECIMAL
注意
DataHub的TIMESTAMP精确到微秒,在Unix时间戳中为16位,但实时计算定义的TIMESTAMP精确到毫秒,在Unix时间戳中为13位,所以建议您使用BIGINT进行映射。如果您需要使用TIMESTAMP,建议使用计算列进行转换。
create table datahub_input(
name VARCHAR
) with (
type='datahub',
endPoint='http://dh-cn-hangzhou.aliyun-inc.com',
project='test1',
topic='topic1',
accessId='',
accessKey='',
startTime='2018-06-01 00:00:00'
);
create table datahub_output(
name varchar
)with(
type='datahub',
endPoint='http://dh-cn-hangzhou.aliyun-inc.com',
project='test2',
topic='topic2',
accessId='',
accessKey='',
batchSize='1000',
batchWriteTimeoutMs='500'
);
INSERT INTO datahub_output
SELECT
LOWER(name)
from datahub_input;
创建日志服务SLS结果表
日志服务SLS结果表仅支持VARCHAR类型的字段。
日志服务SLS是针对日志类数据的一站式服务。日志服务可以帮助您快捷地完成数据采集、消费、投递以及查询分析,提升运维和运营效率,建立海量日志处理能力。日志服务本身是流数据存储,实时计算Flink版能将其作为流式数据的输入。
create table sls_stream(
`name` VARCHAR,
age BIGINT,
birthday BIGINT
)with(
type='sls',
endPoint='http://cn-hangzhou-corp.sls.aliyuncs.com',
accessId='',
accessKey='',
project='',
logstore=''
);
参数注释说明是否必填备注endPointEndPoint地址是
project项目名是无logstore表名是无accessIdAccessKey ID是无accessKeyAccessKey Secret是无topic属性字段否默认值为空,可以将选定的字段作为属性字段
填充。timestampColumn属性字段否默认值为空,可以将选定的字段作为属性字段
填充(类型必须为INT)。如果未指定,则默认填充当前时间。source属性字段。日志的来源地,例如产生该日志机器的IP地址。否默认值为空,可以将选定的字段作为属性字段
填充。partitionColumn分区列否如果
,则该参数必填。flushIntervalMs触发数据写入的周期否默认值为2000,单位为毫秒。reserveMilliSecondTIMESTAMP类型是否保留毫秒值。否默认值为false,不保留。
日志服务字段类型实时计算Flink版字段类型STRINGVARCHAR
CREATE TABLE random_input (
a VARCHAR,
b VARCHAR) with (
type = 'random'
);
create table sls_output(
a varchar,
b varchar
)with(
type='sls',
endPoint='http://cn-hangzhou-corp.sls.aliyuncs.com',
accessId='',
accessKey='',
project='ali-cloud-streamtest',
logStore='stream-test2'
);
INSERT INTO sls_output
SELECT a, b
FROM random_input;
创建消息队列MQ结果表
CREATE TABLE stream_test_hotline_agent (
id INTEGER,
len BIGINT,
content VARCHAR
) WITH (
type='mq',
endpoint='',
accessID='',
accessKey='',
topic='',
producerGroup='',
tag='',
encoding='utf-8',
fieldDelimiter=',',
retryTimes='5',
sleepTimeMs='500'
);
CREATE TABLE source_table (
commodity VARCHAR
)WITH(
type='random'
);
CREATE TABLE result_table (
mess VARBINARY
) WITH (
type = 'mq',
endpoint='',
accessID='',
accessKey='',
topic='',
producerGroup=''
);
INSERT INTO result_table
SELECT
CAST(SUBSTRING(commodity,0,5) AS VARBINARY) AS mess
FROM source_table;
参数说明备注type结果表类型固定值为mq。topicMessage Queue队列名称无endpoint地址阿里云消息队列提供内网服务MQ(非公网region)和公网服务MQ(公网region)两种类型。accessIDAccessKey ID无accessKeyAccessKey Secret无producerGroup写入的群组无tag写入的标签可选,默认值为空。fieldDelimiter字段分割符可选,默认值为
。分隔符的使用情况如下所示:1.只读模式:以
作为分隔符,
在只读模式不可见。2.编辑模式:以
作为分隔符。encoding编码类型可选,默认值为
。retryTimes写入的重试次数可选,默认值为10。sleepTimeMs重试间隔时间可选,默认值为1000(毫秒)。instanceIDMQ实例ID如果MQ实例无独立命名空间,则不可以使用
参数。如果MQ实例有独立命名空间,则
参数必选。
创建表格存储Tablestore结果表
表格存储Tablestore是基于阿里云飞天分布式系统的分布式NoSQL数据存储服务。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问服务。其本质上就类似于市面上开源的HBase。
CREATE TABLE stream_test_hotline_agent (
name VARCHAR,
age BIGINT,
birthday BIGINT,
PRIMARY KEY (name,age)
) WITH (
type='ots',
instanceName='',
tableName='',
accessId='',
accessKey='',
endPoint='',
valueColumns='birthday'
);
注意
- valueColumns值不能是声明的主键,可以是主键之外的任意字段。
- Tablestore结果表声明中,除主键列外,至少包含一个属性列。
参数说明备注type结果表类型固定值为ots。instanceName实例名无tableName表名无endPoint实例访问地址参见
。accessIdAccessKey ID无accessKeyAccessKey Secret无valueColumns指定插入的字段列名插入多个字段以英文逗号(,)分割。例如
。bufferSize流入多少条数据后开始输出可选,默认值为5000,表示输入的数据达到5000条就开始输出。batchWriteTimeoutMs写入超时的时间可选,单位为毫秒,默认值为5000。表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。batchSize一次批量写入的条数可选,默认值为100。retryIntervalMs重试间隔时间可选,单位毫秒,默认值为1000。maxRetryTimes最大重试次数可选,默认值为100。ignoreDelete是否忽略DELETE操作默认值为False。
注意
- bufferSize根据Tablestore主键对结果数据进行去重后,再在bufferSize的基础上进行batchSize。
- Tablestore结果表必须定义有
Primary Key
,以Update方式写入结果数据到Tablestore表。
创建云数据库RDS MySQL版结果表
CREATE TABLE rds_output(
id INT,
len INT,
content VARCHAR,
PRIMARY KEY (id,len)
) WITH (
type='rds',
url='',
tableName='',
userName='',
password=''
);
注意
- 实时计算写入RDS MySQL数据库结果表原理:针对实时计算Flink版每行结果数据,拼接成一行SQL语句,输入至目标端数据库,然后执行。如果使用批量写,需要在URL后面加上参数
?rewriteBatchedStatements=true
,以提高系统性能。 - 如果实时计算写入数据支持自增主键,则在DDL中不声明该自增字段即可。例如,ID是自增字段,实时计算Flink版DDL不声明该自增字段,则数据库在一行数据写入过程中会自动填补相关自增字段。
- 如果DRDS有分区表,拆分键必须在实时计算Flink版DDL里PRIMARY KEY()中声明,否则拆分的表无法写入。
- DDL声明的字段必须至少存在一个非主键的字段,否则产生报错。
参数说明是否必填备注type结果表类型是固定值为
。urlJDBC(Java DataBase Connectivity)连接地址是URL的格式为:
,其中databaseName为对应的数据库名称。tableName表名是无userName用户名是无password密码是无maxRetryTimes最大重试次数否默认值为10。batchSize一次批量写入的条数否默认值为4096。bufferSize流入多少条数据后开始去重否默认值为10000。flushIntervalMs清空缓存的时间间隔否默认值为2000,单位为毫秒。表示如果缓存中的数据在等待2秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。excludeUpdateColumns忽略指定字段的更新否默认值为空(默认忽略PRIMARY KEY字段)。表示更新主键值相同的数据时,忽略指定字段的更新。ignoreDelete是否忽略delete操作否默认值为false,表示支持delete功能。partitionBy分区否默认为空。表示写入Sink节点前,会根据该值进行Hash分区,数据会流向相应的Hash节点。
RDS字段类型实时计算Flink版字段类型BOOLEANBOOLEANTINYINTTINYINTSMALLINTSMALLINTINTINTBIGINTBIGINTFLOATFLOATDECIMALDECIMALDOUBLEDOUBLEDATEDATETIMETIMETIMESTAMPTIMESTAMPVARCHARVARCHARVARBINARYVARBINARY
参数名称说明默认值最低版本要求useUnicode是否使用Unicode字符集,如果参数characterEncoding设置为GB2312或GBK,本参数值必须设置为true。false1.1gcharacterEncoding当useUnicode设置为true时,指定字符编码。例如可设置为GB2312或GBK。false1.1gautoReconnect当数据库连接异常中断时,是否自动重新连接。false1.1autoReconnectForPools是否使用针对数据库连接池的重连策略。false3.1.3failOverReadOnly自动重连成功后,连接是否设置为只读。true3.0.12maxReconnectsautoReconnect设置为true时,重试连接的次数。31.1initialTimeoutautoReconnect设置为true时,两次重连之间的时间间隔,单位为秒。21.1connectTimeout和数据库服务器建立socket连接时的连接超时时长,单位为毫秒。0表示永不超时,适用于JDK 1.4及以上版本。03.0.1socketTimeoutsocket操作(读写)超时,单位:毫秒。0表示永不超时。03.0.1
CREATE TABLE source (
id INT,
len INT,
content VARCHAR
) with (
type = 'random'
);
CREATE TABLE rds_output(
id INT,
len INT,
content VARCHAR,
PRIMARY KEY (id,len)
) WITH (
type='rds',
url='',
tableName='',
userName='',
password=''
);
INSERT INTO rds_output
SELECT id, len, content FROM source;
创建MaxCompute结果表
MaxCompute中的Clustered Table表不支持作为MaxCompute结果表。
MaxCompute Sink可以分为以下两个阶段:
Commit方法不能提供 原子性。因此,MaxCompute Sink提供的是 At least Once方式,而不是Exactly Once方式。
DDL中定义的字段需要与MaxCompute物理表中的字段名称、顺序以及类型保持一致,否则可能导致MaxCompute物理表中查询的数据为 /n
。
create table odps_output(
id INT,
user_name VARCHAR,
content VARCHAR
) with (
type = 'odps',
endPoint = '',
project = '',
tableName = '',
accessId = '',
accessKey = '',
`partition` = 'ds=2018****'
);
参数说明是否必填备注type结果表类型。是固定值为
。endPointMaxCompute服务地址。是请参见
。tunnelEndpointMaxCompute Tunnel服务的连接地址。是请参见
。projectMaxCompute项目名称。是无。tableName表名。是无。accessIdAccessKey ID。是无。accessKeyAccessKey Secret。是无。partition分区名。否参见注意事项。flushIntervalMsOdps tunnel writer缓冲区Flush间隔,单位毫秒。MaxCompute Sink写入记录时,先放入数据到MaxCompute的缓冲区中,等缓冲区溢出或者每隔一段时间(flushIntervalMs)时,再把缓冲区里的数据写到目标 MaxCompute表。否默认值是30000毫秒,即30秒。partitionBy写入Sink节点前,系统会根据该值做Hash Shuffle,数据就会流向对应的Sink节点。否系统按照多个列进行Hash Shuffle,各个列名之间使用逗号(,)分割。默认为空。isOverwrite写入Sink节点前,是否将结果表清空。否默认参数值为
。在声明MaxCompute的流式作业结果表时,
参数必须为
,否则在编译时会抛出异常。dynamicPartitionLimit分区数目最大值。否默认值是100,内存中会把出现过的分区和Tunnel/writer的映射关系维护到一个Map里,如果这个Map的大小超过了
设定值,则会出现
报错。
注意
如果存在分区表,则必填partition
参数:
- 固定分区:例如
`partition
= 'ds=20180905'表示将数据写入分区
ds= 20180905`。 - 动态分区:如果不明文显示分区的值,则会根据写入数据中的分区列具体的值,写入到不同的分区中。例如
`partition
='ds'表示根据
ds`字段的值写入分区。 如果要创建多级动态分区,With参数中Partition的字段顺序和结果表的DDL中的分区字段顺序,必须与物理表一致,各个分区字段之间使用逗号(,)分割。 动态分区列需要显式写在建表语句中。*
Stream模式的MaxCompute结果表具备 At Least Once**数据保障机制,在作业运行失败后,可能会出现数据重复。
MaxCompute字段类型实时计算Flink版字段类型TINYINTTINYINTSMALLINTSMALLINTINTINTBIGINTBIGINTFLOATFLOATDOUBLEDOUBLEBOOLEANBOOLEANDATETIMETIMESTAMPTIMESTAMPTIMESTAMPVARCHARVARCHARSTRINGVARCHARDECIMALDECIMAL
- 写入固定分区
CREATE TABLE source (
id INT,
len INT,
content VARCHAR
) with (
type = 'random'
);
create table odps_sink (
id INT,
len INT,
content VARCHAR
) with (
type = 'odps',
endPoint = '',
project = '',
tableName = '',
accessId = '',
accessKey = '',
`partition` = 'ds=20180418'
);
INSERT INTO odps_sink
SELECT
id, len, content
FROM source;
- 写入动态分区
CREATE TABLE source (
id INT,
len INT,
content VARCHAR,
c TIMESTAMP
) with (
type = 'random'
);
create table odps_sink (
id INT,
len INT,
content VARCHAR,
ds VARCHAR
) with (
type = 'odps',
endPoint = '',
project = '',
tableName = '',
accessId = '',
accessKey = '',
`partition`='ds'
);
INSERT INTO odps_sink
SELECT
id,
len,
content,
DATE_FORMAT(c, 'yyMMdd') as ds
FROM source;
创建云数据库HBase版结果表
实时计算HBase结果表不支持自建的开源HBase。
HBase标准版示例代码如下。
create table liuxd_user_behavior_test_front (
row_key varchar,
from_topic varchar,
origin_data varchar,
record_create_time varchar,
primary key (row_key)
) with (
type = 'cloudhbase',
zkQuorum = '2',
columnFamily = '',
tableName = '',
batchSize = '500'
);
HBase增强版示例代码如下。
create table liuxd_user_behavior_test_front (
row_key varchar,
from_topic varchar,
origin_data varchar,
record_create_time varchar,
primary key (row_key)
) with (
type = 'cloudhbase',
endPoint = '',
userName = 'root',
password = 'root',
columnFamily = '',
tableName = '',
batchSize = '500'
);
Blink 3.5.0及以上HBase增强版示例代码如下。
create table liuxd_user_behavior_test_front (
row_key varchar,
from_topic varchar,
origin_data varchar,
record_create_time varchar,
primary key (row_key)
) with (
type = 'cloudhbase',
zkQuorum = '',
userName = 'root',
password = 'root',
columnFamily = '',
tableName = '',
batchSize = '500'
);
Blink 3.5.0及以上HBase写入主备切换示例代码如下。
create table liuxd_user_behavior_test_front (
row_key varchar,
from_topic varchar,
origin_data varchar,
record_create_time varchar,
primary key (row_key)
) with (
type = 'cloudhbase',
zkQuorum = '',
haClusterID = 'ha-xxx',
userName = 'root',
password = 'root',
columnFamily = '',
tableName = '',
batchSize = '500'
);
注意
- PRIMARY KEY支持定义多字段。多字段以
rowkeyDelimiter
(默认为:
)作为分隔符进行连接。 - HBase执行撤回删除操作时,如果COLUMN定义了多版本,将清空所有版本的COLUMN值。
- HBase标准版和HBase增强版DDL的区别为连接参数不同:
- HBase标准版使用连接参数
zkQuorum
。 - HBase增强版使用连接参数
endPoint
。
参数说明是否必填备注type结果表类型是固定值为cloudhbase。zkQuorumHBase集群配置的zk地址是可以在hbase-site.xml文件中查看hbase.zookeeper.quorum相关配置。仅在HBase标准版中生效。zkNodeParent集群配置在zk上的路径否可以在hbase-site.xml文件中查看hbase.zookeeper.quorum相关配置。仅在HBase标准版中生效。endPointHBase地域名称是可在购买的HBase实例控制台中获取。仅在HBase增强版中生效。userName用户名否仅在HBase增强版中生效。password密码否仅在HBase增强版中生效。tableNameHBase表名是无columnFamily列族名是仅支持插入同一列族。maxRetryTimes最大尝试次数否默认值为10。bufferSize流入多少条数据后进行去重否默认值为5000。batchSize一次批量写入的条数否默认值为100。建议batchSize参数值为200~300。过大的batchSize值可能导致任务OOM(内存不足)报错。flushIntervalMs周期性清理buffer的间隔,可以减少写入HBase的延迟。否默认值为2000,单位为毫秒。writePkValue是否写入主键值否默认值为false。stringWriteMod是否都按照STRING插入否默认值为false。rowkeyDelimiter
的分隔符否默认值为冒号(:)。isDynamicTable是否为动态表否默认值为false。haClustserIDHBase高可用实例ID否只有访问同城主备实例时才需要配置。
实时计算部分结果数据需要按某列的值,作为动态列输入HBase。HBase中,以每小时的成交额作为动态列的数据,示例如下。
rowkeycf:0cf:1cf:220170707100cf:1300
当 isDynamicTable参数值为true时,表明该表为支持动态列的HBase表。
动态表仅支持3列输出,例如,ROW_KEY、COLUMN和VALUE。此时第2列(本示例中的COLUMN)为动态列,动态表中的其它参数与HBase的WITH参数一致。
使用动态表时,所有数据类型需要先转换为STRING类型,再进行输入。
CREATE TABLE stream_test_hotline_agent (
name varchar,
age varchar,
birthday varchar,
primary key (name)
) WITH (
type = 'cloudhbase',
...
columnFamily = 'cf',
isDynamicTable ='true'
);
以上声明将把 birthday
插入到以 name
为ROW_KEY的 cf:age
列中。例如, (wang,18,2016-12-12)
会插入ROW_KEY为 wang
的行, cf:18
列。
DDL中必选按照从上到下的顺序,声明ROW_KEY(本示例中的 name
)、COLUMN(本示例中的 age
)和VALUE(本示例中的 birthday
),且声明ROW_KEY为PRIMARY KEY。
create table source (
id TINYINT,
name BIGINT
) with (
type = 'random'
);
create table sink (
id TINYINT,
name BIGINT,
primary key (id)
) with (
type = 'cloudhbase',
zkQuorum = '',
columnFamily = '',
tableName = ''
);
INSERT INTO sink
SELECT id, name FROM source;
创建Elasticsearch结果表
CREATE TABLE es_stream_sink(
field1 LONG,
field2 VARBINARY,
field3 VARCHAR,
PRIMARY KEY(field1)
)WITH(
type ='elasticsearch',
endPoint = 'http://es-cn-mp****.public.elasticsearch.aliyuncs.com:****',
accessId = '',
accessKey = '',
index = '',
typeName = ''
);
- ES支持根据PRIMARY KEY进行UPDATE,且PRIMARY KEY只能为1个字段。
- 在指定PRIMARY KEY后,Document的ID为PRIMARY KEY字段的值。
- 在未指定PRIMARY KEY时,Document的ID为随机。
- 在full更新模式下,新增的doc会完全覆盖已存在的doc。
- 在inc更新模式下,系统会依据输入的字段值更新对应的字段。
- 所有的更新默认为UPSERT语义,即INSERT或UPDATE。
参数说明是否必选默认值typeconnector类型。是elasticsearchendPointServer地址,例入:http://127.0.0.1:9211。是无accessId创建ES时的登录名。如果通过Kibana插件操作ES,请填写Kibana登录ID。是无accessKey创建ES时的登录密码。如果通过Kibana插件操作ES,请填写Kibana登录密码。是无index索引名称。是无typeName文档类型。是
bufferSize流入多少条数据后开始去重。否1000maxRetryTimes异常重试次数。否30timeout读超时,单位为毫秒。否600000discovery是否开启节点发现。如果开启,客户端每5分钟刷新一次Server List。否falsecompression是否使用GZIP压缩Request Bodies。否truemultiThread是否开启JestClient多线程。否trueignoreWriteError是否忽略写入异常。否falsesettings创建Index的Settings配置。否无updateMode指定主键(PRIMARY KEY)后的更新模式:full:全量覆盖。inc:增量更新。否full
参数说明是否必选备注dynamicIndex是否开启动态索引。否参数取值如下:true:开启。false(默认值):不开启。indexField抽取索引的字段名。
为true时必填。只支持TIMESTAMP(以秒为单位)、DATE和LONG3种数据类型。indexInterval切换索引的周期。
为true时必填。参数值如下:d(默认值):天。m:月。w:周mapping启用动态索引时,设置文档各字段的类型与格式。例如,设置名为sendTime字段的格式:
否默认值为空。
- 当开启动态索引后,基本配置中的
index
名称会作为后续创建索引的统一Alias,Alias和索引为一对多关系。 - 不同的
indexInterval
对应的真实索引名称: - d -> Alias "yyyyMMdd"
- m -> Alias "yyyyMM"
- w -> Alias "yyyyMMW"
- 对于单个的真实索引可使用Index API进行修改,但对于Alias只支持
get
功能。
CREATE TABLE es_stream_sink(
field1 LONG,
field2 VARBINARY,
field3 TIMESTAMP,
PRIMARY KEY(field1)
)WITH(
type ='elasticsearch',
endPoint = 'http://es-cn-mp****.public.elasticsearch.aliyuncs.com:****',
accessId = '',
accessKey = '',
index = '',
typeName = '',
dynamicIndex = 'true',
indexField = 'field3',
indexInterval = 'd'
);
创建时序数据库结果表
实时计算引用时序数据库(TSDB)结果表需要配置数据存储白名单。
阿里云时序数据库(Time Series Database,简称TSDB)是一种集时序数据高效读写、压缩存储、实时计算能力为一体的数据库服务,可以广泛应用于物联网和互联网领域,实现对设备及业务服务的实时监控,实时预测告警。
CREATE TABLE stream_test_hitsdb (
metric VARCHAR,
`timestamp` INTEGER,
`value` DOUBLE,
tagk1 VARCHAR,
tagk2 VARCHAR,
tagk3 VARCHAR
) WITH (
type='hitsdb',
host='',
virtualDomainSwitch = 'false',
httpConnectionPool = '20',
batchPutSize = '1000'
);
建表默认格式:
- 第0列:metric(VARCHAR)。
- 第1列:timestamp(INTEGER),单位为秒。
- 第2列:value(DOUBLE)。
- 第3~N列:TagKey,即时间序列数据库中的fieldName。
- tag可以为多列。
- 必须声明 metric、 timestamp和 value,且字段名称、字段顺序和字段数据类型必须和TSDB保持完全一致。
参数说明备注type结果表类型固定值为
。hostIP或VIP域名填写注册实例的Host。port端口默认值为8242。virtualDomainSwitch是否使用VIPServer默认值为false,如果需要使用VIPServer,则virtualDomainSwitch设置为true。httpConnectionPoolHTTP连接池默认值为10。httpCompress使用GZIP压缩默认值为false,即不压缩。httpConnectTimeoutHTTP连接超时时间默认值为0。ioThreadCountIO线程数默认值为1。batchPutBufferSize缓冲区大小默认值为10000。batchPutRetryCount写入重试次数默认值为3。batchPutSize每次提交数据量默认每次提交500个数据点。batchPutTimeLimit缓冲区等待时间默认值为200,单位为毫秒。batchPutConsumerThreadCount序列化线程数默认值为1。
Blink 3.2 及以上版本支持6种Blink写入TSDB的模型,分别是:
- 支持单值无tag数据点的写入,其schema格式如下所示,包括3个字段,这3个字段名称不可使用其他名称代替。
metric,timestamp,value
- 支持单值带tag数据点的写入,其schema格式如下所示,除metric、timestamp和value关键词名字必须保持一致外,tag的名称可以任意指定。
metric,timestamp,value,tagKey1,....,tagKeyN
- 支持单值带不确定tag个数的数据点的写入,其schema格式如下所示,包括4个字段,这4个字段名称不可使用其他名称代替。
metric,timestamp,value,tags
其中,tags的内容为形如如下格式的JSON字符串,便于绕过Blink table schema需要固定tag个数的限制。
{"tagKey1":"tagValue1","tagKey2":"tagValue2",......,"tagKeyN":"tagValueN"}
- 支持多值无tag数据点的写入,其schema格式如下所示。
metric,timestamp,field_name1,field_name2,......,field_nameN
其中metric和timestamp字段名称不可使用其他名称代替。对于多值的fields,由于需要区分tag和field,同时兼容之前的单值写入,这里约定需要给每个field加上固定前缀field,自动识别field字段。例如,对于一个field名称 field_name1
,在写入TSDB时,会自动将前缀 field_
去掉,只保留name1,即对于上面的schema,实际写入TSDB的格式如下,name1和name2是多值field的名称。
metric,timestamp,name1,name2,......,nameN
- 支持多值带tag数据点的写入,其schema格式如下所示,除metric和timestamp关键词名字必须保持一致外,tag的名称可以任意指定。
metric,timestamp,tagKey1,....,tagKeyN,field_name1,field_name2,......,field_nameN
- 支持单值带不确定tag个数的数据点的写入,其schema格式如下所示。
metric,timestamp,tags,field_name1,field_name2,......,field_nameN
其中,tags的内容类似如下JSON字符串,便于绕过Blink table schema需要固定tag个数的限制。
{"tagKey1":"tagValue1","tagKey2":"tagValue2",......,"tagKeyN":"tagValueN"}
创建消息队列Kafka结果表
Kafka结果表支持写入自建Kafka集群。
create table sink_kafka (
messageKey VARBINARY,
`message` VARBINARY,
PRIMARY KEY (messageKey)
) with (
type = 'kafka010',
topic = '',
bootstrap.servers = ''
);
-
创建Kafka结果表时,必须明文指定
PRIMARY KEY (messageKey)
。 -
通用配置参数说明是否必选备注typeKafka对应版本是必须是Kafka08、Kafka09、Kafka010、Kafka011中的一种。topicTopic名称是无
- 必选配置
- Kafka08必选配置参数说明zookeeper.connectzk连接ID
- Kafka09/Kafka010/Kafka011必选配置参数说明bootstrap.serversKafka集群地址
- 可选配置参数
consumer.id
socket.timeout.ms
fetch.message.max.bytes
num.consumer.fetchers
auto.commit.enable
auto.commit.interval.ms
queued.max.message.chunks
rebalance.max.retries
fetch.min.bytes
fetch.wait.max.ms
rebalance.backoff.ms
refresh.leader.backoff.ms
auto.offset.reset
consumer.timeout.ms
exclude.internal.topics
partition.assignment.strategy
client.id
zookeeper.session.timeout.ms
zookeeper.connection.timeout.ms
zookeeper.sync.time.ms
offsets.storage
offsets.channel.backoff.ms
offsets.channel.socket.timeout.ms
offsets.commit.max.retries
dual.commit.enabled
partition.assignment.strategy
socket.receive.buffer.bytes
fetch.min.bytes
Kafka官方文档:
typeKafka版本Kafka080.8.22Kafka090.9.0.1Kafka0100.10.2.1Kafka0110.11.0.2及以上
create table datahub_input (
id VARCHAR,
nm VARCHAR
) with (
type = 'datahub'
);
create table sink_kafka (
messageKey VARBINARY,
`message` VARBINARY,
PRIMARY KEY (messageKey)
) with (
type = 'kafka010',
topic = '',
bootstrap.servers = ''
);
INSERT INTO
sink_kafka
SELECT
cast(id as VARBINARY) as messageKey,
cast(nm as VARBINARY) as `message`
FROM
datahub_input;
创建云数据库HybridDB for MySQL结果表
云数据库HybridDB for MySQL(原名PetaData)是同时支持海量数据在线事务(OLTP)和在线分析(OLAP)的HTAP(Hybrid Transaction/Analytical Processing)关系型数据库。HybridDB for MySQL采用一份数据存储来进行OLTP和OLAP处理,避免把一份数据复制多次进行数据分析,极大地降低了数据存储的成本。
create table petadata_output(
id INT,
len INT,
content VARCHAR,
primary key(id,len)
) with (
type='petaData',
url='yourDatabaseURL',
tableName='yourTableName',
userName='yourDatabaseUserName',
password='yourDatabasePassword'
);
- 实时计算写入PetaData数据库结果表原理:针对实时计算Flink版每行结果数据,拼接成一行SQL语句,输入至目标端数据库。
- bufferSize默认值是1000,如果到达bufferSize阈值,则会触发写出。因此配置batchSize的同时还需要配置bufferSize。bufferSize和batchSize大小相同即可。
- batchSize数值不建议设置过大,建议设置
batchSize='4096'
。
参数说明是否必填备注type结果表类型是固定值为petaData。url地址是
。tableName表名是无userName用户名是无password密码是无maxRetryTimes最大重试次数否默认值为3。batchSize一次批量写入的条数否默认值为1000,表示每次写多少条。bufferSize流入多少条数据后开始去重否无flushIntervalMs清空缓存的时间间隔否单位为毫秒。默认值为3000,表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。ignoreDelete是否忽略Delete操作否默认值为false。
创建云数据库RDS SQL Server版结果表
create table ss_output(
id INT,
len INT,
content VARCHAR,
primary key(id,len)
) with (
type='jdbc',
url='jdbc:sqlserver://ip:port;database=****',
tableName='',
userName='',
password=''
);
- 实时计算Flink版写入RDS和DRDS数据库结果表原理:针对实时计算Flink版每行结果数据,拼接成一行SQL语句,输入至目标端数据库。如果使用批量写,需要在URL后面加上参数
?rewriteBatchedStatements=true
,以提高系统性能。 - RDS SQL Server数据库支持自增主键。如果需要让实时计算Flink版写入数据支持自增主键,则在DDL中不声明该自增字段。
- 如果DRDS有分区表,拆分键必须在实时计算Flink版DDL里primary key()中声明,否则拆分的表无法写入。
- DDL声明的字段必选至少存在一个非主键的字段,否则产生报错。
参数说明是否必填备注type结果表类型是固定值为jdbc。urlJDBC(Java DataBase Connectivity)连接地址是无tableName表名是无username账号是无password密码是无maxRetryTimes写入重试次数否默认值为10。bufferSize流入多少条数据后开始去重否默认值为10000,表示输入的数据达到10000条开始去重。flushIntervalMs清空缓存的时间间隔否单位为毫秒,默认值为2000,表示如果缓存中的数据在等待2秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。excludeUpdateColumns忽略指定字段的更新否可选,默认值为空(默认忽略Primary Key字段),表示更新主键值相同的数据时,忽略指定字段的更新。ignoreDelete是否忽略Delete操作否默认值为False。
RDS字段类型实时计算Flink版字段类型BOOLEANBOOLEANTINYINTTINYINTSMALLINTSMALLINTINTINTBIGINTBIGINTFLOATFLOATDECIMALDECIMALDOUBLEDOUBLEDATEDATETIMETIMETIMESTAMPTIMESTAMPVARCHARVARCHARVARBINARYVARBINARY
参数说明默认值最低版本要求useUnicode是否使用Unicode字符集。如果参数CharacterEncoding设置为GB2312或GBK,useUnicode值必须设置为True。False1.1gcharacterEncoding当useUnicode设置为True时,指定字符编码。例如可设置为GB2312或GBK。False1.1gautoReconnect当数据库连接异常中断时,是否自动重新连接。False1.1autoReconnectForPools是否使用针对数据库连接池的重连策略。False3.1.3failOverReadOnly自动重连成功后,连接是否设置为只读。True3.0.12maxReconnectsautoReconnect设置为True时,重试连接的次数。31.1initialTimeoutautoReconnect设置为True时,两次重连之间的时间间隔,单位为秒。21.1connectTimeout和数据库服务器建立socket连接时的超时,单位为毫秒。0,表示永不超时,适用于JDK 1.4及以上版本。3.0.1socketTimeoutsocket操作(读写)超时,单位为毫秒。0,表示永不超时。3.0.1
CREATE TABLE source (
id INT,
len INT,
content VARCHAR
) with (
type = 'random'
);
CREATE TABLE rds_output(
id INT,
len INT,
content VARCHAR,
PRIMARY KEY (id,len)
) WITH (
type='jdbc',
url='',
tableName='',
userName='',
password=''
);
INSERT INTO rds_output
SELECT id, len, content FROM source;
创建云数据库Redis版结果表
阿里云数据库Redis版是兼容开源Redis协议标准、提供内存加硬盘混合存储的数据库服务,基于高可靠双机热备架构及可平滑扩展的集群架构,充分满足高吞吐、低延迟及弹性变配的业务需求。
云数据库Redis版结果表支持5种Redis数据结构,其DDL定义如下:
DDL为两列:第1列为key,第2列为value。Redis插入数据的命令为 set key value
。
create table resik_output (
a varchar,
b varchar,
primary key(a)
) with (
type = 'redis',
mode = 'string',
host = '${redisHost}',
port = '${redisPort}',
dbNum = '${dbNum}',
ignoreDelete = 'true'
);
DDL为两列:第1列为key,第2列为value。Redis插入数据的命令为 lpush key value
。
create table resik_output (
a varchar,
b varchar,
primary key(a)
) with (
type = 'redis',
mode = 'list',
host = '${redisHost}',
port = '${redisPort}',
dbNum = '${dbNum}',
ignoreDelete = 'true'
);
DDL为两列:第1列为key,第2列为value。Redis插入数据的命令为 sadd key value
。
create table resik_output (
a varchar,
b varchar,
primary key(a)
) with (
type = 'redis',
mode = 'set',
host = '${redisHost}',
port = '${redisPort}',
dbNum = '${dbNum}',
ignoreDelete = 'true'
);
DDL为三列:第1列为key,第2列为hash_key,第3列为hash_key对应的hash_value。Redis插入数据的命令为 hmset key hash_key hash_value
。
create table resik_output (
a varchar,
b varchar,
c varchar,
primary key(a)
) with (
type = 'redis',
mode = 'hashmap',
host = '${redisHost}',
port = '${redisPort}',
dbNum = '${dbNum}',
ignoreDelete = 'true'
);
DDL为三列:第1列为key,第2列为score,第3列为value。Redis插入数据的命令为 add key score value
。
create table resik_output (
a varchar,
b double,
c varchar,
primary key(a)
) with (
type = 'redis',
mode = 'sortedset',
host = '${redisHost}',
port = '${redisPort}',
dbNum = '${dbNum}',
ignoreDelete = 'true'
);
参数参数说明是否必填取值type结果表类型是固定值为
。portRedis Server对应端口否默认值为6379。dbNumRedis Server对应数据库序号否默认值为0。ignoreDelete是否忽略Retraction消息否默认值为false,可取值为true或false。如果设置为false,收到Retraction时,同时删除数据对应的key及已插入的数据。passwordRedis Server对应的密码否默认值为空,不进行权限验证。clusterModeRedis是否为集群模式否取值如下:true:Redis为集群模式。false(默认值):Redis为单机模式。
Redis字段类型实时计算Flink版字段类型STRINGVARCHARSCOREDOUBLE
因为Redis的SCORE类型应用于SORTEDSET(有序集合),所以需要手动为每个Value设置一个DOUBLE类型的SCORE,Value才能按照该SCORE从小到大进行排序。
CREATE TABLE random_stream (
v VARCHAR,
p VARCHAR
) with (
type = 'random'
);
create table resik_output (
a VARCHAR,
b VARCHAR,
primary key(a)
) with (
type = 'redis',
mode = 'string',
host = '',
password = ''
);
INSERT INTO resik_output
SELECT v, p
FROM random_stream;
创建云数据库MongoDB版结果表
MongoDB结果表不支持主键更新,数据输入形式为重复插入。
CREATE TABLE mongodb_sink (
`a` VARCHAR
) WITH (
type = 'mongodb',
database = '',
collection= '',
uri='mongodb://{}:{}@{host}:****?replicaSet=mgset-1224****',
keepAlive='true',
maxConnectionIdleTime='20000',
batchSize='2000'
);
参数说明是否必填备注typeConnector类型是固定值为mongodb。database数据库名称是无collection数据集合是无uriMongoDB连接串是无keepAlive是否保持长连接否默认值为true。maxConnectionIdleTime连接超时时长否整型值,不能为负数,单位为毫秒。默认值为60000。0表示无连接超时限制。batchSize每次批量写入的条数否整型值,默认值为1024。系统会设定一个大小为batchSize的缓冲条数,当数据的条数达到batchSize时,触发数据的输出。
当Checkpoint时间达到时,即使数据未到达batchSize值,也将触发数据的输出。
创建云原生数据仓库AnalyticDB MySQL版3.0结果表
云原生数据仓库AnalyticDB MySQL版3.0结果表暂不支持注册存储功能。
云原生数据仓库AnalyticDB MySQL版3.0数据库支持自增主键。如果实时计算Flink版写入数据支持自增主键,则在DDL中不声明该自增字段。
CREATE TABLE rds_output (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id,len)
) WITH (
type='ADB30',
url='jdbc:mysql://:/',
tableName='',
userName='',
password=''
);
实时计算Flink版写入云原生数据仓库AnalyticDB MySQL版3.0结果表分为以下两个阶段:
参数注释说明是否必选备注typeconnector类型是固定值为
。urljdbc连接地址是云原生数据仓库AnalyticDB MySQL版数据库地址。示例:
。tableName表名是无username账号是无password密码是无maxRetryTimes写入重试次数否默认值为3。bufferSize流入多少条数据后开始去重否默认值为1000,表示输入的数据达到1000条则开始输出。batchSize一次批量写入的条数否默认值为1000。flushIntervalMs清空缓存的时间间隔否单位为毫秒,默认值为3000。表示如果缓存中的数据在等待3秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。ignoreDelete是否忽略DELETE操作否默认值为false,表示支持DELETE功能。replaceMode是否采用replace into语法插入数据否取值:true(默认):采用。 false:采用
。reserveMilliSecondTimeStamp类型是否保留毫秒否默认值为false,不保留毫秒数值。
注意: bufferSize
和 batchSize
参数需要指定主键后才能生效。
创建自定义结果表
自定义结果表Class需要继承自定义Sink插件的基类CustomSinkBase,并使用如下方法实现。
protected Map<String,String> userParamsMap;
protected Set<String> primaryKeys;
protected List<String> headerFields;
protected RowTypeInfo rowTypeInfo;
public abstract void open(int taskNumber,int numTasks) throws IOException;
public abstract void close() throws IOException;
public abstract void writeAddRecord(Row row) throws IOException;
public abstract void writeDeleteRecord(Row row) throws IOException;
public abstract void sync() throws IOException;
public String getName();
进入 blink_customersink_3x目录,执行 mvn clean package
命令,再在实时计算Flink版开发控制台上传刚编译成功后的JAR包 blink_customersink_3x/target/blink-customersink-3.x-1.0-SNAPSHOT-jar-with-dependencies.jar,引用资源之后,对于自定义的Sink插件,需要指明 type = 'custom'
,并且指明实现接口的Class。
create table in_table(
kv varchar
)with(
type = 'random'
);
create table out_table(
`key` varchar,
`value` varchar
)with(
type = 'custom',
class = 'com.alibaba.blink.customersink.RedisSink',
host = 'r-uf****.redis.rds.aliyuncs.com',
port = '6379',
db = '0',
batchsize = '10',
password = ''
);
insert into out_table
select
substring(kv,0,4) as `key`,
substring(kv,0,6) as `value`
from in_table;
Redis Sink插件的参数说明如下。
参数说明是否必填备注hostRedis实例内网连接地址(host)是无portRedis实例端口号是无passwordRedis连接密码是无dbRedis Database编号否默认值为0,表示db0。batchsize每次批量写入的条数否默认值为1,表示不批量写入。
创建InfluxDB结果表
create table stream_test_influxdb(
`metric` varchar,
`timestamp` BIGINT,
`tag_value1` varchar,
`field_fieldValue1` Double
)with(
type = 'influxdb',
endpoint = 'http://service.cn.influxdb.aliyuncs.com:****',
database = '',
batchPutsize = '1',
username = '',
password = ''
);
建表默认格式:
- 第0列:metric(VARCHAR),必填。
- 第1列:timestamp(BIGINT),必填,单位为毫秒。
- 第2列:tag_value1(VARCHAR),必填,最少填写一个。
- 第3列:field_fieldValue1(DOUBLE),必填,最少填写一个。 写入多个field_fieldValue时,需要按照如下格式填写。
field_fieldValue1 类型,
field_fieldValue2 类型,
...
field_fieldValueN 类型
示例如下。
field_fieldValue1 Double,
field_fieldValue2 INTEGER,
...
field_fieldValueN INTEGER
结果表中只支持 metric、 timestamp、 tag_和 field_,不能出现其他的字段。
参数说明是否必填备注type结果表类型是固定值为InfluxDB。endpointInfluxDB的Endpoint是在InfluxDB中,Endpoint是VPC网络地址,例如:https://localhost:3242或http://localhost:8086。Endpoint支持HTTP和HTTPS。databaseInfluxDB的数据库名是例如:db-blink或者blink。batchPutSize批量提交的记录条数否默认每次提交500个数据点。usernameInfluxDB的用户名是需要对写入的数据库有写权限。passwordInfluxDB的密码是默认值为0。retentionPolicy保留策略否为空时,默认填写为每个database的默认保留策略。
InfluxDB字段类型实时计算Flink版字段类型BOOLEANBOOLEANINTINTBIGINTBIGINTFLOATFLOATDECIMALDECIMALDOUBLEDOUBLEDATEDATETIMETIMETIMESTAMPTIMESTAMPVARCHARVARCHAR
创建Phoenix5结果表
Phoenix5是云数据库HBase实例中的一种HBase SQL服务,须在云数据库HBase实例中开启HBase SQL服务后才可以使用Phoenix5。
create table US_POPULATION_SINK (
`STATE` varchar,
CITY varchar,
POPULATION BIGINT,
PRIMARY KEY (`STATE`, CITY)
) WITH (
type = 'PHOENIX5',
serverUrl = '',
tableName = ''
);
参数说明是否必填备注type结果表类型是固定值为
。serverUrlPhoenix5的Query Server地址:如果集群中创建的,则是负载均衡服务的URL地址。如果是在单机中创建的,则是单机的URL地址。是serverUrl格式为
,其中:host:Phoenix5服务的域名。port:Phoenix5服务的端口号,固定值为8765。tableName读取Phoenix5表名。是Phoenix5表名格式为SchemaName.TableName,其中:SchemaName:模式名,可以为空,即不写模式名,仅写表名,表示使用数据库的默认模式。TableName:表名。
create table `source` (
`id` varchar,
`name` varchar,
`age` varchar,
`birthday` varchar
) WITH (
type = 'random'
);
create table sink (
`id` varchar,
`name` varchar,
`age` varchar,
`birthday` varchar,
primary key (id)
) WITH (
type = 'PHOENIX5',
serverUrl = '',
tableName = ''
);
INSERT INTO sink
SELECT `id` ,`name` , `age` ,`birthday`
FROM `source`;
创建分析型数据库PostgreSQL版结果表
实时计算Flink版写入分析型数据库PostgreSQL结果表分为以下两个阶段:
create table rds_output(
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) with (
type='adbpg',
url='jdbc:postgresql://:/',
tableName='',
userName='',
password=''
);
参数说明是否必填备注type源表类型。是固定值为
。urlJDBC连接地址。是分析型数据库PostgreSQL版数据库的JDBC连接地址。格式为
,其中:yourNetworkAddress:内网地址。PortId:连接端口。yourDatabaseName:连接的数据库名称。示例:
tableName表名。是无。username账号。是无。password密码。是无。maxRetryTimes写入重试次数。否默认值为3。useCopy是否采用Copy API写入数据。否参数取值如下:0(默认值):采用INSERT方式写入数据。1:采用copy API方式写入数据。batchSize一次批量写入的条数。否默认值为5000。exceptionMode数据写入过程中出现异常时的处理策略。否支持以下两种处理策略:ignore(默认值):忽略出现异常时写入的数据。strict:数据写入异常时,Failover报错。conflictMode当主键冲突或唯一索引出现冲突时的处理策略。否支持以下三种处理策略:ignore(默认值):忽略主键冲突,保留之前的数据。strict:主键冲突时,Failover报错。update:主键冲突时,更新新到的数据。targetSchemaSchema名称。否默认值为public。connectionMaxActive单个task允许的最大连接数。否请根据实际的并发task个数,以及目标端数据库允许的最大连接数进行设置。
分析型数据库PostgreSQL版字段类型实时计算Flink版字段类型BOOLEANBOOLEANSMALLINTTINYINTSMALLINTSMALLINTINTINTBIGINTBIGINTDOUBLE PRECISIONDOUBLETEXTVARCHARTIMESTAMPDATETIMEDATEDATEREALFLOATDOUBLE PRECISIONDECIMALTIMETIMETIMESTAMPTIMESTAMP
Original: https://blog.csdn.net/shenglishuguang/article/details/123882481
Author: 胜利的曙光
Title: Blink SQL之创建数据结果表