Spark学习 day7

电影评分数据分析

#coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F

if __name__ == '__main__':
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()

    sc = spark.sparkContext

    schema = StructType().add("user_id", StringType(), nullable=True).\
        add("movie_id", IntegerType(), nullable=True).\
        add("rank", IntegerType(), nullable=True).\
        add("ts", StringType(), nullable=True)


    df = spark.read.format("csv").\
        option("sep", "\t").\
        option("header", False).\
        option("encoding", "utf-8").\
        schema(schema=schema).\
        load("../data/input/sql/u.data")


    #用户平均分查询
    df.groupBy("user_id").\
        avg("rank").\
        withColumnRenamed("avg(rank)", "avg_rank").\
        withColumn("avg_rank", F.round("avg_rank", 2)).\
        orderBy("avg_rank", ascending=False).\
        show()


    #电影平均分查询SQL风格
    df.createTempView("movie")
    spark.sql("""
        SELECT movie_id,ROUND(AVG(rank),2) AS avg_rank FROM movie GROUP BY movie_id ORDER BY avg_rank DESC 
    """).show()


    #查询大于平均分的电影的数量
    print("大于平均分电电影的数量:", df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count())


    #查询高分电影中(>3)打分次数最多的用户,此人打分平均分
    user_id = df.where("rank > 3").\
        groupBy("user_id").\
        count().\
        withColumnRenamed("count", "cnt").\
        orderBy("cnt", ascending=False).\
        limit(1).\
        first()['user_id']

    df.filter(df['user_id'] == user_id).\
        select(F.round(F.avg("rank"), 2)).show()


    #查询每个用户的平均打分,最低打分和最高打分
    df.groupBy("user_id").\
        agg(
            F.round(F.avg("rank"), 2).alias("avg_rank"),
            F.min("rank").alias("min_rank"),
            F.max("rank").alias("max_rank")
    ).show()


    #查询评分超过100次的电影的平均分 排名TOP10
    df.groupBy("movie_id").\
        agg(
            F.count("movie_id").alias("cnt"),
            F.round(F.avg("rank"), 2 ).alias("avg_rank")
        ).where("cnt > 100").\
        orderBy("avg_rank", ascending=False).\
        limit(10).\
        show()


    """
    1.agg:GroupData对象的API,在里面可以写多个集合
    2.alias:Column的API,可以针对一个列进行改名
    3.withColumnRename:DataFrame的API,可以对DF中的列进行改名
    4.orderBy:DataFrame的API,进行排序,参数1是被排序的列,参数2是 升序(True)或降序(False)
    5.first:DataFrame的API,取出DF的第一行数据,返回值结果是Row对象
    """

image
image
image

异常数据清理:
image

#coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F

if __name__ == '__main__':
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()

    sc = spark.sparkContext

    df = spark.read.format("csv").\
        option("sep", ";").\
        option("header", True).\
        load("../data/input/sql/people.csv")


    #对数据进行去重处理
    df.dropDuplicates().show()

    df.dropDuplicates(['age', 'job']).show()

    #缺失值处理
    df.dropna().show()

    df.dropna(thresh=3).show()

    df.dropna(thresh=2, subset=['name', 'age']).show()

    #缺失值填充
    df.fillna("loss").show()

    #指定列填充
    df.fillna("N/A", subset=['job']).show()

    df.fillna({"name": "未知姓名", "age": 1, "job": "worker"}).show()

image
image
image
image

posted @ 2026-02-06 17:16  呓语-MSHK  阅读(3)  评论(0)    收藏  举报