k8s 上运行我们的 springboot 服务之——大文件读写
说明:
一般的我们很少遇到在代码里面一次性去读取几个G的大文件(4g的txt文件等等),如果遇到了我们还是按照一般的方法去读取要么是oom了,要么是文件读不允许超过2g的限制,要么速度非常慢。
有一个经典的面试题:有一个4G大小的txt文件,文件里面的数据是多个不同且乱序的英文字母,要求:1、读取这个文件2、统计这个文件中字母“a”的个数。3、用jdk提供的方法以更快的方式完成。4、统计我们代码的执行时长
思考:
1、文件太大我们需要来分开读
2、使用多线程来完成,注意并发、数量和计时的统计
解决方法:
1、使用RandomAccessFile 可以读取文件的某段数据
2、计算出分段读取文件的start,end坐标
3、使用多线程来读和统计
实现:
1、首先把大文件分段度的区间计算出来
2、使用parallelStream来读
3、使用自定义线程池来读
测试数据:
1、文件大小3.95G,“A”的总数2125000000
2、使用stream:21376ms
3、普通for循环:21037ms
3:parallelStream:14907ms
4:线程池:14771ms
结论:
1、多线程的确能够加快速度
2、合理的设置线程池大小(也就是在做分段划分时设置的大小),我这里是设置的4(和我电脑配置一样)
3、parallelStream其实用的就是多线程,和map reduce原理类似
代码:
获得文件分段:
/** * 获得文件分段信息 * * @param file: * @param count: * @throws * @return: java.util.List<com.zhy.frame.core.vo.SplitFileVo> * @author: lvmoney /XXXXXX科技有限公司 * @date: 2020/5/29 18:55 */ public static List<SplitFileVo> getSplit(String file, int count) { List<SplitFileVo> result = new ArrayList<>(); RandomAccessFile raf = null; try { //获取目标文件 预分配文件所占的空间 在磁盘中创建一个指定大小的文件 r 是只读 raf = new RandomAccessFile(new File(file), "r"); //文件的总长度 long length = raf.length(); if (count > length) { SplitFileVo splitFileVo = new SplitFileVo(); splitFileVo.setBegin(0); splitFileVo.setEnd(length); result.add(splitFileVo); return result; } //文件切片后的长度 long maxSize = length / count; for (int i = 0; i < count; i++) { SplitFileVo splitFileVo = new SplitFileVo(); //最后一片单独处理 long begin = 0; if (i > 0) { begin = result.get(i - 1).getEnd(); } long end = begin + maxSize; splitFileVo.setBegin(begin); splitFileVo.setEnd(end); result.add(splitFileVo); } if (length % count > 0) { SplitFileVo splitFileVo = new SplitFileVo(); splitFileVo.setBegin(result.get(count - 1).getEnd()); splitFileVo.setEnd(length); result.add(splitFileVo); } } catch (FileNotFoundException e) { LOGGER.error("没有找到对应的文件:{}", file); } catch (IOException e) { LOGGER.error("文件切割失败:{},错误原因是:{}", file, e.getMessage()); } finally { try { raf.close(); } catch (IOException e) { LOGGER.error("文件切割失败:{},错误原因是:{}", file, e.getMessage()); } } return result; }
分段读并统计:
/** * 读取文件某一段数据 * * @param file: * @param begin: 开始 * @param end: 结束 * @throws * @return: boolean * @author: lvmoney /XXXXXX科技有限公司 * @date: 2020/5/29 10:08 */ public static Long readFile(String file, long begin, long end) { String a = file.split(FileUtil.getFileSuffix(file))[0]; Long result = 0L; try { //申明文件切割后的文件磁盘 RandomAccessFile in = new RandomAccessFile(new File(file), "r"); //定义一个可读,可写的文件并且后缀名为的二进制文件 SnowflakeIdFactoryUtil snowflakeIdFactoryUtil = new SnowflakeIdFactoryUtil(1, 2); String tempFile = a + BaseConstant.CONNECTOR_UNDERLINE + snowflakeIdFactoryUtil.nextId() + FileUtil.getFileSuffix(file); File delFile = new File(tempFile); //申明具体每一文件的字节数组 int n = 0; //从指定位置读取文件字节流 in.seek(begin); //判断文件流读取的边界 Long seq = end - begin; if (seq <= FILE_BYTE_LENGTH) { byte[] w = new byte[Integer.parseInt(String.valueOf(seq))]; n = in.read(w); result += getCharacterCount(new String(w, 0, n), "A"); } else { byte[] b = new byte[FILE_BYTE_LENGTH]; int loop = (int) (seq / FILE_BYTE_LENGTH); for (int i = 0; i < loop; i++) { n = in.read(b); String temp = new String(b, 0, n); result += getCharacterCount(temp, "A"); } if (seq % FILE_BYTE_LENGTH > 0) { byte[] w = new byte[Integer.parseInt(String.valueOf(seq % FILE_BYTE_LENGTH))]; n = in.read(w); String temp = new String(w, 0, n); result += getCharacterCount(temp, "A"); } } //关闭输入流 in.close(); if (delFile.exists()) { delFile.delete(); } } catch ( Exception e) { LOGGER.error("获得文件位置指针报错:{}", e); } return result; }
统计:
/** * 获得某个字符在字符串中出现的次数 * * @param res: * @param s: * @throws * @return: java.lang.Long * @author: lvmoney /XXXXXX科技有限公司 * @date: 2020/5/27 11:28 */ public static Long getCharacterCount(String res, String s) { Long count = 0L; for (char c : res.toCharArray()) { if (String.valueOf(c).equals(s)) { count += 1; } } return count; }
parallelStream测试:
public class Test1 { public static void main(String[] args) { long starTime = System.currentTimeMillis(); AtomicLong count = new AtomicLong(0); String file = "D:\\data\\temp\\test.txt"; List<SplitFileVo> list = SplitFile.getSplit(file, 4); System.out.println(JsonUtil.t2JsonString(list)); list.parallelStream().forEach(e -> count.getAndAdd(FileUtil.readFile(file, e.getBegin(), e.getEnd()))); System.out.println(count); long endTime = System.currentTimeMillis(); long Time = endTime - starTime; System.out.println("执行时间:" + Time);// FileUtil.readFile(file, 0, 13); }}
线程池测试:
public class FileReadThreadPoolExecutor { private static Long time; private ThreadPoolExecutor pool = null; /** * 线程池初始化方法 * <p> * corePoolSize 核心线程池大小----1 * maximumPoolSize 最大线程池大小----3 * keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间----30+单位TimeUnit * TimeUnit keepAliveTime时间单位----TimeUnit.MINUTES * workQueue 阻塞队列----new ArrayBlockingQueue<Runnable>(5)==== 5容量的阻塞队列 * threadFactory 新建线程工厂----new CustomThreadFactory()====定制的线程工厂 * rejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时, * 即当提交第9个任务时(前面线程都没有执行完,此测试方法中用sleep(100)), * 任务会交给RejectedExecutionHandler来处理 */ public void init() { pool = new ThreadPoolExecutor(4, 8, 30, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(8), new FileReadThreadPoolExecutor.FileThreadFactory(), new FileReadThreadPoolExecutor.CustomRejectedExecutionHandler()); } public void destory() { if (pool != null) { pool.shutdownNow(); } } public ExecutorService getCustomThreadPoolExecutor() { return this.pool; } private class CustomRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //核心改造点,由blockingqueue的offer改成put阻塞方法 try { executor.getQueue().put(r); } catch (InterruptedException e) { e.printStackTrace(); } } } private class FileThreadFactory implements ThreadFactory { private AtomicInteger count = new AtomicInteger(-1); @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); String threadName = CustomUnblockThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1); t.setName(threadName); return t; } } public static void main(String[] args) { FileReadThreadPoolExecutor exec = new FileReadThreadPoolExecutor(); //1. 初始化 exec.init(); ExecutorService pool = exec.getCustomThreadPoolExecutor(); long start = System.currentTimeMillis(); AtomicLong count = new AtomicLong(0); String file = "D:\\data\\temp\\test.txt"; List<SplitFileVo> list = SplitFile.getSplit(file, 4); for (int i = 0; i < list.size(); i++) { SplitFileVo vo = list.get(i); Runnable r = () -> { count.getAndAdd(FileUtil.readFile(file, vo.getBegin(), vo.getEnd())); }; pool.execute(r); } //2. 销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行 pool.shutdown(); while (!pool.isTerminated()) { } long end = System.currentTimeMillis(); //获取结束时间 System.out.println(count); System.out.println(end - start); }}
版权声明: 本文为 InfoQ 作者【柠檬】的原创文章。
原文链接:【http://xie.infoq.cn/article/2d9d7f7327a3ba0436f7a1d94】。
本文遵守【CC BY-NC】协议,转载请保留原文出处及本版权声明。
柠檬
人生尚未成功,朋友仍需努力 2020.05.21 加入
长期从事微服务,中台等后台开发和架构设计。一些见解和实现可查看https://gitee.com/lvmoney/zhy-frame-parent
评论