Spark学习 day8
对各省销售指标,每个省的销售额进行统计
TOP3销售省份中,有多少家店铺日均销售额 > 1000
TOP3省份中各个省份的平均单单价
TOP3省份中,各个省份的支付类型比例

#coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import StringType
if __name__ == '__main__':
spark = SparkSession.builder.\
appName("SparkSQL Example").\
master("local[*]").\
config("spark.sql.shuffle.partitions", "2").\
config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse").\
config("hive.metastore.uris", "thrift://node3:9083").\
enableHiveSupport().\
getOrCreate()
df = spark.read.format("json").load("../../data/input/mini.json").\
dropna(thresh=1, subset=['storeProvince']).\
filter("storeProvince != 'null'").\
filter("receivable < 10000").\
select("storeProvince", "storeID", "receivable", "dateTS", "payType")
province_sale_df = df.groupBy("storeProvince").sum("receivable").\
withColumnRenamed("sum(receivable)", "money").\
withColumn("money", F.round("money", 2)).\
orderBy("money", ascending=False)
province_sale_df.show(truncate=False)
province_sale_df.write.mode("overwrite").\
format("jdbc").\
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf8").\
option("dbtable", "province_sale").\
option("user", "root").\
option("password", "123456").\
option("encoding", "utf-8").\
save()
province_sale_df.write.mode("overwrite").saveAsTable("default.province_sale", "parquet")
top3_province_df = province_sale_df.limit(3).select("storeProvince").withColumnRenamed("storeProvince", "top3_province")
top3_province_df_joined = df.join(top3_province_df, on = df['storeProvince'] == top3_province_df['top3_province'])
top3_province_df_joined.persist(StorageLevel.MEMORY_AND_DISK)
province_hot_store_count_df = top3_province_df_joined.groupBy("storeProvince", "storeID",
F.from_unixtime(df['dateTS'].substr(0,10), "yyyy-MM-dd").alias("day")).\
sum("receivable").withColumnRenamed("sum(receivable)", "money").\
filter("money > 1000").\
dropDuplicates(subset=["storeID"]).\
groupBy("storeProvince").count()
province_hot_store_count_df.write.mode("overwrite").\
format("jdbc").\
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf8").\
option("dbtable", "province_hot_store_count").\
option("user", "root").\
option("password", "123456").\
option("encoding", "utf-8").\
save()
province_hot_store_count_df.write.mode("overwrite").saveAsTable("default.province_hot_store_count", "parquet")
top3_province_order_avg_df = top3_province_df_joined.groupBy("storeProvince").\
avg("receivable").\
withColumnRenamed("avg(receivable)", "money").\
withColumn("money", F.round("money", 2)).\
orderBy("money", ascending=False)
top3_province_order_avg_df.show(truncate=False)
top3_province_order_avg_df.write.mode("overwrite").\
format("jdbc").\
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf8").\
option("dbtable", "top3_province_order_avg").\
option("user", "root").\
option("password", "123456").\
option("encoding", "utf-8").\
save()
top3_province_order_avg_df.write.mode("overwrite").saveAsTable("default.top3_province_order_avg", "parquet")
top3_province_df_joined.createTempView("province_pay")
def udf_func(percent):
return str(round(percent * 100, 2)) + "%"
my_udf = F.udf(udf_func, StringType())
pay_type_df = spark.sql("""
SELECT storeProvince, payType, (COUNT(payType) / total) AS percent FROM
(SELECT storeProvince, payType, count(1) OVER(PARTITION BY storeProvince) AS total FROM province_pay) AS sub
GROUP BY storeProvince, payType, total
""").withColumn("percent", my_udf("percent"))
pay_type_df.show()
pay_type_df.write.mode("overwrite").\
format("jdbc").\
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf8").\
option("dbtable", "pay_type").\
option("user", "root").\
option("password", "123456").\
option("encoding", "utf-8").\
save()
pay_type_df.write.mode("overwrite").saveAsTable("default.pay_type", "parquet")
top3_province_df_joined.unpersist()

浙公网安备 33010602011771号