Spark: 单词计数(Word Count)的MapReduce实现(Java/Python)

Linux60

1 导引

我们在博客《Hadoop: 单词计数(Word Count)的MapReduce实现 》中学习了如何用Hadoop-MapReduce实现单词计数,现在我们来看如何用Spark来实现同样的功能。

2. Spark的MapReudce原理

Spark框架也是MapReduce-like模型,采用"分治-聚合"策略来对数据分布进行分布并行处理。不过该框架相比Hadoop-MapReduce,具有以下两个特点:

  • 对大数据处理框架的输入/输出,中间数据进行建模,将这些数据抽象为统一的数据结构命名为弹性分布式数据集(Resilient Distributed Dataset),并在此数据结构上构建了一系列通用的数据操作,使得用户可以简单地实现复杂的数据处理流程。
  • 采用了基于内存的数据聚合、数据缓存等机制来加速应用执行尤其适用于迭代和交互式应用。

Spark社区推荐用户使用Dataset、DataFrame等面向结构化数据的高层API(Structured API)来替代底层的RDD API,因为这些高层API含有更多的数据类型信息(Schema),支持SQL操作,并且可以利用经过高度优化的Spark SQL引擎来执行。不过,由于RDD API更基础,更适合用来展示基本概念和原理,后面我们的代码都使用RDD API。

Spark的RDD/dataset分为多个分区。RDD/Dataset的每一个分区都映射一个或多个数据文件, Spark通过该映射读取数据输入到RDD/dataset中。

因为我们这里采用的本地单机多线程调试模式,默认分区数即为本地机器使用的线程数,若在代码中设置了 local[N](使用 N个线程),则默认为 N个分区;若设为 local[*](使用本地CPU核数个线程),则默认分区数为本地CPU核数。大家可以通过调用 RDD对象的 getNumPartitions()查看实际分区个数。

我们下面的流程描述中,假设每个文件对应一个分区。

Spark的Map示意图如下:
Spark: 单词计数(Word Count)的MapReduce实现(Java/Python)

Spark的Reduce示意图如下:
Spark: 单词计数(Word Count)的MapReduce实现(Java/Python)

3. Word Count的Java实现

项目架构如下图:

Word-Count-Spark
├─ input
│  ├─ file1.txt
│  ├─ file2.txt
│  └─ file3.txt
├─ output
│  └─ result.txt
├─ pom.xml
├─ src
│  ├─ main
│  │  └─ java
│  │     └─ WordCount.java
│  └─ test
└─ target

WordCount.java文件如下:

package com.orion;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;

import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import java.io.*;
import java.nio.file.*;

public class WordCount {
    private static Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {
        if (args.length != 3) {
            System.err.println("Usage: WordCount   ");
            System.exit(1);
        }
                String input_path = args[0];
                String output_path = args[1];
        int n_threads = Integer.parseInt(args[2]);

        SparkSession spark = SparkSession.builder()
            .appName("WordCount")
            .master(String.format("local[%d]", n_threads))
            .getOrCreate();

        JavaRDD lines = spark.read().textFile(input_path).javaRDD();

        JavaRDD words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
        JavaPairRDD ones = words.mapToPair(s -> new Tuple2<>(s, 1));
        JavaPairRDD counts = ones.reduceByKey((i1, i2) -> i1 + i2);

        List> output = counts.collect();

                String filePath = Paths.get(output_path, "result.txt").toString();
                BufferedWriter out = new BufferedWriter(new FileWriter(filePath));
        for (Tuple2 tuple : output) {
            out.write(tuple._1() + ": " + tuple._2() + "\n");
        }
        out.close();
                spark.stop();
    }
}

pom.xml文件配置如下:


  4.0.0

  com.WordCount
  WordCount
  1.0-SNAPSHOT

  WordCount

  http://www.example.com

    2.12.10
    2.12
    UTF-8
    UTF-8
    UTC
    11
    1.4.0
    3.7.1
    3.1.2
    2.0.0
    4.4.0
    3.8.0
    3.2.0
    3.2.1
    2.8.2
    1.6.8
    3.2.0
    1.6
    2.22.2
    UTF-8
    11
    11
    3.2.1

      junit
      junit
      4.11
      test

        org.scala-lang
        scala-library
        ${scala.version}
        provided

        org.apache.spark
        spark-core_2.12
        ${spark.version}

        org.apache.spark
        spark-sql_2.12
        ${spark.version}
        provided

          maven-clean-plugin
          3.1.0

          maven-resources-plugin
          3.0.2

          maven-compiler-plugin
          3.8.0

          maven-surefire-plugin
          2.22.1

          maven-jar-plugin
          3.0.2

          maven-install-plugin
          2.5.2

          maven-deploy-plugin
          2.8.2

          maven-site-plugin
          3.7.1

          maven-project-info-reports-plugin
          3.0.0

          maven-compiler-plugin
          3.8.0

              11
              11
              true
              /Library/Java/JavaVirtualMachines/jdk-11.0.15.jdk/Contents/Home/bin/javac

记得配置输入参数 inputoutput3分别代表输入目录、输出目录和使用本地线程数(在VSCode中在 launch.json文件中配置)。编译运行后可在 output目录下查看 result.txt

Tom: 1
Hello: 3
Goodbye: 1
World: 2
David: 1

可见成功完成了单词计数功能。

4. Word Count的Python实现

先使用pip按照 pyspark==3.8.2

pip install pyspark==3.8.2

注意PySpark只支持Java 8/11,请勿使用更高级的版本。这里我使用的是Java 11。运行 java -version可查看本机Java版本。

(base) orion-orion@MacBook-Pro ~ % java -version
java version "11.0.15" 2022-04-19 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.15+8-LTS-149)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.15+8-LTS-149, mixed mode)

项目架构如下:

Word-Count-Spark
&#x251C;&#x2500; input
&#x2502;  &#x251C;&#x2500; file1.txt
&#x2502;  &#x251C;&#x2500; file2.txt
&#x2502;  &#x2514;&#x2500; file3.txt
&#x251C;&#x2500; output
&#x2502;  &#x2514;&#x2500; result.txt
&#x251C;&#x2500; src
&#x2502;  &#x2514;&#x2500; word_count.py

word_count.py编写如下:

from pyspark.sql import SparkSession
import sys
import os
from operator import add

if len(sys.argv) != 4:
    print("Usage: WordCount   ", file=sys.stderr)
    exit(1)

input_path, output_path, n_threads = sys.argv[1], sys.argv[2], int(sys.argv[3])

spark = SparkSession.builder.appName("WordCount").master("local[%d]" % n_threads).getOrCreate()

lines = spark.read.text(input_path).rdd.map(lambda r: r[0])

counts = lines.flatMap(lambda s: s.split(" "))\
    .map(lambda word: (word, 1))\
    .reduceByKey(add)

output = counts.collect()

with open(os.path.join(output_path, "result.txt"), "wt") as f:
    for (word, count) in output:
        f.write(str(word) +": " + str(count) + "\n")

spark.stop()

使用 python word_count.py input output 3运行后,可在 output中查看对应的输出文件 result.txt

Hello: 3
World: 2
Goodbye: 1
David: 1
Tom: 1

可见成功完成了单词计数功能。

参考

Original: https://www.cnblogs.com/orion-orion/p/16314837.html
Author: orion-orion
Title: Spark: 单词计数(Word Count)的MapReduce实现(Java/Python)



相关阅读1

Title: Kubernetes K8s 结合国内外文章解决 The kubelet is not running

镜像下载、域名解析、时间同步请点击阿里云开源镜像站

初学K8s,好好教程不跟着来,非要全部安装最新的docker 和 k8s,经过4小时奋战终于解决!

先说明下docker是最新版的安装,又安装K8s

cat > /etc/yum.repos.d/kubernetes.repo <<eof [kubernetes] name="Kubernetes" baseurl="https://mirrors.aliyun.com/kubernetes/yum/repos/kubernetes-el7-x86_64" enabled="1" gpgcheck="0" repo_gpgcheck="0" gpgkey="https://mirrors.aliyun.com/kubernetes/yum/doc/yum-key.gpg" https: mirrors.aliyun.com kubernetes yum doc rpm-package-key.gpg eof < code></eof>
yum install -y --nogpgcheck kubelet kubeadm kubectl
systemctl enable kubelet

这样直接安装成功,得到 1.23.5-0,版本号就是v1.23.5

然后噩梦开始,视频教程人家指定了v1.19.0直接成功安装,我这倒好,报错开始

Initial timeout of 40s passed.

[kubelet-check] It seems like the kubelet isn't running or healthy.

[kubelet-check] The HTTP call equal to 'curl -sSL http://localhost:10248/healthz' failed with error: Get "http://localhost:10248/healthz": dial tcp [::1]:10248: connect: connection refused.

[kubelet-check] It seems like the kubelet isn't running or healthy.

....

This error is likely caused by:
        - The kubelet is not running
        - The kubelet is unhealthy due to a misconfiguration of the node in some way (required cgroups disabled)

报错如上,首先先把你之前查到的资料见鬼去,反正在我这是不好使的(我这里都是新版,2022-03-18新装的)

1、systemctl start kubelet 这个在只有【主】的时候根本起不来,所以不用折腾看状态了,有就行了

2、/etc/systemd/system/kubelet.service.d/10-kubeadm.conf 文件,我相信你一定查到了,为啥我没有,因为你的在这/usr/lib/systemd/system/kubelet.service.d/10-kubeadm.conf(这个你最好没改过,改过的话改回去,保持默认就好),上面那个文件就该没有的,连那个目录都不该有的(再次强调新版!旧版不知道,v1.23.5我这是),有文章让你新建并添加什么的,删了吧没用的,启动了也是假象

[root@k8s-master ~]# cd /etc/systemd/system/kubelet.service.d/
-bash: cd: /etc/systemd/system/kubelet.service.d/: &#x6CA1;&#x6709;&#x90A3;&#x4E2A;&#x6587;&#x4EF6;&#x6216;&#x76EE;&#x5F55;

3、echo '{"exec-opts": ["native.cgroupdriver=systemd"]}' >> /etc/docker/daemon.json,我相信你已经见过这句了,是的!就是这句,这句真是重点!一开始我就对了,但是先往下看吧,真坑

4、还有让你们改docker images tag的,歇了吧,用不到的

开始噩梦安装之旅,注意这里我已经替换为国内的阿里源,所以根本不用改 docker Tag!只要能正常拉回来就没有问题!

kubeadm init \
  --image-repository registry.aliyuncs.com/google_containers \
  --kubernetes-version=v1.23.5 \
  --pod-network-cidr=10.244.0.0/16  \
  --service-cidr=10.96.0.0/12  \
  --apiserver-advertise-address=192.168.0.2

重点来了

排错,这个也是重点,你知道是错在哪! 【重点1】

journalctl -xeu kubelet | grep Failed

定位为毛线在失败(实际是一行,我换行下,这样好看)

3&#x6708; 18 20:21:04 k8s-master kubelet[36490]: E0318 20:21:04.954990   36490 server.go:302]
"Failed to run kubelet"
err="failed to run Kubelet: misconfiguration: kubelet cgroup driver: \"systemd\" is
different from docker cgroup driver: \"cgroupfs\""

实际你只要在你的/etc/docker/daemon.json 中加入 "exec-opts": ["native.cgroupdriver=systemd"]

是加入!不是追加,追加的不管用!!!我就是被坑在这了!怕你搞错,看例子! 【重点2】

  1 {
  2   "registry-mirrors": ["https://&#x4F60;&#x7684;&#x52A0;&#x901F;&#x5730;&#x5740;.mirror.aliyuncs.com"],
  3   "log-driver": "json-file",
  4   "log-opts": {
  5     "max-size": "10m",
  6     "max-file": "1"
  7    },   &#x3010;&#x770B;&#x8FD9;&#xFF01;&#x8FD9;&#x52A0;&#x4E2A;&#x9017;&#x53F7;&#xFF0C;&#x4E0B;&#x9762;&#x52A0;&#x8FD9;&#x53E5;&#xFF0C;&#x7ED3;&#x5C3E;&#x6CA1;&#x6709;&#x9017;&#x53F7;&#x3011;
  8   "exec-opts": ["native.cgroupdriver=systemd"]
  9 }

然后,你就可以

systemctl daemon-reload
systemctl restart docker
systemctl restart kubelet
kubeadm reset #&#x91CD;&#x7F6E;&#x4E86;&#x6CA1;&#x4E8B;&#x7684;&#xFF0C;&#x53CD;&#x6B63;&#x4E4B;&#x524D;&#x4E5F;&#x8D77;&#x4E0D;&#x6765;~
y

systemctl status kubelet #&#x521D;&#x59CB;&#x5C31;&#x662F;&#x542F;&#x52A8;&#x4E0D;&#x4E86;&#x7684;
#&#x9A8C;&#x8BC1;cgroupdriver &#x4FEE;&#x6539;&#x751F;&#x6548; &#xFF08;&#x8FD9;&#x4E24;&#x884C;&#x90FD;&#x662F;&#x9A8C;&#x8BC1;&#x7684;&#xFF09;&#x770B;&#x5230;systemd &#x5C31;&#x5BF9;&#x4E86;
docker info -f {{.CgroupDriver}}
docker info | grep -i cgroup

最后再执行

kubeadm init \
  --image-repository registry.aliyuncs.com/google_containers \
  --kubernetes-version=v1.23.5 \
  --pod-network-cidr=10.244.0.0/16  \
  --service-cidr=10.96.0.0/12  \
  --apiserver-advertise-address=192.168.0.2

世界核平!

Spark: 单词计数(Word Count)的MapReduce实现(Java/Python)

好了,万马奔腾~

原文链接:https://blog.csdn.net/zhangbest5/article/details/123583927

Original: https://www.cnblogs.com/helong-123/p/16453847.html
Author: 萌褚
Title: Kubernetes K8s 结合国内外文章解决 The kubelet is not running

相关阅读2

Title: Linux Accounting(中文翻译)(2):Delay Accounting

任务在执行时等待某个内核资源会意外遇到延迟,例如可运行的任务正在等待空闲CPU。

per-task的延时统计功能测量下列情况下任务经历的延迟:

  • 正在等待CPU,waiting for a CPU (while being runnable)
  • 同步块I/O的完成,completion of synchronous block I/O initiated by the task
  • 正在页面内交换,swapping in pages
  • 内存回收,memory reclaim
  • 占满页面缓存,thrashing page cache
  • 直接压缩,direct compact
  • 写保护拷贝,write-protect copy
    这些统计功能通过taskstats接口提供给用户空间使用。

这些延迟给设置CPU优先级,IO优先级和合适的RSS限制提供了反馈。重要任务的长期延迟可以作为一个触发器来提高它的优先级。

这个功能,通过使用taskstats接口,也为所有的线程组(传统的UNIX进程)中的任务或者线程提供了延时统计,这种收集比内核收集更有效。

延迟统计使用的taskstats接口在本目录下的文档中被详细地描述。Taskstats给用户空间返回一个per-pid和per-tgid统计的通用数据结构。延迟统计功能计算这个结构中的具体数据域。参看 include/uapi/linux/taskstats.h中的与延迟统计有关的数据域描述。

它一般以计数器的形式返回累计延迟(for cpu, sync block I/O, swapin, memory reclaim, thrash page cache, direct compact, write-protect copy等)。

获取任务计数器(cpu_delay_total)的两个连续读数的差异,由于等待这个间隔内的相关资源也会带来任务延迟。

当任务退出时,per-task统计的记录被发送给用户空间。如果它是线程组中最后退出的任务,per-tgid统计也会被发送。更多细节由taskstats接口描述给出。

在tools/accounting目录下的用户空间程序getdelays.c允许运行简单的命令和显示相关的延迟统计信息。它也用作taskstats的简单示例。

编译内核:

CONFIG_TASK_DELAY_ACCT=y
CONFIG_TASKSTATS=y

延迟统计启动时默认是禁止的,要使能它,增加 delayacct到内核启动选项。要么使用sysctl kernel.task_delayacct在运行时来切换状态。注意只要在使能之后启动任务,它就有delayacct信息。

系统系统之后,使用跟getdelays.c相似的程序来访问任务或者任务组(tgid)相关的延迟。应用程序也允许执行命令和查看相关的延迟。

getdelays命令的一般格式:

getdelays [-dilv] [-t tgid] [-p pid]

获取自系统启动以来PID 10的延迟:

# ./getdelays -d -p 10
(output similar to next case)

获取自系统启动以来tgid 5中所有pid的延迟总和:

# ./getdelays -d -t 5
print delayacct stats ON
TGID    5

CPU             count     real total  virtual total    delay total  delay average
                    8        7000000        6872122        3382277          0.423ms
IO              count    delay total  delay average
                    0              0              0ms
SWAP            count    delay total  delay average
                    0              0              0ms
RECLAIM         count    delay total  delay average
                    0              0              0ms
THRASHING       count    delay total  delay average
                    0              0              0ms
COMPACT         count    delay total  delay average
                    0              0              0ms
WPCOPY          count    delay total  delay average
                    0              0              0ms

获取pid1的IO统计:

# ./getdelays -i -p 1
printing IO accounting
linuxrc: read=65536, write=0, cancelled_write=0

上述命令可以使用-v来获取等多调试信息。

Original: https://www.cnblogs.com/aosp/p/16369720.html
Author: 河东西望
Title: Linux Accounting(中文翻译)(2):Delay Accounting

相关阅读3

Title: 【Linux】使用 apt-get 查询并安装指定版本的软件

镜像下载、域名解析、时间同步请点击阿里云开源镜像站

一、通过apt-get安装指定版本的软件

$ sudo apt-get install package=version

version是软件版本号,package是要安装的软件

二、查询指定软件有多少个版本

1、通过网站搜索

https://packages.ubuntu.com/

2、使用 apt-cache madison 列出软件的所有来源

$ sudo apt-cache madison package

Spark: 单词计数(Word Count)的MapReduce实现(Java/Python)

madison 是一个 apt-cache 子命令,可以通过man apt-cache查询更多apt-cache的用法。

3、使用 apt-cache policy 列出软件的所有来源

$ sudo apt-cache policy vim

Spark: 单词计数(Word Count)的MapReduce实现(Java/Python)

policy 是一个 apt-cache 子命令,可以通过 man apt-cache 查询更多apt-cache用法。

4、使用 apt-cache showpkg 列出软件的所有来源

$ sudo apt-cache showpkg  vim

Spark: 单词计数(Word Count)的MapReduce实现(Java/Python)

5、使用 apt-get install -s 安装软件

$ sudo apt-get install -s package

6、使用 apt-show-versions 列出软件所有版本,并查看是否已经安装

$ sudo apt-get install apt-show-versions
$ apt-show-versions -a vim

Spark: 单词计数(Word Count)的MapReduce实现(Java/Python)

还可以通过apt-show-versions -u package查询是否有升级版本。

7、查询指定包的详情

$ sudo apt-cache show package

或者

$ dpkg -l package

8、显示已安装包的详情

$ dpkg -s package

或者

dpkg-query -s package

三、使用技巧

在查询后面,我们可以带上一些参数来实现筛选

$ sudo apt-cache show package | grep version

$ sudo apt-show-versions | more

原文链接:https://blog.csdn.net/Cappuccino_jay/article/details/125224053

Original: https://www.cnblogs.com/helong-123/p/16470499.html
Author: 萌褚
Title: 【Linux】使用 apt-get 查询并安装指定版本的软件