Hadoop_MapReduce架构

大数据58

MapReduce

4.1 MapReduce原理

  • 4.1.1 概述
  • 4.1.2MapReduce 的主要功能
    • 4.1.3MapReduce 的处理流程

4.1.1 MapReduce概述

MapReduce是面向大数据并行处理的计算模型、框架和平台,它包含以下三层含义。

(1)MapReduce是一个基于集群的高性能并行计算平台。
(2)MapReduce是一个并行计算与运行软件框架。
(3)MapReduce是一个并行程序设计模型与方法。

4.1.2 MapReduce的主要功能

1.数据划分和计算任务调度

Job 和 Task

Job待处理的数据划分为多个数据块
Task自动调度计算节点来处理相应的数据块

2.数据/代码互定位

本地化数据处理:一个计算节点尽可能处理其本地磁盘上所分布存储的数据,实现代码向数据的迁移。

3.系统优化

中间结果进入Reduce节点前会进行合并处理,一个Reduce节点所处理的数据可能来自多个Map节点。

4.出错检测和恢复

系统将维护数据存储的可靠性,用多备份冗余存储机制提高数据存储的可靠性,并能及时检测和恢复出错数据。

4.1.3 MapReduce的处理流程

Map

Shuffle

Reduce

MapShuffle

ReduceShuffle

1.Map和Reduce函数用户自定义,shuffle是由系统自动实现。
2.最终得到一个分区有序文件(具有Partition值的键值对存储在一起,并按key值进行升序排序)

4.2 Mapreduce 的编程基础

4.2.1内置数据类型

  • [Hadoops数据类型的使用]
    下面展示一些 数据类型

数据类型ValueBooleanWritable布尔型ByteWritable单字节数值DoubleWritable双字节数FlowWritable浮点IntWritable整形LongWritable长整型TextUTF-8格式存储的文本NullWritable当中的key或value为NULL时使用ArrayWritable数组

1) Hadoop数据类型的使用


package com.etc;

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

public class HadoopDataType {

    public static void testText() {
    System.out.println("testText");
    Text text = new Text ("hello hadoop!");
    System.out.println (text .getLength());
    System.out.println(text .find("a"));
    System. out.println (text.toString());
    }

    public static void testArrayWritable() {
    System. out . println ("testArrayWritable");
    ArrayWritable arr = new ArrayWritable (IntWritable.class);
    IntWritable year = new IntWritable (2017) ;
    IntWritable month = new IntWritable(07) ;
    IntWritable date = new IntWritable(01) ;
    arr.set (new IntWritable[] { year, month, date });
    System.out.println (String . format ("year=%d, month=%d, date=%d",
            ((IntWritable) arr .get()[0]).get(),
            ((IntWritable) arr.get()[1]).get(),
            ((IntWritable)arr.get() [2]) .get()));
    }

    public static void testMapWritable() {
    System.out.println("testMapWritable");
    MapWritable map = new MapWritable();
    Text k1 = new Text("name");
    Text v1 = new Text ("tonny");
    Text k2 = new Text ("password");
    map.put(k1, v1);
    map.put(k2, NullWritable.get());
    System.out.println(map.get (k1).toString());
    System.out.println(map.get(k2) .toString());
    }

    public static void main(String[] args) {

    testText();
    testArrayWritable();
    testMapWritable();
    }
}

2) 运行结果:
Hadoop_MapReduce架构

4.2.2 Hadoop MapReduce 架构

  • Hadoop MapReduce1.0体系架构 由 Client(客户端)、JobTracker(作业跟踪器)、TaskTracker (任务跟踪器)、Task(任务)组成。

Hadoop_MapReduce架构

MapReduce 设计的一个核心理念就是"计算向数据靠拢",而不是传统计算模式的"数据向计算靠拢"。这是因为移动大量数据需要的网络传输开销太大,同时也大大降低了数据处理的效率。

; 4.2.3 MapReduce的工作流程

Input

Map

Sort

Combine

Partition

Reduce

Output

4.3MapReduce示例
4.3.1 WordCont原理

Hadoop_MapReduce架构

  • *流程

Hadoop_MapReduce架构

  • 示例
    Hadoop_MapReduce架构 *程序代码
package com.etc;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 2) {
          System.err.println("Usage: wordcount  [...] ");
          System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < otherArgs.length - 1; ++i) {
          FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job,
          new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
    }
4.3.2 学生平均成绩
  • 示例
    *程序代码
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text;
import org.apache.hadoopmapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib。output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericoptionsParser;

public class Score {
public static class Map extends

Mapper<LongWritable,Text,Text,IntWritable>
{

public void map(LongWritable key,Text value,Context context)
      throws IoException, InterruptedExcepti {

      string line = value . toString();

  StringTokenizer tokenizerArticle =new StringTokenizer(tokenizerArticle.nextToken());

 while (tokenizerArticle.hasMoreElements ()){

 StringTokenizer tokenizerLine = new StringTokenizer(
  TokenizerArticle.nextToken());
  String strNmae = tokenizerLine.nextToken();
  String strScore = tokenizerLine.nextToken();
  Text name = new Text (strName);
  int scoreInt = Integer.parseInt(strScore);

context.write (name, new Intwritable (scoreInt));
    }
  }
}
   public static class Reduce extends
 Reducer<Text, Intwritable, Text, IntWritable> {

   public void reduce(Text key, Iterable<IntWritable> values,
      Context context) throws IOException, InterruptedException{
    int sum= 0;
    int count = 0;
    Iterator<Intwritable> iterator = values.iterator();

    while (iterator.hasNext()) {
     Sum += iterator.next().get();
     count++;
    }

    int average- (int) sum/count;
    context .write(key, new IntWritable(average));
      }
}

public static void main(String[] args) throws Exception {

  Configuration conf = new Contguration();
  conf.set ("mapred.job.tracker","hadoop:9001");
  String[] ioArgs = new String[]
    {"hdfs://hadoop0:9000/input/score/*","hdfs://hadoop0:9000/output/scoreout"};
String[] otherArgs = new GenericOptionsParset(conf, ioArgs).getRemainingArgs();
   if (otherArgs.length !=2) {
     System.err.println("Usage: Score Average  ");
     System.exit(2);
  Job job = new Job(conf, "Score Average");
  job.setJarByClass(Score.class);

  job.setMapperClass(Map.class);
  job.setCombinerClass(Reduce.class) ;
  job.setReducerClass(Reduce.class) ;
    //设置输出类型
  job.setOutputKeyClass(Text.class);
  job.setOutputvalueClass (Intwritable.class);
    //将输入的数据集分割成小数据块splites,提供一个RecordReder的实现
  job.setInputFormatClass (TextInputFormat.class) ;
    //提供一个RecordWriter的实现,负责数据输出
  job.setOutputFormatClass (TextOutputFormat.class) ;
    //设置输入和输出目录
  FileInputFormat.addInputPath(job, new Path(otherArgs[0]);
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}

Original: https://www.cnblogs.com/lang12/p/15361029.html
Author: Aurora*
Title: Hadoop_MapReduce架构



相关阅读

Title: Docker Compose 的介绍、安装与使用

一、什么是 Docker Compose

ComposeDocker 官方的开源项目,负责实现Docker容器集群的快速编排,开源代码在 https://github.com/docker/compose 上。

我们知道使用 Dockerfile 模板文件可以让用户很方便的定义一个单独的应用容器,其实在工作中,经常会碰到需要多个容器相互配合来完成的某项任务情况,例如工作中的web服务容器本身,往往会在后端加上数据库容器,甚至会有负责均衡器。

Compose 就是来做这个事情的,它允许用户通过一个单独的 docker-compose.yml 模板文件(YAML格式)来定义一组相关联的应用容器为一个项目( project )

Compose 中有两个重要的概念:

  • 服务( service ):一个应用的容器,实际上可以包括若干运行相同镜像的容器实例
  • 项目( project ):由一组关联的应用容器组成的一个完整业务单元,在 docker-compose.yml 中定义

Compose 项目是由Python编写的,实际上就是调用了Docker服务提供的API来对容器进行管理,因此,只要所在的操作系统的平台支持Docker API,就可以在其上利用Compose来进行编排管理。

二、为什么要使用 Docker Compose

假如有几十上百个容器,并且容器之间还存在依赖,光是忙着搭建容器都耗掉一天了,还谈什么Devops,那有没有什么方便快捷的组建,可以让我们通过一个配置就搞定容器编排和运行呢?

Docker Compose 的特点

Docker Compose 就是为了简化多容器配置和管理工作而生的,可以简化大量重复的手动工作,具有以下主要特点:

  • 提供工具用于定义和运行多个docker容器应用
  • 使用yaml文件来配置应用服务( docker-compse.yml )
  • 可以通过一个简单的命令 docker-compse up 可以按照依赖关系启动所有服务
  • 可以通过一个简单的命令 docker-compose down 停止所有服务
  • 当一个服务需要的时候,可以很简单地通过 --scale 进行扩容

Docker Compose 的考虑理由

  • 可移植性Docker Compose仅需一个命令即可提供完整的开发环境: docker-compose up,然后使用 docker-compose down 轻松将其拆解。 这使我们的开发人员可以将开发环境保持在一个中立位置,并帮助我们轻松地部署应用程序。
  • 测试Compose的另一个重要功能是通过将其置于自己的环境中,以快速可重复的方式支持运行单元和E2E测试。这意味着,您可以运行与生产环境非常相似的环境,而不是在本地/主机系统上测试应用程序。
  • 单个主机上的多个隔离环境Compose使用项目名称将环境彼此隔离,这带来了以下好处:
  • 您可以在一台计算机上运行同一环境的多个副本
  • 它可以防止不同的项目和服务相互干扰

三、 Docker Compose 的使用场景

  • 单主机部署: 传统上,Compose专注于开发和测试,但现在可用于在单个主机系统上进行部署和管理容器的整个部署过程。
  • 开发环境: Compose提供了在孤立的环境中运行应用程序的能力,该环境可以在安装了Docker的任何计算机上运行。 这使测试你的应用程序变得非常容易,并提供了一种尽可能接近生产环境的工作方式。 Compose文件管理应用程序的所有依赖项(数据库,队列,缓存等),并且可以使用单个命令创建每个容器。
  • 自动化测试环境: 持续集成和整个开发过程的重要组成部分是自动化测试套件,该套件要求可以在其中执行测试的环境。Compose提供了一种方便的方法来创建和销毁与您的生产环境接近的隔离测试环境。

Hadoop_MapReduce架构

四、安装 Docker Compose

sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose

Hadoop_MapReduce架构

sudo apt  install docker-compose  # version 1.25.0-1

Hadoop_MapReduce架构
* 测试安装版本:

docker-compose --version

五、如何使用 Docker Compose

使用 Compose 基本上只要三步:

  1. 通过编辑 Dockerfile 定义应用程序发布所需的运行环境
  2. 通过编辑 docker-compose.yml 文件定义多个容器一起运行的环境和相互关系
  3. 运行 docker-compose up 开始你的整个应用系统

Docker Compose 命令

Compose 大部分命令的对象即可以是项目的本身,也可以是指定为项目中的服务或者容器。

执行 docker-compose [COMMAND] --help 或者 docker-compose help [COMMAND] 可以查看命令的帮助信息

  • 具体的使用格式 docker-compose [-f=<arg>...] [options] [COMMAND] [ARGS]</arg>
  • 参数选项

    -f, --file file指定模板文件,默认是docker-compose.yml模板文件,可以多次指定 -p, --project-name name指定项目名称,默认使用所在目录名称作为项目名称 --x-networking 使用Docker的后端可插拔网络特性 --x-networking-driver driver指定网络的后端驱动,默认使用bridge --verbose 输入更多的调试信息 -v, --version 输出版本信息

官方链接:https://docs.docker.com/compose/reference/build/

模版文件

模板文件是 Compose 的核心,涉及的指令关键字比较多,但是大部分的指令与docker run相关的参数的含义是类似的。

Original: https://www.cnblogs.com/DingyLand/p/yunjisuan_02_.html
Author: stu(dying)
Title: Docker Compose 的介绍、安装与使用