<返回更多

入职AI算法岗?你需要会的大数据知识

2020-09-21    
加入收藏
入职AI算法岗?你需要会的大数据知识

 

pyspark-RDD基础

spark

pyspark是spark的Python API,允许python调用spark编程模型

初始化spark

SparkContext

from pyspark import SparkContext
sc = SparkContext(master='local[2]')

核查SparkContext

./bin/spark-shell --master local[2]
./bin/pyspark --master local[4] --py-files code.py

配置

from pyspark import SparkConf,SparkContext
conf = (SparkConf().setMaster("local").setAppName("my APP").set("spark.executor.memory","1g"))
sc = SparkContext(conf=conf)

使用shell

pyspark shell已经为SparkContext创建了名为sc的变量

./bin/spark-shell --master local[2]
./bin/pyspark --master local[4] --py-files code.py

用—master参数设定Context连接到哪个Master服务器,通过传递逗号分隔列表至—py-files添加Python.zip、egg或.py文件到Runtime路径

加载数据

并行集合

rdd = sc.parallelize([('a',7),('a',2),('b',2)])
rdd2 = sc.parallelize([('a',2),('d',1),('b',1)])
rdd3 = sc.parallelize(range(100))
rdd4 = sc.parallelize([("a",["x","y","z"]),("b",["p","r"])])

外部数据

使用textFile()函数从HDFS、本地文件或其它支持hadoop的文件系统里读取文件,或使用wholeTextFiles()函数读取目录下所有文本文件

textFile = sc.textFile('a.txt')
textFile2 = sc.wholeTextFiles(/aa)

提取RDD信息

基础信息

rdd.getNumPatitions()								列出分区数
rdd.count()													计算RDD的实例数量
rdd.countByKey()										按键计算RDD实例数量
defaultdict(<type 'int'>,('a':2,'b':1))
rdd.countByValue()							按值计算RDD实例数量
defaultdict(<type 'int'>,(('b',2):1,('a',2):1,('a',7):1))
rdd.collectAsMap()							以字典的形式返回键值
('a':2,'b':2)
rdd.sum()									汇总RDD元素
4959
sc.parallelize([]).isEmpty()				检查RDD是否为空

汇总

rdd.max()					RDD元素的最大值
rdd.min()					RDD元素的最小值
rdd.mean()					RDD元素的平均值
rdd.stdev()					RDD元素的标准差
rdd.variance()				RDD元素的方差
rdd.histogram(3)			分箱(bin)生成直方图
rdd.stats()					综合统计包括:计数、平均值、标准差、最大值和最小值

应用函数

rdd.map(lambda x:x+(x[1],x[0])).collect()		对每个RDD元素执行函数
rdd.flatMap(lambda x:x+(x[1],x[0]))				对每个RDD元素执行函数,并拉平结果
rdd.collect()
rdd.flatMapValues(lambda x:x).collect()			不改变键,对rdd的每个键值对执行flatMap函数

选择数据

获取
rdd.collect()							返回包含所以RDD元素的列表
rdd.take(4)								提取前4个RDD元素
rdd.first()								提取第一个RDD元素
rdd.top(2)								提取前两个RDD元素
抽样rdd.sample(False,0.15,81)				返回RDD的采样子集
筛选rdd.filter(lambda x:'a' in x)			筛选RDD
rdd.distinct()							返回RDD里的唯一值
rdd.keys()								返回RDD键值对里的键

迭代

def g(x):print(x)     
rdd.foreach(g)

改变数据形状

规约
rdd.reduceByKey(lambda x,y:x+y)				合并每个键的值
rdd.reduce(lambda x,y:x+y)					合并RDD的值
分组rdd.groupBy(lambda x:x%2).mapValues(list)	返回RDD的分组值
rdd.groupByKey().mapValues(list)			按键分组RDD集合seqOp = (lambda x,y:(x[0]+y,x[1]+1))
combOP = (lambda x,y:(x[0]+y[0],x[1]+y[1]))
rdd.aggregate((0,0),seqOp,combOP)		 	汇总每个分区里的RDD元素,并输出结果
rdd.aggregeteByKey((0,0),seqOp,combOP)		汇总每个RDD的键的值
rdd.fold(0,add)								汇总每个分区里的RDD元素,并输出结果
rdd.foldByKey(0,add)						合并每个键的值
rdd,keyBy(lambda x:x+x)						通过执行函数,创建RDD元素的元组

数学运算

rdd.subtract(rdd2)							返回RDD2里没有匹配键的rdd的兼职对
rdd2.subtractByKey(rdd)						返回rdd2里的每个(键、值)对,rdd中,没有匹配的键
rdd.cartesian(rdd2)							返回rdd和rdd2的笛卡尔积

排序

rdd.sortBy(lambda x:x[1])					按给定函数排序RDD
rdd.sortByKey()								按键排序RDD的键值对

重分区

rdd.repartition(4)							新建一个含4个分区的RDD
rdd.coalesce(1)								将RDD中的分区数缩减为1个

保存

rdd.saveAsTextFile("rdd.txt")
rdd.saveAsHadoopFile("hdfs://namenodehost/parent/child",'org.Apache.hadoop.mapred.TextOutputFormat')

终止SparkContext

sc.stop()

执行程序

./bin/spark-submit examples/src/main/python/pi.py

Pyspark_sql

Pyspark与Spark SQL

Spark SQL是Apache Spark处理结构化数据的模块

初始化SparkSession

SparkSession用于创建数据框,将数据框注册为表,执行SQL查询,缓存表及读取Parquet文件

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("my app").config("spark.some.config.option","some-value").getOrCreate()

创建数据框

从RDD创建

from pyspark.sql.types import *
推断Schemasc = spark.sparkContextlines = sc.textFile("people.txt")
parts = lines.map(lambda l:l.split(","))
people = parts.map(lambda p:Row(name=p[0],age=int(p[1])))
peopledf = spark.createDataFrame(people)指定Schemapeople = parts.map(lambda p:Row(name=p[0],age=int(p[1].strip())))
schemaString = "name age"
fields = [StructField(field_name,StringType(),True) for field_name in schemaString.split()]
schema = StructType(fields)spark.createDataFrame(people,schema).show()

从spark数据源创建

json
df = spark.read.json("customer.json")
df.show()df2 = spark.read.load("people.json",format = "json")
Parquet文件df3 = spark.read.load("users.parquet")
文本文件df4 = spark.read.text("people.txt")

查阅数据信息

df.dtypes						返回df的列名与数据类型
df.show()						显示df内容
df.head()						返回前n行数据
df.first()						返回第一行数据
df.take(2)						返回前两行数据
df.schema						返回df的schema
df.describe().show()			汇总统计数据
df.columns						返回df列名
df.count()						返回df的行数
df.distinct().count()			返回df中不重复的行数
df.printSchema()				返回df的Schema
df.explain()					返回逻辑与实体方案

重复值

df = df.dropDuplicates()

查询

from pyspark.sql import functions as F
Select
df.select("firstName").show()					显示firstName列的所有条目
df.select("firstName","lastName".show())					
df.select("firstName","age",
          explode("phoneNumber")				显示firstName、age的所有条目和类型
          .alias("contactInfo"))
			.select("ContactInfo.type","firstName","age")
df.select(df["firstName"],df["age"]+1).show()	显示firstName和age列的所有记录添加
df.select(df["age"]>24).show()					显示所有小于24的记录
When
df.select("firstName",F.when(df.age>30,1))		显示firstName,且大于30岁显示1,小于30显示0
				.otherwise(0).show()
df[df.firstName.isin("Jane","Boris")].collect()	显示符合特定条件的firstName列的记录
Like	
df.select("firstName",df.lastName,				显示lastName列中包含Smith的firstName列的记录
          like("Smith")).show()
Startswith-Endwith
df.select("firstName",df.lastName.				显示lastName列中以Sm开头的firstName列的记录	
          startswith("Sm")).show()
df.select(df.lastName.endswith("th")).show()	显示以th结尾的lastName
Substring
df.select(df.firstName.substr(1,3).alias("name"))返回firstName的子字符串
Between
df.select(df.age.between(22,24)).show()			显示介于22到24直接的age列的所有记录

添加、修改、删除列

添加列

df = df.withColumn('city',df.address.city) 
       .withColumn('postalCode',df.address.postalCode) 
    .withColumn('state',df.address.state) 
    .withColumn('streetAddress',df.address.streetAddress) 
    .withColumn('telePhoneNumber',explode(df.phoneNumber.number)) 
    .withColumn('telePhoneType',explode(df.phoneNumber.type)) 

修改列

df = df.withColumnRenamed('telePhoneNumber','phoneNumber')

删除列

df = df.drop("address","phoneNumber")
df = df.drop(df.address).drop(df.phoneNumber)

分组

df.groupBy("age").count().show()		按age列分组,统计每组人数

筛选

df.filter(df["age"]>24).show()			按age列筛选,保留年龄大于24岁的

排序

peopledf.sort(peopledf.age.desc()).collect()
df.sort("age",ascending=False).collect()
df.orderBy(["age","city"],ascending=[0,1]).collect()

替换缺失值

df.na.fill(50).show()				用一个值替换空值
df.na.drop().show()					去除df中为空值的行
df.na.replace(10,20).show()			用一个值去替换另一个值

重分区

df.repartition(10).rdd.getNumPartitions()	将df拆分为10个分区
df.coalesce(1).rdd.getNumPartitions()		将df合并为1个分区

运行SQL查询

将数据框注册为视图

peopledf.createGlobalTempView("people")
df.createTempView("customer")
df.createOrReplaceTempView("customer")

查询视图

df = spark.sql("select * from customer").show()
peopledf = spark.sql("select * from global_temp.people").show()

输出

数据结构

rdd1 = df.rdd		将df转为rdd
df.toJSON().first()	将df转为rdd字符串df.toPandas()		将df的内容转为Pandas的数据框

保存至文件

df.select("firstName","city").write.save("nameAndCity.parquet")
df.select("firstName","age").write.save("nameAndAges.json",format="json")

终止SparkSession

spark.stop()
声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>