时序预测大模型 Chronos模型设计思想

亚马逊研究团队于2024年提出的一种时间序列预测基础模型Chronos,代表了时间序列领域向大模型范式的重要转变。

模型的核心思想是将时间序列预测重塑为一个条件生成建模或语言建模任务。

应用于Zero-shot (零时序预测(Zero-shot Time Series Forecasting) 指的是一种时间序列预测方法,模型能够在没有见过目标序列的历史数据的情况下,直接对该序列进行预测)。

Chronos是一个对时间序列数据的概率模型进行预训练的框架,它将这些值标记为与基于transformer的模型(如T5)一起使用。模型将序列的值缩放和量化到一个固定的词汇表

  • 首先Chronos 在生成上下文token之前,首先会对数据进行缩放和量化。
  • 然后,这些token可用于训练现有的语言模型,该模型将预测下一个上下文token。
  • 最后后对预测结果进行去量化和去比例化处理,以获得实际预测结果。

设计思想

Chronos旨在成为一个通用的“预训练-微调”基础模型:

  • 统一表示:将多元时间序列的数值通过分桶(bucketing)和映射,转换为离散的token序列(类似于文本中的词汇)。

  • 基于Transformer的架构:采用纯解码器或编码器-解码器架构(如T5),像处理语言一样处理时间序列token序列。

  • 大规模预训练:在包含公开数据集的大规模时间序列语料库(如Monash、UCI等来源的约50亿个时间点)上进行预训练,学习通用的时序模式

主要技术特点

  • 量化与token化:

    • 对时间序列值进行分位数分桶,每个桶对应一个token ID。

    • 例如,将历史观测值“-0.5, 0.1, 1.3”转换为像“token_15, token_41, token_9”这样的序列。

  • 上下文学习:

    • 在预测时,输入为过去观测值的token序列,模型自回归地生成未来token序列。

    • 通过反向token化将预测token转换回数值。

  • 零样本/少样本能力:

    • 得益于大规模预训练,Chronos在未见数据集上表现出优秀的零样本预测性能。

    • 可通过少量示例(少样本)或轻量微调快速适配新任务。

工作流程

  1. 输入准备:对历史时间序列进行缩放、归一化、token化。

  2. 模型推理:输入上下文长度内的token,生成未来时间步的token序列。

  3. 输出解码:将预测token反转为数值,并可能进行后处理(如调整尺度)。

优势

  • 通用性强:一个模型适应多个领域(能源、交通、经济等)。

  • 简化流程:无需复杂的特征工程或为每个数据集单独设计模型。

  • 性能优越:在零样本和少样本设定下,其表现常超越传统统计模型(如ARIMA)和特定任务模型。

 

Chronos的“特征”局限性

Chronos 预训练的数据是数值型时间序列的点位,例如“昨天气温25度,今天26度”。它不支持在输入中直接加入以下常见特征:

  • 分类特征:如星期几、节假日、产品类别、店铺ID等。

  • 文本特征:如产品描述、新闻标题。

  • 其他外部回归变量:如促销活动标志、油价、汇率等。

它的“特征”本质上就是历史数值序列本身。 它从海量的、多样的单变量序列中学习通用的时间模式(趋势、周期、季节等),然后将这种模式迁移到新的序列上。

在实践中引入其他特征

如果预测任务必须依赖其他特征,可以采取以下几种工程化的变通方法,但这些方法不再是纯粹的零样本或少样本预测,需要一定的建模工作:

方法A:特征编码到时间序列中(最常用)(Chronos的协变量方案)

将外部特征编码成额外的、同步的时间序列通道,然后用 通道独立(Channel-Independent) 的方式处理,让模型隐式地学习关系。

  • 步骤:

    1. 创建多变量序列:假设要预测销售额Y,有外部特征“是否节假日H”和“气温T”。构造一个三变量的时间序列 [Y, H, T]

    2. 分别Token化:对每个变量单独进行分桶和Token化。

    3. 交错输入:将三个变量的历史Token按时间步交错排列:[Y_t1, H_t1, T_t1, Y_t2, H_t2, T_t2, ...]

    4. 训练/微调:在目标数据集上对预训练的Chronos模型进行微调。模型通过微调来学习YHT之间的关联。

  • 优点:能利用特征信息。

  • 缺点:必须进行微调,丧失了Chronos“开箱即用”的最大优势;且模型对特征关系的捕捉能力有限。

方法B:将特征作为条件前缀

借鉴大型语言模型的“提示”技术,将特征信息以文本或特殊Token的形式,作为预测任务的条件前缀(Conditional Prefix)。

  • 示例:对于预测“北京王府井店、星期天、夏季的冰淇淋销量”,可以构造一个提示前缀Token序列,如 [LOC_BEIJING, STORE_WFJ, DAY_SUNDAY, SEASON_SUMMER],然后跟上历史销量的Token序列,让模型基于此条件生成未来预测。

  • 现状:在原版Chronos论文和官方实现中并未强调或实现此功能,但这是一种理论上的扩展方向,需要重新设计和训练模型。

方法C:分组建模

将具有相同特征组合的序列分组,对每组数据分别使用Chronos进行零样本预测或微调。

  • 示例:将所有“北京、周末”的序列归为一组,用这组历史数据作为上下文示例(少样本学习),让Chronos预测新的“北京、周末”序列。

  • 优点:无需改变模型,利用了特征。

  • 缺点:数据被分割,每组数据量可能变少,影响预测稳定性。

 

Chronos模型的协变量

工作原理:

  1. 通道独立编码:每个协变量单独进行分桶token化

  2. 交错排列:按时间步交错协变量token

  3. 微调学习:在目标数据集上微调,让模型学习变量间关系

局限性:

  • 必须微调,丧失零样本能力

  • 模型只能学习训练数据中出现的模式

  • 处理未来已知协变量困难

def predict_chronos2(df, selected_features, target, prediction_length, id_column, timestamp_column, model_path='/Users/lxh-mac/Documents/models/amazon/chronos-2'):
    pipeline: Chronos2Pipeline = BaseChronosPipeline.from_pretrained(model_path, device_map="cpu")
    energy_pred_cov_df = pipeline.predict_df(
        df[[id_column, timestamp_column, target]],  # 暂时只使用目标变量,不使用协变量
        future_df=None,
        prediction_length=prediction_length,
        quantile_levels=[0.1, 0.5, 0.9],
        id_column=id_column,
        timestamp_column=timestamp_column,
        target=target,
    )
    return energy_pred_cov_df['predictions']

 

Chronos模型的多变量

Chronos实现多变量预测有三种主要方式:

  1. 通道独立:简单快速

  2. 多变量微调:功能强大,但需要训练数据

  3. 层次预测:适合具有层级结构的数据

策略一:通道独立预测

将多变量序列拆分成多个独立的单变量序列,分别预测后再合并。

import pandas as pd
import numpy as np
from chronos import ChronosPipeline

# 假设有3个相关变量的数据
df = pd.DataFrame({
    'timestamp': pd.date_range('2024-01-01', periods=100, freq='H'),
    'temperature': np.random.normal(20, 5, 100),  # 温度
    'humidity': np.random.normal(60, 10, 100),     # 湿度
    'pressure': np.random.normal(1013, 5, 100)     # 气压
})

# 方法1:分别预测每个变量
def channel_independent_predict(df, target_columns, prediction_length=24):
    predictions = {}
    
    for target in target_columns:
        # 为每个变量创建独立的DataFrame
        single_var_df = df[['timestamp', target]].copy()
        single_var_df['id'] = f'series_{target}'  # 给每个序列一个独立ID
        
        # 使用Chronos预测
        pipeline = ChronosPipeline.from_pretrained("amazon/chronos-t5-small")
        forecast = pipeline.predict(
            context=single_var_df[target].values[-168:],  # 最后168个点作为历史
            prediction_length=prediction_length,
            num_samples=100  # 用于计算分位数
        )
        
        predictions[target] = forecast.mean  # 取均值预测
        predictions[f'{target}_lower'] = forecast.quantile(0.1)  # 10%分位数
        predictions[f'{target}_upper'] = forecast.quantile(0.9)  # 90%分位数
    
    # 合并所有预测结果
    results = pd.DataFrame(predictions)
    return results

# 使用
targets = ['temperature', 'humidity', 'pressure']
results = channel_independent_predict(df, targets, prediction_length=24)

策略二:多变量序列微调(Multivariate Fine-tuning)

需对模型进行微调,多变量数据调整为单序列数据,数据格式为["变量1_t1","变量2_t1","变量3_t1","变量1_t2","变量2_t2","变量3_t2""变量1_t3","变量2_t3","变量3_t3"...]

import torch
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
from chronos import ChronosTokenizer
import pandas as pd

class MultivariateChronos:
    def __init__(self, model_name="amazon/chronos-t5-large"):
        # 加载预训练模型
        self.model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
        self.tokenizer = ChronosTokenizer.from_pretrained(model_name)
        
        # 多变量特定的配置
        self.num_variables = 3  # 变量数量
        self.context_length = 168  # 历史长度
        
    def prepare_multivariate_data(self, df, variable_columns):
        """
        将多变量数据转换为Chronos可理解的格式
        """
        # 1. 对每个变量分别归一化和分桶
        normalized_data = {}
        for var in variable_columns:
            series = df[var].values
            # 归一化(使用每个序列的均值和标准差)
            mean, std = series.mean(), series.std()
            normalized = (series - mean) / (std + 1e-8)
            normalized_data[var] = normalized
        
        # 2. 交错排列变量
        # 格式: [var1_t1, var2_t1, var3_t1, var1_t2, var2_t2, var3_t2, ...]
        multivariate_series = []
        for i in range(len(df)):
            for var in variable_columns:
                multivariate_series.append(normalized_data[var][i])
        
        return np.array(multivariate_series), {'means': means, 'stds': stds}
    
    def multivariate_tokenize(self, multivariate_series):
        """
        将交错的多变量序列转换为token
        """
        # 将连续值离散化为token
        tokens = self.tokenizer.encode(multivariate_series)
        return torch.tensor(tokens).unsqueeze(0)  # 添加batch维度
    
    def fine_tune(self, train_data, epochs=10, learning_rate=1e-4):
        """
        在多变量数据上微调Chronos
        """
        self.model.train()
        optimizer = torch.optim.AdamW(self.model.parameters(), lr=learning_rate)
        
        for epoch in range(epochs):
            total_loss = 0
            for batch in train_data:
                # 准备输入(历史多变量序列)
                inputs = self.multivariate_tokenize(batch['input_series'])
                
                # 准备目标(未来多变量序列)
                targets = self.multivariate_tokenize(batch['target_series'])
                
                # 前向传播
                outputs = self.model(input_ids=inputs, labels=targets)
                loss = outputs.loss
                
                # 反向传播
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                
                total_loss += loss.item()
            
            print(f"Epoch {epoch+1}, Loss: {total_loss/len(train_data):.4f}")
    
    def predict_multivariate(self, context_series, prediction_length=24):
        """
        使用微调后的模型进行多变量预测
        """
        self.model.eval()
        
        # 将上下文转换为token
        context_tokens = self.multivariate_tokenize(context_series)
        
        # 自回归生成预测
        with torch.no_grad():
            generated = self.model.generate(
                input_ids=context_tokens,
                max_length=len(context_tokens[0]) + prediction_length * self.num_variables,
                num_beams=5,
                temperature=0.7,
                do_sample=True
            )
        
        # 将token解码回数值
        predictions = self.tokenizer.decode(generated[0])
        
        # 将交错的预测拆分成各个变量
        pred_array = predictions.cpu().numpy()
        pred_by_var = {}
        
        for i, var in enumerate(self.variable_names):
            # 提取该变量的所有时间步
            var_predictions = pred_array[i::self.num_variables][-prediction_length:]
            pred_by_var[var] = var_predictions
        
        return pred_by_var

# 使用示例
if __name__ == "__main__":
    # 准备数据
    data = pd.read_csv('multivariate_data.csv')
    variables = ['temperature', 'humidity', 'pressure']
    
    # 初始化多变量Chronos
    mv_chronos = MultivariateChronos()
    mv_chronos.variable_names = variables
    
    # 准备训练数据
    train_series, stats = mv_chronos.prepare_multivariate_data(data, variables)
    
    # 创建滑动窗口训练样本
    train_dataset = []
    window_size = 168  # 7天每小时
    stride = 24  # 每天
    
    for i in range(0, len(train_series) - window_size - 24*3, stride):
        input_series = train_series[i:i+window_size]
        target_series = train_series[i+window_size:i+window_size+24*3]  # 预测3天
        train_dataset.append({
            'input_series': input_series,
            'target_series': target_series
        })
    
    # 微调模型
    mv_chronos.fine_tune(train_dataset, epochs=5)
    
    # 预测
    latest_context = train_series[-168:]  # 最后168个点
    predictions = mv_chronos.predict_multivariate(latest_context, prediction_length=24)
    
    # 反归一化
    for var in variables:
        mean, std = stats['means'][var], stats['stds'][var]
        predictions[var] = predictions[var] * std + mean

策略三:层次预测(Hierarchical Forecasting)

层次预测是处理具有明确层级结构的多变量时间序列的方法。核心思想是:不同层级的时间序列之间存在聚合约束。

自底向上法:只预测底层,上层通过求和得到
优点:保持底层细节,顶层自然一致 缺点:顶层可能过于受底层噪声影响

自顶向下法:只预测顶层,按比例分配到下层
比例可以通过历史平均值、趋势等计算

最优协调法(最小二乘协调)
公式:Ŷ = S(SᵀW⁻¹S)⁻¹SᵀW⁻¹Y
其中:
Y: 基础预测向量 [n_total * horizon, 1]
Ŷ: 协调后预测
W: 协方差矩阵,通常用对角矩阵(假设误差独立)
class HierarchicalChronos:
    def __init__(self):
        self.aggregation_matrix = None  # 聚合矩阵
    
    def hierarchical_forecast(self, df, hierarchy):
        """
        层次预测实现
        hierarchy = {
            'total': ['region_A', 'region_B'],
            'region_A': ['store_A1', 'store_A2'],
            'region_B': ['store_B1', 'store_B2', 'store_B3']
        }
        """
        forecasts = {}
        
        # 1. 底层预测(叶子节点)
        leaf_nodes = ['store_A1', 'store_A2', 'store_B1', 'store_B2', 'store_B3']
        for leaf in leaf_nodes:
            leaf_data = df[df['store_id'] == leaf]['sales']
            forecast = self.channel_independent_predict(leaf_data)
            forecasts[leaf] = forecast
        
        # 2. 中层聚合
        for parent, children in hierarchy.items():
            if parent not in ['total']:  # 不是最顶层
                child_forecasts = [forecasts[child] for child in children]
                # 简单求和(或加权平均)
                forecasts[parent] = np.sum(child_forecasts, axis=0)
        
        # 3. 顶层聚合
        forecasts['total'] = np.sum([
            forecasts['region_A'], 
            forecasts['region_B']
        ], axis=0)
        
        # 4. 协调(使用最小二乘协调)
        reconciled = self.reconcile_forecasts(forecasts, hierarchy)
        
        return reconciled
    
    def reconcile_forecasts(self, forecasts, hierarchy):
        """使用最小二乘法协调层次预测"""
        # 构建求和矩阵S
        S = self.build_sum_matrix(hierarchy)
        
        # 构建基础预测向量
        b = np.concatenate([forecasts[node] for node in forecasts.keys()])
        
        # 最小二乘协调
        # y_reconciled = S * (S^T * S)^(-1) * S^T * b
        S_T = S.T
        reconciled = S @ np.linalg.inv(S_T @ S) @ S_T @ b
        
        return self.reshape_to_hierarchy(reconciled, hierarchy)

 后训练

对时间序列进行标记化处理后,将其发送到 LLM 进行训练。使用分类交叉熵损失作为模型训练的损失函数。

 

posted @ 2025-12-04 16:50  wangssd  阅读(108)  评论(0)    收藏  举报