基于 Hologres 构建多模态 AI 数据分析与检索系统
- 2025-11-24 浙江
本文字数:20228 字
阅读完需:约 66 分钟
在数据驱动时代,非结构化数据(文本、图像、音视频、日志等)与结构化、半结构化数据(JSON)共同构成企业的核心数据资产。其中,非结构化数据以更原始、多元的形态蕴含着海量的业务洞察(如用户反馈、合同条款、产品缺陷图像),Hologres4.0 以“AI 时代的一站式多模态分析平台”为核心理念,全面展示了 Hologres 在结构化、半结构化与非结构化数据分析能力上的重大突破,发布全新向量索引 HGraph,登顶 VectorDBBench 性价比榜单 QPS、Recall、Latency、Load 四项第一,为 AI 应用的提供高性价比、高吞吐、低延迟、高并发的向量服务,成为全球最具性价比的向量数据库!
基于 Hologres 构建多模态 AI 数据分析与检索系统
本文将会模拟金融场景中对招股书、合同等 PDF 文件的检索与分析,以辅助业务进行下一步的精细化运营决策。基于 Hologres 4.0 对 PDF 非结构化数据的处理与检索,包含的主要能力如下:
非结构化数据(Object Table):支持通过表的形式读取 OSS 中非结构化数据(PDF/IMAGE/PPT 等)。
AI Function:在 Hologres 中可以用标准 SQL 的方式调用 AI Function,自动调用内置大模型,完成 AI 服务建设场景
数据加工:提供 Embed、Chunk 算子,可以对非结构化数据加工成结构化数据存储,无需使用外部算法就能自动 Embed。
数据检索和分析:提供
ai_gen、ai_summarize等算子,能够通过 SQL 对数据进行推理、问题总结及翻译等操作。Dynamic Table 介绍:支持增量刷新模式对非结构化数据自动加工,每次只计算增量的数据有效减少重复计算,降低资源利用率。
向量检索:支持标准 SQL 的向量检索,用于非结构化数据的相似度搜索、场景识别等,在同一个查询中可以自由地实现向量和标量的检索。
全文检索:通过倒排索引、分词等机制实现对非结构化数据的高效检索,支持关键词匹配、短语检索等丰富的检索方式,实现更加灵活的检索。
方案优势
通过如上核心能力,在 Hologres 中多模态 AI 检索与分析的核心优势如下:
完整的 AI 数据处理流程:涵盖从数据 Embed、Chunk、增量加工、检索/分析的全流程,开发人员可以使用大数据系统一样,轻松构建 AI 应用。
标准 SQL 加工和分析非结构化数据:无需使用专用开发语言,纯 SQL 就能完成非结构化数据提取、加工,也无需借助外部系统,数据处理更加高效和简单,降低开发人员学习成本。
检索更精准、灵活和智能:可以轻松构建“关键词+语义+多模态”的混合检索链路,覆盖从精准搜索到意图理解的全场景需求。还能结合 AI Function 实现对用户意图的深度理解,语义关联和上下文推理,实现更智能的检索能力。
数据不出库,更安全:不需要将数据导出到外部系统,与 hologres 的多种安全能力无缝集成,高效保护数据安全。
本实践文档将会介绍如何通过上诉核心能力在 Hologres 中对非结构化数据加工和检索,助力搭建企业级多模态 AI 数据平台,打破数据孤岛,释放全域数据价值
方案流程
本次方案的流程如下:
数据集准备。
将金融数据集中的 PDF 文件上传至 OSS 存储。
PDF 数据加工。
使用 Object Table 读取 PDF 的元数据信息,然后创建增量刷新的 Dynamic Table,并对数据进行 Embed 和 Chunk,同时也对 Dynamic Table 构建向量索引和全文索引,以便后续检索可以使用索引的能力。
使用
ai_embed算子对将自然语言的问题进行 Embedding,然后使用全文和向量双路召回结果,并对结果进行排序,结合大模型的推理能力,最终输出相似度最高的答案。
准备工作
数据准备
本文使用 ModelScope 公开的金融数据集中的 PDF 文件夹中的文件,共 80 份公司招股说明书。
环境准备
购买 Hologres V4.0 及以上版本实例并创建数据库。
购买 AI 资源。
本文以 large-96core-512GB-384GB、1 个节点为例。
模型部署。本次方案使用的模型以及分配的资源为:
说明上述模型的资源均为默认分配的资源。
操作步骤
下载 PDF 文件并上传至 OSS。
下载博金大模型挑战赛-金融千问 14b 数据集中 80 份招股书(PDF)。
登录 OSS 管理控制台,创建 Bucket 并将已下载的 PDF 文件上传至该 Bucket 路径下。上传操作详情,请参见简单上传。
账号授权。
登录 RAM 控制台,创建阿里云 RAM 角色并授予 OSS 的相关权限。
推荐授予 AliyunOSSReadOnlyAccess 权限。
为上述阿里云 RAM 角色添加登录和 Hologres 的访问权限。
阿里云账号(主账号)
修改 RAM 角色的信任策略。重点需更新如下参数:
Action:更新为 sts:AssumeRole。
Service:更新为 hologres.aliyuncs.com。
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "RAM": [ "acs:ram::1866xxxx:root" ], "Service": [ "hologres.aliyuncs.com" ] } } ], "Version": "1"}RAM 用户(子账号)
● 为 RAM 用户授权。
a. 在权限管理 > 权限策略页面,单击创建权限策略,并选择脚本编辑模式创建权限策略。具体操作,请参见创建自定义权限策略
Hologres 可通过该策略判断当前 RAM 用户是否具备创建对应 RAM 角色的权限。权限策略内容如下。
{ "Version": "1", "Statement": [ { "Effect": "Allow", "Action": "hologram:GrantAssumeRole", "Resource": "<arn账号>" } ]}b. 在身份管理 > 用户页面,单击目标 RAM 用户操作列中的添加权限,为 RAM 用户(子账号)授予上述步骤已创建的权限策略。具体操作,请参见为 RAM 用户授权。
● 为已创建的 RAM 角色授权。
修改 RAM 角色的信任策略。重点需更新如下参数:
Action:更新为
sts:AssumeRole。Service:更新为
hologres.aliyuncs.com。
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "RAM": [ "acs:ram::1866xxxx:root" ], "Service": [ "hologres.aliyuncs.com" ] } } ], "Version": "1"}对 PDF 文件进行 Embedding 和 Chunk。
需要创建 Object Table 和 Dynamic Table 对 PDF 的元数据读取以及加工。因为流程较长,Hologres 直接将其封装为存储过程。该存储过程包括的能力如下:
会创建一张 Object Table,用于取 PDF 的元数据。
创建一张增量刷新模式的 Dynamic Table 结果表,用于存储加工后的数据。同时,该表需设置向量索引和全文索引,且 Dynamic Table 不设置自动刷新,需要手动刷新。
Dynamic Table 的刷新过程中会使用
ai_embed、ai_chunk对数据进行 Embed 和切片。
该存储过程代码如下:
CALL create_rag_corpus_from_oss( oss_path => 'oss://xxxx/bs_challenge_financial_14b_dataset/pdf', oss_endpoint => 'oss-cn-hangzhou-internal.aliyuncs.com', oss_role_arn => 'acs:ram::186xxxx:role/xxxx', corpus_table => 'public.dt_bs_challenge_financial');刷新结果表。
通过如上存储过程创建的 Object Table 和 Dynamic Table 均需手动刷新,才能完成数据加工。该步骤已被封装为 PDF 加工存储过程,该存储过程包括的能力如下:
刷新一次 Object Table 获取 PDF 元数据
刷新一次 Dynamic Table,进行 PDF 的 Embedding 和 Chunk 加工。
该存储过程使用代码如下:
CALL refresh_rag_corpus_table( corpus_table => 'public.dt_bs_challenge_financial');PDF 检索。
加工好的数据可以根据业务使用场景,通过向量、全文等方式进行检索。例如:可以根据招股书来查询某个公司的业绩走势,以此来判断公司后续的走势是悲观还是乐观,以便对后续的投资意向提供辅助建议。
向量检索
在向量检索时,为了检索方便,我们将问题 Embedding 和 Prompt 构建,大模型输出答案等过程封装成为向量检索函数,直接调用如下该存储过程可以实现向量召回。
-- 向量单路召回 + AI重排SELECT qa_vector_search_retrieval( question => '报告期内,湖南国科微电子股份有限公司2014年度、2015年度、2016年度营业收入和净利润分别较上年增长多大幅度?', corpus_table => 'dt_bs_challenge_financial', prompt => '请分析如下业绩走势是悲观还是乐观,并给出原因:${question}\n\n 参考信息:\n\n ${context}')检索答案如下:
qa_retrieval---------"根据提供的信息,对湖南国科微电子股份有限公司的业绩走势进行分析,可以得出以下结论:
### 一、业绩走势分析:悲观
#### 1. **营业收入增长乏力**- 2014年度营业收入较上年增长 **15.13%**,但2015年度营业收入却 **下降5.21%**,2016年度数据未提供,但可以看出营业收入增长趋势在2015年出现明显下滑。- 2012年至2014年营业收入的年复合增长率仅为 **4.47%**,表明公司业务扩张较为缓慢。
#### 2. **净利润增长持续下降**- 2014年度净利润增长 **5.43%**,2015年度净利润 **下降3.29%**。- 扣除非经常性损益后,2014年度归属于母公司股东的净利润增长 **-3.14%**,2015年度进一步下降 **-5.60%**,表明公司主营业务盈利能力在持续恶化。- 2012年至2014年扣除非经常性损益后净利润的年复合增长率为 **-4.38%**,远低于营业收入增长,说明公司主营业务盈利能力不足,增长主要依赖非经常性损益。
#### 3. **非经常性损益占比偏高**- 报告期内,非经常性损益占净利润的比例较高,2014年、2013年、2012年分别为 **17.54%、10.25%、8.06%**,表明公司利润中有一部分来自政策扶持、政府补贴等非经常性因素,而非核心业务的持续增长。- 依赖非经常性损益来维持利润增长,不利于公司长期的稳定发展。
#### 4. **净资产收益率下降**- 加权平均净资产收益率从2014年的 **18.10%** 下降到2015年的 **24.82%**,再到2016年的 **28.23%**,虽然数据看似增长,但需注意该指标是以扣除非经常性损益后的净利润计算的,而净利润本身在下降,因此这种增长可能与资本结构变化有关,而非盈利能力的实质性提升。
### 二、原因总结1. **主营业务增长乏力**:营业收入和净利润增长均呈现下降趋势,尤其是净利润的下降表明公司盈利能力在减弱。2. **非经常性损益依赖度高**:公司利润中非经常性损益占比较高,说明主营业务的盈利能力不足,公司业绩的持续性存疑。3. **市场竞争激烈**:公司采购的工控机、显示器、电源等产品市场竞争激烈,价格平稳,利润空间受到挤压。4. **行业环境影响**:不锈钢市场价格波动、原材料价格波动可能对公司经营业绩造成一定影响,虽然公司已采取措施降低影响,但长期来看仍需关注。
### 三、结论总体来看,湖南国科微电子股份有限公司的业绩走势偏 **悲观**。公司主营业务增长乏力,净利润持续下降,对非经常性损益依赖度高,未来盈利能力的可持续性存疑。公司需要加强核心业务的竞争力,优化成本结构,提高主营业务盈利能力,以实现长期稳健发展。"全文检索
在全文检索时,为了检索方便,我们将问题 Embedding 和 Prompt 构建,大模型输出答案等过程封装为全文检索函数,直接调用如下存储过程可以实现全文召回:
--全文检索召回SELECT qa_text_search_retrieval( question => '报告期内,湖南国科微电子股份有限公司2014年度、2015年度、2016年度营业收入和净利润分别较上年增长多大幅度?', corpus_table => 'dt_bs_challenge_financial', prompt => '请分析如下业绩走势是悲观还是乐观,并给出原因:${question}\n\n 参考信息:\n\n ${context}');检索答案如下:
qa_text_search_retrieval----------------"根据提供的信息,湖南国科微电子股份有限公司在2014年、2015年和2016年的业绩走势整体上呈现**悲观**趋势,具体原因如下:
### 1. **营业收入增长乏力**- 2014年营业收入增长率为**15.13%**,但2015年营业收入增长率转为**-5.21%**,即出现负增长。- 2012年至2014年的营业收入年复合增长率仅为**4.47%**,说明公司营业收入增长较为缓慢,业务发展不够强劲。- 2015年上半年营业收入预测与2014年同期相比大致持平,但2015年上半年净利润较上年同期**略有下降**,表明盈利能力下降。
### 2. **净利润和扣非净利润增长不佳**- 2014年净利润增长率为**5.43%**,2015年净利润增长率下降为**-3.29%**,即净利润出现下滑。- 扣除非经常性损益后的净利润增长率在2014年为**-3.14%**,2015年进一步下降为**-5.60%**,说明公司主营业务的盈利能力持续下降。- 2012年至2014年扣非净利润的年复合增长率为**-4.38%**,明显低于营业收入的年复合增长率,说明公司利润质量不高,主营业务盈利能力较弱。
### 3. **经营活动现金流波动**- 2014年销售商品、提供劳务收到的现金占营业收入的比例较前两年有所下降,主要与部分收入确认项目的**收款跨期**有关,说明公司现金流管理存在问题。- 2013年购买商品、接受劳务支付的现金占营业成本比例较高,主要是由于当年**采购原材料并完成生产**,但部分成本在2014年才结转,导致2014年该比例较低,反映公司采购和生产节奏不够稳定。
### 4. **投资和盈利能力指标**- 加权平均净资产收益率(ROE)在2014年为**18.10%**,2015年上升至**24.82%**,2016年进一步上升至**28.23%**,虽然有所提升,但ROE的提高可能主要依赖**财务杠杆**,而非核心业务盈利能力的提升。- 考虑到净利润和扣非净利润持续下降,ROE的提升并不能完全反映公司经营质量的改善。
### 5. **2015年上半年业绩预测**- 2015年上半年营业收入预计为**8,505万元至10,395万元**,与2014年同期的**10,127.35万元**大致持平,但净利润预计为**2,340万元至2,860万元**,低于2014年同期的**2,912.66万元**,说明公司盈利能力下降。
### 总结综合来看,湖南国科微电子股份有限公司在2014年至2016年的业绩走势**偏向悲观**。虽然ROE有所提升,但营业收入增长乏力、净利润和扣非净利润持续下降、经营活动现金流波动较大,表明公司主营业务盈利能力较弱,经营质量有待提升。"向量+全文混合检索
在向量、全文结合 Rank 排序混合检索场景中,为了检索方便,Hologres 将其封装为向量+全文双路召回+Rank 排序函数,该存储过程的能力如下:
根据问题使用向量计算,召回 TOP 20 的答案。
根据问题使用全文检索,召回 TOP 20 的答案。
使用
ai_rank,对向量和全文召回的答案进行排序,最后输出 Top1 的答案。使用
ai_gen,结合大模型根据提示词以及检索的答案,生成最终答案并进行输出。
-- 全文、向量双路召回 + AI重排SELECT qa_hybrid_retrieval( question => '报告期内,湖南国科微电子股份有限公司2014年度、2015年度、2016年度营业收入和净利润分别较上年增长多大幅度?', corpus_table => 'dt_bs_challenge_financial', prompt => '请分析如下业绩走势是悲观还是乐观,并给出原因:${question}\n\n 参考信息:\n\n ${context}');检索答案如下:
qa_hybrid_retrieval---"根据提供的信息,我们可以对湖南国科微电子股份有限公司的业绩走势进行如下分析,并判断其趋势是悲观还是乐观:
---
### 一、**营业收入走势分析**1. **2012-2014年复合增长率**: - 营业收入的年复合增长率为 **4.47%**,表明公司营业收入的增长较为平稳。 - 2014年营业收入为 **18,154.06万元**,较2013年增长 **15.13%**。 - 2015年营业收入较上年 **下降5.21%**,出现了负增长。
2. **结论**: - 营业收入的增长在2014年有所回升,但2015年出现明显下滑,表明公司业务扩张遇到了一定阻力。
---
### 二、**净利润走势分析**1. **2012-2014年复合增长率**: - 扣除非经常性损益后的净利润年复合增长率为 **-4.38%**,低于营业收入的年复合增长率,说明公司盈利能力有所下降。 - 2014年扣非净利润为 **42,731,071.18元**,较2013年下降 **3.14%**。 - 2015年扣非净利润较上年进一步下降 **5.60%**。
2. **非经常性损益影响**: - 2014年、2013年及2012年非经常性损益占净利润的比例分别为 **17.54%、10.25%、8.06%**,呈上升趋势。 - 非经常性损益的增加主要来自于政府补贴和理财产品收益,而非主营业务带来的持续增长。
3. **结论**: - 扣非净利润连续两年下降,说明公司主营业务盈利能力减弱,业绩增长依赖于非经常性损益,这是令人担忧的信号。
---
### 三、**现金流量与经营稳定性**1. **经营活动现金流**: - 2014年营业收入为 **18,154.06万元**,但销售商品、提供劳务收到的现金并未明确给出,无法判断现金流是否健康。 - 报告期内,公司银行存款分别为 **13,063.38万元、4,152.54万元、9,864.61万元**,资金流动性波动较大,但主要客户、供应商及经营模式保持稳定。
2. **结论**: - 虽然公司现金流存在波动,但客户和供应商稳定,经营模式未发生重大变化,这为公司未来的发展提供了一定保障。
---
### 四、**2015年上半年业绩预测**- 2015年1至6月预计营业收入为 **8,505万元至10,395万元**,较2014年同期的 **4,641.19万元**有明显增长。- 但2015年1至3月净利润较上年同期下降 **48.26%**,主要是因为确认收入的项目毛利率较低。
---
### 五、**综合分析与判断**1. **乐观因素**: - 2014年营业收入增长较快,达到 **15.13%**。 - 2015年上半年营业收入预计增长显著,表明公司可能正在逐步恢复。 - 主要客户、供应商及经营模式保持稳定,为公司提供了良好的运营基础。
2. **悲观因素**: - 2015年营业收入较上年 **下降5.21%**,净利润也出现下滑。 - 扣非净利润连续两年下降,表明公司主营业务盈利能力不足。 - 非经常性损益占比上升,业绩增长依赖于政府补贴和理财产品收益,缺乏内生增长动力。 - 2015年1至3月净利润大幅下滑 **48.26%**,表明短期业绩波动较大。
---
### **最终结论:整体趋势偏悲观**- 尽管公司在2014年营业收入有所回升,且2015年上半年预计增长,但 **扣非净利润连续下降**、**净利润增长依赖非经常性损益**、**短期业绩波动较大**,表明公司目前的业绩增长缺乏持续性和稳定性。- 因此,从长期来看,公司业绩走势偏 **悲观**,需关注其主营业务盈利能力的改善和非经常性损益的依赖问题。
---
### **建议**1. 关注公司未来主营业务的盈利能力是否能有所提升。2. 降低对非经常性损益的依赖,提高内生增长动力。3. 稳定客户和供应商关系,优化业务结构,提高毛利率。"向量+全文双路召回+RRF 排序
使用向量和全文检索后,通过 RRF (Reciprocal Rank Fusion)排序召回结果。为了检索方便,Hologres 已经封装为向量+全文双路召回+RRF排序函数(详细定义见下方附录),该存储过程的能力如下:
根据问题使用向量计算,召回 TOP 20 的答案。
根据问题使用全文检索,召回 TOP 20 的答案。
对向量和全文召回的答案,计算 RRF 分数,最后输出 Top N 的答案。
使用
ai_gen、大模型根据提示词,以及检索的答案,拼装成最终答案并输出。
-- 全文、向量双路召回 + RRF重排SELECT qa_hybrid_retrieval_rrf( question => '报告期内,湖南国科微电子股份有限公司2014年度、2015年度、2016年度营业收入和净利润分别较上年增长多大幅度?', corpus_table => 'dt_bs_challenge_financial', prompt => '请分析如下业绩走势是悲观还是乐观,并给出原因:${question}\n\n 参考信息:\n\n ${context}');检索答案如下:
qa_hybrid_retrieval_rrf------------------"根据提供的信息,对湖南国科微电子股份有限公司的业绩走势进行分析,可以得出以下结论:
### **业绩走势判断:悲观**#### **原因分析如下:**
1. **净利润增长低于营业收入增长:** - 提供的信息指出,公司2012年至2014年的**营业收入年复合增长率为4.47%**,表明公司整体业务增长较为平稳。 - 但**扣除非经常性损益后归属于母公司股东的净利润年复合增长率为-4.38%**,明显低于营业收入的增长率。这说明公司在收入增长的同时,盈利能力并未同步提升,甚至出现下滑,可能受到成本上升、毛利率下降或非经常性损益减少等因素影响。
2. **净利润波动较大:** - 2015年1至3月的净利润较上年同期减少了48.26%,且主要原因在于确认收入的项目毛利率较低(如无锡地铁一号线项目以模块外购为主)。这表明公司短期业绩容易受到业务结构变化的影响,存在一定的不稳定性。
3. **毛利率和盈利能力下降:** - 提到“主营业务利润对公司净利润的贡献”在2014年较2013年略有下降,而2013年又较2012年下降。这说明公司核心业务的盈利能力可能在减弱,可能受到市场竞争加剧、成本上升或产品结构变化的影响。
4. **2015年上半年预测利润下降:** - 2015年1至6月预计营业收入为8,505万元至10,395万元,与2014年同期大致持平,但净利润预计为2,340万元至2,860万元,低于2014年同期的2,912.66万元。这表明公司盈利能力在进一步下降,可能面临一定的经营压力。
5. **业务增长乏力:** - 虽然营业收入增长较为平稳,但净利润的下降表明公司业务增长的质量不高,未能有效转化为利润。这可能影响投资者对公司未来发展的信心。
### **总结:**湖南国科微电子股份有限公司的业绩走势整体偏向**悲观**。虽然营业收入保持了平稳增长,但净利润的增长明显滞后甚至出现负增长,表明公司盈利能力在下降,业务发展质量不高,且存在短期业绩波动的风险。如果公司不能有效提升毛利率、控制成本或优化产品结构,未来业绩可能继续承压。"附录:存储过程定义
上述文档中使用的存储过程定义如下,方便您做参考。
说明在 Hologres 中不支持创建 Function,如下存储过程仅做参考,无法修改后直接执行。
PDF 加工存储过程
创建 Object Table 和 Dynamic Table
CREATE OR REPLACE PROCEDURE create_rag_corpus_from_oss( oss_path TEXT, oss_endpoint TEXT, oss_role_arn TEXT, corpus_table TEXT, embedding_model TEXT DEFAULT NULL, parse_document_model TEXT DEFAULT NULL, chunk_model TEXT DEFAULT NULL, chunk_size INT DEFAULT 300, chunk_overlap INT DEFAULT 50, overwrite BOOLEAN DEFAULT FALSE)AS $$DECLARE corpus_schema TEXT; corpus_name TEXT; obj_table_name TEXT; full_corpus_ident TEXT; full_obj_ident TEXT; embed_expr TEXT; chunk_expr TEXT; parse_expr TEXT; embedding_dims INT;BEGIN -- 1. 拆 schema name + table name IF position('.' in corpus_table) > 0 THEN corpus_schema := split_part(corpus_table, '.', 1); corpus_name := split_part(corpus_table, '.', 2); ELSE corpus_schema := 'public'; corpus_name := corpus_table; END IF;
obj_table_name := corpus_name || '_obj_table';
full_corpus_ident := format('%I.%I', corpus_schema, corpus_name); full_obj_ident := format('%I.%I', corpus_schema, obj_table_name); -- 2. 如果需要覆盖,先删表和索引 IF overwrite THEN DECLARE dyn_table_exists BOOLEAN; rec RECORD; BEGIN -- 检查 dynamic table 是否存在 SELECT EXISTS ( SELECT 1 FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = corpus_name AND n.nspname = corpus_schema ) INTO dyn_table_exists;
IF dyn_table_exists THEN -- 2.1 关闭动态表自动刷新 -- RAISE NOTICE 'Disabling auto refresh for %', full_corpus_ident; -- EXECUTE format('ALTER TABLE IF EXISTS %s SET (auto_refresh_enable=false)', full_corpus_ident);
-- 2.2 查找 RUNNING 刷新任务并取消 FOR rec IN EXECUTE format( $f$ SELECT query_job_id FROM hologres.hg_dynamic_table_refresh_log(%L) WHERE status = 'RUNNING'; $f$, corpus_table ) LOOP RAISE NOTICE 'Found running refresh job: %', rec.query_job_id; IF hologres.hg_internal_cancel_query_job(rec.query_job_id::bigint) THEN RAISE NOTICE 'Cancel job % succeeded.', rec.query_job_id; ELSE RAISE WARNING 'Cancel job % failed.', rec.query_job_id; END IF; END LOOP;
-- 2.3 删除 Dynamic Table EXECUTE format('DROP TABLE IF EXISTS %s;', full_corpus_ident); ELSE RAISE NOTICE 'Dynamic table % does not exist, skip cancel job and drop.', full_corpus_ident; END IF;
-- 2.4 无论如何,Object Table 都要删除 EXECUTE format('DROP OBJECT TABLE IF EXISTS %s;', full_obj_ident); END; END IF;
-- 3. 创建 Object Table RAISE NOTICE 'Create object table: %', obj_table_name; EXECUTE format( $f$ CREATE OBJECT TABLE %s WITH ( path = %L, oss_endpoint = %L, role_arn = %L ); $f$, full_obj_ident, oss_path, oss_endpoint, oss_role_arn );
COMMIT;
-- 4. 刷新 Object Table RAISE NOTICE 'Refresh object table: %', obj_table_name; EXECUTE format('REFRESH OBJECT TABLE %s;', full_obj_ident);
COMMIT;
-- 5. 文档解析模型选择 IF parse_document_model IS NULL OR length(trim(parse_document_model)) = 0 THEN parse_expr := 'ai_parse_document(file, ''auto'', ''markdown'')'; ELSE parse_expr := format( 'ai_parse_document(%L, file, ''auto'', ''markdown'')', parse_document_model ); END IF;
-- 6. chunk 模型选择 IF chunk_model IS NULL OR length(trim(chunk_model)) = 0 THEN chunk_expr := format('ai_chunk(doc, %s, %s)', chunk_size, chunk_overlap); ELSE chunk_expr := format( 'ai_chunk(%L, doc, %s, %s)', chunk_model, chunk_size, chunk_overlap ); END IF;
-- 7. embedding 模型选择 IF embedding_model IS NULL OR length(trim(embedding_model)) = 0 THEN embed_expr := 'ai_embed(chunk)';
EXECUTE 'SELECT array_length(ai_embed(''dummy''), 1)' INTO embedding_dims; ELSE embed_expr := format('ai_embed(%L, chunk)', embedding_model);
EXECUTE format( 'SELECT array_length(ai_embed(%L, ''dummy''), 1)', embedding_model ) INTO embedding_dims; END IF;
RAISE NOTICE 'embedding dimension is: %', embedding_dims;
-- 8. 创建 RAG 输出动态表 RAISE NOTICE 'create dynamic table: %', corpus_name; EXECUTE format( $f$ CREATE DYNAMIC TABLE %s( CHECK(array_ndims(embedding_vector) = 1 AND array_length(embedding_vector, 1) = %s) ) WITH ( vectors = '{ "embedding_vector": { "algorithm": "HGraph", "distance_method": "Cosine", "builder_params": { "base_quantization_type": "sq8_uniform", "max_degree": 64, "ef_construction": 400, "precise_quantization_type": "fp32", "use_reorder": true } } }', auto_refresh_mode = 'incremental', freshness = '5 minutes', auto_refresh_enable = 'false' ) AS WITH parsed_doc AS ( SELECT object_uri, etag, %s AS doc FROM %s ), chunked_doc AS ( SELECT object_uri, etag, unnest(%s) AS chunk FROM parsed_doc ) SELECT object_uri, etag, chunk, %s AS embedding_vector FROM chunked_doc; $f$, full_corpus_ident, embedding_dims, parse_expr, full_obj_ident, chunk_expr, embed_expr ); COMMIT;
-- 9. 创建全文索引(索引名 = 表名 || '_fulltext_idx') EXECUTE format( 'CREATE INDEX %I ON %s USING FULLTEXT (chunk);', corpus_name || '_fulltext_idx', full_corpus_ident );
RAISE NOTICE ''; RAISE NOTICE 'Create RAG corpus success to table: %', corpus_table; RAISE NOTICE ' Vector index is: %.embedding_vector', corpus_table; RAISE NOTICE ' TextSearch index is: %.chunk', corpus_table;END;$$ LANGUAGE plpgsql;刷新 Object Table 和 Dynamic Table 存储过程
CREATE OR REPLACE PROCEDURE refresh_rag_corpus_table( corpus_table TEXT)AS $$DECLARE corpus_schema TEXT; corpus_name TEXT; obj_table_name TEXT; full_corpus_ident TEXT; full_obj_ident TEXT;BEGIN -- 1. 解析 schema 和表名 IF position('.' in corpus_table) > 0 THEN corpus_schema := split_part(corpus_table, '.', 1); corpus_name := split_part(corpus_table, '.', 2); ELSE corpus_schema := 'public'; corpus_name := corpus_table; END IF;
obj_table_name := corpus_name || '_obj_table';
full_corpus_ident := format('%I.%I', corpus_schema, corpus_name); full_obj_ident := format('%I.%I', corpus_schema, obj_table_name);
-- 2. 刷新 Object Table RAISE NOTICE 'Refreshing Object Table: %', obj_table_name; EXECUTE format('REFRESH OBJECT TABLE %s;', full_obj_ident);
-- 3. 刷新 Dynamic Table RAISE NOTICE 'Refreshing Dynamic Table: %', corpus_name; EXECUTE format('REFRESH TABLE %s;', full_corpus_ident);
RAISE NOTICE 'Refresh complete for corpus table %', corpus_table;END;$$ LANGUAGE plpgsql;删除 Object Table 和 Dynamic Table 存储过程
CREATE OR REPLACE PROCEDURE drop_rag_corpus_table( corpus_table TEXT)AS $$DECLARE corpus_schema TEXT; corpus_name TEXT; obj_table_name TEXT; full_corpus_ident TEXT; full_obj_ident TEXT; rec RECORD;BEGIN -- 1. 解析 schema 和表名 IF position('.' in corpus_table) > 0 THEN corpus_schema := split_part(corpus_table, '.', 1); corpus_name := split_part(corpus_table, '.', 2); ELSE corpus_schema := 'public'; corpus_name := corpus_table; END IF;
obj_table_name := corpus_name || '_obj_table';
full_corpus_ident := format('%I.%I', corpus_schema, corpus_name); full_obj_ident := format('%I.%I', corpus_schema, obj_table_name);
-- 2. 删除表 -- 2.1 关闭动态表自动刷新 -- RAISE NOTICE 'Disabling auto refresh for %', full_corpus_ident; -- EXECUTE format('ALTER TABLE IF EXISTS %s SET (auto_refresh_enable=false)', full_corpus_ident);
-- 2.2 查找 RUNNING 刷新任务并取消 FOR rec IN EXECUTE format( $f$ SELECT query_job_id FROM hologres.hg_dynamic_table_refresh_log(%L) WHERE status = 'RUNNING'; $f$, corpus_table ) LOOP RAISE NOTICE 'Found running refresh job: %', rec.query_job_id; IF hologres.hg_internal_cancel_query_job(rec.query_job_id::bigint) THEN RAISE NOTICE 'Cancel job % succeeded.', rec.query_job_id; ELSE RAISE WARNING 'Cancel job % failed.', rec.query_job_id; END IF; END LOOP;
-- 2.3 删除 Dynamic Table RAISE NOTICE 'Dropping Dynamic Table: %', corpus_name; EXECUTE format('DROP TABLE IF EXISTS %s;', full_corpus_ident);
-- 2.4 删除 Object Table RAISE NOTICE 'Dropping Object Table: %', obj_table_name; EXECUTE format('DROP OBJECT TABLE IF EXISTS %s;', full_obj_ident);
RAISE NOTICE 'Drop complete for corpus: %', corpus_table;END;$$ LANGUAGE plpgsql;向量检索函数
-- RAG向量单路召回问答CREATE OR REPLACE FUNCTION qa_vector_search_retrieval( question TEXT, corpus_table TEXT, embedding_model TEXT DEFAULT NULL, llm_model TEXT DEFAULT NULL, ranking_model TEXT DEFAULT NULL, prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}', language TEXT DEFAULT 'Chinese', vector_recall_count INT DEFAULT 20, rerank_recall_count INT DEFAULT 5, vector_col TEXT DEFAULT 'embedding_vector')RETURNS TEXT AS$$DECLARE final_answer TEXT; sql TEXT; embedding_expr TEXT; ai_rank_expr TEXT; ai_gen_expr TEXT; embedding_model_valid BOOLEAN; llm_model_valid BOOLEAN; ranking_model_valid BOOLEAN;BEGIN embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) != ''); llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != ''); ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) != '');
IF embedding_model_valid THEN embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')'; ELSE embedding_expr := 'ai_embed(' || quote_literal(question) || ')'; END IF;
IF ranking_model_valid THEN ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)'; ELSE ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)'; END IF;
IF llm_model_valid THEN ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) || ', replace(replace(replace(' || quote_literal(prompt) || ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )'; ELSE ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) || ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))'; END IF;
sql := ' WITH embedding_recall AS ( SELECT chunk, approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') AS distance FROM ' || corpus_table || ' ORDER BY distance DESC LIMIT ' || vector_recall_count || ' ), rerank AS ( SELECT chunk, ' || ai_rank_expr || ' AS score FROM embedding_recall ORDER BY score DESC LIMIT ' || rerank_recall_count || ' ), concat_top_chunks AS ( SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank ) SELECT ' || ai_gen_expr || ' FROM concat_top_chunks; ';
EXECUTE sql INTO final_answer; RETURN final_answer;END;$$ LANGUAGE plpgsql;全文检索函数
CREATE OR REPLACE FUNCTION qa_text_search_retrieval( question TEXT, corpus_table TEXT, llm_model TEXT DEFAULT NULL, ranking_model TEXT DEFAULT NULL, prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}', language TEXT DEFAULT 'Chinese', text_search_recall_count INT DEFAULT 20, rerank_recall_count INT DEFAULT 5, text_search_col TEXT DEFAULT 'chunk')RETURNS TEXT AS$$DECLARE final_answer TEXT; sql TEXT; ai_rank_expr TEXT; ai_gen_expr TEXT; llm_model_valid BOOLEAN; ranking_model_valid BOOLEAN;BEGIN llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != ''); ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) != '');
IF ranking_model_valid THEN ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)'; ELSE ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)'; END IF;
IF llm_model_valid THEN ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) || ', replace(replace(replace(' || quote_literal(prompt) || ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )'; ELSE ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) || ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))'; END IF;
sql := ' WITH text_search_recall AS ( SELECT chunk FROM ' || corpus_table || ' ORDER BY text_search(' || text_search_col || ', ' || quote_literal(question) || ') DESC LIMIT ' || text_search_recall_count || ' ), rerank AS ( SELECT chunk, ' || ai_rank_expr || ' AS score FROM text_search_recall ORDER BY score DESC LIMIT ' || rerank_recall_count || ' ), concat_top_chunks AS ( SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank ) SELECT ' || ai_gen_expr || ' FROM concat_top_chunks; ';
EXECUTE sql INTO final_answer; RETURN final_answer;END;$$ LANGUAGE plpgsql;向量+全文双路召回+Rank 排序函数
CREATE OR REPLACE FUNCTION qa_hybrid_retrieval( question TEXT, corpus_table TEXT, embedding_model TEXT DEFAULT NULL, llm_model TEXT DEFAULT NULL, ranking_model TEXT DEFAULT NULL, prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}', language TEXT DEFAULT 'Chinese', text_search_recall_count INT DEFAULT 20, vector_recall_count INT DEFAULT 20, rerank_recall_count INT DEFAULT 5, vector_col TEXT DEFAULT 'embedding_vector', text_search_col TEXT DEFAULT 'chunk')RETURNS TEXT AS$$DECLARE final_answer TEXT; sql TEXT; embedding_expr TEXT; ai_rank_expr TEXT; ai_gen_expr TEXT; embedding_model_valid BOOLEAN; llm_model_valid BOOLEAN; ranking_model_valid BOOLEAN;BEGIN embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) != ''); llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != ''); ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) != '');
IF embedding_model_valid THEN embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')'; ELSE embedding_expr := 'ai_embed(' || quote_literal(question) || ')'; END IF;
IF ranking_model_valid THEN ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)'; ELSE ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)'; END IF;
IF llm_model_valid THEN ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) || ', replace(replace(replace(' || quote_literal(prompt) || ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )'; ELSE ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) || ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))'; END IF;
sql := ' WITH embedding_recall AS ( SELECT chunk FROM ' || corpus_table || ' ORDER BY approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') DESC LIMIT ' || vector_recall_count || ' ), text_search_recall AS ( SELECT chunk FROM ' || corpus_table || ' ORDER BY text_search(' || text_search_col || ', ' || quote_literal(question) || ') DESC LIMIT ' || text_search_recall_count || ' ), union_recall AS ( SELECT chunk FROM embedding_recall UNION SELECT chunk FROM text_search_recall ), rerank AS ( SELECT chunk, ' || ai_rank_expr || ' AS score FROM union_recall ORDER BY score DESC LIMIT ' || rerank_recall_count || ' ), concat_top_chunks AS ( SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank ) SELECT ' || ai_gen_expr || ' FROM concat_top_chunks; ';
EXECUTE sql INTO final_answer; RETURN final_answer;END;$$ LANGUAGE plpgsql;向量+全文双路召回+RRF 排序函数
CREATE OR REPLACE FUNCTION qa_hybrid_retrieval_rrf( question TEXT, corpus_table TEXT, embedding_model TEXT DEFAULT NULL, llm_model TEXT DEFAULT NULL, ranking_model TEXT DEFAULT NULL, prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}', language TEXT DEFAULT 'Chinese', text_search_recall_count INT DEFAULT 20, vector_recall_count INT DEFAULT 20, rerank_recall_count INT DEFAULT 5, rrf_k INT DEFAULT 60, vector_col TEXT DEFAULT 'embedding_vector', text_search_col TEXT DEFAULT 'chunk')RETURNS TEXT AS$$DECLARE final_answer TEXT; sql TEXT; embedding_expr TEXT; ai_rank_expr TEXT; ai_gen_expr TEXT; embedding_model_valid BOOLEAN; llm_model_valid BOOLEAN; ranking_model_valid BOOLEAN;BEGIN embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) <> ''); llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) <> ''); ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) <> '');
IF embedding_model_valid THEN embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')'; ELSE embedding_expr := 'ai_embed(' || quote_literal(question) || ')'; END IF;
IF ranking_model_valid THEN ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)'; ELSE ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)'; END IF;
IF llm_model_valid THEN ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) || ', replace(replace(replace(' || quote_literal(prompt) || ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )'; ELSE ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) || ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))'; END IF;
sql := ' WITH embedding_recall AS ( SELECT chunk, vec_score, ROW_NUMBER() OVER (ORDER BY vec_score DESC) AS rank_vec FROM ( SELECT chunk, approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') AS vec_score FROM ' || corpus_table || ' ) t ORDER BY vec_score DESC LIMIT ' || vector_recall_count || ' ), text_search_recall AS ( SELECT chunk, text_score, ROW_NUMBER() OVER (ORDER BY text_score DESC) AS rank_text FROM ( SELECT chunk, text_search(' || text_search_col || ', ' || quote_literal(question) || ') AS text_score FROM ' || corpus_table || ' ) ts WHERE text_score > 0 ORDER BY text_score DESC LIMIT ' || text_search_recall_count || ' ), rrf_scores AS ( SELECT chunk, SUM(1.0 / (' || rrf_k || ' + rank_val)) AS rrf_score FROM ( SELECT chunk, rank_vec AS rank_val FROM embedding_recall UNION ALL SELECT chunk, rank_text AS rank_val FROM text_search_recall ) sub GROUP BY chunk ), top_chunks AS ( SELECT chunk FROM rrf_scores ORDER BY rrf_score DESC LIMIT ' || rerank_recall_count || ' ), concat_top_chunks AS ( SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM top_chunks ) SELECT ' || ai_gen_expr || ' FROM concat_top_chunks; ';
EXECUTE sql INTO final_answer; RETURN final_answer;END;$$ LANGUAGE plpgsql;如果想体验更多操作流程,欢迎到阿里云官网领取 Hologres 免费试用,开通 Hologres4.0,并按照操作文档实践。
阿里云大数据AI技术
还未添加个人签名 2020-10-15 加入
分享阿里云计算平台的大数据和AI方向的技术创新和趋势、实战案例、经验总结。







评论