Spark学习 day7
电影评分数据分析
#coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F
if __name__ == '__main__':
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
sc = spark.sparkContext
schema = StructType().add("user_id", StringType(), nullable=True).\
add("movie_id", IntegerType(), nullable=True).\
add("rank", IntegerType(), nullable=True).\
add("ts", StringType(), nullable=True)
df = spark.read.format("csv").\
option("sep", "\t").\
option("header", False).\
option("encoding", "utf-8").\
schema(schema=schema).\
load("../data/input/sql/u.data")
#用户平均分查询
df.groupBy("user_id").\
avg("rank").\
withColumnRenamed("avg(rank)", "avg_rank").\
withColumn("avg_rank", F.round("avg_rank", 2)).\
orderBy("avg_rank", ascending=False).\
show()
#电影平均分查询SQL风格
df.createTempView("movie")
spark.sql("""
SELECT movie_id,ROUND(AVG(rank),2) AS avg_rank FROM movie GROUP BY movie_id ORDER BY avg_rank DESC
""").show()
#查询大于平均分的电影的数量
print("大于平均分电电影的数量:", df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count())
#查询高分电影中(>3)打分次数最多的用户,此人打分平均分
user_id = df.where("rank > 3").\
groupBy("user_id").\
count().\
withColumnRenamed("count", "cnt").\
orderBy("cnt", ascending=False).\
limit(1).\
first()['user_id']
df.filter(df['user_id'] == user_id).\
select(F.round(F.avg("rank"), 2)).show()
#查询每个用户的平均打分,最低打分和最高打分
df.groupBy("user_id").\
agg(
F.round(F.avg("rank"), 2).alias("avg_rank"),
F.min("rank").alias("min_rank"),
F.max("rank").alias("max_rank")
).show()
#查询评分超过100次的电影的平均分 排名TOP10
df.groupBy("movie_id").\
agg(
F.count("movie_id").alias("cnt"),
F.round(F.avg("rank"), 2 ).alias("avg_rank")
).where("cnt > 100").\
orderBy("avg_rank", ascending=False).\
limit(10).\
show()
"""
1.agg:GroupData对象的API,在里面可以写多个集合
2.alias:Column的API,可以针对一个列进行改名
3.withColumnRename:DataFrame的API,可以对DF中的列进行改名
4.orderBy:DataFrame的API,进行排序,参数1是被排序的列,参数2是 升序(True)或降序(False)
5.first:DataFrame的API,取出DF的第一行数据,返回值结果是Row对象
"""



异常数据清理:

#coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F
if __name__ == '__main__':
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
sc = spark.sparkContext
df = spark.read.format("csv").\
option("sep", ";").\
option("header", True).\
load("../data/input/sql/people.csv")
#对数据进行去重处理
df.dropDuplicates().show()
df.dropDuplicates(['age', 'job']).show()
#缺失值处理
df.dropna().show()
df.dropna(thresh=3).show()
df.dropna(thresh=2, subset=['name', 'age']).show()
#缺失值填充
df.fillna("loss").show()
#指定列填充
df.fillna("N/A", subset=['job']).show()
df.fillna({"name": "未知姓名", "age": 1, "job": "worker"}).show()




浙公网安备 33010602011771号