写点什么

基于 Hologres 构建多模态 AI 数据分析与检索系统

  • 2025-11-24
    浙江
  • 本文字数:20228 字

    阅读完需:约 66 分钟

在数据驱动时代,非结构化数据(文本、图像、音视频、日志等)与结构化、半结构化数据(JSON)共同构成企业的核心数据资产。其中,非结构化数据以更原始、多元的形态蕴含着海量的业务洞察(如用户反馈、合同条款、产品缺陷图像),Hologres4.0 以“AI 时代的一站式多模态分析平台”为核心理念,全面展示了 Hologres 在结构化、半结构化与非结构化数据分析能力上的重大突破,发布全新向量索引 HGraph,登顶 VectorDBBench 性价比榜单 QPS、Recall、Latency、Load 四项第一,为 AI 应用的提供高性价比、高吞吐、低延迟、高并发的向量服务,成为全球最具性价比的向量数据库!

基于 Hologres 构建多模态 AI 数据分析与检索系统

本文将会模拟金融场景中对招股书、合同等 PDF 文件的检索与分析,以辅助业务进行下一步的精细化运营决策。基于 Hologres 4.0 对 PDF 非结构化数据的处理与检索,包含的主要能力如下:


  • 非结构化数据(Object Table):支持通过表的形式读取 OSS 中非结构化数据(PDF/IMAGE/PPT 等)。

  • AI Function:在 Hologres 中可以用标准 SQL 的方式调用 AI Function,自动调用内置大模型,完成 AI 服务建设场景

  • 数据加工:提供 Embed、Chunk 算子,可以对非结构化数据加工成结构化数据存储,无需使用外部算法就能自动 Embed。

  • 数据检索和分析:提供ai_genai_summarize等算子,能够通过 SQL 对数据进行推理、问题总结及翻译等操作。

  • Dynamic Table 介绍:支持增量刷新模式对非结构化数据自动加工,每次只计算增量的数据有效减少重复计算,降低资源利用率。

  • 向量检索:支持标准 SQL 的向量检索,用于非结构化数据的相似度搜索、场景识别等,在同一个查询中可以自由地实现向量和标量的检索。

  • 全文检索:通过倒排索引、分词等机制实现对非结构化数据的高效检索,支持关键词匹配、短语检索等丰富的检索方式,实现更加灵活的检索。

方案优势

通过如上核心能力,在 Hologres 中多模态 AI 检索与分析的核心优势如下:

  • 完整的 AI 数据处理流程:涵盖从数据 Embed、Chunk、增量加工、检索/分析的全流程,开发人员可以使用大数据系统一样,轻松构建 AI 应用。

  • 标准 SQL 加工和分析非结构化数据:无需使用专用开发语言,纯 SQL 就能完成非结构化数据提取、加工,也无需借助外部系统,数据处理更加高效和简单,降低开发人员学习成本。

  • 检索更精准、灵活和智能:可以轻松构建“关键词+语义+多模态”的混合检索链路,覆盖从精准搜索到意图理解的全场景需求。还能结合 AI Function 实现对用户意图的深度理解,语义关联和上下文推理,实现更智能的检索能力。

  • 数据不出库,更安全:不需要将数据导出到外部系统,与 hologres 的多种安全能力无缝集成,高效保护数据安全。


本实践文档将会介绍如何通过上诉核心能力在 Hologres 中对非结构化数据加工和检索,助力搭建企业级多模态 AI 数据平台,打破数据孤岛,释放全域数据价值

方案流程

本次方案的流程如下:

  1. 数据集准备。

金融数据集中的 PDF 文件上传至 OSS 存储。


  1. PDF 数据加工。

使用 Object Table 读取 PDF 的元数据信息,然后创建增量刷新的 Dynamic Table,并对数据进行 Embed 和 Chunk,同时也对 Dynamic Table 构建向量索引和全文索引,以便后续检索可以使用索引的能力。


  1. 使用ai_embed算子对将自然语言的问题进行 Embedding,然后使用全文和向量双路召回结果,并对结果进行排序,结合大模型的推理能力,最终输出相似度最高的答案。

准备工作

  • 数据准备

本文使用 ModelScope 公开的金融数据集中的 PDF 文件夹中的文件,共 80 份公司招股说明书。


  • 环境准备

  • 购买 Hologres V4.0 及以上版本实例并创建数据库。

  • 购买 AI 资源。


本文以 large-96core-512GB-384GB、1 个节点为例。

  • 模型部署。本次方案使用的模型以及分配的资源为:

说明上述模型的资源均为默认分配的资源。

操作步骤

  1. 下载 PDF 文件并上传至 OSS。

  • 下载博金大模型挑战赛-金融千问 14b 数据集中 80 份招股书(PDF)。

  • 登录 OSS 管理控制台,创建 Bucket 并将已下载的 PDF 文件上传至该 Bucket 路径下。上传操作详情,请参见简单上传。

  1. 账号授权。

  • 登录 RAM 控制台,创建阿里云 RAM 角色并授予 OSS 的相关权限。


推荐授予 AliyunOSSReadOnlyAccess 权限。

  • 为上述阿里云 RAM 角色添加登录和 Hologres 的访问权限。

  • 阿里云账号(主账号)

修改 RAM 角色的信任策略。重点需更新如下参数:

  • Action:更新为 sts:AssumeRole。

  • Service:更新为 hologres.aliyuncs.com。

{  "Statement": [    {      "Action": "sts:AssumeRole",      "Effect": "Allow",      "Principal": {        "RAM": [          "acs:ram::1866xxxx:root"        ],        "Service": [          "hologres.aliyuncs.com"        ]      }    }  ],  "Version": "1"}
复制代码


  • RAM 用户(子账号)

● 为 RAM 用户授权。

a. 在权限管理 > 权限策略页面,单击创建权限策略,并选择脚本编辑模式创建权限策略。具体操作,请参见创建自定义权限策略


Hologres 可通过该策略判断当前 RAM 用户是否具备创建对应 RAM 角色的权限。权限策略内容如下。

{  "Version": "1",  "Statement": [    {      "Effect": "Allow",      "Action": "hologram:GrantAssumeRole",      "Resource": "<arn账号>"    }  ]}
复制代码


b. 在身份管理 > 用户页面,单击目标 RAM 用户操作列中的添加权限,为 RAM 用户(子账号)授予上述步骤已创建的权限策略。具体操作,请参见为 RAM 用户授权。


● 为已创建的 RAM 角色授权。

修改 RAM 角色的信任策略。重点需更新如下参数:

  • Action:更新为sts:AssumeRole

  • Service:更新为hologres.aliyuncs.com

{  "Statement": [    {      "Action": "sts:AssumeRole",      "Effect": "Allow",      "Principal": {        "RAM": [          "acs:ram::1866xxxx:root"        ],        "Service": [          "hologres.aliyuncs.com"        ]      }    }  ],  "Version": "1"}
复制代码


  1. 对 PDF 文件进行 Embedding 和 Chunk。

需要创建 Object Table 和 Dynamic Table 对 PDF 的元数据读取以及加工。因为流程较长,Hologres 直接将其封装为存储过程。该存储过程包括的能力如下:

  • 会创建一张 Object Table,用于取 PDF 的元数据。

  • 创建一张增量刷新模式的 Dynamic Table 结果表,用于存储加工后的数据。同时,该表需设置向量索引和全文索引,且 Dynamic Table 不设置自动刷新,需要手动刷新。

  • Dynamic Table 的刷新过程中会使用ai_embedai_chunk对数据进行 Embed 和切片。

该存储过程代码如下:

CALL create_rag_corpus_from_oss(    oss_path => 'oss://xxxx/bs_challenge_financial_14b_dataset/pdf',    oss_endpoint => 'oss-cn-hangzhou-internal.aliyuncs.com',    oss_role_arn => 'acs:ram::186xxxx:role/xxxx',    corpus_table => 'public.dt_bs_challenge_financial');
复制代码


  1. 刷新结果表。

通过如上存储过程创建的 Object Table 和 Dynamic Table 均需手动刷新,才能完成数据加工。该步骤已被封装为 PDF 加工存储过程,该存储过程包括的能力如下:

  • 刷新一次 Object Table 获取 PDF 元数据

  • 刷新一次 Dynamic Table,进行 PDF 的 Embedding 和 Chunk 加工。

该存储过程使用代码如下:

CALL refresh_rag_corpus_table(    corpus_table => 'public.dt_bs_challenge_financial');
复制代码


  1. PDF 检索。

加工好的数据可以根据业务使用场景,通过向量、全文等方式进行检索。例如:可以根据招股书来查询某个公司的业绩走势,以此来判断公司后续的走势是悲观还是乐观,以便对后续的投资意向提供辅助建议。

向量检索

在向量检索时,为了检索方便,我们将问题 Embedding 和 Prompt 构建,大模型输出答案等过程封装成为向量检索函数,直接调用如下该存储过程可以实现向量召回。

-- 向量单路召回 + AI重排SELECT qa_vector_search_retrieval(  question => '报告期内,湖南国科微电子股份有限公司2014年度、2015年度、2016年度营业收入和净利润分别较上年增长多大幅度?',  corpus_table => 'dt_bs_challenge_financial',  prompt => '请分析如下业绩走势是悲观还是乐观,并给出原因:${question}\n\n 参考信息:\n\n ${context}')
复制代码


检索答案如下:

qa_retrieval---------"根据提供的信息,对湖南国科微电子股份有限公司的业绩走势进行分析,可以得出以下结论:
### 一、业绩走势分析:悲观
#### 1. **营业收入增长乏力**- 2014年度营业收入较上年增长 **15.13%**,但2015年度营业收入却 **下降5.21%**,2016年度数据未提供,但可以看出营业收入增长趋势在2015年出现明显下滑。- 2012年至2014年营业收入的年复合增长率仅为 **4.47%**,表明公司业务扩张较为缓慢。
#### 2. **净利润增长持续下降**- 2014年度净利润增长 **5.43%**,2015年度净利润 **下降3.29%**。- 扣除非经常性损益后,2014年度归属于母公司股东的净利润增长 **-3.14%**,2015年度进一步下降 **-5.60%**,表明公司主营业务盈利能力在持续恶化。- 2012年至2014年扣除非经常性损益后净利润的年复合增长率为 **-4.38%**,远低于营业收入增长,说明公司主营业务盈利能力不足,增长主要依赖非经常性损益。
#### 3. **非经常性损益占比偏高**- 报告期内,非经常性损益占净利润的比例较高,2014年、2013年、2012年分别为 **17.54%、10.25%、8.06%**,表明公司利润中有一部分来自政策扶持、政府补贴等非经常性因素,而非核心业务的持续增长。- 依赖非经常性损益来维持利润增长,不利于公司长期的稳定发展。
#### 4. **净资产收益率下降**- 加权平均净资产收益率从2014年的 **18.10%** 下降到2015年的 **24.82%**,再到2016年的 **28.23%**,虽然数据看似增长,但需注意该指标是以扣除非经常性损益后的净利润计算的,而净利润本身在下降,因此这种增长可能与资本结构变化有关,而非盈利能力的实质性提升。
### 二、原因总结1. **主营业务增长乏力**:营业收入和净利润增长均呈现下降趋势,尤其是净利润的下降表明公司盈利能力在减弱。2. **非经常性损益依赖度高**:公司利润中非经常性损益占比较高,说明主营业务的盈利能力不足,公司业绩的持续性存疑。3. **市场竞争激烈**:公司采购的工控机、显示器、电源等产品市场竞争激烈,价格平稳,利润空间受到挤压。4. **行业环境影响**:不锈钢市场价格波动、原材料价格波动可能对公司经营业绩造成一定影响,虽然公司已采取措施降低影响,但长期来看仍需关注。
### 三、结论总体来看,湖南国科微电子股份有限公司的业绩走势偏 **悲观**。公司主营业务增长乏力,净利润持续下降,对非经常性损益依赖度高,未来盈利能力的可持续性存疑。公司需要加强核心业务的竞争力,优化成本结构,提高主营业务盈利能力,以实现长期稳健发展。"
复制代码

全文检索

在全文检索时,为了检索方便,我们将问题 Embedding 和 Prompt 构建,大模型输出答案等过程封装为全文检索函数,直接调用如下存储过程可以实现全文召回:

--全文检索召回SELECT qa_text_search_retrieval(    question => '报告期内,湖南国科微电子股份有限公司2014年度、2015年度、2016年度营业收入和净利润分别较上年增长多大幅度?',    corpus_table => 'dt_bs_challenge_financial',    prompt => '请分析如下业绩走势是悲观还是乐观,并给出原因:${question}\n\n 参考信息:\n\n ${context}');
复制代码


检索答案如下:

qa_text_search_retrieval----------------"根据提供的信息,湖南国科微电子股份有限公司在2014年、2015年和2016年的业绩走势整体上呈现**悲观**趋势,具体原因如下:
### 1. **营业收入增长乏力**- 2014年营业收入增长率为**15.13%**,但2015年营业收入增长率转为**-5.21%**,即出现负增长。- 2012年至2014年的营业收入年复合增长率仅为**4.47%**,说明公司营业收入增长较为缓慢,业务发展不够强劲。- 2015年上半年营业收入预测与2014年同期相比大致持平,但2015年上半年净利润较上年同期**略有下降**,表明盈利能力下降。
### 2. **净利润和扣非净利润增长不佳**- 2014年净利润增长率为**5.43%**,2015年净利润增长率下降为**-3.29%**,即净利润出现下滑。- 扣除非经常性损益后的净利润增长率在2014年为**-3.14%**,2015年进一步下降为**-5.60%**,说明公司主营业务的盈利能力持续下降。- 2012年至2014年扣非净利润的年复合增长率为**-4.38%**,明显低于营业收入的年复合增长率,说明公司利润质量不高,主营业务盈利能力较弱。
### 3. **经营活动现金流波动**- 2014年销售商品、提供劳务收到的现金占营业收入的比例较前两年有所下降,主要与部分收入确认项目的**收款跨期**有关,说明公司现金流管理存在问题。- 2013年购买商品、接受劳务支付的现金占营业成本比例较高,主要是由于当年**采购原材料并完成生产**,但部分成本在2014年才结转,导致2014年该比例较低,反映公司采购和生产节奏不够稳定。
### 4. **投资和盈利能力指标**- 加权平均净资产收益率(ROE)在2014年为**18.10%**,2015年上升至**24.82%**,2016年进一步上升至**28.23%**,虽然有所提升,但ROE的提高可能主要依赖**财务杠杆**,而非核心业务盈利能力的提升。- 考虑到净利润和扣非净利润持续下降,ROE的提升并不能完全反映公司经营质量的改善。
### 5. **2015年上半年业绩预测**- 2015年上半年营业收入预计为**8,505万元至10,395万元**,与2014年同期的**10,127.35万元**大致持平,但净利润预计为**2,340万元至2,860万元**,低于2014年同期的**2,912.66万元**,说明公司盈利能力下降。
### 总结综合来看,湖南国科微电子股份有限公司在2014年至2016年的业绩走势**偏向悲观**。虽然ROE有所提升,但营业收入增长乏力、净利润和扣非净利润持续下降、经营活动现金流波动较大,表明公司主营业务盈利能力较弱,经营质量有待提升。"
复制代码

向量+全文混合检索

在向量、全文结合 Rank 排序混合检索场景中,为了检索方便,Hologres 将其封装为向量+全文双路召回+Rank 排序函数,该存储过程的能力如下:

  • 根据问题使用向量计算,召回 TOP 20 的答案。

  • 根据问题使用全文检索,召回 TOP 20 的答案。

  • 使用ai_rank,对向量和全文召回的答案进行排序,最后输出 Top1 的答案。

  • 使用ai_gen,结合大模型根据提示词以及检索的答案,生成最终答案并进行输出。

-- 全文、向量双路召回 + AI重排SELECT qa_hybrid_retrieval(    question => '报告期内,湖南国科微电子股份有限公司2014年度、2015年度、2016年度营业收入和净利润分别较上年增长多大幅度?',    corpus_table => 'dt_bs_challenge_financial',    prompt => '请分析如下业绩走势是悲观还是乐观,并给出原因:${question}\n\n 参考信息:\n\n ${context}');
复制代码


检索答案如下:

qa_hybrid_retrieval---"根据提供的信息,我们可以对湖南国科微电子股份有限公司的业绩走势进行如下分析,并判断其趋势是悲观还是乐观:
---
### 一、**营业收入走势分析**1. **2012-2014年复合增长率**: - 营业收入的年复合增长率为 **4.47%**,表明公司营业收入的增长较为平稳。 - 2014年营业收入为 **18,154.06万元**,较2013年增长 **15.13%**。 - 2015年营业收入较上年 **下降5.21%**,出现了负增长。
2. **结论**: - 营业收入的增长在2014年有所回升,但2015年出现明显下滑,表明公司业务扩张遇到了一定阻力。
---
### 二、**净利润走势分析**1. **2012-2014年复合增长率**: - 扣除非经常性损益后的净利润年复合增长率为 **-4.38%**,低于营业收入的年复合增长率,说明公司盈利能力有所下降。 - 2014年扣非净利润为 **42,731,071.18元**,较2013年下降 **3.14%**。 - 2015年扣非净利润较上年进一步下降 **5.60%**。
2. **非经常性损益影响**: - 2014年、2013年及2012年非经常性损益占净利润的比例分别为 **17.54%、10.25%、8.06%**,呈上升趋势。 - 非经常性损益的增加主要来自于政府补贴和理财产品收益,而非主营业务带来的持续增长。
3. **结论**: - 扣非净利润连续两年下降,说明公司主营业务盈利能力减弱,业绩增长依赖于非经常性损益,这是令人担忧的信号。
---
### 三、**现金流量与经营稳定性**1. **经营活动现金流**: - 2014年营业收入为 **18,154.06万元**,但销售商品、提供劳务收到的现金并未明确给出,无法判断现金流是否健康。 - 报告期内,公司银行存款分别为 **13,063.38万元、4,152.54万元、9,864.61万元**,资金流动性波动较大,但主要客户、供应商及经营模式保持稳定。
2. **结论**: - 虽然公司现金流存在波动,但客户和供应商稳定,经营模式未发生重大变化,这为公司未来的发展提供了一定保障。
---
### 四、**2015年上半年业绩预测**- 2015年1至6月预计营业收入为 **8,505万元至10,395万元**,较2014年同期的 **4,641.19万元**有明显增长。- 但2015年1至3月净利润较上年同期下降 **48.26%**,主要是因为确认收入的项目毛利率较低。
---
### 五、**综合分析与判断**1. **乐观因素**: - 2014年营业收入增长较快,达到 **15.13%**。 - 2015年上半年营业收入预计增长显著,表明公司可能正在逐步恢复。 - 主要客户、供应商及经营模式保持稳定,为公司提供了良好的运营基础。
2. **悲观因素**: - 2015年营业收入较上年 **下降5.21%**,净利润也出现下滑。 - 扣非净利润连续两年下降,表明公司主营业务盈利能力不足。 - 非经常性损益占比上升,业绩增长依赖于政府补贴和理财产品收益,缺乏内生增长动力。 - 2015年1至3月净利润大幅下滑 **48.26%**,表明短期业绩波动较大。
---
### **最终结论:整体趋势偏悲观**- 尽管公司在2014年营业收入有所回升,且2015年上半年预计增长,但 **扣非净利润连续下降**、**净利润增长依赖非经常性损益**、**短期业绩波动较大**,表明公司目前的业绩增长缺乏持续性和稳定性。- 因此,从长期来看,公司业绩走势偏 **悲观**,需关注其主营业务盈利能力的改善和非经常性损益的依赖问题。
---
### **建议**1. 关注公司未来主营业务的盈利能力是否能有所提升。2. 降低对非经常性损益的依赖,提高内生增长动力。3. 稳定客户和供应商关系,优化业务结构,提高毛利率。"
复制代码

向量+全文双路召回+RRF 排序

使用向量和全文检索后,通过 RRF (Reciprocal Rank Fusion)排序召回结果。为了检索方便,Hologres 已经封装为向量+全文双路召回+RRF排序函数(详细定义见下方附录),该存储过程的能力如下:


  • 根据问题使用向量计算,召回 TOP 20 的答案。

  • 根据问题使用全文检索,召回 TOP 20 的答案。

  • 对向量和全文召回的答案,计算 RRF 分数,最后输出 Top N 的答案。

  • 使用ai_gen、大模型根据提示词,以及检索的答案,拼装成最终答案并输出。

-- 全文、向量双路召回 + RRF重排SELECT qa_hybrid_retrieval_rrf(    question => '报告期内,湖南国科微电子股份有限公司2014年度、2015年度、2016年度营业收入和净利润分别较上年增长多大幅度?',    corpus_table => 'dt_bs_challenge_financial',    prompt => '请分析如下业绩走势是悲观还是乐观,并给出原因:${question}\n\n 参考信息:\n\n ${context}');
复制代码


检索答案如下:

qa_hybrid_retrieval_rrf------------------"根据提供的信息,对湖南国科微电子股份有限公司的业绩走势进行分析,可以得出以下结论:
### **业绩走势判断:悲观**#### **原因分析如下:**
1. **净利润增长低于营业收入增长:** - 提供的信息指出,公司2012年至2014年的**营业收入年复合增长率为4.47%**,表明公司整体业务增长较为平稳。 - 但**扣除非经常性损益后归属于母公司股东的净利润年复合增长率为-4.38%**,明显低于营业收入的增长率。这说明公司在收入增长的同时,盈利能力并未同步提升,甚至出现下滑,可能受到成本上升、毛利率下降或非经常性损益减少等因素影响。
2. **净利润波动较大:** - 2015年1至3月的净利润较上年同期减少了48.26%,且主要原因在于确认收入的项目毛利率较低(如无锡地铁一号线项目以模块外购为主)。这表明公司短期业绩容易受到业务结构变化的影响,存在一定的不稳定性。
3. **毛利率和盈利能力下降:** - 提到“主营业务利润对公司净利润的贡献”在2014年较2013年略有下降,而2013年又较2012年下降。这说明公司核心业务的盈利能力可能在减弱,可能受到市场竞争加剧、成本上升或产品结构变化的影响。
4. **2015年上半年预测利润下降:** - 2015年1至6月预计营业收入为8,505万元至10,395万元,与2014年同期大致持平,但净利润预计为2,340万元至2,860万元,低于2014年同期的2,912.66万元。这表明公司盈利能力在进一步下降,可能面临一定的经营压力。
5. **业务增长乏力:** - 虽然营业收入增长较为平稳,但净利润的下降表明公司业务增长的质量不高,未能有效转化为利润。这可能影响投资者对公司未来发展的信心。
### **总结:**湖南国科微电子股份有限公司的业绩走势整体偏向**悲观**。虽然营业收入保持了平稳增长,但净利润的增长明显滞后甚至出现负增长,表明公司盈利能力在下降,业务发展质量不高,且存在短期业绩波动的风险。如果公司不能有效提升毛利率、控制成本或优化产品结构,未来业绩可能继续承压。"
复制代码

附录:存储过程定义

上述文档中使用的存储过程定义如下,方便您做参考。

说明在 Hologres 中不支持创建 Function,如下存储过程仅做参考,无法修改后直接执行。

PDF 加工存储过程

  • 创建 Object Table 和 Dynamic Table

CREATE OR REPLACE PROCEDURE create_rag_corpus_from_oss(    oss_path TEXT,    oss_endpoint TEXT,    oss_role_arn TEXT,    corpus_table TEXT,    embedding_model TEXT DEFAULT NULL,    parse_document_model TEXT DEFAULT NULL,    chunk_model TEXT DEFAULT NULL,    chunk_size INT DEFAULT 300,    chunk_overlap INT DEFAULT 50,    overwrite BOOLEAN DEFAULT FALSE)AS $$DECLARE    corpus_schema TEXT;    corpus_name TEXT;    obj_table_name TEXT;    full_corpus_ident TEXT;    full_obj_ident TEXT;    embed_expr TEXT;    chunk_expr TEXT;    parse_expr TEXT;    embedding_dims INT;BEGIN    -- 1. 拆 schema name + table name    IF position('.' in corpus_table) > 0 THEN        corpus_schema := split_part(corpus_table, '.', 1);        corpus_name   := split_part(corpus_table, '.', 2);    ELSE        corpus_schema := 'public';        corpus_name   := corpus_table;    END IF;
obj_table_name := corpus_name || '_obj_table';
full_corpus_ident := format('%I.%I', corpus_schema, corpus_name); full_obj_ident := format('%I.%I', corpus_schema, obj_table_name); -- 2. 如果需要覆盖,先删表和索引 IF overwrite THEN DECLARE dyn_table_exists BOOLEAN; rec RECORD; BEGIN -- 检查 dynamic table 是否存在 SELECT EXISTS ( SELECT 1 FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = corpus_name AND n.nspname = corpus_schema ) INTO dyn_table_exists;
IF dyn_table_exists THEN -- 2.1 关闭动态表自动刷新 -- RAISE NOTICE 'Disabling auto refresh for %', full_corpus_ident; -- EXECUTE format('ALTER TABLE IF EXISTS %s SET (auto_refresh_enable=false)', full_corpus_ident);
-- 2.2 查找 RUNNING 刷新任务并取消 FOR rec IN EXECUTE format( $f$ SELECT query_job_id FROM hologres.hg_dynamic_table_refresh_log(%L) WHERE status = 'RUNNING'; $f$, corpus_table ) LOOP RAISE NOTICE 'Found running refresh job: %', rec.query_job_id; IF hologres.hg_internal_cancel_query_job(rec.query_job_id::bigint) THEN RAISE NOTICE 'Cancel job % succeeded.', rec.query_job_id; ELSE RAISE WARNING 'Cancel job % failed.', rec.query_job_id; END IF; END LOOP;
-- 2.3 删除 Dynamic Table EXECUTE format('DROP TABLE IF EXISTS %s;', full_corpus_ident); ELSE RAISE NOTICE 'Dynamic table % does not exist, skip cancel job and drop.', full_corpus_ident; END IF;
-- 2.4 无论如何,Object Table 都要删除 EXECUTE format('DROP OBJECT TABLE IF EXISTS %s;', full_obj_ident); END; END IF;
-- 3. 创建 Object Table RAISE NOTICE 'Create object table: %', obj_table_name; EXECUTE format( $f$ CREATE OBJECT TABLE %s WITH ( path = %L, oss_endpoint = %L, role_arn = %L ); $f$, full_obj_ident, oss_path, oss_endpoint, oss_role_arn );
COMMIT;
-- 4. 刷新 Object Table RAISE NOTICE 'Refresh object table: %', obj_table_name; EXECUTE format('REFRESH OBJECT TABLE %s;', full_obj_ident);
COMMIT;
-- 5. 文档解析模型选择 IF parse_document_model IS NULL OR length(trim(parse_document_model)) = 0 THEN parse_expr := 'ai_parse_document(file, ''auto'', ''markdown'')'; ELSE parse_expr := format( 'ai_parse_document(%L, file, ''auto'', ''markdown'')', parse_document_model ); END IF;
-- 6. chunk 模型选择 IF chunk_model IS NULL OR length(trim(chunk_model)) = 0 THEN chunk_expr := format('ai_chunk(doc, %s, %s)', chunk_size, chunk_overlap); ELSE chunk_expr := format( 'ai_chunk(%L, doc, %s, %s)', chunk_model, chunk_size, chunk_overlap ); END IF;
-- 7. embedding 模型选择 IF embedding_model IS NULL OR length(trim(embedding_model)) = 0 THEN embed_expr := 'ai_embed(chunk)';
EXECUTE 'SELECT array_length(ai_embed(''dummy''), 1)' INTO embedding_dims; ELSE embed_expr := format('ai_embed(%L, chunk)', embedding_model);
EXECUTE format( 'SELECT array_length(ai_embed(%L, ''dummy''), 1)', embedding_model ) INTO embedding_dims; END IF;
RAISE NOTICE 'embedding dimension is: %', embedding_dims;
-- 8. 创建 RAG 输出动态表 RAISE NOTICE 'create dynamic table: %', corpus_name; EXECUTE format( $f$ CREATE DYNAMIC TABLE %s( CHECK(array_ndims(embedding_vector) = 1 AND array_length(embedding_vector, 1) = %s) ) WITH ( vectors = '{ "embedding_vector": { "algorithm": "HGraph", "distance_method": "Cosine", "builder_params": { "base_quantization_type": "sq8_uniform", "max_degree": 64, "ef_construction": 400, "precise_quantization_type": "fp32", "use_reorder": true } } }', auto_refresh_mode = 'incremental', freshness = '5 minutes', auto_refresh_enable = 'false' ) AS WITH parsed_doc AS ( SELECT object_uri, etag, %s AS doc FROM %s ), chunked_doc AS ( SELECT object_uri, etag, unnest(%s) AS chunk FROM parsed_doc ) SELECT object_uri, etag, chunk, %s AS embedding_vector FROM chunked_doc; $f$, full_corpus_ident, embedding_dims, parse_expr, full_obj_ident, chunk_expr, embed_expr ); COMMIT;
-- 9. 创建全文索引(索引名 = 表名 || '_fulltext_idx') EXECUTE format( 'CREATE INDEX %I ON %s USING FULLTEXT (chunk);', corpus_name || '_fulltext_idx', full_corpus_ident );
RAISE NOTICE ''; RAISE NOTICE 'Create RAG corpus success to table: %', corpus_table; RAISE NOTICE ' Vector index is: %.embedding_vector', corpus_table; RAISE NOTICE ' TextSearch index is: %.chunk', corpus_table;END;$$ LANGUAGE plpgsql;
复制代码


  • 刷新 Object Table 和 Dynamic Table 存储过程

CREATE OR REPLACE PROCEDURE refresh_rag_corpus_table(    corpus_table TEXT)AS $$DECLARE    corpus_schema TEXT;    corpus_name   TEXT;    obj_table_name TEXT;    full_corpus_ident TEXT;    full_obj_ident    TEXT;BEGIN    -- 1. 解析 schema 和表名    IF position('.' in corpus_table) > 0 THEN        corpus_schema := split_part(corpus_table, '.', 1);        corpus_name   := split_part(corpus_table, '.', 2);    ELSE        corpus_schema := 'public';        corpus_name   := corpus_table;    END IF;
obj_table_name := corpus_name || '_obj_table';
full_corpus_ident := format('%I.%I', corpus_schema, corpus_name); full_obj_ident := format('%I.%I', corpus_schema, obj_table_name);
-- 2. 刷新 Object Table RAISE NOTICE 'Refreshing Object Table: %', obj_table_name; EXECUTE format('REFRESH OBJECT TABLE %s;', full_obj_ident);
-- 3. 刷新 Dynamic Table RAISE NOTICE 'Refreshing Dynamic Table: %', corpus_name; EXECUTE format('REFRESH TABLE %s;', full_corpus_ident);
RAISE NOTICE 'Refresh complete for corpus table %', corpus_table;END;$$ LANGUAGE plpgsql;
复制代码


  • 删除 Object Table 和 Dynamic Table 存储过程

CREATE OR REPLACE PROCEDURE drop_rag_corpus_table(    corpus_table TEXT)AS $$DECLARE    corpus_schema TEXT;    corpus_name   TEXT;    obj_table_name TEXT;    full_corpus_ident TEXT;    full_obj_ident    TEXT;    rec RECORD;BEGIN    -- 1. 解析 schema 和表名    IF position('.' in corpus_table) > 0 THEN        corpus_schema := split_part(corpus_table, '.', 1);        corpus_name   := split_part(corpus_table, '.', 2);    ELSE        corpus_schema := 'public';        corpus_name   := corpus_table;    END IF;
obj_table_name := corpus_name || '_obj_table';
full_corpus_ident := format('%I.%I', corpus_schema, corpus_name); full_obj_ident := format('%I.%I', corpus_schema, obj_table_name);
-- 2. 删除表 -- 2.1 关闭动态表自动刷新 -- RAISE NOTICE 'Disabling auto refresh for %', full_corpus_ident; -- EXECUTE format('ALTER TABLE IF EXISTS %s SET (auto_refresh_enable=false)', full_corpus_ident);
-- 2.2 查找 RUNNING 刷新任务并取消 FOR rec IN EXECUTE format( $f$ SELECT query_job_id FROM hologres.hg_dynamic_table_refresh_log(%L) WHERE status = 'RUNNING'; $f$, corpus_table ) LOOP RAISE NOTICE 'Found running refresh job: %', rec.query_job_id; IF hologres.hg_internal_cancel_query_job(rec.query_job_id::bigint) THEN RAISE NOTICE 'Cancel job % succeeded.', rec.query_job_id; ELSE RAISE WARNING 'Cancel job % failed.', rec.query_job_id; END IF; END LOOP;
-- 2.3 删除 Dynamic Table RAISE NOTICE 'Dropping Dynamic Table: %', corpus_name; EXECUTE format('DROP TABLE IF EXISTS %s;', full_corpus_ident);
-- 2.4 删除 Object Table RAISE NOTICE 'Dropping Object Table: %', obj_table_name; EXECUTE format('DROP OBJECT TABLE IF EXISTS %s;', full_obj_ident);
RAISE NOTICE 'Drop complete for corpus: %', corpus_table;END;$$ LANGUAGE plpgsql;
复制代码

向量检索函数

-- RAG向量单路召回问答CREATE OR REPLACE FUNCTION qa_vector_search_retrieval(    question TEXT,    corpus_table TEXT,    embedding_model TEXT DEFAULT NULL,    llm_model TEXT DEFAULT NULL,    ranking_model TEXT DEFAULT NULL,    prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',    language TEXT DEFAULT 'Chinese',    vector_recall_count INT DEFAULT 20,    rerank_recall_count INT DEFAULT 5,    vector_col TEXT DEFAULT 'embedding_vector')RETURNS TEXT AS$$DECLARE    final_answer TEXT;    sql TEXT;    embedding_expr TEXT;    ai_rank_expr TEXT;    ai_gen_expr TEXT;    embedding_model_valid BOOLEAN;    llm_model_valid BOOLEAN;    ranking_model_valid BOOLEAN;BEGIN    embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) != '');    llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != '');    ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) != '');
IF embedding_model_valid THEN embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')'; ELSE embedding_expr := 'ai_embed(' || quote_literal(question) || ')'; END IF;
IF ranking_model_valid THEN ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)'; ELSE ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)'; END IF;
IF llm_model_valid THEN ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) || ', replace(replace(replace(' || quote_literal(prompt) || ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )'; ELSE ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) || ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))'; END IF;
sql := ' WITH embedding_recall AS ( SELECT chunk, approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') AS distance FROM ' || corpus_table || ' ORDER BY distance DESC LIMIT ' || vector_recall_count || ' ), rerank AS ( SELECT chunk, ' || ai_rank_expr || ' AS score FROM embedding_recall ORDER BY score DESC LIMIT ' || rerank_recall_count || ' ), concat_top_chunks AS ( SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank ) SELECT ' || ai_gen_expr || ' FROM concat_top_chunks; ';
EXECUTE sql INTO final_answer; RETURN final_answer;END;$$ LANGUAGE plpgsql;
复制代码

全文检索函数

CREATE OR REPLACE FUNCTION qa_text_search_retrieval(    question TEXT,    corpus_table TEXT,    llm_model TEXT DEFAULT NULL,    ranking_model TEXT DEFAULT NULL,    prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',    language TEXT DEFAULT 'Chinese',    text_search_recall_count INT DEFAULT 20,    rerank_recall_count INT DEFAULT 5,    text_search_col TEXT DEFAULT 'chunk')RETURNS TEXT AS$$DECLARE    final_answer TEXT;    sql TEXT;    ai_rank_expr TEXT;    ai_gen_expr TEXT;    llm_model_valid BOOLEAN;    ranking_model_valid BOOLEAN;BEGIN    llm_model_valid     := (llm_model IS NOT NULL AND trim(llm_model) != '');    ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) != '');
IF ranking_model_valid THEN ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)'; ELSE ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)'; END IF;
IF llm_model_valid THEN ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) || ', replace(replace(replace(' || quote_literal(prompt) || ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )'; ELSE ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) || ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))'; END IF;
sql := ' WITH text_search_recall AS ( SELECT chunk FROM ' || corpus_table || ' ORDER BY text_search(' || text_search_col || ', ' || quote_literal(question) || ') DESC LIMIT ' || text_search_recall_count || ' ), rerank AS ( SELECT chunk, ' || ai_rank_expr || ' AS score FROM text_search_recall ORDER BY score DESC LIMIT ' || rerank_recall_count || ' ), concat_top_chunks AS ( SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank ) SELECT ' || ai_gen_expr || ' FROM concat_top_chunks; ';
EXECUTE sql INTO final_answer; RETURN final_answer;END;$$ LANGUAGE plpgsql;
复制代码

向量+全文双路召回+Rank 排序函数

CREATE OR REPLACE FUNCTION qa_hybrid_retrieval(    question TEXT,    corpus_table TEXT,    embedding_model TEXT DEFAULT NULL,    llm_model TEXT DEFAULT NULL,    ranking_model TEXT DEFAULT NULL,    prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',    language TEXT DEFAULT 'Chinese',    text_search_recall_count INT DEFAULT 20,    vector_recall_count INT DEFAULT 20,    rerank_recall_count INT DEFAULT 5,    vector_col TEXT DEFAULT 'embedding_vector',    text_search_col TEXT DEFAULT 'chunk')RETURNS TEXT AS$$DECLARE    final_answer TEXT;    sql TEXT;    embedding_expr TEXT;    ai_rank_expr TEXT;    ai_gen_expr TEXT;    embedding_model_valid BOOLEAN;    llm_model_valid BOOLEAN;    ranking_model_valid BOOLEAN;BEGIN    embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) != '');    llm_model_valid       := (llm_model IS NOT NULL AND trim(llm_model) != '');    ranking_model_valid   := (ranking_model IS NOT NULL AND trim(ranking_model) != '');
IF embedding_model_valid THEN embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')'; ELSE embedding_expr := 'ai_embed(' || quote_literal(question) || ')'; END IF;
IF ranking_model_valid THEN ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)'; ELSE ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)'; END IF;
IF llm_model_valid THEN ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) || ', replace(replace(replace(' || quote_literal(prompt) || ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )'; ELSE ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) || ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))'; END IF;
sql := ' WITH embedding_recall AS ( SELECT chunk FROM ' || corpus_table || ' ORDER BY approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') DESC LIMIT ' || vector_recall_count || ' ), text_search_recall AS ( SELECT chunk FROM ' || corpus_table || ' ORDER BY text_search(' || text_search_col || ', ' || quote_literal(question) || ') DESC LIMIT ' || text_search_recall_count || ' ), union_recall AS ( SELECT chunk FROM embedding_recall UNION SELECT chunk FROM text_search_recall ), rerank AS ( SELECT chunk, ' || ai_rank_expr || ' AS score FROM union_recall ORDER BY score DESC LIMIT ' || rerank_recall_count || ' ), concat_top_chunks AS ( SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank ) SELECT ' || ai_gen_expr || ' FROM concat_top_chunks; ';
EXECUTE sql INTO final_answer; RETURN final_answer;END;$$ LANGUAGE plpgsql;
复制代码

向量+全文双路召回+RRF 排序函数

CREATE OR REPLACE FUNCTION qa_hybrid_retrieval_rrf(    question TEXT,    corpus_table TEXT,    embedding_model TEXT DEFAULT NULL,    llm_model TEXT DEFAULT NULL,    ranking_model TEXT DEFAULT NULL,    prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',    language TEXT DEFAULT 'Chinese',    text_search_recall_count INT DEFAULT 20,    vector_recall_count INT DEFAULT 20,    rerank_recall_count INT DEFAULT 5,    rrf_k INT DEFAULT 60,    vector_col TEXT DEFAULT 'embedding_vector',    text_search_col TEXT DEFAULT 'chunk')RETURNS TEXT AS$$DECLARE    final_answer TEXT;    sql TEXT;    embedding_expr TEXT;    ai_rank_expr TEXT;    ai_gen_expr TEXT;    embedding_model_valid BOOLEAN;    llm_model_valid BOOLEAN;    ranking_model_valid BOOLEAN;BEGIN    embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) <> '');    llm_model_valid       := (llm_model IS NOT NULL AND trim(llm_model) <> '');    ranking_model_valid   := (ranking_model IS NOT NULL AND trim(ranking_model) <> '');
IF embedding_model_valid THEN embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')'; ELSE embedding_expr := 'ai_embed(' || quote_literal(question) || ')'; END IF;
IF ranking_model_valid THEN ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)'; ELSE ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)'; END IF;
IF llm_model_valid THEN ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) || ', replace(replace(replace(' || quote_literal(prompt) || ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )'; ELSE ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) || ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))'; END IF;
sql := ' WITH embedding_recall AS ( SELECT chunk, vec_score, ROW_NUMBER() OVER (ORDER BY vec_score DESC) AS rank_vec FROM ( SELECT chunk, approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') AS vec_score FROM ' || corpus_table || ' ) t ORDER BY vec_score DESC LIMIT ' || vector_recall_count || ' ), text_search_recall AS ( SELECT chunk, text_score, ROW_NUMBER() OVER (ORDER BY text_score DESC) AS rank_text FROM ( SELECT chunk, text_search(' || text_search_col || ', ' || quote_literal(question) || ') AS text_score FROM ' || corpus_table || ' ) ts WHERE text_score > 0 ORDER BY text_score DESC LIMIT ' || text_search_recall_count || ' ), rrf_scores AS ( SELECT chunk, SUM(1.0 / (' || rrf_k || ' + rank_val)) AS rrf_score FROM ( SELECT chunk, rank_vec AS rank_val FROM embedding_recall UNION ALL SELECT chunk, rank_text AS rank_val FROM text_search_recall ) sub GROUP BY chunk ), top_chunks AS ( SELECT chunk FROM rrf_scores ORDER BY rrf_score DESC LIMIT ' || rerank_recall_count || ' ), concat_top_chunks AS ( SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM top_chunks ) SELECT ' || ai_gen_expr || ' FROM concat_top_chunks; ';
EXECUTE sql INTO final_answer; RETURN final_answer;END;$$ LANGUAGE plpgsql;
复制代码


如果想体验更多操作流程,欢迎到阿里云官网领取 Hologres 免费试用,开通 Hologres4.0,并按照操作文档实践。

https://help.aliyun.com/zh/hologres/user-guide/best-practices-financial-multimodal-ai-data-analysis-and-retrieval-system

用户头像

还未添加个人签名 2020-10-15 加入

分享阿里云计算平台的大数据和AI方向的技术创新和趋势、实战案例、经验总结。

评论

发布
暂无评论
基于Hologres构建多模态AI数据分析与检索系统_AI_阿里云大数据AI技术_InfoQ写作社区