【sparkSQL】创建DataFrame及保存

人工智能65

首先我们要创建SparkSession

```java;gutter:true;
val spark = SparkSession.builder()
.appName("test")
.master("local")
.getOrCreate()
import spark.implicits._ //将RDD转化成为DataFrame并支持SQL操作


然后我们通过SparkSession来创建DataFrame

**1.使用 `toDF` 函数创建DataFrame**

通过导入(importing)spark.implicits, 就可以将本地序列(seq), 数组或者RDD转为DataFrame。

只要这些数据的内容能指定数据类型即可。

```java;gutter:true;
import spark.implicits._
val df = Seq(
  (1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")),
  (2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))
).toDF("id", "name", "created_time")

注意:如果直接用toDF()而不指定列名字,那么默认列名为"_1", "_2"

可以通过df.withColumnRenamed("_1", "newName1").withColumnRenamed("_2", "newName2")进行修改列名

2.使用 createDataFrame 函数创建DataFrame

通过schema + row 来创建

我们可以通俗的理解为schema为表的表头,row为表的数据记录

```java;gutter:true;
import org.apache.spark.sql.types._
//定义dataframe的结构的schema
val schema = StructType(List(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("create_time", DateType, nullable = true)
))
//定义dataframe内容的rdd
val rdd = sc.parallelize(Seq(
Row(1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")),
Row(2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))
))
//创建dataframe
val df = spark.createDataFrame(rdd, schema)


![【sparkSQL】创建DataFrame及保存](https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20221130/1336360-20180515124341925-237609173.png)

不过,我们可以把文件结构当做参数来使用,通过rdd自动产生schema和row,不用自己手动生成。

```java;gutter:true;
import org.apache.spark.sql.types._

//传入属性参数
val schemaString = " id name create_time"
//解析参数变成StructField
val fields = schemaString.split(" ")
                         .map(fieldName => StructField(fieldname, StringType, nullable = true))
//定义dataframe的结构的schema
val schema = StructType(fields)

//定义dataframe内容的rdd
val lines = sc.textFile("file:///people.txt")
val rdd = lines.spilt(_.split(","))
               .map(attributes=>ROW(attributes(0),attributes(1).trim) )

//创建dataframe
val df = spark.createDataFrame(rdd, schema)

3.通过反射机制创建DataFrame

首先要定义一个case class,因为只有case class才能被Spark隐式转化为DataFrame

```java;gutter:true;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits.
//创建匹配类
case class Person(id:Int,name:String,age:Long)
//读取文件生成rdd
val rdd = sc.textFile("file:///")
//通过匹配类把rdd转化成dataframe
val df = rdd.map(
.split(","))
.map(attributes => Person(attributes(0),attributes(1),attributes(2).trim.toInt)) .toDF()


**4.通过文件直接创建DataFrame**

(1)使用parquet文件read创建

```java;gutter:true;
val df = spark.read.parquet("hdfs:/path/to/file")

(2)使用json文件read创建

```java;gutter:true;
val df = spark.read.json("examples/src/main/resources/people.json")


(3)使用csv文件load创建

```java;gutter:true;
val df = spark.read
        .format("com.databricks.spark.csv")
        .option("header", "true") //reading the headers
        .option("mode", "DROPMALFORMED")
        .load("csv/file/path")

(4)使用Hive表创建

```java;gutter:true;
spark.table("test.person") // 库名.表名 的格式
.registerTempTable("person") // 注册成临时表
spark.sql(
"""
| select *
| from person
| limit 10
""".stripMargin).show()


记得,最后我们要调用spark.stop()来关闭SparkSession。

**5.保存**

**(1)通过df.write.format().save("file:///")保存**

write.format()支持输出的格式有 JSON、parquet、JDBC、orc、csv、text等文件格式

,save()定义保存的位置

<details><summary>*<font color='gray'>[En]</font>*</summary>*<font color='gray'></font>*</details>

里面的内容一般为

![【sparkSQL】创建DataFrame及保存](https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20221130/1336360-20180516102542287-483165641.png)

不用担心,这是没错的。

我们读取的时候,并不需要使用文件夹里面的part-xxxx文件,直接读取目录即可。

**(2)通过df.rdd.saveAsTextFile("file:///")转化成rdd再保存**

**我们对于不同格式的文件读写来说,我们一般使用两套对应方式**

```java;gutter:true;
val df = spark.read.格式("file:///")//读取文件
df.write.格式("file:///")//保存文件

```java;gutter:true;
val df = spark.read.format("").load("file:///")//读取文件
df.write.save("file:///")//保存文件


具体read和load方法有什么不同,我还不是很清楚,弄明白了回来补充。

**6.通过JDBC创建DataFrame**

我们在启动Spark-shell或者提交任务的时候需要添加相应的jar包

spark-shell(spark-submit)

--jars /usr/local/spark/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar \

--driver-class-path /usr/local/spark/mysql-connector-java-5.1.40-bin.jar

```java;gutter:true;
val jdbcDf = spark.read.format("jdbc")
    .option("driver", "com.mysql.jdbc.Driver")   //驱动
    .option("url", "jdbc:mysql://ip:3306")  //数据库地址
    .option("dbtable", "db.user_test") //表名:数据库名.表名
    .option("user", "test") //用户名
    .option("password", "123456")  //密码
    .load()
jdbcDf.show()

Original: https://www.cnblogs.com/zzhangyuhang/p/9040442.html
Author: zzhangyuhang
Title: 【sparkSQL】创建DataFrame及保存



相关阅读

Title: SRILM语言模型库安装

参考、执行后总结以便下次使用:

主要参考1,其余为解决问题项

1、srilm主要安装步骤及流程,https://bbs.huaweicloud.com/blogs/detail/183445
2、gwak安装详细问题:http://www.mamicode.com/info-detail-226395.html

3、make install时候权限不够

解决 Linux 系统,出现"不在sudoers文件中,此事将被报告"的问题 - 知乎

流程如下:

1、安装tcl依赖项

【tcl8.6.12-src.tar.gz】http://www.tcl.tk/software/tcltk/download.html

tar zxvf tcl8.6.8-src.tar.gz
cd  tcl8.6.8/
cd unix
./config
make
make install

使用用户账号,权限不够,make install执行失败。使用sudo 命令,输入当前用户密码, 提示不在sudoer文件中,此事将被警告。执行失败,为用户增加权限,参考第3个知乎连接解决问题。

2、安装其它依赖项

先使用,如gcc --version的命令,查看这些项是否已经安装(应该是最后两个没有被安装)

apt-get install gcc
apt-get install make
apt-get install gzip
apt-get install bzip2
apt-get install p7zip
apt-get install gawk

根据系统提示安装这些依赖项。

gawk提示定位不到位置,即找不到安装包,需手动安装【gawk-5.1.1.tar.gz】Index of /gnu/gawk

1、解压
tar zxvf  gawk-5.1.1.tar.gz
cd gawk-5.1.1
./configure --prefix=/usr --libexecdir=/usr/lib
2、编译
make
make check
make install
3、修改
mv -v /usr/bin/find  /bin
cp -v doc/{awkforai.txt,*.{eps,pdf,jpg}}  /usr/share/doc/gawk-5.1.1

#注意参见参考链接2,上述第3步骤执行出错没有关系。执行gawk --version 出现版本信息即可

3、安装SRILM

下载安装包【srilm-1.7.2.tar.gz】http://www.speech.sri.com/projects/srilm/download.html

(1)解压

pwd
$ /home/user2/
mkdir srilm
mv srilm-1.7.2.tar.gz srilm/
tar zxvf  srilm-1.7.2.tar.gz

(2)修改文件

cd srilm
vim Makefile
# 在:# SRILM = /home/speech/stolcke/project/srilm/devel 后一行添加
    SRILM =$(PWD)

#查询机器类型x86_64修改文件,输入命令
uname -i
$ x86_64
# 打开文件
vim common/Makefile.machine.i686-m64
# 将其中两行命令的值修改如下
    NO_TCL = X
    GAWK = /usr/bin/gawk

(3)、编译

pwd
$ /home/user2/srilm/
make World

(4)、修改环境变量

# 打开
pwd
$ /home/user2/
vim .bashrc
# 添加
export PATH="/home/user2/srilm/bin/i686-m64:$PATH"
# 生效
source .bashrc

(5)、测试

pwd
$ /home/user2/srilm
make test

第五步,出现结果出现很多IDENTICAL就说明安装成功,DEFFERNT即不成功应该是依赖项没爱安装好,查看依赖项后,再重新安装

【sparkSQL】创建DataFrame及保存

4、简单使用

参考链接1

Original: https://blog.csdn.net/qq_41447652/article/details/122454485
Author: 小白羊000
Title: SRILM语言模型库安装