Spark学习 day8

对各省销售指标,每个省的销售额进行统计
TOP3销售省份中,有多少家店铺日均销售额 > 1000
TOP3省份中各个省份的平均单单价
TOP3省份中,各个省份的支付类型比例

image

#coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel
from  pyspark.sql.types import StringType

if __name__ == '__main__':
    spark = SparkSession.builder.\
        appName("SparkSQL Example").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", "2").\
        config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse").\
        config("hive.metastore.uris", "thrift://node3:9083").\
        enableHiveSupport().\
        getOrCreate()

    df = spark.read.format("json").load("../../data/input/mini.json").\
        dropna(thresh=1, subset=['storeProvince']).\
        filter("storeProvince != 'null'").\
        filter("receivable < 10000").\
        select("storeProvince", "storeID", "receivable", "dateTS", "payType")


    province_sale_df = df.groupBy("storeProvince").sum("receivable").\
        withColumnRenamed("sum(receivable)", "money").\
        withColumn("money", F.round("money", 2)).\
        orderBy("money", ascending=False)
    province_sale_df.show(truncate=False)


    province_sale_df.write.mode("overwrite").\
        format("jdbc").\
        option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf8").\
        option("dbtable", "province_sale").\
        option("user", "root").\
        option("password", "123456").\
        option("encoding", "utf-8").\
        save()

    province_sale_df.write.mode("overwrite").saveAsTable("default.province_sale", "parquet")


    top3_province_df = province_sale_df.limit(3).select("storeProvince").withColumnRenamed("storeProvince", "top3_province")
    top3_province_df_joined = df.join(top3_province_df, on = df['storeProvince'] == top3_province_df['top3_province'])


    top3_province_df_joined.persist(StorageLevel.MEMORY_AND_DISK)

    province_hot_store_count_df = top3_province_df_joined.groupBy("storeProvince", "storeID",
                                    F.from_unixtime(df['dateTS'].substr(0,10), "yyyy-MM-dd").alias("day")).\
        sum("receivable").withColumnRenamed("sum(receivable)", "money").\
        filter("money > 1000").\
        dropDuplicates(subset=["storeID"]).\
        groupBy("storeProvince").count()


    province_hot_store_count_df.write.mode("overwrite").\
        format("jdbc").\
        option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf8").\
        option("dbtable", "province_hot_store_count").\
        option("user", "root").\
        option("password", "123456").\
        option("encoding", "utf-8").\
        save()

    province_hot_store_count_df.write.mode("overwrite").saveAsTable("default.province_hot_store_count", "parquet")



    top3_province_order_avg_df = top3_province_df_joined.groupBy("storeProvince").\
        avg("receivable").\
        withColumnRenamed("avg(receivable)", "money").\
        withColumn("money", F.round("money", 2)).\
        orderBy("money", ascending=False)

    top3_province_order_avg_df.show(truncate=False)


    top3_province_order_avg_df.write.mode("overwrite").\
        format("jdbc").\
        option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf8").\
        option("dbtable", "top3_province_order_avg").\
        option("user", "root").\
        option("password", "123456").\
        option("encoding", "utf-8").\
        save()



    top3_province_order_avg_df.write.mode("overwrite").saveAsTable("default.top3_province_order_avg", "parquet")

    top3_province_df_joined.createTempView("province_pay")

    def udf_func(percent):
        return str(round(percent * 100, 2)) + "%"

    my_udf = F.udf(udf_func, StringType())

    pay_type_df = spark.sql("""
        SELECT storeProvince, payType, (COUNT(payType) / total) AS percent FROM
        (SELECT storeProvince, payType, count(1) OVER(PARTITION BY storeProvince) AS total FROM province_pay) AS sub
        GROUP BY storeProvince, payType, total
    """).withColumn("percent", my_udf("percent"))

    pay_type_df.show()
    pay_type_df.write.mode("overwrite").\
        format("jdbc").\
        option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf8").\
        option("dbtable", "pay_type").\
        option("user", "root").\
        option("password", "123456").\
        option("encoding", "utf-8").\
        save()

    pay_type_df.write.mode("overwrite").saveAsTable("default.pay_type", "parquet")

    top3_province_df_joined.unpersist()

image

posted @ 2026-02-07 15:20  呓语-MSHK  阅读(4)  评论(0)    收藏  举报