写点什么

Vue3 使用多线程处理文件分片任务

作者:木偶
  • 2025-01-22
    陕西
  • 本文字数:7339 字

    阅读完需:约 24 分钟

前言:文件上传在前端大概是必不可少的,如果文件过大就会影响系统性能进而影响用户的体验,本文着重介绍如何进行文件分片以及使用多线程来处理分片任务,进而提升系统整体的性能增加用户体验。

一、文件说明


  1. Chunk.ts:主要进行文件的创建分片操作;

  2. file.ts:主要进行计算分片数量、创建线程等操作;

  3. worker.ts:线程文件处理分片数据(进行上传等操作)。



二、逐层分析

1.Chunk.ts

在进行文件分片时,我们会用到spark-md5来进行文件MD5的计算,所以我们需要进行对此库进行下载,使用如下命令进行安装:


npm i spark-md5 -S
复制代码

示例代码:

// @ts-ignoreimport SparkMD5 from 'spark-md5'const createChunk = (file: File, index: number, chunkSize: number) => {    return new Promise(async (reslove) => {        const start = index * chunkSize        const end = start + chunkSize        const spark = new SparkMD5.ArrayBuffer();        const fileReader = new FileReader()        const blob = file.slice(start, end)        fileReader.onload = (e) => {            spark.append(e.target?.result)            reslove({                start,                end,                index,                hash: spark.end(),                blob            })        }        fileReader.readAsArrayBuffer(blob)    })}export {createChunk}
复制代码

代码结构解析:

import SparkMD5 from 'spark-md5'
复制代码


  • SparkMD5 是一个用于计算 MD5 哈希值的 JavaScript 库。它支持直接对字符串、ArrayBuffer、Blob 等进行哈希计算。在这个代码中,我们使用 SparkMD5.ArrayBuffer() 来处理 ArrayBuffer 类型的数据。


const createChunk = (file: File, index: number, chunkSize: number) => {    return new Promise(async (reslove) => {        const start = index * chunkSize        const end = start + chunkSize        const spark = new SparkMD5.ArrayBuffer();        const fileReader = new FileReader()        const blob = file.slice(start, end)
复制代码


  • createChunk:这是一个异步函数,接受三个参数:

  • file: 要处理的文件(File 类型)。

  • index: 当前处理的分片索引。

  • chunkSize: 每个分块的大小(单位是字节)。

  • startend:分别是当前分块的起始位置和结束位置。startindex * chunkSize,表示根据索引计算该分块的起始字节位置,endstart + chunkSize,表示该分块的结束字节位置。

  • sparkSparkMD5.ArrayBuffer()SparkMD5 提供的构造函数,用来计算 ArrayBuffer 类型的数据的 MD5 值。

  • fileReaderFileReader 是浏览器提供的 API,用来读取文件内容。通过 fileReader.readAsArrayBuffer(),可以将 File 对象的一个片段(blob)读取为 ArrayBuffer 类型。

  • blobfile.slice(start, end) 会从原始文件中提取出一个指定大小的片段,从 startend,作为一个 Blob 对象。


fileReader.onload = (e) => {    spark.append(e.target?.result)    reslove({        start,        end,        index,        hash: spark.end(),        blob    })}
复制代码


  • fileReader.onload:当文件片段被成功读取后,onload 事件被触发,e.target.result 会包含读取的内容,它是一个 ArrayBuffer 类型的数据。

  • spark.append(e.target?.result):将读取的 ArrayBuffer 数据追加到 spark 实例中,spark 会根据这个数据更新当前的 MD5 值。

  • reslove:当哈希计算完成后,调用 resolve() 将计算结果返回。这是一个 Promise,所以在 resolve 时,最终的结果会传递给调用者。返回的对象包含:

  • start:分块的起始字节位置。

  • end:分块的结束字节位置。

  • index:分块的索引。

  • hash:当前分块的 MD5 哈希值。

  • blob:当前分块的 Blob 对象,包含文件的指定片段。


fileReader.readAsArrayBuffer(blob)
复制代码


  • fileReader.readAsArrayBuffer(blob):这是 FileReader 的方法,用来读取传入的 Blob 对象。它会将文件的指定片段转化为 ArrayBuffer,并触发 onload 事件。


export { createChunk }
复制代码


  • export { createChunk }:将 createChunk 函数导出,以便其他模块使用。

作用和过程:

  1. 分割文件:该函数将文件分成多个大小为 chunkSize 的块,每个块由 file.slice(start, end) 生成。

  2. 计算哈希值:对每个分块,使用 SparkMD5 来计算该分块的 MD5 哈希值。SparkMD5append 方法会将文件片段的内容追加到 MD5 计算中,最终计算得到该片段的哈希值。

  3. 返回结果:该函数返回一个 Promise,计算完成后返回一个对象,包含了该分块的起始位置、结束位置、分块索引、哈希值和分块的 Blob



2.file.ts

这段代码的目的是实现文件的分片上传,并通过 Web Workers 来并行处理多个分片的上传任务。最终,所有分片上传完成后,将会调用合并接口将分片合并成一个完整的文件。

示例代码:

const chunkSize = 1024 * 1024 * 10   // 每个大小设置为 10Mimport apiFile from "@/api/apiFile";import { useConuntStore } from "@/store/home";const sliceFile = async (file:File) => {    return new Promise(reslove => {        const systemCount = navigator.hardwareConcurrency || 4        const size = file.size        const chunks = Math.ceil(size / chunkSize)  //总共可以分的数量        let count = 0        const countstore = useConuntStore();        countstore.processValue = 0        // 每个线程能分到的分片数量        const thread_count = Math.ceil(chunks / systemCount)        for (let i = 0; i < systemCount; i++) {            const worker = new Worker(new URL('./worker.ts', import.meta.url),{type:'module'})            let start = i * thread_count            let end = Math.min(start + thread_count, chunks) // 计算的是当前线程的“结束”索引,确保它不会超出总的分片数 chunks            worker.postMessage({                file,                start,                end,                chunkSize,                chunks,                processValue:countstore.processValue            })            worker.onmessage = async(e) => {                worker.terminate()                countstore.processValue += e.data                count ++                if(count == systemCount){                    let data = {                        fileName: file.name,                        totalChunks: chunks                    }                    let resp = await apiFile.MergeFile(data)                    reslove(resp)                }            }            worker.onerror = (err) => {                console.error('Worker error:', err)                worker.terminate()            }        }    })}export { sliceFile }
复制代码

详细解析:

1. 常量和变量初始化:

const chunkSize = 1024 * 1024 * 10   // 每个分片的大小设置为 10M
复制代码


  • chunkSize 定义了每个分片的大小为 10MB (1024 * 1024 * 10 字节)。


import apiFile from "@/api/apiFile";import { useConuntStore } from "@/store/home";
复制代码


  • apiFile 是从 API 模块导入,用于发送请求(合并文件等)。

  • useConuntStorestore/home 导入,这是 Piniastore,管理了全局的状态,主要在此展示进度条。

2. 文件分片和并行处理:

const sliceFile = async (file: File) => {    return new Promise(reslove => {        const systemCount = navigator.hardwareConcurrency || 4        const size = file.size        const chunks = Math.ceil(size / chunkSize)  // 计算文件的总分片数
复制代码


  • sliceFile 是一个异步函数,接收 file 参数(类型是 File)。

  • systemCount:表示要使用多少个并行线程来处理文件的分片,默认为浏览器支持的核心数 navigator.hardwareConcurrency,如果无法获取则默认为 4。

  • chunks:计算总共需要多少个分片。假设每个分片大小为 chunkSize,则总分片数为文件大小除以分片大小,向上取整。

3. 进度条和线程数计算:

        let count = 0        const countstore = useConuntStore();        countstore.processValue = 0        const thread_count = Math.ceil(chunks / systemCount)
复制代码


  • count:一个计数器,用于统计完成的线程数。

  • countstore.processValue:用于存储当前的上传进度值(例如用于进度条)。

  • thread_count:计算每个线程需要处理的分片数。总分片数 chunks 除以并行线程数 systemCount

4. 并行工作线程(Web Worker)初始化:

        for (let i = 0; i < systemCount; i++) {            const worker = new Worker(new URL('./worker.ts', import.meta.url),{type:'module'})            let start = i * thread_count            let end = Math.min(start + thread_count, chunks)  // 计算当前线程的“结束”索引,确保它不会超出总分片数
复制代码


  • 使用 for 循环启动 systemCount 个 Web Worker 线程来处理文件分片。

  • startend:定义当前工作线程处理的分片的起始和结束索引。确保每个线程处理不同的分片。

5. 向 Web Worker 发送数据:

            worker.postMessage({                file,                start,                end,                chunkSize,                chunks,                processValue: countstore.processValue            })
复制代码


  • 向 Web Worker 发送消息,包含以下数据:

  • file: 文件对象,供 Worker 读取文件。

  • startend: 当前线程需要处理的分片范围。

  • chunkSizechunks: 分片大小和总分片数。

  • processValue: 当前进度值,用于实时更新进度条。

6. 处理 Web Worker 返回的数据:

            worker.onmessage = async (e) => {                worker.terminate()                countstore.processValue += e.data                count ++                if(count == systemCount){                    let data = {                        fileName: file.name,                        totalChunks: chunks                    }                    let resp = await apiFile.MergeFile(data)                    reslove(resp)                }            }
复制代码


  • 当 Web Worker 完成分片上传任务时,它会通过 onmessage 事件返回结果。

  • worker.terminate():任务完成后,销毁该 Worker。

  • 更新 processValue,即上传进度,可能是每个分片上传完成后累计的进度。

  • count 增加,统计完成的 Worker 线程数。

  • 如果所有线程完成了任务(count == systemCount),则调用 apiFile.MergeFile 接口请求合并文件分片。

  • fileNametotalChunks 作为参数传给合并接口。

  • 最后,通过 resolve 返回合并后的响应。

7. 错误处理:

            worker.onerror = (err) => {                console.error('Worker error:', err)                worker.terminate()            }
复制代码


  • 如果 Web Worker 执行过程中出现错误,onerror 会被触发,错误会被输出到控制台,并终止该 Worker。

8. 最终返回的 Promise:

  • 返回一个 Promise,这意味着调用 sliceFile 时可以等待文件处理完成(包括所有分片上传和合并)。

  • 如果所有分片上传成功并且文件合并成功,返回的 Promise 将会 resolve,返回合并后的结果。



总结:

这段代码使用了 Web Workers 来并行处理文件的上传,每个 Worker 负责处理一定范围内的文件分片。每个分片上传完成后,进度会更新,所有分片上传完成后,调用合并接口将文件合并成一个完整的文件。


核心思路


  • 将文件分成多个小块进行并行上传,利用多线程提高上传速度。

  • 使用 Web Worker 处理上传任务,避免主线程被阻塞。

  • 利用进度条(processValue)实时显示上传进度。


这种方法适用于上传大文件,通过分片上传可以有效避免上传超时或文件过大导致的问题。

3.worker.ts

这段代码的目的是通过 Web Worker 实现文件分片上传,并且通过处理文件的分片并发上传,实现高效上传的同时还提供上传进度的反馈。具体来讲,这段代码是 Web Worker 的内部实现,用于执行文件的分片上传任务。

示例代码:

import apiFile from "@/api/apiFile";import {createChunk} from './Chunk'
interface resultObj { start: number; end: number; index: number; hash: string; blob: Blob;}onmessage = async (e) => { const result = [] let { file, start, end, chunkSize, chunks, processValue } = e.data for (let i = start; i < end; i++) { let pom = await createChunk(file, i, chunkSize) as resultObj; let formData = new FormData(); formData.append("file", pom.blob); formData.append("chunkIndex", (pom.index).toString()); formData.append("fileName", file.name); formData.append("hash", pom.hash); formData.append("totalChunks", chunks); let up_status = apiFile.fileUpload(formData); result.push(up_status) } const chunksResult = await Promise.all(result) processValue = chunksResult.length/chunks postMessage(processValue)}
复制代码

详细解析:

1. 初始化和参数解构:

import {createChunk} from './Chunk'
复制代码


  • 导入 Chunk 中的方法用于进行文件分片


let { file, start, end, chunkSize, chunks, processValue } = e.data
复制代码


  • 从主线程传递过来的数据 e.data 中解构出以下参数:

  • file:待上传的文件对象。

  • startend:当前 Worker 需要处理的文件分片的起始和结束索引(即分配给当前 Worker 处理的文件片段)。

  • chunkSize:每个分片的大小(字节数)。

  • chunks:文件总共被分割成的分片数量。

  • processValue:当前进度的值。

2. 创建存储上传任务的 result 数组:

const result = []
复制代码


  • result 数组用于存储每个上传请求的 Promise 对象。

3. 循环处理分片上传:

for (let i = start; i < end; i++) {    let pom = await createChunk(file, i, chunkSize) as resultObj;
复制代码


  • 循环从 startend 索引,针对每一个分片进行处理。

  • createChunk(file, i, chunkSize) 调用会返回一个 Promise,用于生成一个文件分片。它会返回一个包含分片信息的对象 resultObj,包括:

  • startend:分片的起始和结束字节位置。

  • index:当前分片的索引(从 0 开始)。

  • hash:当前分片的 MD5 校验和。

  • blob:当前分片的文件数据。

4. 准备上传数据:

let formData = new FormData();formData.append("file", pom.blob);formData.append("chunkIndex", (pom.index).toString());formData.append("fileName", file.name);formData.append("hash", pom.hash);formData.append("totalChunks", chunks);
复制代码


  • 每次处理一个分片时,构建一个 FormData 对象,用于将分片上传到服务器。

  • file:分片的 Blob 数据。

  • chunkIndex:当前分片的索引。

  • fileName:原始文件名。

  • hash:当前分片的 MD5 值。

  • totalChunks:总的分片数(方便后端合并分片时知道总共需要多少个分片)。

5. 执行上传:

let up_status = apiFile.fileUpload(formData);
复制代码


  • 使用 apiFile.fileUpload(formData) 发送上传请求。封装了上传文件的 API,返回一个 Promise

  • up_status 是上传操作的 Promise 对象。

6. 等待所有上传任务完成:

result.push(up_status)
复制代码


  • 将每个上传请求的 Promise 添加到 result 数组中,Promise.all(result) 将等待所有的上传任务完成。

7. 等待所有上传任务完成并计算上传进度:

const chunksResult = await Promise.all(result)processValue = chunksResult.length / chunkspostMessage(processValue)
复制代码


  • Promise.all(result) 等待所有上传任务完成。chunksResult.length 代表已上传的分片数量。

  • processValue 计算已上传的进度值:已上传的分片数除以总分片数,得到上传进度(范围是 0 到 1)。

  • 最后,postMessage(processValue) 将上传进度发送回主线程。

8. 总结:

  • Web Worker 主要工作:

  • 通过循环处理文件分片,使用 createChunk 函数读取并切割文件。

  • 使用 FormData 将每个分片打包成一个上传请求。

  • 使用 apiFile.fileUpload 发送请求。

  • 上传完成后更新进度,并将进度通过 postMessage 返回给主线程。

  • 主线程的作用:

  • 主线程负责创建 Web Worker,并向 Web Worker 传递文件、分片范围等信息。

  • 接收上传进度信息,并根据进度信息更新 UI(比如进度条)。

  • 上传进度计算:

  • 上传进度是通过计算已上传的分片数占总分片数的比例来实现的,即 processValue = chunksResult.length / chunks

  • 进度值通过 postMessage 传递给主线程,可以在主线程中进一步使用该值来更新上传进度的 UI。

三、使用

在需要引入的 vue 文件中使用一下语句进行引入:


import { sliceFile } from "@/utils/file";
复制代码


因为主要是在文件操作时候进行调用,本文章使用的是 vue3+element-plus,所以调用时机放在了文件上传之前的钩子函数中,如下所示:


<el-upload    v-model:file-list="fileList"    class="upload-demo"    action=""    multiple    :http-request="handleSubmit"    :before-upload="handleBeforeUpload"    :show-file-list="false"    style="margin-left:10px"  >    <el-icon><FolderAdd /></el-icon> </el-upload>
复制代码


interface resIn{  code:number,  msg:string}const handleBeforeUpload = async (file: File) => {  const startDate = Date.now();  if(file.size > 1024*1024*100){    ElMessage.warning('文件最大不能超过100M')    return  }  let res = await sliceFile(file) as resIn  const endDate = Date.now();  // code为我自己定义的,可以进行更换  if(res.code == 200){    //上传完之后可进行的操作    ElMessage.success(res.msg + '耗时: ' + (endDate - startDate) +' ms' )  }else{    //失败情况下的操作    ElMessage.error(res.msg)  }};
复制代码


发布于: 刚刚阅读数: 4
用户头像

木偶

关注

凭时间赢来的东西,时间肯定会为之作证 2022-10-21 加入

CSDN前端领域优质创作者,CSDN博客专家,InfoQ写作社区专家博主,擅长PC端以及Uniapp开发

评论

发布
暂无评论
Vue3使用多线程处理文件分片任务_JavaScript_木偶_InfoQ写作社区