Jansiel Notes

分片计算文件MD5,并借助WebWorker和队列优化

之前也有做过计算文件MD5做相关校验,最近业务上再次遇上,同时需要大量文件同时计算,难免需要优化相关性能。这里用到分片计算,同时将计算过程放到webworker,避免长时间占用主线程造成卡顿,并且通过队列控制并发,避免占用过多计算机资源,造成浏览器卡顿。

整个文件直接计算MD5

 1import SparkMD5 from 'spark-md5'
 2const spark = new SparkMD5.ArrayBuffer()
 3export const createMD5 = file => {
 4    // 验证file
 5    return new Promise((resolve,reject)=>{
 6        const fileReader = new FileReader()
 7         fileReader.readAsArrayBuffer(file)
 8         fileReader.onerror = e => {
 9             console.error(e)
10             return reject(e)
11             }
12        fileReader.onload = () => {
13            try{
14            resolve(spark.hash(dataBuffer))
15            }catch(e){
16                 console.error(e)
17             return reject(e)
18            }
19
20         }
21    })
22
23}
24

直接加载整个文件,将整个文件的二进制数据arrayBuffer丢给计算工具计算MD5,如果是一个小文件就还好,但是大文件,批量的文件,一下子将大量资源放到内存里会造成明显的卡顿 .

分片计算MD5

spark-md5推荐:Incremental md5 performs a lot better for hashing large amounts of data, such as files. One could read files in chunks, using the FileReader & Blob's, and append each chunk for md5 hashing while keeping memory usage low.

增量 md5在散列大量数据(比如文件)时表现得更好。可以使用 FileReader & Blob 的块来读取文件,并为 md5哈希添加每个块,同时保持较低的内存使用。

通过File.slice将文件分片,一块一块地增量处理spark.append(chunkArrayBuffer),最后通过spark.end()获取最终的文件md5,可以保持低内存使用,当然,我们平时讲的文件分片上传等也是基于File.slice。

上代码

 1import SparkMD5 from 'spark-md5'
 2/*
 3 * @description 创建文件的MD5值,根据文件大小动态选择分片大小和快速循环的方式
 4 * @param {Object} file el文件对象或文件对象
 5 * @return {String} MD5 值
 6 * */
 7const maxChunkSize = 1024 * 1024 // 每次处理数据块的最大值,1MB
 8const chunksPerCycle = 100 // 每个计算周期中处理的数据块数
 9export const createMD5 = file => {
10  file = file.raw || file
11  let stamp = Date.now()
12  console.time('md计算耗时' + stamp)
13  return new Promise((resolve, reject) => {
14    // 分片放worker处理
15    const fileReader = new FileReader()
16    try {
17      let currentChunk = 0
18      const totalChunks = Math.ceil(file.size / maxChunkSize)
19      const spark = new SparkMD5.ArrayBuffer()
20      // 处理数据块
21      const processChunk = start => {
22        try {
23          const blobSlice = File.prototype.slice || File.prototype.mozSlice || File.prototype.webkitSlice
24          const end = Math.min(start + maxChunkSize, file.size)
25          const chunk = blobSlice.call(file, start, end)
26          fileReader.readAsArrayBuffer(chunk)
27        } catch (e) {
28          console.error(e)
29        }
30      }
31      // 文件读取成功
32      fileReader.onload = () => {
33       spark.append(fileReader.result) // 将当前数据块内容追加到 MD5 计算器
34        currentChunk += 1
35        if (currentChunk >= totalChunks) {
36          const md5Hash = spark.end() // 完成 MD5 计算
37           spark.destroy() // 销毁计算器
38           fileReader.abort() //释放资源
39           console.timeEnd('计算MD5')
40           return resolve(md5Hash)
41        } else if (currentChunk % chunksPerCycle === 0) {
42          // 在处理指定数量的数据块后,设置一个任务延迟以使 UI 线程有空间处理
43          setTimeout(() => {
44            processChunk(currentChunk * maxChunkSize)
45          }, 0)
46        } else {
47          // 继续处理下一个数据块
48          processChunk(currentChunk * maxChunkSize)
49        }
50      }
51      fileReader.onerror = e => {
52        console.error(e)
53        return reject(e)
54      }
55      // 开始处理第一个数据块
56      processChunk(0)
57    } catch (e) {
58      fileReader.abort()
59      reject(e)
60    } finally {
61    }
62  })
63}
64

借助WebWorker优化

md5计算属于高密度计算,如果直接放在js主线程,长时间占用造成卡顿难免对用户体验不好,这里也可以发挥出webworker的优势,本来webworker就是为了线程可以执行任务而不干扰用户界面而生。

这里也有两种方式,

第一种,将File直接传给worker,在worker里面分片读取计算,但是这样的话,因为File对象这种二进制数据直接传递给worker,浏览器会先给FIle做一层拷贝,但是拷贝方式多少会造成性能问题。

当然,二进制数据可以通过Transferable Objects,也就是转移数据的方法,主线程把二进制数据直接转移给子线程,但是一旦转移,主线程就无法再使用这些二进制数据了(防止出现多个线程同时修改数据的局面)。那么我们能不能将File通过这种方式传过去呢?不行,Transferable Objects只支持以下几个类型,就算支持将File转过去,我们还得想一个问题就是,将FIle转过去,主线程就不能用这个File对象了,那你后续怎么做进一步的处理,上传啥的。

以下是可以被 转移 的不同规范的对象:

第二种,反正也是切片,我这里就直接在主线程切片读取,将读取后的chunkBuffer通过Transferable Objects传递到worker线程。

worker代码:

 1import SparkMD5 from 'spark-md5'
 2const spark = new SparkMD5.ArrayBuffer()
 3self.addEventListener('message', ({ data }) => {
 4  const { dataBuffer, status } = data
 5  try {
 6    if (status === 'ing') {
 7      spark.append(dataBuffer)
 8    } else if (status === 'end') {
 9      self.postMessage({ md5: spark.end(), status: 'success' })
10      spark.destroy() // 销毁计算器
11    }
12  } catch (e) {
13    self.postMessage({ status: 'error', error: e })
14    console.error(e)
15  }
16})
17

主线程:

 1/*
 2 * @description 创建文件的MD5值,根据文件大小动态选择分片大小和快速循环的方式
 3 * @param {Object} file el文件对象或文件对象
 4 * @return {String} MD5 值
 5 * */
 6const maxChunkSize = 1024 * 1024 // 每次处理数据块的最大值,1MB
 7const chunksPerCycle = 100 // 每个计算周期中处理的数据块数
 8export const createMD5 = file => {
 9  file = file.raw || file
10  let stamp = Date.now()
11  console.time('md计算耗时' + stamp)
12  let worker = new Worker(
13    /* webpackChunkName: "md5-encode.worker" */ new URL('../worker/md5-encode.worker.js', import.meta.url)
14  )
15
16  return new Promise((resolve, reject) => {
17    // 分片放worker处理
18    const fileReader = new FileReader()
19    try {
20      let currentChunk = 0
21      const totalChunks = Math.ceil(file.size / maxChunkSize)
22      // 处理数据块
23      const processChunk = start => {
24        try {
25          let blobSlice = File.prototype.slice || File.prototype.mozSlice || File.prototype.webkitSlice
26          const end = Math.min(start + maxChunkSize, file.size)
27          const chunk = blobSlice.call(file, start, end)
28          fileReader.readAsArrayBuffer(chunk)
29        } catch (e) {
30          console.error(e)
31        }
32      }
33      // 文件读取成功
34      fileReader.onload = () => {
35        worker.postMessage({ dataBuffer: fileReader.result, status: 'ing' }, [fileReader.result])
36        currentChunk += 1
37        if (currentChunk >= totalChunks) {
38          worker.postMessage({ status: 'end' })
39          fileReader.abort()
40          // console.timeEnd('计算MD5')
41          return
42          // return resolve(md5Hash)
43        } else if (currentChunk % chunksPerCycle === 0) {
44          // 在处理指定数量的数据块后,设置一个任务延迟以使 UI 线程有空间处理
45          setTimeout(() => {
46            processChunk(currentChunk * maxChunkSize)
47          }, 0)
48        } else {
49          // 继续处理下一个数据块
50          processChunk(currentChunk * maxChunkSize)
51        }
52      }
53
54      fileReader.onerror = e => {
55        console.error(e)
56        return reject(e)
57      }
58       worker.onmessage = ({ data }) => {
59        const { md5, status, error } = data
60        if (status === 'success') {
61          resolve(md5)
62        } else {
63          reject(error)
64        }
65        console.timeEnd('md计算耗时' + stamp)
66        worker.terminate()
67      }
68      // 开始处理第一个数据块
69      processChunk(0)
70    } catch (e) {
71      fileReader.abort()
72      reject(e)
73    } finally {
74    }
75  })
76}
77

第二种方式地第一种直接传File对象会快出File拷贝的时间,处理一个300多m的文件,整体会快几百ms到1s。

借助队列函数进一步优化

现在计算md5不会堵塞主线程了,但是如果大量文件同时进行分片读取,同样会造成一定的卡顿,同时,大家也要知道,worker开到一定数量也是会造成浏览器卡顿的,和电脑的资源大小有关,所以可以通过队列控制计算md5方法的并发量,避免一下子占用太多资源。

以下是我写的一个队列函数,结合一下上面的createMD5即可达到目的。

 1
 2/**
 3 * 队列操作
 4 * @param concurrency 同时执行的数量
 5 * @param fn 操作函数 异步函数
 6 * @param fn.dataItem 操作的数据
 7 * @param fn.getRemoveQueue 撤销排队中的某一个任务
 8 * @returns {function(*=, *=): Promise<unknown>}
 9 * 使用方法
10 * let removeFn = null
11 * const getRemoveFn = fn => removeFn = fn
12 * const handleDataByQueue = createQueue(3, async (dataItem) => {
13 *  await sleep(1000)
14 *  console.log(dataItem)
15 *  return dataItem
16 *  })
17 *  handleDataByQueue(1, getRemoveFn)
18 *  // 取消排队
19 *  setTimeout(() => {
20 *  removeFn && removeFn()
21 *  }, 1000)
22 * */
23export const createQueue = (concurrency, fn) => {
24  const queue = []
25  const runningQueue = []
26  // 撤销排队中的某一个任务
27
28  const removeQueue = task => {
29    const index = queue.findIndex(item => item === task)
30    if (index !== -1) {
31      console.log('取消排队')
32      queue.splice(index, 1)
33    }
34  }
35  const process = (dataItem, getRemoveQueueSource) => {
36    return new Promise((resolve, reject) => {
37      const run = async () => {
38        if (runningQueue.length >= concurrency) {
39          queue.push(run)
40          getRemoveQueueSource && getRemoveQueueSource(() => removeQueue(run))
41          return
42        }
43        runningQueue.push(run)
44        try {
45          const result = await fn(dataItem)
46          resolve(result)
47        } catch (e) {
48          reject(e)
49        } finally {
50          runningQueue.splice(runningQueue.indexOf(run), 1)
51          if (queue.length) {
52            queue.shift()()
53          }
54        }
55      }
56      run()
57    })
58  }
59  return process
60}
61

使用:

1// 10个并发量
2const createMD5ByQueue = createQueue(10, createMD5)
3
4// createMD5ByQueue(file1)
5// createMD5ByQueue(file2)
6// createMD5ByQueue(file3)
7// createMD5ByQueue(file4)
8

如有不妥,多多指教!