1 导引
我们在博客《Hadoop: 单词计数(Word Count)的MapReduce实现 》中学习了如何用Hadoop-MapReduce实现单词计数,现在我们来看如何用Spark来实现同样的功能。
2. Spark的MapReudce原理
Spark框架也是MapReduce-like模型,采用"分治-聚合"策略来对数据分布进行分布并行处理。不过该框架相比Hadoop-MapReduce,具有以下两个特点:
- 对大数据处理框架的输入/输出,中间数据进行建模,将这些数据抽象为统一的数据结构命名为弹性分布式数据集(Resilient Distributed Dataset),并在此数据结构上构建了一系列通用的数据操作,使得用户可以简单地实现复杂的数据处理流程。
- 采用了基于内存的数据聚合、数据缓存等机制来加速应用执行尤其适用于迭代和交互式应用。
Spark社区推荐用户使用Dataset、DataFrame等面向结构化数据的高层API(Structured API)来替代底层的RDD API,因为这些高层API含有更多的数据类型信息(Schema),支持SQL操作,并且可以利用经过高度优化的Spark SQL引擎来执行。不过,由于RDD API更基础,更适合用来展示基本概念和原理,后面我们的代码都使用RDD API。
Spark的RDD/dataset分为多个分区。RDD/Dataset的每一个分区都映射一个或多个数据文件, Spark通过该映射读取数据输入到RDD/dataset中。
因为我们这里采用的本地单机多线程调试模式,默认分区数即为本地机器使用的线程数,若在代码中设置了 local[N]
(使用 N
个线程),则默认为 N
个分区;若设为 local[*]
(使用本地CPU核数个线程),则默认分区数为本地CPU核数。大家可以通过调用 RDD
对象的 getNumPartitions()
查看实际分区个数。
我们下面的流程描述中,假设每个文件对应一个分区。
Spark的Map示意图如下:
Spark的Reduce示意图如下:
3. Word Count的Java实现
项目架构如下图:
Word-Count-Spark
├─ input
│ ├─ file1.txt
│ ├─ file2.txt
│ └─ file3.txt
├─ output
│ └─ result.txt
├─ pom.xml
├─ src
│ ├─ main
│ │ └─ java
│ │ └─ WordCount.java
│ └─ test
└─ target
WordCount.java
文件如下:
package com.orion;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import java.io.*;
import java.nio.file.*;
public class WordCount {
private static Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
if (args.length != 3) {
System.err.println("Usage: WordCount ");
System.exit(1);
}
String input_path = args[0];
String output_path = args[1];
int n_threads = Integer.parseInt(args[2]);
SparkSession spark = SparkSession.builder()
.appName("WordCount")
.master(String.format("local[%d]", n_threads))
.getOrCreate();
JavaRDD lines = spark.read().textFile(input_path).javaRDD();
JavaRDD words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
JavaPairRDD ones = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairRDD counts = ones.reduceByKey((i1, i2) -> i1 + i2);
List> output = counts.collect();
String filePath = Paths.get(output_path, "result.txt").toString();
BufferedWriter out = new BufferedWriter(new FileWriter(filePath));
for (Tuple2 tuple : output) {
out.write(tuple._1() + ": " + tuple._2() + "\n");
}
out.close();
spark.stop();
}
}
pom.xml
文件配置如下:
4.0.0
com.WordCount
WordCount
1.0-SNAPSHOT
WordCount
http://www.example.com
2.12.10
2.12
UTF-8
UTF-8
UTC
11
1.4.0
3.7.1
3.1.2
2.0.0
4.4.0
3.8.0
3.2.0
3.2.1
2.8.2
1.6.8
3.2.0
1.6
2.22.2
UTF-8
11
11
3.2.1
junit
junit
4.11
test
org.scala-lang
scala-library
${scala.version}
provided
org.apache.spark
spark-core_2.12
${spark.version}
org.apache.spark
spark-sql_2.12
${spark.version}
provided
maven-clean-plugin
3.1.0
maven-resources-plugin
3.0.2
maven-compiler-plugin
3.8.0
maven-surefire-plugin
2.22.1
maven-jar-plugin
3.0.2
maven-install-plugin
2.5.2
maven-deploy-plugin
2.8.2
maven-site-plugin
3.7.1
maven-project-info-reports-plugin
3.0.0
maven-compiler-plugin
3.8.0
11
11
true
/Library/Java/JavaVirtualMachines/jdk-11.0.15.jdk/Contents/Home/bin/javac
记得配置输入参数 input
、 output
、 3
分别代表输入目录、输出目录和使用本地线程数(在VSCode中在 launch.json
文件中配置)。编译运行后可在 output
目录下查看 result.txt
:
Tom: 1
Hello: 3
Goodbye: 1
World: 2
David: 1
可见成功完成了单词计数功能。
4. Word Count的Python实现
先使用pip按照 pyspark==3.8.2
:
pip install pyspark==3.8.2
注意PySpark只支持Java 8/11,请勿使用更高级的版本。这里我使用的是Java 11。运行 java -version
可查看本机Java版本。
(base) orion-orion@MacBook-Pro ~ % java -version
java version "11.0.15" 2022-04-19 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.15+8-LTS-149)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.15+8-LTS-149, mixed mode)
项目架构如下:
Word-Count-Spark
├─ input
│ ├─ file1.txt
│ ├─ file2.txt
│ └─ file3.txt
├─ output
│ └─ result.txt
├─ src
│ └─ word_count.py
word_count.py
编写如下:
from pyspark.sql import SparkSession
import sys
import os
from operator import add
if len(sys.argv) != 4:
print("Usage: WordCount ", file=sys.stderr)
exit(1)
input_path, output_path, n_threads = sys.argv[1], sys.argv[2], int(sys.argv[3])
spark = SparkSession.builder.appName("WordCount").master("local[%d]" % n_threads).getOrCreate()
lines = spark.read.text(input_path).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda s: s.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(add)
output = counts.collect()
with open(os.path.join(output_path, "result.txt"), "wt") as f:
for (word, count) in output:
f.write(str(word) +": " + str(count) + "\n")
spark.stop()
使用 python word_count.py input output 3
运行后,可在 output
中查看对应的输出文件 result.txt
:
Hello: 3
World: 2
Goodbye: 1
David: 1
Tom: 1
可见成功完成了单词计数功能。
参考
- [1] Spark官方文档: Quick Start
-
[2] 许利杰,方亚芬. 大数据处理框架Apache Spark设计与实现[M]. 电子工业出版社, 2021.
- [4] similarface: Spark数据分区数量的原理
Original: https://www.cnblogs.com/orion-orion/p/16314837.html
Author: orion-orion
Title: Spark: 单词计数(Word Count)的MapReduce实现(Java/Python)
相关阅读1
Title: Kubernetes K8s 结合国内外文章解决 The kubelet is not running
镜像下载、域名解析、时间同步请点击阿里云开源镜像站
初学K8s,好好教程不跟着来,非要全部安装最新的docker 和 k8s,经过4小时奋战终于解决!
先说明下docker是最新版的安装,又安装K8s
cat > /etc/yum.repos.d/kubernetes.repo <<eof [kubernetes] name="Kubernetes" baseurl="https://mirrors.aliyun.com/kubernetes/yum/repos/kubernetes-el7-x86_64" enabled="1" gpgcheck="0" repo_gpgcheck="0" gpgkey="https://mirrors.aliyun.com/kubernetes/yum/doc/yum-key.gpg" https: mirrors.aliyun.com kubernetes yum doc rpm-package-key.gpg eof < code></eof>
yum install -y --nogpgcheck kubelet kubeadm kubectl
systemctl enable kubelet
这样直接安装成功,得到 1.23.5-0,版本号就是v1.23.5
然后噩梦开始,视频教程人家指定了v1.19.0直接成功安装,我这倒好,报错开始
Initial timeout of 40s passed.
[kubelet-check] It seems like the kubelet isn't running or healthy.
[kubelet-check] The HTTP call equal to 'curl -sSL http://localhost:10248/healthz' failed with error: Get "http://localhost:10248/healthz": dial tcp [::1]:10248: connect: connection refused.
[kubelet-check] It seems like the kubelet isn't running or healthy.
....
This error is likely caused by:
- The kubelet is not running
- The kubelet is unhealthy due to a misconfiguration of the node in some way (required cgroups disabled)
报错如上,首先先把你之前查到的资料见鬼去,反正在我这是不好使的(我这里都是新版,2022-03-18新装的)
1、systemctl start kubelet 这个在只有【主】的时候根本起不来,所以不用折腾看状态了,有就行了
2、/etc/systemd/system/kubelet.service.d/10-kubeadm.conf 文件,我相信你一定查到了,为啥我没有,因为你的在这/usr/lib/systemd/system/kubelet.service.d/10-kubeadm.conf(这个你最好没改过,改过的话改回去,保持默认就好),上面那个文件就该没有的,连那个目录都不该有的(再次强调新版!旧版不知道,v1.23.5我这是),有文章让你新建并添加什么的,删了吧没用的,启动了也是假象
[root@k8s-master ~]# cd /etc/systemd/system/kubelet.service.d/
-bash: cd: /etc/systemd/system/kubelet.service.d/: 没有那个文件或目录
3、echo '{"exec-opts": ["native.cgroupdriver=systemd"]}' >> /etc/docker/daemon.json,我相信你已经见过这句了,是的!就是这句,这句真是重点!一开始我就对了,但是先往下看吧,真坑
4、还有让你们改docker images tag的,歇了吧,用不到的
开始噩梦安装之旅,注意这里我已经替换为国内的阿里源,所以根本不用改 docker Tag!只要能正常拉回来就没有问题!
kubeadm init \
--image-repository registry.aliyuncs.com/google_containers \
--kubernetes-version=v1.23.5 \
--pod-network-cidr=10.244.0.0/16 \
--service-cidr=10.96.0.0/12 \
--apiserver-advertise-address=192.168.0.2
重点来了
排错,这个也是重点,你知道是错在哪! 【重点1】
journalctl -xeu kubelet | grep Failed
定位为毛线在失败(实际是一行,我换行下,这样好看)
3月 18 20:21:04 k8s-master kubelet[36490]: E0318 20:21:04.954990 36490 server.go:302]
"Failed to run kubelet"
err="failed to run Kubelet: misconfiguration: kubelet cgroup driver: \"systemd\" is
different from docker cgroup driver: \"cgroupfs\""
实际你只要在你的/etc/docker/daemon.json 中加入 "exec-opts": ["native.cgroupdriver=systemd"]
是加入!不是追加,追加的不管用!!!我就是被坑在这了!怕你搞错,看例子! 【重点2】
1 {
2 "registry-mirrors": ["https://你的加速地址.mirror.aliyuncs.com"],
3 "log-driver": "json-file",
4 "log-opts": {
5 "max-size": "10m",
6 "max-file": "1"
7 }, 【看这!这加个逗号,下面加这句,结尾没有逗号】
8 "exec-opts": ["native.cgroupdriver=systemd"]
9 }
然后,你就可以
systemctl daemon-reload
systemctl restart docker
systemctl restart kubelet
kubeadm reset #重置了没事的,反正之前也起不来~
y
systemctl status kubelet #初始就是启动不了的
#验证cgroupdriver 修改生效 (这两行都是验证的)看到systemd 就对了
docker info -f {{.CgroupDriver}}
docker info | grep -i cgroup
最后再执行
kubeadm init \
--image-repository registry.aliyuncs.com/google_containers \
--kubernetes-version=v1.23.5 \
--pod-network-cidr=10.244.0.0/16 \
--service-cidr=10.96.0.0/12 \
--apiserver-advertise-address=192.168.0.2
世界核平!
好了,万马奔腾~
原文链接:https://blog.csdn.net/zhangbest5/article/details/123583927
Original: https://www.cnblogs.com/helong-123/p/16453847.html
Author: 萌褚
Title: Kubernetes K8s 结合国内外文章解决 The kubelet is not running
相关阅读2
Title: Linux Accounting(中文翻译)(2):Delay Accounting
任务在执行时等待某个内核资源会意外遇到延迟,例如可运行的任务正在等待空闲CPU。
per-task的延时统计功能测量下列情况下任务经历的延迟:
- 正在等待CPU,waiting for a CPU (while being runnable)
- 同步块I/O的完成,completion of synchronous block I/O initiated by the task
- 正在页面内交换,swapping in pages
- 内存回收,memory reclaim
- 占满页面缓存,thrashing page cache
- 直接压缩,direct compact
- 写保护拷贝,write-protect copy
这些统计功能通过taskstats接口提供给用户空间使用。
这些延迟给设置CPU优先级,IO优先级和合适的RSS限制提供了反馈。重要任务的长期延迟可以作为一个触发器来提高它的优先级。
这个功能,通过使用taskstats接口,也为所有的线程组(传统的UNIX进程)中的任务或者线程提供了延时统计,这种收集比内核收集更有效。
延迟统计使用的taskstats接口在本目录下的文档中被详细地描述。Taskstats给用户空间返回一个per-pid和per-tgid统计的通用数据结构。延迟统计功能计算这个结构中的具体数据域。参看 include/uapi/linux/taskstats.h
中的与延迟统计有关的数据域描述。
它一般以计数器的形式返回累计延迟(for cpu, sync block I/O, swapin, memory reclaim, thrash page cache, direct compact, write-protect copy等)。
获取任务计数器(cpu_delay_total)的两个连续读数的差异,由于等待这个间隔内的相关资源也会带来任务延迟。
当任务退出时,per-task统计的记录被发送给用户空间。如果它是线程组中最后退出的任务,per-tgid统计也会被发送。更多细节由taskstats接口描述给出。
在tools/accounting目录下的用户空间程序getdelays.c允许运行简单的命令和显示相关的延迟统计信息。它也用作taskstats的简单示例。
编译内核:
CONFIG_TASK_DELAY_ACCT=y
CONFIG_TASKSTATS=y
延迟统计启动时默认是禁止的,要使能它,增加 delayacct
到内核启动选项。要么使用sysctl kernel.task_delayacct在运行时来切换状态。注意只要在使能之后启动任务,它就有delayacct信息。
系统系统之后,使用跟getdelays.c相似的程序来访问任务或者任务组(tgid)相关的延迟。应用程序也允许执行命令和查看相关的延迟。
getdelays命令的一般格式:
getdelays [-dilv] [-t tgid] [-p pid]
获取自系统启动以来PID 10的延迟:
# ./getdelays -d -p 10
(output similar to next case)
获取自系统启动以来tgid 5中所有pid的延迟总和:
# ./getdelays -d -t 5
print delayacct stats ON
TGID 5
CPU count real total virtual total delay total delay average
8 7000000 6872122 3382277 0.423ms
IO count delay total delay average
0 0 0ms
SWAP count delay total delay average
0 0 0ms
RECLAIM count delay total delay average
0 0 0ms
THRASHING count delay total delay average
0 0 0ms
COMPACT count delay total delay average
0 0 0ms
WPCOPY count delay total delay average
0 0 0ms
获取pid1的IO统计:
# ./getdelays -i -p 1
printing IO accounting
linuxrc: read=65536, write=0, cancelled_write=0
上述命令可以使用-v来获取等多调试信息。
Original: https://www.cnblogs.com/aosp/p/16369720.html
Author: 河东西望
Title: Linux Accounting(中文翻译)(2):Delay Accounting
相关阅读3
Title: 【Linux】使用 apt-get 查询并安装指定版本的软件
镜像下载、域名解析、时间同步请点击阿里云开源镜像站
一、通过apt-get安装指定版本的软件
$ sudo apt-get install package=version
version是软件版本号,package是要安装的软件
二、查询指定软件有多少个版本
1、通过网站搜索
https://packages.ubuntu.com/
2、使用 apt-cache madison 列出软件的所有来源
$ sudo apt-cache madison package
madison 是一个 apt-cache 子命令,可以通过man apt-cache查询更多apt-cache的用法。
3、使用 apt-cache policy 列出软件的所有来源
$ sudo apt-cache policy vim
policy 是一个 apt-cache 子命令,可以通过 man apt-cache 查询更多apt-cache用法。
4、使用 apt-cache showpkg 列出软件的所有来源
$ sudo apt-cache showpkg vim
5、使用 apt-get install -s 安装软件
$ sudo apt-get install -s package
6、使用 apt-show-versions 列出软件所有版本,并查看是否已经安装
$ sudo apt-get install apt-show-versions
$ apt-show-versions -a vim
还可以通过apt-show-versions -u package查询是否有升级版本。
7、查询指定包的详情
$ sudo apt-cache show package
或者
$ dpkg -l package
8、显示已安装包的详情
$ dpkg -s package
或者
dpkg-query -s package
三、使用技巧
在查询后面,我们可以带上一些参数来实现筛选
$ sudo apt-cache show package | grep version
$ sudo apt-show-versions | more
原文链接:https://blog.csdn.net/Cappuccino_jay/article/details/125224053
Original: https://www.cnblogs.com/helong-123/p/16470499.html
Author: 萌褚
Title: 【Linux】使用 apt-get 查询并安装指定版本的软件