2026/3/3课堂小测 从 B站 评论采集到 Hive 数据清洗,再同步到 MySQL:完整大数据流程

> 本文记录一套从 B站评论数据采集 → Python 预处理 → Hive 大数据清洗 → 数据同步到 MySQL → 本地导出分析端到端完整流程

(注意本文章代码均为无保护版本,注意被Ban的风险。)


一、整体流程与架构图描述

1.1 系统整体架构(文字版架构图)

【数据源层】
    B站视频评论开放 API
        ↓
【采集层】
    Python 爬虫(requests + 分页爬取 + 延时控制)
        ↓
【原始数据层】
    本地 JSON / CSV 原始数据文件
        ↓
【预处理层】
    Python 数据格式化 → 生成 TSV 文本(适配 Hive 读取)
        ↓
【分布式存储层】
    HDFS 分布式文件系统
        ↓
【数仓清洗层】
    Hive 数据仓库(原始表 → 清洗表 → 质量治理)
        ↓
【业务存储层】
    MySQL 关系型数据库(供报表、接口、可视化使用)
        ↓
【应用层】
    本地 CSV 导出 + 可视化分析 + 数据挖掘

1.2 完整数据流向流程图(文字版)

开始
  ↓
输入 B站 BV 号
  ↓
Python 爬取评论 → 保存 JSON/CSV
  ↓
数据预处理:JSON → TSV
  ↓
上传 TSV 到 HDFS
  ↓
Hive 创建外部表映射原始数据
  ↓
Hive 数据清洗(去空、去异常、去脏数据)
  ↓
Hive 清洗结果表
  ↓
┌─────────────┴─────────────┐
↓                           ↓
Sqoop 同步到 MySQL     Hive 直接导出 CSV
↓                           ↓
MySQL 业务表            本地分析/可视化
  ↓
MySQL 导出标准 CSV
  ↓
结束

二、环境准备

2.1 Python 环境

sudo yum install -y python3 python3-devel python3-pip
pip3 install requests pandas

2.2 Hadoop & Hive 环境

start-dfs.sh
hive --service metastore &
jps

2.3 MySQL 环境(用于结果存储)

# 安装 MySQL(如已存在可跳过)
sudo yum install -y mysql-community-server
sudo systemctl start mysqld

# 安装 MySQL JDBC 驱动(Hive 同步 MySQL 必需)
cp mysql-connector-java-8.0.30.jar $HIVE_HOME/lib/

三、B站 评论爬虫(Python)

bilibili_crawler.py

import requests
import json
import time
import pandas as pd
from datetime import datetime

class BilibiliCommentCrawler:
    def __init__(self):
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Referer': 'https://www.bilibili.com',
        }

    def get_aid(self, bvid):
        url = f"https://api.bilibili.com/x/web-interface/view?bvid={bvid}"
        res = requests.get(url, headers=self.headers, timeout=10)
        if res.status_code == 200:
            return res.json()['data']['aid']
        return None

    def get_comments(self, bvid, max_pages=10):
        aid = self.get_aid(bvid)
        if not aid:
            return []
        all_comments = []
        for page in range(1, max_pages+1):
            params = {'type':1,'oid':aid,'pn':page,'ps':20,'sort':1}
            res = requests.get("https://api.bilibili.com/x/v2/reply", 
                               headers=self.headers, params=params)
            if res.json()['code'] != 0:
                break
            for r in res.json()['data']['replies']:
                all_comments.append({
                    'comment_id': r['rpid'],
                    'user_id': r['member']['mid'],
                    'username': r['member']['uname'],
                    'user_level': r['member']['level_info']['current_level'],
                    'vip_type': r['member']['vip']['vipType'],
                    'vip_status': r['member']['vip']['vipStatus'],
                    'content': r['content']['message'],
                    'like_count': r['like'],
                    'reply_count': r['count'],
                    'create_time': r['ctime'],
                    'page': page
                })
            time.sleep(1)
        return all_comments

    def save(self, comments, bvid):
        ts = datetime.now().strftime("%Y%m%d_%H%M%S")
        pd.DataFrame(comments).to_csv(f"bili_{bvid}_{ts}.csv", index=False, encoding='utf-8-sig')
        with open(f"bili_{bvid}_{ts}.json", 'w', encoding='utf-8') as f:
            json.dump(comments, f, ensure_ascii=False, indent=2)

if __name__ == "__main__":
    craw = BilibiliCommentCrawler()
    bvid = input("输入BV号:")
    comments = craw.get_comments(bvid)
    if comments:
        craw.save(comments, bvid)
        print(f"共爬取 {len(comments)} 条评论")

四、数据预处理(适配 Hive)

preprocess_data.py
把 JSON 转成 \t 分隔文本,方便 Hive 直接建表映射。

import json,sys
with open(sys.argv[1], encoding='utf-8') as f:
    data = json.load(f)
with open(sys.argv[2], 'w', encoding='utf-8') as f:
    for c in data:
        line = '\t'.join([
            str(c.get('comment_id','')),
            str(c.get('user_id',0)),
            str(c.get('username','')).replace('\t',' ').replace('\n',' '),
            str(c.get('user_level',0)),
            str(c.get('vip_type',0)),
            str(c.get('vip_status',0)),
            str(c.get('content','')).replace('\t',' ').replace('\n',' '),
            str(c.get('like_count',0)),
            str(c.get('reply_count',0)),
            str(c.get('create_time',0)),
            str(c.get('page',0))
        ])
        f.write(line+'\n')

运行:

python3 preprocess_data.py data.json processed.txt

五、Hive 建表 & 数据清洗

5.1 上传 HDFS

hdfs dfs -mkdir -p /user/hadoop/bilibili/input
hdfs dfs -put processed.txt /user/hadoop/bilibili/input

5.2 Hive 建原始表

CREATE DATABASE IF NOT EXISTS bilibili_analysis;
USE bilibili_analysis;

CREATE EXTERNAL TABLE raw_comments (
    comment_id  STRING,
    user_id     BIGINT,
    username    STRING,
    user_level  INT,
    vip_type    INT,
    vip_status  INT,
    content     STRING,
    like_count  INT,
    reply_count INT,
    create_time BIGINT,
    page        INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LOCATION '/user/hadoop/bilibili/input';

5.3 Hive 数据清洗

CREATE TABLE cleaned_comments AS
SELECT
    COALESCE(comment_id, 'unknown') AS comment_id,
    COALESCE(user_id, 0) AS user_id,
    COALESCE(username, '匿名用户') AS username,
    CASE WHEN user_level<0 OR user_level>6 THEN 0 ELSE user_level END AS user_level,
    COALESCE(vip_type,0) AS vip_type,
    COALESCE(vip_status,0) AS vip_status,
    REGEXP_REPLACE(content, '[\\x00-\\x1F]', ' ') AS content,
    CASE WHEN like_count<0 THEN 0 ELSE like_count END AS like_count,
    CASE WHEN reply_count<0 THEN 0 ELSE reply_count END AS reply_count,
    CASE WHEN create_time>0 AND create_time<UNIX_TIMESTAMP() 
         THEN create_time ELSE UNIX_TIMESTAMP() END AS create_time,
    page
FROM raw_comments
WHERE content IS NOT NULL AND content != '';

六、Hive → MySQL 数据同步(生产级方案)

6.1 MySQL 中建目标表

CREATE DATABASE IF NOT EXISTS bili_db;
USE bili_db;

DROP TABLE IF EXISTS cleaned_comments;
CREATE TABLE cleaned_comments (
    comment_id     VARCHAR(50) PRIMARY KEY,
    user_id        BIGINT,
    username       VARCHAR(100),
    user_level     INT,
    vip_type       INT,
    vip_status     INT,
    content        TEXT,
    like_count     INT,
    reply_count    INT,
    create_time    BIGINT,
    page           INT
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

6.2 方案A:Sqoop 同步(企业标准方案)

sqoop export \
--connect jdbc:mysql://localhost:3306/bili_db \
--username root \
--password your-password \
--table cleaned_comments \
--export-dir /user/hive/warehouse/bilibili_analysis.db/cleaned_comments \
--input-fields-terminated-by '\001' \
--input-null-string '\\N' \
--input-null-non-string '\\N'

6.3 方案B:Hive 直接写入 MySQL(无 Sqoop 可用)

CREATE TABLE hive_to_mysql (
    comment_id  STRING,
    user_id     BIGINT,
    username    STRING,
    user_level  INT,
    vip_type    INT,
    vip_status  INT,
    content     STRING,
    like_count  INT,
    reply_count INT,
    create_time BIGINT,
    page        INT
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
    "hive.jdbc.url" = "jdbc:mysql://localhost:3306/bili_db?useSSL=false&characterEncoding=utf8mb4",
    "hive.jdbc.driver" = "com.mysql.cj.jdbc.Driver",
    "hive.jdbc.user" = "root",
    "hive.jdbc.password" = "your-password",
    "hive.jdbc.write.batch.size" = "1000"
);

INSERT OVERWRITE TABLE hive_to_mysql SELECT * FROM cleaned_comments;

七、MySQL 导出 CSV(本地分析)

7.1 MySQL 命令导出

mysql -uroot -p -e "
USE bili_db;
SELECT * FROM cleaned_comments
INTO OUTFILE '/var/lib/mysql-files/bili_clean_result.csv'
FIELDS TERMINATED BY ',' 
ENCLOSED BY '\"'
LINES TERMINATED BY '\n';
"

7.2 Python 导出(通用、简单)

import pandas as pd
import pymysql

conn = pymysql.connect(
    host="localhost",
    user="root",
    password="your-password",
    database="bili_db",
    charset="utf8mb4"
)
df = pd.read_sql("SELECT * FROM cleaned_comments", conn)
df.to_csv("bili_clean_final.csv", index=False, encoding='utf-8-sig')
conn.close()

八、典型数据分析 SQL

-- 用户等级分布
SELECT user_level, COUNT(*) cnt
FROM cleaned_comments
GROUP BY user_level
ORDER BY user_level;

-- 24 小时评论活跃时段
SELECT HOUR(FROM_UNIXTIME(create_time)) hour, COUNT(*) cnt
FROM cleaned_comments
GROUP BY hour
ORDER BY hour;

-- VIP 用户占比
SELECT
    CASE WHEN vip_status=1 THEN 'VIP用户' ELSE '普通用户' END AS user_type,
    COUNT(*) cnt,
    ROUND(COUNT(*)/SUM(COUNT(*)) OVER(), 2) AS ratio
FROM cleaned_comments
GROUP BY user_type;

-- 点赞最高 Top10 评论
SELECT username, content, like_count, FROM_UNIXTIME(create_time)
FROM cleaned_comments
ORDER BY like_count DESC
LIMIT 10;

九、优化与规范

  1. 爬虫规范:必须加延时,遵守 B站 robots 协议,禁止高频爬取。
  2. Hive 优化:大数据量使用分区表 + ORC/Parquet 存储
  3. 数据同步:Hive → MySQL 优先使用 Sqoop,稳定性更高。
  4. MySQL 优化:建议在 user_idcreate_time 建立索引。
  5. 数据安全:用户名、用户ID 等敏感信息需做脱敏处理。

十、完整流程总结

  1. 数据采集:Python 爬取 B站 评论,保存 JSON/CSV。
  2. 数据预处理:将原始数据转为 Hive 可读取的 TSV 格式。
  3. Hive 数仓:创建外部表 → 加载数据 → 清洗数据。
  4. 数据同步:通过 Sqoop/Hive JDBC 将清洗结果写入 MySQL。
  5. 数据导出:从 MySQL 导出标准 CSV,用于本地分析与可视化。
posted @ 2026-03-03 19:55  Moonbeamsc  阅读(14)  评论(0)    收藏  举报
返回顶端