ES创建index报错cluster currently has 4/2 maximum shard
原创
无锋剑客©著作权
文章标签 数据 elasticsearch 解决方案 文章分类 Hadoop 大数据
©著作权归作者所有:来自51CTO博客作者无锋剑客的原创作品,请联系作者获取转载授权,否则将追究法律责任
前提
调试业务服务期间,服务日志需要写入es集群,发现数据写入报错。
报错内容如下
ElasticsearchDeprecationWarning: In a future major version, this request will fail because this action would add [10] total shards, but this cluster currently has [20242]/[20000] maximum shards open. Before upgrading, reduce the number of shards in your cluster or adjust the cluster setting [cluster.max_shards_per_node].
报错内容分析
根据报错内容描述可以看出,是es集群分片数量达到上限导致报错。
解决办法有以下几种:
1:删除集群无用index,减少集群分片数量;2:修改集群index 默认数量,或者修改现有index分片数量;3:修改集群分片数量上限;
调整集群最大分片数
本次采用临时快速解决方案,解决办法如下:
查看现在集群分片数
以下操作在kibana devtool 空间操作
GET _cluster/stats?filter_path=*indices*{ "indices" : { "count" : 1740, "shards" : { "total" : 20242, <<<<<
修改集群最大分片数
PUT _cluster/settings{ "transient" : { "cluster.max_shards_per_node": "2000" }}
结论:
以上操作完成,可以快速应对集群创建index 报分片数量过多问题。另外es集群分片数量设置是否合理,需要参考集群节点数量,数据量、节点内存大小等信息,不在本文阐述。
- 打赏
- 赞
- 收藏
- 评论
- *举报
上一篇:开源BI平台软件特性对比
Original: https://blog.51cto.com/michaelkang/5580749
Author: 无锋剑客
Title: ES创建index报错cluster currently has 4/2 maximum shard
相关阅读1
Title: Clickhouse上用Order By保证绝对正确结果但代价是性能
一些聚合函数的结果跟流入数据的顺序有关,CH文档明确说明这样的函数的结果是不确定的。这是为什么呢?让我们用 explain pipeline
来一探究竟。
以一个很简单的查询为例:
select any( step ) from events group by request_id;
events表的定义如下:
CREATE TABLE default.events
(
`ID` UInt64,
`request_id` String,
`step_id` Int64,
`step` String
)
ENGINE = MergeTree
ORDER BY ID
该查询从events表里面读取数据步骤 step
和请求ID request_id
,按照 request_id
分组并取第一个 step
。
我们看一下这个查询的pipeline:
localhost :) explain pipeline select any( `step`) from events group by request_id
┌─explain────────────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (Aggregating) │
│ Resize 32 → 1 │
│ AggregatingTransform × 32 │
│ StrictResize 32 → 32 │
│ (Expression) │
│ ExpressionTransform × 32 │
│ (SettingQuotaAndLimits) │
│ (ReadFromMergeTree) │
│ MergeTreeThread × 32 0 → 1 │
└────────────────────────────────────────┘
可以看出没有sorting步骤。这个查询在多核服务器中速度是相当快的,因为充分利用了多核,直到最后一步才归并成一个数据流由一个线程来处理。
可是要注意 这个查询的结果每次都不一样,可以用加过滤条件的计数来测试,测试的SQL如下:
select countIf(A='step1') from (select any( `step`) as A from (select * from events) group by request_id)
结果是:2500579, 2500635,2500660。结果差距都不大,但都不是绝对正确的结果。这是因为多线程执行时并不能严格保证是按照engine=MergeTree 的表的存储顺序来处理数据的。如果能容忍误差就没问题,因为这个查询的效率是非常高的。
但如果要追求绝对的正确结果。则需要显示地指定顺序,改造查询如下:
查询的pipeline变成这样:
localhost :) explain pipeline select any( step ) from (select * from events order by ID) group by request_id;
┌─explain─────────────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (Aggregating) │
│ AggregatingTransform │
│ (Expression) │
│ ExpressionTransform │
│ (Sorting) │
│ MergingSortedTransform 36 → 1 │
│ (Expression) │
│ ExpressionTransform × 36 │
│ (SettingQuotaAndLimits) │
│ (ReadFromMergeTree) │
│ MergeTreeInOrder × 36 0 → 1 │
└─────────────────────────────────────────┘
注意到pipeline中增加了重要的一步 MergingSortedTransform 36 → 1
,这一步保证了查询的正确性,但是将多个线程的数据流归集到一起,排序后继续由一个线程完成剩下的处理步骤,效率上受到很大的影响。测试结果表示:加了ORDER BY 子句的查询能够得到一致的正确结果,但效率差了至少10倍。越是核数多的服务器,其差距越大。
Original: https://www.cnblogs.com/chengxin1985/p/16055559.html
Author: 程鑫
Title: Clickhouse上用Order By保证绝对正确结果但代价是性能
相关阅读2
Title: Flink Checkpoint & Savepoint
Checkpoint是Flink实现容错机制最核心的功能,能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new FsStateBackend("hdfs://ip:8020/flink/flink-checkpoints"))
val config = env.getCheckpointConfig
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
config.setCheckpointInterval(60000)
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint处理。
上面代码配置了执行Checkpointing的时间间隔为1分钟。
默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint
Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数:
state.checkpoints.num-retained: 20
如果希望会退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现。
如果Flink程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个Checkpoint点,比如chk-860进行回放,执行如下命令
bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar
- 所有的Checkpoint文件都在以Job ID为名称的目录里面
- 当Job停掉后,重新从某个Checkpoint点(chk-860)进行恢复时,重新生成Job ID
-
Checkpoint编号会从该次运行基于的编号继续连续生成:chk-861、chk-862、chk-863
-
Checkpoint 间隔不要太短
- 过短的间对于底层分布式文件系统而言,会带来很大的压力。
- Flink 作业处理 record 与执行 checkpoint 存在互斥锁,过于频繁的checkpoint,可能会影响整体的性能。
- 合理设置超时时间
Savepoint会在Flink Job之外存储自包含(self-contained)结构的Checkpoint,它使用Flink的Checkpointing机制来创建一个非增量的Snapshot,里面包含Streaming程序的状态,并将Checkpoint的数据存储到外部存储系统中
Flink程序中包含两种状态数据:
- 用户定义的状态(User-defined State)是基于Flink的Transformation函数来创建或者修改得到的状态数据
- 系统状态(System State),是指作为Operator计算一部分的数据Buffer等状态数据,比如在使用Window Function时,在Window内部缓存Streaming数据记录
Flink提供了API来为程序中每个Operator设置ID,这样可以在后续更新/升级程序的时候,可以在Savepoint数据中基于Operator ID来与对应的状态信息进行匹配,从而实现恢复。
设置Operator ID:
DataStream<string> stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
// Stateful mapper with ID
.map(new StatefulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); // Auto-generated ID
</string>
创建一个Savepoint,需要指定对应Savepoint目录,有两种方式来指定
state.savepoints.dir: hdfs://namenode01.td.com/flink/flink-savepoints
bin/flink savepoint :jobId [:targetDirectory]
使用默认配置
bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038
为正在运行的Flink Job指定一个目录存储Savepoint数据
bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/savepoints
bin/flink run -s :savepointPath [:runArgs]
以上面保存的Savepoint为例,恢复Job运行
bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f flink-app-jobs.jar
会启动一个新的Flink Job,ID为cdbae3af1b7441839e7c03bab0d0eefd
- 1bbc5是Flink Job ID字符串前6个字符,后面bd967f90709b是随机生成的字符串
- _metadata文件包含了Savepoint的元数据信息
- 其他文件内容都是序列化的状态信息
Original: https://www.cnblogs.com/bigdata1024/p/16284295.html
Author: chaplinthink
Title: Flink Checkpoint & Savepoint
相关阅读3
Title: CentOS Docker 安装
Docker 支持以下的 64 位 CentOS 版本:
- CentOS 7
- CentOS 8
- 更高版本...
使用官方安装脚本自动安装
安装命令如下:
curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun
也可以使用国内 daocloud 一键安装命令:
curl -sSL https://get.daocloud.io/docker | sh
手动安装
较旧的 Docker 版本称为 docker 或 docker-engine 。如果已安装这些程序,请卸载它们以及相关的依赖项。
在新主机上首次安装 Docker Engine-Community 之前,需要设置 Docker 仓库。之后,您可以从仓库安装和更新 Docker。
设置仓库
安装所需的软件包。yum-utils 提供了 yum-config-manager ,并且 device mapper 存储驱动程序需要 device-mapper-persistent-data 和 lvm2。
使用以下命令来设置稳定的仓库。
可以选择国内的一些源地址:
安装最新版本的 Docker Engine-Community 和 containerd,或者转到下一步安装特定版本:
$ sudo yum install docker-ce docker-ce-cli containerd.io
如果提示您接受 GPG 密钥,请选是。
有多个 Docker 仓库吗?
如果启用了多个 Docker 仓库,则在未在 yum install 或 yum update 命令中指定版本的情况下,进行的安装或更新将始终安装最高版本,这可能不适合您的稳定性需求。
Docker 安装完默认未启动。并且已经创建好 docker 用户组,但该用户组下没有用户。
要安装特定版本的 Docker Engine-Community,请在存储库中列出可用版本,然后选择并安装:
1、列出并排序您存储库中可用的版本。此示例按版本号(从高到低)对结果进行排序。
2、通过其完整的软件包名称安装特定版本,该软件包名称是软件包名称(docker-ce)加上版本字符串(第二列),从第一个冒号(:)一直到第一个连字符,并用连字符(-)分隔。例如:docker-ce-18.09.1。
$ sudo yum install docker-ce-<VERSION_STRING> docker-ce-cli-<VERSION_STRING> containerd.io
启动 Docker。
$ sudo systemctl start docker
通过运行 hello-world 映像来验证是否正确安装了 Docker Engine-Community 。
$ sudo docker run hello-world
删除安装包:
yum remove docker-ce
删除镜像、容器、配置文件等内容:
rm -rf /var/lib/docker
Original: https://www.cnblogs.com/cheyunhua/p/16504477.html
Author: 技术颜良
Title: CentOS Docker 安装