ViT-KNO: 基于Koopman神经算子的全球中期天气预报

DownloadNotebookDownloadCodeViewSource

概述

现代数据天气预报(Numerical Weather Prediction, NWP)可以追溯到1920年,其基于物理原理,整合了几代气象学者的成果经验,是各国气象部门所采用主流的天气预报方法。其中来自欧洲中期天气预报中心(ECMWF)的高分辨率综合系统模型(IFS)表现效果最佳。

直到2022年英伟达研发了一种基于傅里叶神经网络的预测模型FourCastNet,它能以0.25°的分辨率生成全球关键天气指标的预测,这相当于赤道附近约30×30km的空间分辨率和720×1440像素的权重网格大小,与IFS系统一致。这项成果使得AI气象模型首次与传统的物理模型IFS进行直接比较。更多信息可参考:“FourCastNet: A Global Data-driven High-resolution Weather Model using Adaptive Fourier Neural Operators”

但是基于傅里叶神经算子(Fourier Neural Operator, FNO)构建的预测模型FourCastNet在预测中长期天气时,变得不够准确和缺乏可解释性。ViT-KNO充分利用Vision Transformer结构和Koopman理论,学习Koopman Operator去预测非线性动力学系统,通过在线性结构中嵌入复杂的动力学去约束重建过程,ViT-KNO能够捕获复杂的非线性行为,同时保持模型轻量级和计算有效性。ViT-KNO有清晰的数学理论支撑,很好的克服了同类方法在数学和物理上可解释性和理论依据不足的问题。更多信息可参考:“KoopmanLab: machine learning for solving complex physics equations”

技术路径

MindSpore求解该问题的具体流程如下:

  1. 创建数据集

  2. 模型构建

  3. 损失函数

  4. 模型训练

  5. 模型验证和可视化

ViT-KNO

ViT-KNO模型构架如下图所示,主要包含两个分支,上路分支负责结果预测,由Encoder模块,Koopman Layer模块,Decoder模块组成,其中Koopman Layer模块结构如虚线框所示,可重复堆叠;下路分支由Encoder模块,Decoder模块组成,负责输入信息的重构。

ViT-KNO

模型的训练流程如下:

[1]:
import os
import numpy as np
import matplotlib.pyplot as plt

from mindspore import context, Model
from mindspore import dtype as mstype
from mindspore.train import load_checkpoint, load_param_into_net
from mindspore.train import DynamicLossScaleManager

from mindearth.cell import ViTKNO
from mindearth.utils import load_yaml_config, create_logger, plt_global_field_data
from mindearth.data import Dataset, Era5Data
from mindearth.module import Trainer

src 文件可以从ViT-KNO/src下载。

[2]:
from src.callback import EvaluateCallBack, InferenceModule, Lploss, CustomWithLossCell

context.set_context(mode=context.GRAPH_MODE, device_target="Ascend", device_id=1)

model、data和optimizer的参数可以通过加载yaml文件获取(vit_kno.yaml)。

[3]:
config = load_yaml_config('vit_kno.yaml')
config['model']['data_sink'] = True  # 是否使用data sink特性

config['train']['distribute'] = False  # 是否执行分布式任务
config['train']['amp_level'] = 'O2'  # 设置混合精度等级

config['data']['num_workers'] = 1  # 设置并行计算的进程数量
config['data']['grid_resolution'] = 1.4  # 设置气象分辨率参数

config['optimizer']['epochs'] = 100  # 设置epoch数量
config['optimizer']['finetune_epochs'] = 1  # 设置微调epoch数量
config['optimizer']['warmup_epochs'] = 1  # 设置预热epoch的数量
config['optimizer']['initial_lr'] = 0.0001  # 设置初始化学习率

config['summary']["valid_frequency"] = 10  # 设置验证的频率
config['summary']["summary_dir"] = './summary'  # 设置模型checkpoint的存储路径
logger = create_logger(path=os.path.join(config['summary']["summary_dir"], "results.log"))

创建数据集

dataset路径下,下载正则化参数、训练数据集、验证数据集到 ./dataset目录。

修改vit_kno.yaml配置文件中的root_dir参数,该参数设置了数据集的路径。

./dataset中的目录结构如下所示:

.
├── statistic
│   ├── mean.npy
│   ├── mean_s.npy
│   ├── std.npy
│   └── std_s.npy
├── train
│   └── 2015
├── train_static
│   └── 2015
├── train_surface
│   └── 2015
├── train_surface_static
│   └── 2015
├── valid
│   └── 2016
├── valid_static
│   └── 2016
├── valid_surface
│   └── 2016
├── valid_surface_static
│   └── 2016

模型构建

加载相关的数据参数和模型参数,并完成ViT-KNO模型构建。

[4]:
data_params = config["data"]
model_params = config["model"]
compute_type = mstype.float32

model = ViTKNO(image_size=(data_params["h_size"], data_params["w_size"]),
               in_channels=data_params["feature_dims"],
               out_channels=data_params["feature_dims"],
               patch_size=data_params["patch_size"],
               encoder_depths=model_params["encoder_depth"],
               encoder_embed_dims=model_params["encoder_embed_dim"],
               mlp_ratio=model_params["mlp_ratio"],
               dropout_rate=model_params["dropout_rate"],
               num_blocks=model_params["num_blocks"],
               high_freq=True,
               encoder_network=model_params["encoder_network"],
               compute_dtype=compute_type)

损失函数

ViT-KNO使用多loss的训练方法,包括Prediction loss,Reconstruction loss,两者均基于均方误差(Mean Squared Error)。

[5]:
loss_fn = Lploss()
loss_net = CustomWithLossCell(model, loss_fn)

模型训练

模型训练阶段继承了Trainer类,同时重写了get_dataset,get_callback,get_solver三个成员函数,以便于能在训练阶段执行测试验证;

[6]:
class ViTKNOEra5Data(Era5Data):
    def _patch(self, x, img_size, patch_size, output_dims):
        """ Partition the data into patches. """
        if self.run_mode == 'valid' or self.run_mode == 'test':
            x = x.transpose(1, 0, 2, 3)
        return x

class ViTKNOTrainer(Trainer):
    def __init__(self, config, model, loss_fn, logger):
        super(ViTKNOTrainer, self).__init__(config, model, loss_fn, logger)
        self.pred_cb = self.get_callback()

    def get_dataset(self):
        """
        Get train and valid dataset.

        Returns:
            Dataset, train dataset.
            Dataset, valid dataset.
        """
        train_dataset_generator = ViTKNOEra5Data(data_params=self.data_params, run_mode='train')
        valid_dataset_generator = ViTKNOEra5Data(data_params=self.data_params, run_mode='valid')

        train_dataset = Dataset(train_dataset_generator, distribute=self.train_params['distribute'],
                                num_workers=self.data_params['num_workers'])
        valid_dataset = Dataset(valid_dataset_generator, distribute=False, num_workers=self.data_params['num_workers'],
                                shuffle=False)
        train_dataset = train_dataset.create_dataset(self.data_params['batch_size'])
        valid_dataset = valid_dataset.create_dataset(self.data_params['batch_size'])
        return train_dataset, valid_dataset

    def get_callback(self):
        pred_cb = EvaluateCallBack(self.model, self.valid_dataset, self.config, self.logger)
        return pred_cb

    def get_solver(self):
        loss_scale = DynamicLossScaleManager()
        solver = Model(self.loss_fn,
                       optimizer=self.optimizer,
                       loss_scale_manager=loss_scale,
                       amp_level=self.train_params['amp_level']
                       )
        return solver


trainer = ViTKNOTrainer(config, model, loss_net, logger)
2023-09-07 02:22:28,644 - pretrain.py[line:211] - INFO: steps_per_epoch: 404
[7]:
trainer.train()
epoch: 1 step: 404, loss is 0.3572
Train epoch time: 113870.065 ms, per step time: 281.857 ms
epoch: 2 step: 404, loss is 0.2883
Train epoch time: 38169.970 ms, per step time: 94.480 ms
epoch: 3 step: 404, loss is 0.2776
Train epoch time: 38192.446 ms, per step time: 94.536 ms
...
epoch: 98 step: 404, loss is 0.1279
Train epoch time: 38254.867 ms, per step time: 94.690 ms
epoch: 99 step: 404, loss is 0.1306
Train epoch time: 38264.715 ms, per step time: 94.715 ms
epoch: 100 step: 404, loss is 0.1301
Train epoch time: 41886.174 ms, per step time: 103.679 ms
2023-09-07 03:38:51,759 - forecast.py[line:209] - INFO: ================================Start Evaluation================================
2023-09-07 03:39:57,551 - forecast.py[line:227] - INFO: test dataset size: 9
2023-09-07 03:39:57,555 - forecast.py[line:177] - INFO: t = 6 hour:
2023-09-07 03:39:57,555 - forecast.py[line:188] - INFO:  RMSE of Z500: 199.04419938873764, T2m: 2.44011585143782, T850: 1.45654734158296, U10: 1.636622237572019
2023-09-07 03:39:57,556 - forecast.py[line:189] - INFO:  ACC  of Z500: 0.9898813962936401, T2m: 0.9677559733390808, T850: 0.9703396558761597, U10: 0.9609741568565369
2023-09-07 03:39:57,557 - forecast.py[line:177] - INFO: t = 72 hour:
2023-09-07 03:39:57,557 - forecast.py[line:188] - INFO:  RMSE of Z500: 925.158453845783, T2m: 4.638264378699863, T850: 4.385266743972255, U10: 4.761954010777025
2023-09-07 03:39:57,558 - forecast.py[line:189] - INFO:  ACC  of Z500: 0.7650538682937622, T2m: 0.8762193918228149, T850: 0.7014696598052979, U10: 0.6434637904167175
2023-09-07 03:39:57,559 - forecast.py[line:177] - INFO: t = 120 hour:
2023-09-07 03:39:57,559 - forecast.py[line:188] - INFO:  RMSE of Z500: 1105.3634480837272, T2m: 5.488261092294651, T850: 5.120214326468169, U10: 5.424460568523809
2023-09-07 03:39:57,560 - forecast.py[line:189] - INFO:  ACC  of Z500: 0.6540136337280273, T2m: 0.8196010589599609, T850: 0.5682352781295776, U10: 0.5316879749298096
2023-09-07 03:39:57,561 - forecast.py[line:237] - INFO: ================================End Evaluation================================

模型推理及可视化

完成训练后,我们使用第100个ckpt进行推理。

[8]:
params = load_checkpoint('./summary/ckpt/step_1/koopman_vit_1-100_404.ckpt')
load_param_into_net(model, params)
inference_module = InferenceModule(model, config, logger)
[9]:
def plt_data(pred, label, root_dir, index=0):
    """ Visualize the forecast results """
    std = np.load(os.path.join(root_dir, 'statistic/std.npy'))
    mean = np.load(os.path.join(root_dir, 'statistic/mean.npy'))
    std_s = np.load(os.path.join(root_dir, 'statistic/std_s.npy'))
    mean_s = np.load(os.path.join(root_dir, 'statistic/mean_s.npy'))

    plt.figure(num='e_imshow', figsize=(100, 50), dpi=50)

    plt.subplot(4, 3, 1)
    plt_global_field_data(label, 'Z500', std, mean, 'Ground Truth')  # Z500
    plt.subplot(4, 3, 2)
    plt_global_field_data(pred, 'Z500', std, mean, 'Pred')  # Z500
    plt.subplot(4, 3, 3)
    plt_global_field_data(label - pred, 'Z500', std, mean, 'Error')  # Z500

    plt.subplot(4, 3, 4)
    plt_global_field_data(label, 'T850', std, mean, 'Ground Truth')  # T850
    plt.subplot(4, 3, 5)
    plt_global_field_data(pred, 'T850', std, mean, 'Pred')  # T850
    plt.subplot(4, 3, 6)
    plt_global_field_data(label - pred, 'T850', std, mean, 'Error')  # T850

    plt.subplot(4, 3, 7)
    plt_global_field_data(label, 'U10', std_s, mean_s, 'Ground Truth', is_surface=True)  # U10
    plt.subplot(4, 3, 8)
    plt_global_field_data(pred, 'U10', std_s, mean_s, 'Pred', is_surface=True)  # U10
    plt.subplot(4, 3, 9)
    plt_global_field_data(label - pred, 'U10', std_s, mean_s, 'Error', is_surface=True)  # U10

    plt.subplot(4, 3, 10)
    plt_global_field_data(label, 'T2M', std_s, mean_s, 'Ground Truth', is_surface=True)  # T2M
    plt.subplot(4, 3, 11)
    plt_global_field_data(pred, 'T2M', std_s, mean_s, 'Pred', is_surface=True)  # T2M
    plt.subplot(4, 3, 12)
    plt_global_field_data(label - pred, 'T2M', std_s, mean_s, 'Error', is_surface=True)  # T2M

    plt.savefig(f'pred_result.png', bbox_inches='tight')
    plt.show()
[10]:
test_dataset_generator = Era5Data(data_params=config["data"], run_mode='test')
test_dataset = Dataset(test_dataset_generator, distribute=False,
                       num_workers=config["data"]['num_workers'], shuffle=False)
test_dataset = test_dataset.create_dataset(config["data"]['batch_size'])
data = next(test_dataset.create_dict_iterator())
inputs = data['inputs']
labels = data['labels']
pred_time_index = 0
pred = inference_module.forecast(inputs)
pred = pred[pred_time_index].asnumpy()
ground_truth = labels[..., pred_time_index, :, :].asnumpy()
plt_data(pred, ground_truth, config['data']['root_dir'])

下述展示了第100个ckpt的真实值、预测值和他们之间的误差可视化。

plot result