写点什么

昇腾 msmodelslim w8a8 量化代码解析

作者:AI布道Mr.Jin
  • 2025-03-31
    上海
  • 本文字数:11016 字

    阅读完需:约 36 分钟

​最近有很多朋友都在部署 deepseek 模型,而且都用到了模型量化这个功能,目的是减少显存占用、提升推理速度。


上图是 w8a8 量化算法流程,主要包含 4 步:


①,使用昇腾 msmodelslim 仓库提供的量化接口对原始模型权重进行量化,生成 int8 格式的权重文件,以及后续在推理的时候要用到的激活值的量化参数和 matmul 结果的反量化参数;


②,推理执行过程中,把 Matmul 的激活值(也就是输入 X)进行 int8 量化;


③,执行 int8 格式的 Matmul 计算;


④,把 int8 的乘法结果进行反量化。


这篇文章讲解第①步的内容。msmodelslim 提供的 deepseek 模型量化的参考脚本的链接如下:


Ascend/msit​gitee.com/ascend/msit/tree/br_noncom_MindStudio_8.0.0_POC_20251231/msmodelslim/example/DeepSeek​编辑


入口脚本 quant_deepseek_w8a8.py 的代码内容如下(br_noncom_MindStudio_8.0.0_POC_20251231 分支,commit 06a6e8):


#Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved.import argparseimport functoolsimport jsonimport torchimport torch.nn.functional as Ffrom tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoConfig
from msmodelslim.tools.convert_fp8_to_bf16 import auto_convert_model_fp8_to_bf16, OpsTypefrom msmodelslim.tools.copy_config_files import copy_config_files, modify_config_jsonfrom msmodelslim.pytorch.llm_ptq.anti_outlier import AntiOutlierConfig, AntiOutlierfrom msmodelslim.pytorch.llm_ptq.llm_ptq_tools import Calibrator, QuantConfigfrom msmodelslim.tools.logger import set_logger_level
def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('--model_path', type=str, help="model and tokenizer path"), parser.add_argument('--save_path', type=str, help="save path"), parser.add_argument('--layer_count', type=int, default=0) parser.add_argument('--anti_dataset', type=str, default="./anti_prompt.json") parser.add_argument('--calib_dataset', type=str, default="./calib_prompt.json") parser.add_argument('--fp8', action='store_true') parser.add_argument('--bf16', action='store_true') return parser.parse_args()
def custom_hook(model_config): model_config["mla_quantize"] = "w8a8"
args = parse_args()set_logger_level("warning")pbar = tqdm(total=4, position=0, desc="Total Process")model_path = args.model_pathconfig = AutoConfig.from_pretrained(pretrained_model_name_or_path=model_path, trust_remote_code=True)config.num_hidden_layers = args.layer_count if args.layer_count != 0 else config.num_hidden_layers
tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path=model_path, config=config, trust_remote_code=True, use_fast=True, add_eos_token=True)model = AutoModelForCausalLM.from_pretrained(pretrained_model_name_or_path=model_path, config=config, trust_remote_code=True, device_map="auto", torch_dtype="auto", max_memory={ 0: "50GiB", "cpu": "1500GiB" }, attn_implementation='eager')
auto_convert_model_fp8_to_bf16(model, model_path, OpsType.get_ops_type(args.bf16, args.fp8))
pbar.update(1)
def get_anti_dataset(tokenizer, calib_list, device="npu"): calib_dataset = [] max_len = 0 for calib_data in calib_list: inputs = tokenizer(calib_data, return_tensors='pt') calib_dataset.append(inputs.data['input_ids'].to(device)) max_len = max(max_len, inputs.data['input_ids'].size(1)) for i in range(len(calib_dataset)): calib_dataset[i] = F.pad(calib_dataset[i], (0, max_len - calib_dataset[i].size(1)), value=0) return torch.cat(calib_dataset)
def get_calib_dataset(tokenizer, calib_list, device="npu"): calib_dataset = [] for calib_data in calib_list: inputs = tokenizer(calib_data, return_tensors='pt').to(device) calib_dataset.append([inputs.data['input_ids']]) return calib_dataset
with open(args.anti_dataset, "r") as file: anti_prompt = json.load(file)with open(args.calib_dataset, "r") as file: calib_prompt = json.load(file)
anti_data = []for i in range(len(anti_prompt)): tmp = get_anti_dataset(tokenizer, anti_prompt[i]) anti_data.append(tmp)
anti_dataset = []for data in anti_data: anti_dataset.append([data])
dataset_calib = []for i in range(len(calib_prompt)): tmp = get_calib_dataset(tokenizer,calib_prompt[i]) dataset_calib += (tmp)
with torch.no_grad(): anti_config = AntiOutlierConfig(w_bit=8, a_bit=8, anti_method='m4', dev_type='npu', dev_id=model.device.index) anti_outlier = AntiOutlier(model, calib_data=anti_dataset, cfg=anti_config) anti_outlier.process()pbar.update(1)
disable_names = []for ids in range(config.num_hidden_layers): disable_names.append("model.layers." + str(ids) + ".self_attn.kv_b_proj")
quant_config = QuantConfig( a_bit=8, w_bit=8, disable_names=disable_names, dev_type='npu', dev_id=model.device.index, act_method=1, pr=1.0, w_sym=True, mm_tensor=False, is_dynamic=True)
calibrator = Calibrator(model, quant_config, calib_data=dataset_calib, disable_level="L0")calibrator.run()pbar.update(1)calibrator.save(args.save_path, save_type=["safe_tensor"], part_file_size=4)
custom_hooks = { 'config.json': functools.partial(modify_config_json, custom_hook=custom_hook)}copy_config_files(input_path=args.model_path, output_path=args.save_path, quant_config=quant_config, custom_hooks=custom_hooks)pbar.update(1)
复制代码


这篇文章会从入口脚本出发,对 w8a8 量化的技术原理和代码进行解析。

1. 算法原理

上面的代码涉及到 2 个类:AntiOutlier 和 Calibrator,AntiOutlier 代表的是激活异常值抑制,Calibrator 是激活值和权重量化。

1.1 激活异常值抑制

对于 int8 量化算法,浮点数量化后的取值是有限的(-128、-127、...、127),所以浮点数的分布范围越广的话,量化步长就越大,那么就有更多的浮点数会被量化成同一个数值,也就会引入更大的误差。而且大家发现,对于大模型里面的 Matmul,激活值 X 和权重 W 的浮点数分布很不相同,X 的分布范围更大,这样就会导致激活值 X 的量化误差较大。


为了解决激活值的量化误差问题,有人提出了 smoothQuant 算法。这个算法的原理很简单:让 X 除以一个值 s,W 乘以 s,这样的话,X/s 和 W/s 的分布就会更加平滑,同时(X/s)(Ws)=X*W,保证乘积不变。s 的计算公式如下:



其中 Xj 代表 X 的第 j 列, Wj 代表 W 的第 j 行, α 一般取 0.5。示例如下:



在代码实现中,因为 X 是 norm 层的输出,所以会把 X 除以 s 的操作转移到 norm 层,让 norm 层的权重除以 s,这样就不用在推理的过程中,再做一个除法。

1.2 w8a8 量化

这部分没什么好说的,quant_deepseek_w8a8.py 中用到的就是 min-max 量化算法。对于一个权重 tensor 或者激活 X 的 tensor 来说,假如它的最大、最小值分别为 max、min,那么首先可以求出 scale=(max-min)/255,然后得出量化公式为


x 是 tensor 的每个元素。当然,除了以 tensor 粒度求解 scale,还有以 tensor 的每个通道值分布求 scale 的,我们称作 per-channel。


需要注意的是,权重在推理之前就是已知的,所以不需要做模型推理、直接对权重文件的数据进行量化即可;但是激活值 X 是在推理的时候才能获取的,所以我们还需要准备一些“校准数据集”,让模型做一些前向推理,以此确定激活值的量化参数。


算法理论部分到这里就结束了,比较简单,接下来我们看看代码层面是如何实现的。

2. 代码解析


上图是 deepseek w8a8 量化入口脚本,主要包含 3 个部分:异常值抑制、w8a8 量化、保存量化权重和相关参数。

2.1 异常值抑制

anti_config 包含的参数如下:



anti_config = AntiOutlierConfig(w_bit=8, a_bit=8, anti_method='m4', dev_type='npu', dev_id=model.device.index)
复制代码


其中 w_bit 和 a_bit 代表权重量化位数和激活值量化位数,anti_method 代表抑制算法,'m4'是 smooth_quant(m1) 的改进方法,相比于 smooth_quant 增加了量化层。dev_type 和 dev_id 代表运行异常值抑制使用的设备。


anti_outlier 的核心代码在 msmodelslim\msmodelslim\pytorch\llm_ptq\anti_outlier\anti_outlier.py。init()函数和 process()函数的核心逻辑流程图如下:


步骤 1 是在 AntiOutlier 类的 init()函数中完成的,后续步骤是在 process()函数中完成的。


初始化有向无环图是在 init()函数的这个部分执行的:


        try:            self.init_dag()        except Exception as e:            raise Exception("Please check your config, model and input!", e) from e
复制代码


对于 attention 模型,构建 DAG 图的过程就是找出 RMSNorm 算子和它们连接的 linear 层。对于抑制算法"m1",我们做 w8a8 量化的目标层是 qkv 乘法和 up、gate 的全连接层;对于抑制算法"m4",还包含了 O 层和 down 层(O 层的激活值 scale 转移到 V 层,down 层的激活值 scale 转移到 up 层)。


init_dag()函数核心代码如下:


        if self.norm_class_name is not None:  # 可以手动指定norm层            norm_class = list(OrderedDict.fromkeys([m.__class__ for m in self.model.modules() if                                                    self.norm_class_name.lower() == m.__class__.__name__.lower()]))        else:            # 查找包含“norm”字段的层            norm_class = list(                OrderedDict.fromkeys(                    [m.__class__ for m in self.model.modules() if "norm" in m.__class__.__name__.lower()]))            norm_class = [norm_class[0]]            self.norm_class_name = norm_class[0].__name__.lower()        if ProcessHook.GET_NORM_LINEAR_SUBGRAPH not in self.hooks or self.hooks[            ProcessHook.GET_NORM_LINEAR_SUBGRAPH] is None:            # 调用extract_dag()获取DAG图            dag = extract_dag(self.model, dummy_input,                            hook_nodes=norm_class, anti_method=self.cfg.anti_method)            self.norm_linear_subgraph = dag.get_norm_linear_subgraph()            if self.cfg.anti_method == 'm4':                self.linear_linear_subgraph = dag.get_linear_linear_subgraph()                self.norm_linear_subgraph.update(self.linear_linear_subgraph)            del dag
复制代码


上面的代码中主要调用了 extract_dag() 函数获取 DAG 图,然后得到 norm_linear_subgraph。extract_dag 调用的又是 TorchDAGAdapter 类,这篇文章不做详细分析。norm_linear_subgraph 的格式如下所示:norm_linear_subgraph{'model.layers0.input_layernorm': ['model.layers0.attn.q_proj', 'model.layers0.attn.k_proj', 'model.layers0.attn.j_proj'], 'model.layers0.post_attention_layernorm': ['model.layers0.mlp.gate_proj', 'model.layers0.mlp.up_proj'], ...}


anti_outlier 的 process 的核心代码如下:


    def _process(self):        ...        # 给模型层注册hook,执行推理,记录每层的输入输出        act_stats = self.os_stats()        ....        # 遍历需要做量化的层        for norm_name_group in tqdm(iterable=self.norm_linear_subgraph.keys(), desc="AntiOutlier Process", position=1):            linear_names = self.norm_linear_subgraph[norm_name_group]            if isinstance(norm_name_group, str):                norm_module = PatternProcess.get_module_by_name(self.model, norm_name_group)            ...            stats = act_stats[linear_name]
is_expert = any("expert" in name.lower() for name in linear_names) if (is_expert): continue
self.logger.debug(f"smooth {norm_name_group} -> {linear_names}")
for name in linear_names: mod = PatternProcess.get_module_by_name(self.model, name) linear_modules.append(mod)
... # 对权重进行smooth if Multiplier is not None and norm_module is None: norm_module = Multiplier( torch.ones_like(stats[STAT_KEY_SMOOTH_SCALE]).to(linear_modules[0].weight.device) )
prepare_list = [PrepareWeight(norm_module, post_force=True, post_recurse=True)] prepare_list += [PrepareWeight(mod, post_force=True) for mod in linear_modules] # 对norm层权重进行smooth with ResListToRelease(*prepare_list): if self.cfg.anti_method == 'm1' or self.cfg.anti_method == 'm5': smooth_ln_fcs(self.cfg, norm_module, linear_modules, stats, alpha=self.cfg.alpha) elif self.cfg.anti_method == 'm2': os_ln_fcs(self.cfg, norm_module, linear_modules, stats, os_k=self.cfg.os_k) elif self.cfg.anti_method == 'm3': weight_aware(self.cfg, norm_module, linear_modules, stats) elif self.cfg.anti_method == 'm4': if 'scale_min' in inspect.signature(iter_smooth).parameters: fusion_kwargs.update({"scale_min": scale_min}) if 'check_group_fusions' not in inspect.signature(iter_smooth).parameters: fusion_kwargs.pop("check_group_fusions", None) self.logger.debug(f"fusion_kwargs is {fusion_kwargs}") iter_smooth( self.cfg, norm_module, linear_modules, stats, num_attention_heads, **fusion_kwargs ) if attach_op is not None and Multiplier is not None and isinstance(norm_module, Multiplier): attach_op(self.model, norm_module, linear_modules, linear_names)
复制代码


上面的代码,首先执行 self.os_stats(),这个函数的功能是在模型层上注册 hook,然后使用校准数据进行推理,收集每层的输入输出。


然后遍历 norm_linear_subgraph 的值,把 norm 层和对应的 linear 层找出来,先把权重乘以 s,然后使用 CANN 包路径 /usr/local/Ascend/ascend-toolkit/latest/python/site-packages/msmodelslim 下的 so 里面的方法做激活层的 smooth,也就是对 norm 层的权重进行处理。


以上就是异常值处理的主要逻辑,完成异常值处理后,model 里面的 norm 层和 linear 层的权重已经发生了变化,model 会继续传给后续的 calibrator 处理。

2.2 w8a8 量化代码

首先需要设置量化方法的参数:


quant_config = QuantConfig(    a_bit=8,    w_bit=8,    disable_names=disable_names,    dev_type='npu',    dev_id=model.device.index,    act_method=1,    pr=1.0,    w_sym=True,    mm_tensor=False,    is_dynamic=True)
复制代码


a_bit 和 w_bit 代表量化 bit 数;disable_names 代表不做量化层的名称;act_method 代表激活值的量化方法,“1”代表 min-max;pr 是概率参数,非 1 时量化生成的参数带有随机性;w_sym 是指权重是否做对称量化;mm_tensor=False 代表权重是 per-channel 量化;is_dynamic=True 代表激活量化使用动态量化,也就是量化参数是在推理的时候生成,is_dynamic=False 代表静态量化,在调用 calibrator 量化权重的时候就把激活值的量化参数计算好,动态量化精度更高,但是性能更差。


再来看一下实例化 calibrator:


calibrator = Calibrator(model, quant_config, calib_data=dataset_calib, disable_level="L0")
复制代码


传入了异常值抑制后的 model、quant_config、校准数据集和 disable_level。disable_level='“Ln”代表模型结构从最后一层往前数的 n 层不做量化。校准数据集一般采用模型实际应用场景的数据,而且在调试精度的时候,如果发现量化模型在某条数据上精度较差,可以把该条数据加入校准数据集,再进行校准量化。


再来看一下 init()函数做了哪些事情:


    def __init__(self, model,                 cfg: QuantConfig,                 calib_data=None,                 disable_level='L0',                 all_tensors=None):        ...        # 获取校准数据集        self.calib_data = self.get_calib_data([]) if calib_data is None else self.get_calib_data(calib_data)        self.use_kvcache_quant = cfg.use_kvcache_quant  # false        self.norm_class_name = cfg.norm_class_name
... # 创建字典记录量化参数 self.quant_param_dict = AutoSaveDict(self.cfg, max_gb_size=1) # 记录被量化module名称,相关的scale、offset等参数名称 key:weight的名称, value:scale、offset等参数的名称 self.quantized_module_param_dict = defaultdict(list) self.fa_module_param_dict = defaultdict(list) ... # 初始化模型权重json描述 self.quant_model_json_description = QuantModelJsonDescription(self.cfg.model_quant_type, self.cfg.use_kvcache_quant, self.cfg.use_fa_quant) if not re.match(r'^L((?!0)\d+|0)$', disable_level): raise ValueError('Please check the `disable_level` configuration.') self.disable_level = disable_level
model = self.init_model_device(model) self.last_layer_name = None self.rollback_names = None self.quant_linear_names = None # 记录激活值量化相关的参数 self.act_states = None # 确认不参与量化的层 self.rollback_names_process(model) ... # 对模型做量化的层进行替换,替换成可以计算量化参数的“quant modules” try: self.model = self.quantize(model) if self.calib_data: self.enable_quant() except Exception as e: raise Exception("Please check the model and configuration.", e) from e
self.named_module_count = len(list(self.model.named_modules())) self.logger.info("Quantizer initialized successful!")
复制代码


首先创建了一个字典 self.quant_param_dict 用来保存量化参数(scale、offset 等值),然后创建了一个文件 quant_model_json_description,这个其实就是量化完成之后,文件夹下面生成的 quant_model_description_w8a8.json。接着 rollback_names_process() 就是量化目标层回滚。如果不回滚的话,就是模型所有层都做量化。这个函数是根据 disable_names 和 disable_levels 把不量化的层剔除掉。做量化精度调试的时候,一个主要的方式就是尝试回滚不同的层数。


接着就到了 self.quantize(model),这是一个重要的步骤。它调用了 quantize_model()函数,主要内容如下:


        for name, mod in model.named_modules():            with PrepareWeight(mod):                # 跳过不做量化的层                if name in self.rollback_names:                    continue                if isinstance(mod, nn.Linear) or isinstance(mod, nn.modules.linear.NonDynamicallyQuantizableLinear):                    ...                    elif self.cfg.model_quant_type is not QuantType.W8A8S:                        is_dynamic = self.cfg.is_dynamic                        if "mlp" in name and self.is_deepseek_v2:                            if self.cfg.model_quant_type is QuantType.W8A8:                                is_dynamic = True                        # 生成“quant modules”                        quant_mod = LinearQuantizer(cfg=self.cfg, logger=self.logger, is_dynamic=is_dynamic)                    else:                        quant_mod = LinearSparseQuantizer(cfg=self.cfg, logger=self.logger)                    quant_mod.set_param(mod)                    move_update_weight_hook_if_need(mod, quant_mod)                    # 把需要做量化的线性层替换成具备量化功能的quant modules                    _set_module(model, name, quant_mod)        ...
复制代码


这段代码的功能主要是遍历模型的所有模块,把需要做量化的层找到,然后把它们替换成 LinearQuantizer 类,这个类的 forward 函数在做前向推理的过程中会计算量化参数。


calibrator.run()调用了 self._run,self._run 主要调用的是 self.run_calib_mode(),run_calib_mode()函数的核心代码如下:


        for data in tqdm(iterable=self.calib_data, position=1, desc="Calibrator Process"):            if not amp_done and self.cfg.fa_amp:                enable_fa_quantizer_record(self.model)
if isinstance(data, tuple) or isinstance(data, list): self.model(*data) elif isinstance(data, dict): self.model(**data)
...
复制代码


上面的代码主要是遍历校准数据,把每条数据传给模型做一次前向推理。在前向推理的过程中,LinearQuantizer 的 forward()函数会进行关键的量化操作。我们可以看一下相关代码:


首先看一下 LinearQuantizer 的初始化函数:


    def __init__(self, cfg=None, logger=None, is_dynamic=False):        """        cfg: quantizaton configuration        """        super(LinearQuantizer, self).__init__()        self.in_features = None        self.out_features = None        self.weight = None        self.bias = None        # 激活值的Tensor量化器        self.quant_input = TensorQuantizer(            bit=cfg.a_bit, is_signed=cfg.a_signed, is_enable=True,            is_input=True, cfg=cfg, logger=logger, is_dynamic=is_dynamic        )        # 权重的Tensor量化器        self.quant_weight = TensorQuantizer(            bit=cfg.w_bit, is_signed=cfg.w_signed, is_enable=True,            is_input=False, cfg=cfg, logger=logger        )
复制代码


可以看到,代码中初始化了激活值量化器和权重量化器,我们继续看一下它们的 forward 函数逻辑:


    def forward(self, x):        if self.quant_weight.int_infer and (not self.quant_weight.is_calib):            return self._int_infer_forward(x)        else:            if self.quant_input.w_hessian:  # gptq                weight = self.quant_weight(self.weight, y=x.clone())            else:                weight = self.quant_weight(self.weight)            if self.quant_input.bit <= 8:                x = self.quant_input(x)            return F.linear(x, weight, self.bias)
复制代码


首先是调用 self.quant_weight 获得了量化后的权重,然后调用 self.quant_input 获得了量化后的激活值,再执行 linear 操作。self.quant_weight 和 self.quant_input 都是 TensorQuantizer 类,TensorQuantizer 的 forward 函数代码如下:


    def tensor_forward(self, tensor, y=None):        ...        # weight quantization        with torch.no_grad():            # 对权重进行量化            if not self.is_input:                return self._quant_weight_forward(tensor, y)
# activation quantization if self.is_dynamic: self._stat_dynamic_input(tensor) # 对输入进行量化 return self._quant_activation_forward(tensor)
复制代码


_quant_weight_forward 和 _quant_activation_forward 调用的 _init_weight_quant_normal、fake_quantize 和 linear_quantization_params 都是在 CANN 包中的 so 里面实现的,这里不做解析,其原理就是 int8 量化。


上面的代码在执行过程中会生成权重量化和激活值量化的参数,并作为 TensorQuantizer 的属性保存在内存中,后续调用 calibrator.save()的收获收集保存。


执行完 calibrator.run()后,就可以执行 calibrator.save() 保存权重和量化参数了,save() 函数中主要调用 self.get_quant_params() 来获取量化权重和相关参数,代码比较简单,在这里不做讲解。


以上就是这篇文章的所有内容,下面内容会解析 mindie 做量化推理的代码逻辑,敬请期待!


用户头像

还未添加个人签名 2020-11-13 加入

还未添加个人简介

评论

发布
暂无评论
昇腾 msmodelslim w8a8量化代码解析_AI布道Mr.Jin_InfoQ写作社区