Blink SQL之创建数据结果表

大数据10

Blink使用 CREATE TABLE作为输出结果数据的格式定义,同时定义数据如何写入到目的数据结果表。

结果表有两种类型:

  • Append类型:输出存储是日志系统、消息系统、操作日志类的RDS数据库等,数据流输出结果追加到存储中,不会修改原有数据。
  • Update类型:输出存储生命了主键的数据库(如:RDS、HBase等),数据流的输出存在Upsert操作。
CREATE TABLE tableName
    (columnName dataType [, columnName dataType ]*)
    [ WITH (propertyName=propertyValue [, propertyName=propertyValue ]*) ];
CREATE TABLE rds_output(
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
type='rds',
url='yourDatabaseURL',
tableName='yourTableName',
userName='yourDatabaseUserName',
password='yourDatabasePassword'
);

创建Oracle数据库结果表

  • 实时计算将每行结果数据拼成一行SQL写入目标数据库。
  • 数据库中存在需要的表时,Oracle结果表会向该表写入或更新数据;不存在需要的表时,Oracle结果表会新建一个用于写入结果的表。
  • 逻辑表和物理表的主键不必一致,但是逻辑表的主键必须包含物理表主键。
  • 如果未定义主键,以Append方式插入数据;如果已经定义主键,以Upsert方式插入数据。
CREATE TABLE oracle_sink(
  employee_id BIGINT,
  employee_name VARCHAR,
  employee_age INT,
  PRIMARY KEY(employee_id)
) WITH (
    type = 'oracle',
    url = '',
    userName = '',
    password = '',
    tableName = ''
);

参数描述是否必选示例值

结果表类型是固定值为oracle。

数据库连接串是

登录数据库的用户名是无

登录数据库的密码是无

数据库的表名是无

向结果表插入数据的最大尝试次数否默认值为10。

单次写入数据的批次大小。否默认值为50。

去重的缓存大小。否默认值为500。

写超时时间,单位为毫秒。否默认值为500。

更新表的某行数据时,是否不更新该行指定的列数据。否默认不填写。

是否忽略删除操作。否默认值为false。

注意

  • batchSize和bufferSize在指定主键后参数才生效。参数在达到任一阈值时都会触发数据写入。
  • flushIntervalMs 如果在写超时时间内没有向数据库写入数据,系统会将缓存的数据再写一次。
  • excludeUpdateColumns在更新表的某行数据时,默认不更新主键数据。

Oracle 字段类型实时计算Flink版字段类型CHAR、VARCHAR、VARCHAR2VARCHARFLOATDOUBLENUMBERBIGINTDECIMALDECIMAL

CREATE TABLE oracle_source (
  employee_id BIGINT,
  employee_name VARCHAR,
  employ_age INT
) WITH (
  type = 'random'
);

CREATE TABLE oracle_sink(
  employee_id BIGINT,
  employee_name VARCHAR,
  employ_age INT,
  primary key(employee_id)
)with(
  type = 'oracle',
  url = 'jdbc:oracle:thin:@192.168.171.62:1521:sit0',
  userName = 'blink_test',
  password = 'blink_test',
  tableName = 'oracle_sink'
);

INSERT INTO oracle_sink
SELECT * FROM oracle_source;

创建交互式分析Hologres结果表

由于Hologres是异步写入数据的,因此需要添加 blink.checkpoint.fail_on_checkpoint_error=true作业参数,作业异常时才会触发Failover。

create table Hologres_sink(
  name varchar,
  age BIGINT,
  birthday BIGINT
) with (
  type='hologres',
  dbname='',
  tablename='',
  username='',
  password='',
  endpoint='',
  field_delimiter='|'
);

参数说明是否必填备注

结果表类型是固定值为hologres。

数据库名称。是无

表名称是无

用户名是无

密码是无

Hologres VPC 端点信息是参考

列表。

导出数据时,不同行之间使用的分隔符。否默认值为"\u0002"。

流式写入语义。否默认值为insertorignore。

分区表写入否默认值为false。

是否忽略撤回消息。否默认值为false。

当写入分区表时,是否根据分区值自动创建不存在的分区表。否false(默认值):不会自动创建。true:自动创建。

注意

  • 如果Schema不为Public时,则tableName需要填写为 schema.tableName
  • field_delimiter参数不能在数据中插入分隔符,且需要与 bulkload语义一同使用。
  • createPartTable参数如果分区值中存在短划线(-),暂不支持自动创建分区表。

流处理,也称为流数据或流事件处理,即对一系列无界数据或事件连续处理。

根据Hologres Sink的配置和Hologres表的属性,流式语义分为以下两种:

  • Exactly-once(仅一次):即使在发生各种故障的情况下,系统只处理一次数据或事件。
  • At-least-once(至少一次):如果在系统完全处理之前丢失了数据或事件,则从源头重新传输,因此可以多次处理数据或事件。如果第一次重试成功,则不必进行后续重试。

在Hologres结果表中使用流式语义,注意事项:

  • 如果Hologres物理表未设置主键,则Hologres Sink使用 At-least-once语义。
  • 如果Hologres物理表已设置主键,则Hologres Sink通过主键确保 Exactly-once语义。当同主键数据出现多次时,需要设置 mutateType参数确定更新结果表的方式。

mutateType取值如下:

  • insertorignore(默认值):保留首次出现的数据,忽略后续所有数据。仅第一次数据流触发更新,后续忽略。无法实现局部更新。
  • insertorreplace:使用后续出现的数据整行替换已有数据。根据主键覆盖更新。无法实现局部更新。
  • insertorupdate:使用后续出现的数据选择性替换已有数据。根据主键覆盖更新。可以局部更新。

默认情况下,Hologres Sink只能向一张表导入数据。如果导入数据至分区表的父表,即使导入成功,也会查询数据失败。可以设置参数 partitionRouter为true,开启自动将数据路由到对应分区表的功能。

注意事项:

  • tablename参数需要填写为父表的表名。
  • Blink Connector不会自动创建分区表,需要提前手动创建需要导入数据的分区表,否则会导入失败。

在把多个流的数据写到一张Hologres宽表的场景中,会涉及到宽表Merge和数据的局部更新。示例如下:

假设有两个Flink数据流,一个数据流中包含A、B和C字段,另一个数据流中包含A、D和E字段,Hologres宽表WIDE_TABLE包含A、B、C、D和E字段,其中A字段为主键。具体操作如下:

注意

  • 宽表必须有主键。
  • 每个数据流的数据都必须包含完整的主键字段。
  • 列存模式的宽表Merge场景在高RPS的情况下,CPU使用率会偏高,建议关闭表中字段的Dictionary encoding功能。

HologresBLINKINTINTINT[]ARRAY

创建云原生数据仓库AnalyticDB MySQL版2.0结果表

云原生数据仓库AnalyticDB MySQL版是阿里巴巴自主研发的海量数据实时高并发在线分析(Realtime OLAP)云计算服务,支持在毫秒级单位时间内,对千亿级数据进行实时地多维分析透视和业务探索。

CREATE TABLE stream_test_hotline_agent (
id INTEGER,
len BIGINT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
type='ads',
url='yourDatabaseURL',
tableName='',
userName='',
password='',
batchSize='20'
);

参数说明备注type结果表类型固定值为ads。urlJDBC连接地址云原生数据仓库AnalyticDB MySQL版数据库地址。示例:

。tableName表名无username账号无password密码无maxRetryTimes写入重试次数可选,默认值为10。bufferSize流入多少条数据后开始输出。可选,默认值为5000,表示输入的数据达到5000条就开始输出。batchSize一次批量写入的条数可选,默认值为1000。batchWriteTimeoutMs写入超时时间可选,单位为毫秒,默认值为5000。表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。connectionMaxActive单连接池最大连接数可选,默认值为30。ignoreDelete是否忽略delete操作默认值为false。

注意

  • 如果错误码是20015,则表示batchSize设置过大。云原生数据仓库AnalyticDB MySQL版batchSize不能超过1 MB。例如,batchSize设置为 1000,则平均每条记录大小不能超过1 KB。

云原生数据仓库AnalyticDB MySQL版字段类型实时计算Flink版字段类型BOOLEANBOOLEANTINYINT、SMALLINT、INTINTBIGINTBIGINTDOUBELDOUBLEVARCHARVARCHARDATEDATETIMETIMETIMESTAMPTIMESTAMP

创建数据总线DataHub结果表

create table datahub_output(
  name VARCHAR,
  age BIGINT,
  birthday BIGINT
)with(
  type='datahub',
  endPoint=',
  project='<yourProjectName>',
  topic='<yourTopicName>',
  accessId='<yourAccessId>',
  accessKey='<yourAccessKey>',
  batchSize='<yourBatchSize>',
  batchWriteTimeoutMs='1000'
);

参数参数说明是否必填备注type源表类型是固定值为

。endPointendPoint地址是DataHub的endPoint地址,请参考

。projectDataHub项目名称是无topicDataHub中Topic名称是无accessIdAccessKey ID是无accessKeyAccessKey Secret是无maxRetryTimes读取最大重试次数否Blink 2.2.7以下版本:默认3;Blink 2.2.7及以上版本:默认20batchSize一次批量写入的条数否Blink 3.3以下版本:默认300;Blink 3.3及以上版本:默认100batchWriteTimeoutMs缓存数据的超时时间否可选,单位为毫秒,默认值为5000。表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。maxBlockMessages每次写入的最大Block数否默认值为100。reserveMilliSecondTIMESTAMP类型是否保留毫秒否默认值为false。partitionBy写入结果表前会根据该值进行Hash分类,数据会流向对应的结果表。否默认值为空,随机进行数据分配。

决定数据流到Blink的哪个Subtask。hashFields指定了列名之后,相同列的值会写入到同一个Shard。否默认值为Null,即随机写入。可以指明多个列值,用逗号(,)分隔。例如,

决定数据流写到DataHub的哪个Shard。

DataHub字段类型实时计算字段类型BIGINTBIGINTTIMESTAMPBIGINTSTRINGVARCHARDOUBLEDOUBLEBOOLEANBOOLEANDECIMALDECIMAL

注意
DataHub的TIMESTAMP精确到微秒,在Unix时间戳中为16位,但实时计算定义的TIMESTAMP精确到毫秒,在Unix时间戳中为13位,所以建议您使用BIGINT进行映射。如果您需要使用TIMESTAMP,建议使用计算列进行转换。

create table datahub_input(
  name VARCHAR
) with (
  type='datahub',
  endPoint='http://dh-cn-hangzhou.aliyun-inc.com',
  project='test1',
  topic='topic1',
  accessId='',
  accessKey='',
  startTime='2018-06-01 00:00:00'
);

create table datahub_output(
  name varchar
)with(
  type='datahub',
  endPoint='http://dh-cn-hangzhou.aliyun-inc.com',
  project='test2',
  topic='topic2',
  accessId='',
  accessKey='',
  batchSize='1000',
  batchWriteTimeoutMs='500'
);

INSERT INTO datahub_output
SELECT
  LOWER(name)
from datahub_input;

创建日志服务SLS结果表

日志服务SLS结果表仅支持VARCHAR类型的字段。

日志服务SLS是针对日志类数据的一站式服务。日志服务可以帮助您快捷地完成数据采集、消费、投递以及查询分析,提升运维和运营效率,建立海量日志处理能力。日志服务本身是流数据存储,实时计算Flink版能将其作为流式数据的输入。

create table sls_stream(
 `name` VARCHAR,
 age BIGINT,
 birthday BIGINT
)with(
 type='sls',
 endPoint='http://cn-hangzhou-corp.sls.aliyuncs.com',
 accessId='',
 accessKey='',
 project='',
 logstore=''
);

参数注释说明是否必填备注endPointEndPoint地址是

project项目名是无logstore表名是无accessIdAccessKey ID是无accessKeyAccessKey Secret是无topic属性字段否默认值为空,可以将选定的字段作为属性字段

填充。timestampColumn属性字段否默认值为空,可以将选定的字段作为属性字段

填充(类型必须为INT)。如果未指定,则默认填充当前时间。source属性字段。日志的来源地,例如产生该日志机器的IP地址。否默认值为空,可以将选定的字段作为属性字段

填充。partitionColumn分区列否如果

,则该参数必填。flushIntervalMs触发数据写入的周期否默认值为2000,单位为毫秒。reserveMilliSecondTIMESTAMP类型是否保留毫秒值。否默认值为false,不保留。

日志服务字段类型实时计算Flink版字段类型STRINGVARCHAR

CREATE TABLE random_input (
  a VARCHAR,
  b VARCHAR) with (
    type = 'random'
);

create table sls_output(
 a varchar,
 b varchar
)with(
 type='sls',
 endPoint='http://cn-hangzhou-corp.sls.aliyuncs.com',
 accessId='',
 accessKey='',
 project='ali-cloud-streamtest',
 logStore='stream-test2'
);

INSERT INTO sls_output
SELECT a, b
FROM random_input;

创建消息队列MQ结果表

CREATE TABLE stream_test_hotline_agent (
id INTEGER,
len BIGINT,
content VARCHAR
) WITH (
type='mq',
endpoint='',
accessID='',
accessKey='',
topic='',
producerGroup='',
tag='',
encoding='utf-8',
fieldDelimiter=',',
retryTimes='5',
sleepTimeMs='500'
);
CREATE TABLE source_table (
  commodity VARCHAR
)WITH(
  type='random'
);

CREATE TABLE result_table (
  mess VARBINARY
) WITH (
  type = 'mq',
  endpoint='',
  accessID='',
  accessKey='',
  topic='',
  producerGroup=''
);

INSERT INTO result_table
SELECT
CAST(SUBSTRING(commodity,0,5) AS VARBINARY) AS mess
FROM source_table;

参数说明备注type结果表类型固定值为mq。topicMessage Queue队列名称无endpoint地址阿里云消息队列提供内网服务MQ(非公网region)和公网服务MQ(公网region)两种类型。accessIDAccessKey ID无accessKeyAccessKey Secret无producerGroup写入的群组无tag写入的标签可选,默认值为空。fieldDelimiter字段分割符可选,默认值为

。分隔符的使用情况如下所示:1.只读模式:以

作为分隔符,

在只读模式不可见。2.编辑模式:以

作为分隔符。encoding编码类型可选,默认值为

。retryTimes写入的重试次数可选,默认值为10。sleepTimeMs重试间隔时间可选,默认值为1000(毫秒)。instanceIDMQ实例ID如果MQ实例无独立命名空间,则不可以使用

参数。如果MQ实例有独立命名空间,则

参数必选。

创建表格存储Tablestore结果表

表格存储Tablestore是基于阿里云飞天分布式系统的分布式NoSQL数据存储服务。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问服务。其本质上就类似于市面上开源的HBase。

CREATE TABLE stream_test_hotline_agent (
 name VARCHAR,
 age BIGINT,
 birthday BIGINT,
 PRIMARY KEY (name,age)
) WITH (
 type='ots',
 instanceName='',
 tableName='',
 accessId='',
 accessKey='',
 endPoint='',
 valueColumns='birthday'
);

注意

  • valueColumns值不能是声明的主键,可以是主键之外的任意字段。
  • Tablestore结果表声明中,除主键列外,至少包含一个属性列。

参数说明备注type结果表类型固定值为ots。instanceName实例名无tableName表名无endPoint实例访问地址参见

。accessIdAccessKey ID无accessKeyAccessKey Secret无valueColumns指定插入的字段列名插入多个字段以英文逗号(,)分割。例如

。bufferSize流入多少条数据后开始输出可选,默认值为5000,表示输入的数据达到5000条就开始输出。batchWriteTimeoutMs写入超时的时间可选,单位为毫秒,默认值为5000。表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。batchSize一次批量写入的条数可选,默认值为100。retryIntervalMs重试间隔时间可选,单位毫秒,默认值为1000。maxRetryTimes最大重试次数可选,默认值为100。ignoreDelete是否忽略DELETE操作默认值为False。

注意

  • bufferSize根据Tablestore主键对结果数据进行去重后,再在bufferSize的基础上进行batchSize。
  • Tablestore结果表必须定义有 Primary Key,以Update方式写入结果数据到Tablestore表。

创建云数据库RDS MySQL版结果表

CREATE TABLE rds_output(
   id INT,
   len INT,
   content VARCHAR,
   PRIMARY KEY (id,len)
) WITH (
   type='rds',
   url='',
   tableName='',
   userName='',
   password=''
);

注意

  • 实时计算写入RDS MySQL数据库结果表原理:针对实时计算Flink版每行结果数据,拼接成一行SQL语句,输入至目标端数据库,然后执行。如果使用批量写,需要在URL后面加上参数 ?rewriteBatchedStatements=true,以提高系统性能。
  • 如果实时计算写入数据支持自增主键,则在DDL中不声明该自增字段即可。例如,ID是自增字段,实时计算Flink版DDL不声明该自增字段,则数据库在一行数据写入过程中会自动填补相关自增字段。
  • 如果DRDS有分区表,拆分键必须在实时计算Flink版DDL里PRIMARY KEY()中声明,否则拆分的表无法写入。
  • DDL声明的字段必须至少存在一个非主键的字段,否则产生报错。

参数说明是否必填备注type结果表类型是固定值为

。urlJDBC(Java DataBase Connectivity)连接地址是URL的格式为:

,其中databaseName为对应的数据库名称。tableName表名是无userName用户名是无password密码是无maxRetryTimes最大重试次数否默认值为10。batchSize一次批量写入的条数否默认值为4096。bufferSize流入多少条数据后开始去重否默认值为10000。flushIntervalMs清空缓存的时间间隔否默认值为2000,单位为毫秒。表示如果缓存中的数据在等待2秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。excludeUpdateColumns忽略指定字段的更新否默认值为空(默认忽略PRIMARY KEY字段)。表示更新主键值相同的数据时,忽略指定字段的更新。ignoreDelete是否忽略delete操作否默认值为false,表示支持delete功能。partitionBy分区否默认为空。表示写入Sink节点前,会根据该值进行Hash分区,数据会流向相应的Hash节点。

RDS字段类型实时计算Flink版字段类型BOOLEANBOOLEANTINYINTTINYINTSMALLINTSMALLINTINTINTBIGINTBIGINTFLOATFLOATDECIMALDECIMALDOUBLEDOUBLEDATEDATETIMETIMETIMESTAMPTIMESTAMPVARCHARVARCHARVARBINARYVARBINARY

参数名称说明默认值最低版本要求useUnicode是否使用Unicode字符集,如果参数characterEncoding设置为GB2312或GBK,本参数值必须设置为true。false1.1gcharacterEncoding当useUnicode设置为true时,指定字符编码。例如可设置为GB2312或GBK。false1.1gautoReconnect当数据库连接异常中断时,是否自动重新连接。false1.1autoReconnectForPools是否使用针对数据库连接池的重连策略。false3.1.3failOverReadOnly自动重连成功后,连接是否设置为只读。true3.0.12maxReconnectsautoReconnect设置为true时,重试连接的次数。31.1initialTimeoutautoReconnect设置为true时,两次重连之间的时间间隔,单位为秒。21.1connectTimeout和数据库服务器建立socket连接时的连接超时时长,单位为毫秒。0表示永不超时,适用于JDK 1.4及以上版本。03.0.1socketTimeoutsocket操作(读写)超时,单位:毫秒。0表示永不超时。03.0.1

CREATE TABLE source (
   id INT,
   len INT,
   content VARCHAR
) with (
   type = 'random'
);

CREATE TABLE rds_output(
   id INT,
   len INT,
   content VARCHAR,
   PRIMARY KEY (id,len)
) WITH (
   type='rds',
   url='',
   tableName='',
   userName='',
   password=''
);

INSERT INTO rds_output
SELECT id, len, content FROM source;

创建MaxCompute结果表

MaxCompute中的Clustered Table表不支持作为MaxCompute结果表。

MaxCompute Sink可以分为以下两个阶段:

Commit方法不能提供 原子性。因此,MaxCompute Sink提供的是 At least Once方式,而不是Exactly Once方式。

DDL中定义的字段需要与MaxCompute物理表中的字段名称、顺序以及类型保持一致,否则可能导致MaxCompute物理表中查询的数据为 /n

create table odps_output(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) with (
  type = 'odps',
  endPoint = '',
  project = '',
  tableName = '',
  accessId = '',
  accessKey = '',
  `partition` = 'ds=2018****'
);

参数说明是否必填备注type结果表类型。是固定值为

。endPointMaxCompute服务地址。是请参见

。tunnelEndpointMaxCompute Tunnel服务的连接地址。是请参见

。projectMaxCompute项目名称。是无。tableName表名。是无。accessIdAccessKey ID。是无。accessKeyAccessKey Secret。是无。partition分区名。否参见注意事项。flushIntervalMsOdps tunnel writer缓冲区Flush间隔,单位毫秒。MaxCompute Sink写入记录时,先放入数据到MaxCompute的缓冲区中,等缓冲区溢出或者每隔一段时间(flushIntervalMs)时,再把缓冲区里的数据写到目标 MaxCompute表。否默认值是30000毫秒,即30秒。partitionBy写入Sink节点前,系统会根据该值做Hash Shuffle,数据就会流向对应的Sink节点。否系统按照多个列进行Hash Shuffle,各个列名之间使用逗号(,)分割。默认为空。isOverwrite写入Sink节点前,是否将结果表清空。否默认参数值为

。在声明MaxCompute的流式作业结果表时,

参数必须为

,否则在编译时会抛出异常。dynamicPartitionLimit分区数目最大值。否默认值是100,内存中会把出现过的分区和Tunnel/writer的映射关系维护到一个Map里,如果这个Map的大小超过了

设定值,则会出现

报错。

注意
如果存在分区表,则必填 partition参数:

  • 固定分区:例如`partition= 'ds=20180905'表示将数据写入分区ds= 20180905`。
  • 动态分区:如果不明文显示分区的值,则会根据写入数据中的分区列具体的值,写入到不同的分区中。例如`partition='ds'表示根据ds`字段的值写入分区。 如果要创建多级动态分区,With参数中Partition的字段顺序和结果表的DDL中的分区字段顺序,必须与物理表一致,各个分区字段之间使用逗号(,)分割。 动态分区列需要显式写在建表语句中。*
    Stream模式的MaxCompute结果表具备
    At Least Once**数据保障机制,在作业运行失败后,可能会出现数据重复。

MaxCompute字段类型实时计算Flink版字段类型TINYINTTINYINTSMALLINTSMALLINTINTINTBIGINTBIGINTFLOATFLOATDOUBLEDOUBLEBOOLEANBOOLEANDATETIMETIMESTAMPTIMESTAMPTIMESTAMPVARCHARVARCHARSTRINGVARCHARDECIMALDECIMAL

  • 写入固定分区
CREATE TABLE source (
   id INT,
   len INT,
   content VARCHAR
) with (
   type = 'random'
);

create table odps_sink (
   id INT,
   len INT,
   content VARCHAR
) with (
   type = 'odps',
   endPoint = '',
   project = '',
   tableName = '',
   accessId = '',
   accessKey = '',
   `partition` = 'ds=20180418'
);

INSERT INTO odps_sink
SELECT
   id, len, content
FROM source;
  • 写入动态分区
CREATE TABLE source (
  id INT,
  len INT,
  content VARCHAR,
  c TIMESTAMP
) with (
  type = 'random'
);

create table odps_sink (
  id INT,
  len INT,
  content VARCHAR,
  ds VARCHAR
) with (
  type = 'odps',
  endPoint = '',
  project = '',
  tableName = '',
  accessId = '',
  accessKey = '',
  `partition`='ds'
);

INSERT INTO odps_sink
SELECT
   id,
   len,
   content,
   DATE_FORMAT(c, 'yyMMdd') as ds
FROM source;

创建云数据库HBase版结果表

实时计算HBase结果表不支持自建的开源HBase。

HBase标准版示例代码如下。

create table liuxd_user_behavior_test_front (
    row_key varchar,
    from_topic varchar,
    origin_data varchar,
    record_create_time varchar,
    primary key (row_key)
) with (
    type = 'cloudhbase',
    zkQuorum = '2',
    columnFamily = '',
    tableName = '',
    batchSize = '500'
);

HBase增强版示例代码如下。

create table liuxd_user_behavior_test_front (
    row_key varchar,
    from_topic varchar,
    origin_data varchar,
    record_create_time varchar,
    primary key (row_key)
) with (
    type = 'cloudhbase',
    endPoint = '',
    userName  = 'root',
    password = 'root',
    columnFamily = '',
    tableName = '',
    batchSize = '500'
);

Blink 3.5.0及以上HBase增强版示例代码如下。

create table liuxd_user_behavior_test_front (
    row_key varchar,
    from_topic varchar,
    origin_data varchar,
    record_create_time varchar,
    primary key (row_key)
) with (
    type = 'cloudhbase',
    zkQuorum = '',
    userName  = 'root',
    password = 'root',
    columnFamily = '',
    tableName = '',
    batchSize = '500'
);

Blink 3.5.0及以上HBase写入主备切换示例代码如下。

create table liuxd_user_behavior_test_front (
    row_key varchar,
    from_topic varchar,
    origin_data varchar,
    record_create_time varchar,
    primary key (row_key)
) with (
    type = 'cloudhbase',
    zkQuorum = '',
    haClusterID = 'ha-xxx',
    userName  = 'root',
    password = 'root',
    columnFamily = '',
    tableName = '',
    batchSize = '500'
);

注意

  • PRIMARY KEY支持定义多字段。多字段以 rowkeyDelimiter(默认为 :)作为分隔符进行连接。
  • HBase执行撤回删除操作时,如果COLUMN定义了多版本,将清空所有版本的COLUMN值。
  • HBase标准版和HBase增强版DDL的区别为连接参数不同:
  • HBase标准版使用连接参数 zkQuorum
  • HBase增强版使用连接参数 endPoint

参数说明是否必填备注type结果表类型是固定值为cloudhbase。zkQuorumHBase集群配置的zk地址是可以在hbase-site.xml文件中查看hbase.zookeeper.quorum相关配置。仅在HBase标准版中生效。zkNodeParent集群配置在zk上的路径否可以在hbase-site.xml文件中查看hbase.zookeeper.quorum相关配置。仅在HBase标准版中生效。endPointHBase地域名称是可在购买的HBase实例控制台中获取。仅在HBase增强版中生效。userName用户名否仅在HBase增强版中生效。password密码否仅在HBase增强版中生效。tableNameHBase表名是无columnFamily列族名是仅支持插入同一列族。maxRetryTimes最大尝试次数否默认值为10。bufferSize流入多少条数据后进行去重否默认值为5000。batchSize一次批量写入的条数否默认值为100。建议batchSize参数值为200~300。过大的batchSize值可能导致任务OOM(内存不足)报错。flushIntervalMs周期性清理buffer的间隔,可以减少写入HBase的延迟。否默认值为2000,单位为毫秒。writePkValue是否写入主键值否默认值为false。stringWriteMod是否都按照STRING插入否默认值为false。rowkeyDelimiter

的分隔符否默认值为冒号(:)。isDynamicTable是否为动态表否默认值为false。haClustserIDHBase高可用实例ID否只有访问同城主备实例时才需要配置。

实时计算部分结果数据需要按某列的值,作为动态列输入HBase。HBase中,以每小时的成交额作为动态列的数据,示例如下。

rowkeycf:0cf:1cf:220170707100cf:1300

isDynamicTable参数值为true时,表明该表为支持动态列的HBase表。

动态表仅支持3列输出,例如,ROW_KEY、COLUMN和VALUE。此时第2列(本示例中的COLUMN)为动态列,动态表中的其它参数与HBase的WITH参数一致。

使用动态表时,所有数据类型需要先转换为STRING类型,再进行输入。

CREATE TABLE stream_test_hotline_agent (
  name varchar,
  age varchar,
  birthday varchar,
  primary key (name)
) WITH (
  type = 'cloudhbase',
  ...

  columnFamily = 'cf',
  isDynamicTable ='true'
);

以上声明将把 birthday插入到以 name为ROW_KEY的 cf:age列中。例如, &#xFF08;wang,18,2016-12-12)会插入ROW_KEY为 wang的行, cf:18列。

DDL中必选按照从上到下的顺序,声明ROW_KEY(本示例中的 name)、COLUMN(本示例中的 age)和VALUE(本示例中的 birthday),且声明ROW_KEY为PRIMARY KEY。

create table source (
  id   TINYINT,
  name BIGINT
) with (
  type = 'random'
);

create table sink (
  id    TINYINT,
  name  BIGINT,
  primary key (id)
) with (
  type = 'cloudhbase',
  zkQuorum = '',
  columnFamily = '',
  tableName = ''
);

INSERT INTO sink
SELECT id, name FROM source;

创建Elasticsearch结果表

 CREATE TABLE es_stream_sink(
  field1 LONG,
  field2 VARBINARY,
  field3 VARCHAR,
  PRIMARY KEY(field1)
)WITH(
  type ='elasticsearch',
  endPoint = 'http://es-cn-mp****.public.elasticsearch.aliyuncs.com:****',
  accessId = '',
  accessKey = '',
  index = '',
  typeName = ''
);
  • ES支持根据PRIMARY KEY进行UPDATE,且PRIMARY KEY只能为1个字段。
  • 在指定PRIMARY KEY后,Document的ID为PRIMARY KEY字段的值。
  • 在未指定PRIMARY KEY时,Document的ID为随机。
  • 在full更新模式下,新增的doc会完全覆盖已存在的doc。
  • 在inc更新模式下,系统会依据输入的字段值更新对应的字段。
  • 所有的更新默认为UPSERT语义,即INSERT或UPDATE。

参数说明是否必选默认值typeconnector类型。是elasticsearchendPointServer地址,例入:http://127.0.0.1:9211。是无accessId创建ES时的登录名。如果通过Kibana插件操作ES,请填写Kibana登录ID。是无accessKey创建ES时的登录密码。如果通过Kibana插件操作ES,请填写Kibana登录密码。是无index索引名称。是无typeName文档类型。是

bufferSize流入多少条数据后开始去重。否1000maxRetryTimes异常重试次数。否30timeout读超时,单位为毫秒。否600000discovery是否开启节点发现。如果开启,客户端每5分钟刷新一次Server List。否falsecompression是否使用GZIP压缩Request Bodies。否truemultiThread是否开启JestClient多线程。否trueignoreWriteError是否忽略写入异常。否falsesettings创建Index的Settings配置。否无updateMode指定主键(PRIMARY KEY)后的更新模式:full:全量覆盖。inc:增量更新。否full

参数说明是否必选备注dynamicIndex是否开启动态索引。否参数取值如下:true:开启。false(默认值):不开启。indexField抽取索引的字段名。

为true时必填。只支持TIMESTAMP(以秒为单位)、DATE和LONG3种数据类型。indexInterval切换索引的周期。

为true时必填。参数值如下:d(默认值):天。m:月。w:周mapping启用动态索引时,设置文档各字段的类型与格式。例如,设置名为sendTime字段的格式:

否默认值为空。

  • 当开启动态索引后,基本配置中的 index名称会作为后续创建索引的统一Alias,Alias和索引为一对多关系。
  • 不同的 indexInterval对应的真实索引名称:
  • d -> Alias "yyyyMMdd"
  • m -> Alias "yyyyMM"
  • w -> Alias "yyyyMMW"
  • 对于单个的真实索引可使用Index API进行修改,但对于Alias只支持 get功能。
CREATE TABLE es_stream_sink(
  field1 LONG,
  field2 VARBINARY,
  field3 TIMESTAMP,
  PRIMARY KEY(field1)
)WITH(
  type ='elasticsearch',
  endPoint = 'http://es-cn-mp****.public.elasticsearch.aliyuncs.com:****',
  accessId = '',
  accessKey = '',
  index = '',
  typeName = '',
  dynamicIndex = 'true',
  indexField = 'field3',
  indexInterval = 'd'
);

创建时序数据库结果表

实时计算引用时序数据库(TSDB)结果表需要配置数据存储白名单。

阿里云时序数据库(Time Series Database,简称TSDB)是一种集时序数据高效读写、压缩存储、实时计算能力为一体的数据库服务,可以广泛应用于物联网和互联网领域,实现对设备及业务服务的实时监控,实时预测告警。

CREATE TABLE stream_test_hitsdb (
    metric      VARCHAR,
    `timestamp` INTEGER,
    `value`     DOUBLE,
    tagk1       VARCHAR,
    tagk2       VARCHAR,
    tagk3       VARCHAR
) WITH (
    type='hitsdb',
    host='',
    virtualDomainSwitch = 'false',
    httpConnectionPool = '20',
    batchPutSize = '1000'
);

建表默认格式:

  • 第0列:metric(VARCHAR)。
  • 第1列:timestamp(INTEGER),单位为秒。
  • 第2列:value(DOUBLE)。
  • 第3~N列:TagKey,即时间序列数据库中的fieldName。
  • tag可以为多列。
  • 必须声明 metrictimestampvalue,且字段名称、字段顺序和字段数据类型必须和TSDB保持完全一致。

参数说明备注type结果表类型固定值为

。hostIP或VIP域名填写注册实例的Host。port端口默认值为8242。virtualDomainSwitch是否使用VIPServer默认值为false,如果需要使用VIPServer,则virtualDomainSwitch设置为true。httpConnectionPoolHTTP连接池默认值为10。httpCompress使用GZIP压缩默认值为false,即不压缩。httpConnectTimeoutHTTP连接超时时间默认值为0。ioThreadCountIO线程数默认值为1。batchPutBufferSize缓冲区大小默认值为10000。batchPutRetryCount写入重试次数默认值为3。batchPutSize每次提交数据量默认每次提交500个数据点。batchPutTimeLimit缓冲区等待时间默认值为200,单位为毫秒。batchPutConsumerThreadCount序列化线程数默认值为1。

Blink 3.2 及以上版本支持6种Blink写入TSDB的模型,分别是:

  • 支持单值无tag数据点的写入,其schema格式如下所示,包括3个字段,这3个字段名称不可使用其他名称代替。
metric,timestamp,value
  • 支持单值带tag数据点的写入,其schema格式如下所示,除metric、timestamp和value关键词名字必须保持一致外,tag的名称可以任意指定。
metric,timestamp,value,tagKey1,....,tagKeyN
  • 支持单值带不确定tag个数的数据点的写入,其schema格式如下所示,包括4个字段,这4个字段名称不可使用其他名称代替。
metric,timestamp,value,tags

其中,tags的内容为形如如下格式的JSON字符串,便于绕过Blink table schema需要固定tag个数的限制。

{"tagKey1":"tagValue1","tagKey2":"tagValue2",......,"tagKeyN":"tagValueN"}
  • 支持多值无tag数据点的写入,其schema格式如下所示。
metric,timestamp,field_name1,field_name2,......,field_nameN

其中metric和timestamp字段名称不可使用其他名称代替。对于多值的fields,由于需要区分tag和field,同时兼容之前的单值写入,这里约定需要给每个field加上固定前缀field,自动识别field字段。例如,对于一个field名称 field_name1,在写入TSDB时,会自动将前缀 field_去掉,只保留name1,即对于上面的schema,实际写入TSDB的格式如下,name1和name2是多值field的名称。

metric,timestamp,name1,name2,......,nameN
  • 支持多值带tag数据点的写入,其schema格式如下所示,除metric和timestamp关键词名字必须保持一致外,tag的名称可以任意指定。
metric,timestamp,tagKey1,....,tagKeyN,field_name1,field_name2,......,field_nameN
  • 支持单值带不确定tag个数的数据点的写入,其schema格式如下所示。
metric,timestamp,tags,field_name1,field_name2,......,field_nameN

其中,tags的内容类似如下JSON字符串,便于绕过Blink table schema需要固定tag个数的限制。

{"tagKey1":"tagValue1","tagKey2":"tagValue2",......,"tagKeyN":"tagValueN"}

创建消息队列Kafka结果表

Kafka结果表支持写入自建Kafka集群。

create table sink_kafka (
  messageKey VARBINARY,
  `message` VARBINARY,
  PRIMARY KEY (messageKey)
) with (
  type = 'kafka010',
  topic = '',
  bootstrap.servers = ''
);
  • 创建Kafka结果表时,必须明文指定 PRIMARY KEY (messageKey)

  • 通用配置参数说明是否必选备注typeKafka对应版本是必须是Kafka08、Kafka09、Kafka010、Kafka011中的一种。topicTopic名称是无

  • 必选配置
  • Kafka08必选配置参数说明zookeeper.connectzk连接ID
  • Kafka09/Kafka010/Kafka011必选配置参数说明bootstrap.serversKafka集群地址
  • 可选配置参数
  • consumer.id
  • socket.timeout.ms
  • fetch.message.max.bytes
  • num.consumer.fetchers
  • auto.commit.enable
  • auto.commit.interval.ms
  • queued.max.message.chunks
  • rebalance.max.retries
  • fetch.min.bytes
  • fetch.wait.max.ms
  • rebalance.backoff.ms
  • refresh.leader.backoff.ms
  • auto.offset.reset
  • consumer.timeout.ms
  • exclude.internal.topics
  • partition.assignment.strategy
  • client.id
  • zookeeper.session.timeout.ms
  • zookeeper.connection.timeout.ms
  • zookeeper.sync.time.ms
  • offsets.storage
  • offsets.channel.backoff.ms
  • offsets.channel.socket.timeout.ms
  • offsets.commit.max.retries
  • dual.commit.enabled
  • partition.assignment.strategy
  • socket.receive.buffer.bytes
  • fetch.min.bytes

Kafka官方文档:

typeKafka版本Kafka080.8.22Kafka090.9.0.1Kafka0100.10.2.1Kafka0110.11.0.2及以上

create table datahub_input (
  id VARCHAR,
  nm VARCHAR
) with (
  type = 'datahub'
);

create table sink_kafka (
  messageKey VARBINARY,
  `message` VARBINARY,
  PRIMARY KEY (messageKey)
) with (
  type = 'kafka010',
  topic = '',
  bootstrap.servers = ''
);

INSERT INTO
  sink_kafka
SELECT
  cast(id as VARBINARY) as messageKey,
  cast(nm as VARBINARY) as `message`
FROM
  datahub_input;

创建云数据库HybridDB for MySQL结果表

云数据库HybridDB for MySQL(原名PetaData)是同时支持海量数据在线事务(OLTP)和在线分析(OLAP)的HTAP(Hybrid Transaction/Analytical Processing)关系型数据库。HybridDB for MySQL采用一份数据存储来进行OLTP和OLAP处理,避免把一份数据复制多次进行数据分析,极大地降低了数据存储的成本。

create table petadata_output(
 id INT,
 len INT,
 content VARCHAR,
 primary key(id,len)
) with (
 type='petaData',
 url='yourDatabaseURL',
 tableName='yourTableName',
 userName='yourDatabaseUserName',
 password='yourDatabasePassword'
);
  • 实时计算写入PetaData数据库结果表原理:针对实时计算Flink版每行结果数据,拼接成一行SQL语句,输入至目标端数据库。
  • bufferSize默认值是1000,如果到达bufferSize阈值,则会触发写出。因此配置batchSize的同时还需要配置bufferSize。bufferSize和batchSize大小相同即可。
  • batchSize数值不建议设置过大,建议设置 batchSize='4096'

参数说明是否必填备注type结果表类型是固定值为petaData。url地址是

。tableName表名是无userName用户名是无password密码是无maxRetryTimes最大重试次数否默认值为3。batchSize一次批量写入的条数否默认值为1000,表示每次写多少条。bufferSize流入多少条数据后开始去重否无flushIntervalMs清空缓存的时间间隔否单位为毫秒。默认值为3000,表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。ignoreDelete是否忽略Delete操作否默认值为false。

创建云数据库RDS SQL Server版结果表

create table ss_output(
  id INT,
  len INT,
  content VARCHAR,
  primary key(id,len)
) with (
  type='jdbc',
  url='jdbc:sqlserver://ip:port;database=****',
  tableName='',
  userName='',
  password=''
);
  • 实时计算Flink版写入RDS和DRDS数据库结果表原理:针对实时计算Flink版每行结果数据,拼接成一行SQL语句,输入至目标端数据库。如果使用批量写,需要在URL后面加上参数 ?rewriteBatchedStatements=true,以提高系统性能。
  • RDS SQL Server数据库支持自增主键。如果需要让实时计算Flink版写入数据支持自增主键,则在DDL中不声明该自增字段。
  • 如果DRDS有分区表,拆分键必须在实时计算Flink版DDL里primary key()中声明,否则拆分的表无法写入。
  • DDL声明的字段必选至少存在一个非主键的字段,否则产生报错。

参数说明是否必填备注type结果表类型是固定值为jdbc。urlJDBC(Java DataBase Connectivity)连接地址是无tableName表名是无username账号是无password密码是无maxRetryTimes写入重试次数否默认值为10。bufferSize流入多少条数据后开始去重否默认值为10000,表示输入的数据达到10000条开始去重。flushIntervalMs清空缓存的时间间隔否单位为毫秒,默认值为2000,表示如果缓存中的数据在等待2秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。excludeUpdateColumns忽略指定字段的更新否可选,默认值为空(默认忽略Primary Key字段),表示更新主键值相同的数据时,忽略指定字段的更新。ignoreDelete是否忽略Delete操作否默认值为False。

RDS字段类型实时计算Flink版字段类型BOOLEANBOOLEANTINYINTTINYINTSMALLINTSMALLINTINTINTBIGINTBIGINTFLOATFLOATDECIMALDECIMALDOUBLEDOUBLEDATEDATETIMETIMETIMESTAMPTIMESTAMPVARCHARVARCHARVARBINARYVARBINARY

参数说明默认值最低版本要求useUnicode是否使用Unicode字符集。如果参数CharacterEncoding设置为GB2312或GBK,useUnicode值必须设置为True。False1.1gcharacterEncoding当useUnicode设置为True时,指定字符编码。例如可设置为GB2312或GBK。False1.1gautoReconnect当数据库连接异常中断时,是否自动重新连接。False1.1autoReconnectForPools是否使用针对数据库连接池的重连策略。False3.1.3failOverReadOnly自动重连成功后,连接是否设置为只读。True3.0.12maxReconnectsautoReconnect设置为True时,重试连接的次数。31.1initialTimeoutautoReconnect设置为True时,两次重连之间的时间间隔,单位为秒。21.1connectTimeout和数据库服务器建立socket连接时的超时,单位为毫秒。0,表示永不超时,适用于JDK 1.4及以上版本。3.0.1socketTimeoutsocket操作(读写)超时,单位为毫秒。0,表示永不超时。3.0.1

CREATE TABLE source (
  id INT,
  len INT,
  content VARCHAR
) with (
  type = 'random'
);

CREATE TABLE rds_output(
  id INT,
  len INT,
  content VARCHAR,
  PRIMARY KEY (id,len)
) WITH (
  type='jdbc',
  url='',
  tableName='',
  userName='',
  password=''
);

INSERT INTO rds_output
SELECT id, len, content FROM source;

创建云数据库Redis版结果表

阿里云数据库Redis版是兼容开源Redis协议标准、提供内存加硬盘混合存储的数据库服务,基于高可靠双机热备架构及可平滑扩展的集群架构,充分满足高吞吐、低延迟及弹性变配的业务需求。

云数据库Redis版结果表支持5种Redis数据结构,其DDL定义如下:

DDL为两列:第1列为key,第2列为value。Redis插入数据的命令为 set key value

create table resik_output (
  a varchar,
  b varchar,
  primary key(a)
) with (
  type = 'redis',
  mode = 'string',
  host = '${redisHost}',
  port = '${redisPort}',
  dbNum = '${dbNum}',
  ignoreDelete = 'true'
);

DDL为两列:第1列为key,第2列为value。Redis插入数据的命令为 lpush key value

create table resik_output (
  a varchar,
  b varchar,
  primary key(a)
) with (
  type = 'redis',
  mode = 'list',
  host = '${redisHost}',
  port = '${redisPort}',
  dbNum = '${dbNum}',
  ignoreDelete = 'true'
);

DDL为两列:第1列为key,第2列为value。Redis插入数据的命令为 sadd key value

create table resik_output (
  a varchar,
  b varchar,
  primary key(a)
) with (
  type = 'redis',
  mode = 'set',
  host = '${redisHost}',
  port = '${redisPort}',
  dbNum = '${dbNum}',
  ignoreDelete = 'true'
);

DDL为三列:第1列为key,第2列为hash_key,第3列为hash_key对应的hash_value。Redis插入数据的命令为 hmset key hash_key hash_value

create table resik_output (
  a varchar,
  b varchar,
  c varchar,
  primary key(a)
) with (
  type = 'redis',
  mode = 'hashmap',
  host = '${redisHost}',
  port = '${redisPort}',
  dbNum = '${dbNum}',
  ignoreDelete = 'true'
);

DDL为三列:第1列为key,第2列为score,第3列为value。Redis插入数据的命令为 add key score value

create table resik_output (
  a varchar,
  b double,
  c varchar,
  primary key(a)
) with (
  type = 'redis',
  mode = 'sortedset',
  host = '${redisHost}',
  port = '${redisPort}',
  dbNum = '${dbNum}',
  ignoreDelete = 'true'
);

参数参数说明是否必填取值type结果表类型是固定值为

。portRedis Server对应端口否默认值为6379。dbNumRedis Server对应数据库序号否默认值为0。ignoreDelete是否忽略Retraction消息否默认值为false,可取值为true或false。如果设置为false,收到Retraction时,同时删除数据对应的key及已插入的数据。passwordRedis Server对应的密码否默认值为空,不进行权限验证。clusterModeRedis是否为集群模式否取值如下:true:Redis为集群模式。false(默认值):Redis为单机模式。

Redis字段类型实时计算Flink版字段类型STRINGVARCHARSCOREDOUBLE

因为Redis的SCORE类型应用于SORTEDSET(有序集合),所以需要手动为每个Value设置一个DOUBLE类型的SCORE,Value才能按照该SCORE从小到大进行排序。

CREATE TABLE random_stream (
  v VARCHAR,
  p VARCHAR
) with (
    type = 'random'
);

create table resik_output (
  a VARCHAR,
  b VARCHAR,
  primary key(a)
) with (
  type = 'redis',
  mode = 'string',
  host = '',
  password = ''
);

INSERT INTO resik_output
SELECT v, p
FROM random_stream;

创建云数据库MongoDB版结果表

MongoDB结果表不支持主键更新,数据输入形式为重复插入。

CREATE TABLE mongodb_sink (
  `a`  VARCHAR
) WITH (
   type = 'mongodb',
   database = '',
   collection= '',
   uri='mongodb://{}:{}@{host}:****?replicaSet=mgset-1224****',
   keepAlive='true',
   maxConnectionIdleTime='20000',
   batchSize='2000'
);

参数说明是否必填备注typeConnector类型是固定值为mongodb。database数据库名称是无collection数据集合是无uriMongoDB连接串是无keepAlive是否保持长连接否默认值为true。maxConnectionIdleTime连接超时时长否整型值,不能为负数,单位为毫秒。默认值为60000。0表示无连接超时限制。batchSize每次批量写入的条数否整型值,默认值为1024。系统会设定一个大小为batchSize的缓冲条数,当数据的条数达到batchSize时,触发数据的输出。

当Checkpoint时间达到时,即使数据未到达batchSize值,也将触发数据的输出。

创建云原生数据仓库AnalyticDB MySQL版3.0结果表

云原生数据仓库AnalyticDB MySQL版3.0结果表暂不支持注册存储功能。

云原生数据仓库AnalyticDB MySQL版3.0数据库支持自增主键。如果实时计算Flink版写入数据支持自增主键,则在DDL中不声明该自增字段。

CREATE TABLE rds_output (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id,len)
) WITH (
type='ADB30',
url='jdbc:mysql://:/',
tableName='',
userName='',
password=''
);

实时计算Flink版写入云原生数据仓库AnalyticDB MySQL版3.0结果表分为以下两个阶段:

参数注释说明是否必选备注typeconnector类型是固定值为

。urljdbc连接地址是云原生数据仓库AnalyticDB MySQL版数据库地址。示例:

。tableName表名是无username账号是无password密码是无maxRetryTimes写入重试次数否默认值为3。bufferSize流入多少条数据后开始去重否默认值为1000,表示输入的数据达到1000条则开始输出。batchSize一次批量写入的条数否默认值为1000。flushIntervalMs清空缓存的时间间隔否单位为毫秒,默认值为3000。表示如果缓存中的数据在等待3秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。ignoreDelete是否忽略DELETE操作否默认值为false,表示支持DELETE功能。replaceMode是否采用replace into语法插入数据否取值:true(默认):采用。 false:采用

。reserveMilliSecondTimeStamp类型是否保留毫秒否默认值为false,不保留毫秒数值。

注意bufferSizebatchSize参数需要指定主键后才能生效。

创建自定义结果表

自定义结果表Class需要继承自定义Sink插件的基类CustomSinkBase,并使用如下方法实现。

protected Map<String,String> userParamsMap;
protected Set<String> primaryKeys;
protected List<String> headerFields;
protected RowTypeInfo rowTypeInfo;

public abstract void open(int taskNumber,int numTasks) throws IOException;

public abstract void close() throws IOException;

public abstract void writeAddRecord(Row row) throws IOException;

public abstract void writeDeleteRecord(Row row) throws IOException;

public abstract void sync() throws IOException;

public String getName();

进入 blink_customersink_3x目录,执行 mvn clean package命令,再在实时计算Flink版开发控制台上传刚编译成功后的JAR包 blink_customersink_3x/target/blink-customersink-3.x-1.0-SNAPSHOT-jar-with-dependencies.jar,引用资源之后,对于自定义的Sink插件,需要指明 type = 'custom',并且指明实现接口的Class。

create table in_table(
    kv varchar
)with(
    type = 'random'
);

create table out_table(
    `key` varchar,
    `value` varchar
)with(
    type = 'custom',
    class = 'com.alibaba.blink.customersink.RedisSink',

    host = 'r-uf****.redis.rds.aliyuncs.com',
    port = '6379',
    db = '0',
    batchsize = '10',
    password = ''
);

insert into out_table
select
substring(kv,0,4) as `key`,
substring(kv,0,6) as `value`
from in_table;

Redis Sink插件的参数说明如下。

参数说明是否必填备注hostRedis实例内网连接地址(host)是无portRedis实例端口号是无passwordRedis连接密码是无dbRedis Database编号否默认值为0,表示db0。batchsize每次批量写入的条数否默认值为1,表示不批量写入。

创建InfluxDB结果表

create table stream_test_influxdb(
    `metric` varchar,
    `timestamp` BIGINT,
    `tag_value1` varchar,
    `field_fieldValue1` Double
)with(
    type = 'influxdb',
    endpoint = 'http://service.cn.influxdb.aliyuncs.com:****',
    database = '',
    batchPutsize = '1',
    username = '',
    password = ''
);

建表默认格式:

  • 第0列:metric(VARCHAR),必填。
  • 第1列:timestamp(BIGINT),必填,单位为毫秒。
  • 第2列:tag_value1(VARCHAR),必填,最少填写一个。
  • 第3列:field_fieldValue1(DOUBLE),必填,最少填写一个。 写入多个field_fieldValue时,需要按照如下格式填写。
field_fieldValue1 类型,
field_fieldValue2 类型,
...

field_fieldValueN 类型

示例如下。

field_fieldValue1 Double,
field_fieldValue2 INTEGER,
...

field_fieldValueN INTEGER

结果表中只支持 metrictimestamptag_和 field_,不能出现其他的字段。

参数说明是否必填备注type结果表类型是固定值为InfluxDB。endpointInfluxDB的Endpoint是在InfluxDB中,Endpoint是VPC网络地址,例如:https://localhost:3242或http://localhost:8086。Endpoint支持HTTP和HTTPS。databaseInfluxDB的数据库名是例如:db-blink或者blink。batchPutSize批量提交的记录条数否默认每次提交500个数据点。usernameInfluxDB的用户名是需要对写入的数据库有写权限。passwordInfluxDB的密码是默认值为0。retentionPolicy保留策略否为空时,默认填写为每个database的默认保留策略。

InfluxDB字段类型实时计算Flink版字段类型BOOLEANBOOLEANINTINTBIGINTBIGINTFLOATFLOATDECIMALDECIMALDOUBLEDOUBLEDATEDATETIMETIMETIMESTAMPTIMESTAMPVARCHARVARCHAR

创建Phoenix5结果表

Phoenix5是云数据库HBase实例中的一种HBase SQL服务,须在云数据库HBase实例中开启HBase SQL服务后才可以使用Phoenix5。

create table US_POPULATION_SINK (
  `STATE` varchar,
  CITY varchar,
  POPULATION BIGINT,
  PRIMARY KEY (`STATE`, CITY)
) WITH (
  type = 'PHOENIX5',
  serverUrl = '',
  tableName = ''
);

参数说明是否必填备注type结果表类型是固定值为

。serverUrlPhoenix5的Query Server地址:如果集群中创建的,则是负载均衡服务的URL地址。如果是在单机中创建的,则是单机的URL地址。是serverUrl格式为

,其中:host:Phoenix5服务的域名。port:Phoenix5服务的端口号,固定值为8765。tableName读取Phoenix5表名。是Phoenix5表名格式为SchemaName.TableName,其中:SchemaName:模式名,可以为空,即不写模式名,仅写表名,表示使用数据库的默认模式。TableName:表名。

create table `source` (
  `id` varchar,
  `name` varchar,
  `age` varchar,
  `birthday` varchar
) WITH (
   type = 'random'
);

create table sink (
  `id` varchar,
  `name` varchar,
  `age` varchar,
  `birthday` varchar,
  primary key (id)
) WITH (
  type = 'PHOENIX5',
  serverUrl = '',
  tableName = ''
);

INSERT INTO sink
  SELECT  `id` ,`name` , `age` ,`birthday`
FROM `source`;

创建分析型数据库PostgreSQL版结果表

实时计算Flink版写入分析型数据库PostgreSQL结果表分为以下两个阶段:

create table rds_output(
  id INT,
  len INT,
  content VARCHAR,
  PRIMARY KEY(id)
) with (
  type='adbpg',
  url='jdbc:postgresql://:/',
  tableName='',
  userName='',
  password=''
);

参数说明是否必填备注type源表类型。是固定值为

。urlJDBC连接地址。是分析型数据库PostgreSQL版数据库的JDBC连接地址。格式为

,其中:yourNetworkAddress:内网地址。PortId:连接端口。yourDatabaseName:连接的数据库名称。示例:

tableName表名。是无。username账号。是无。password密码。是无。maxRetryTimes写入重试次数。否默认值为3。useCopy是否采用Copy API写入数据。否参数取值如下:0(默认值):采用INSERT方式写入数据。1:采用copy API方式写入数据。batchSize一次批量写入的条数。否默认值为5000。exceptionMode数据写入过程中出现异常时的处理策略。否支持以下两种处理策略:ignore(默认值):忽略出现异常时写入的数据。strict:数据写入异常时,Failover报错。conflictMode当主键冲突或唯一索引出现冲突时的处理策略。否支持以下三种处理策略:ignore(默认值):忽略主键冲突,保留之前的数据。strict:主键冲突时,Failover报错。update:主键冲突时,更新新到的数据。targetSchemaSchema名称。否默认值为public。connectionMaxActive单个task允许的最大连接数。否请根据实际的并发task个数,以及目标端数据库允许的最大连接数进行设置。

分析型数据库PostgreSQL版字段类型实时计算Flink版字段类型BOOLEANBOOLEANSMALLINTTINYINTSMALLINTSMALLINTINTINTBIGINTBIGINTDOUBLE PRECISIONDOUBLETEXTVARCHARTIMESTAMPDATETIMEDATEDATEREALFLOATDOUBLE PRECISIONDECIMALTIMETIMETIMESTAMPTIMESTAMP

Original: https://blog.csdn.net/shenglishuguang/article/details/123882481
Author: 胜利的曙光
Title: Blink SQL之创建数据结果表