写点什么

k8s 上运行我们的 springboot 服务之——大文件读写

用户头像
柠檬
关注
发布于: 2020 年 05 月 29 日

说明:

一般的我们很少遇到在代码里面一次性去读取几个G的大文件(4g的txt文件等等),如果遇到了我们还是按照一般的方法去读取要么是oom了,要么是文件读不允许超过2g的限制,要么速度非常慢。



有一个经典的面试题:有一个4G大小的txt文件,文件里面的数据是多个不同且乱序的英文字母,要求:1、读取这个文件2、统计这个文件中字母“a”的个数。3、用jdk提供的方法以更快的方式完成。4、统计我们代码的执行时长



思考:



1、文件太大我们需要来分开读

2、使用多线程来完成,注意并发、数量和计时的统计



解决方法:

1、使用RandomAccessFile 可以读取文件的某段数据

2、计算出分段读取文件的start,end坐标

3、使用多线程来读和统计



实现:

1、首先把大文件分段度的区间计算出来

2、使用parallelStream来读

3、使用自定义线程池来读



测试数据:

1、文件大小3.95G,“A”的总数2125000000

2、使用stream:21376ms

3、普通for循环:21037ms

3:parallelStream:14907ms

4:线程池:14771ms



结论:

1、多线程的确能够加快速度

2、合理的设置线程池大小(也就是在做分段划分时设置的大小),我这里是设置的4(和我电脑配置一样)

3、parallelStream其实用的就是多线程,和map reduce原理类似



代码:



获得文件分段:



/**
* 获得文件分段信息
*
* @param file:
* @param count:
* @throws
* @return: java.util.List<com.zhy.frame.core.vo.SplitFileVo>
* @author: lvmoney /XXXXXX科技有限公司
* @date: 2020/5/29 18:55
*/
public static List<SplitFileVo> getSplit(String file, int count) {

List<SplitFileVo> result = new ArrayList<>();
RandomAccessFile raf = null;
try {
//获取目标文件 预分配文件所占的空间 在磁盘中创建一个指定大小的文件 r 是只读
raf = new RandomAccessFile(new File(file), "r");
//文件的总长度
long length = raf.length();
if (count > length) {
SplitFileVo splitFileVo = new SplitFileVo();
splitFileVo.setBegin(0);
splitFileVo.setEnd(length);
result.add(splitFileVo);
return result;
}
//文件切片后的长度
long maxSize = length / count;

for (int i = 0; i < count; i++) {
SplitFileVo splitFileVo = new SplitFileVo();
//最后一片单独处理
long begin = 0;
if (i > 0) {
begin = result.get(i - 1).getEnd();
}
long end = begin + maxSize;
splitFileVo.setBegin(begin);
splitFileVo.setEnd(end);
result.add(splitFileVo);
}
if (length % count > 0) {
SplitFileVo splitFileVo = new SplitFileVo();
splitFileVo.setBegin(result.get(count - 1).getEnd());
splitFileVo.setEnd(length);
result.add(splitFileVo);
}
} catch (FileNotFoundException e) {
LOGGER.error("没有找到对应的文件:{}", file);
} catch (IOException e) {
LOGGER.error("文件切割失败:{},错误原因是:{}", file, e.getMessage());
} finally {
try {
raf.close();
} catch (IOException e) {
LOGGER.error("文件切割失败:{},错误原因是:{}", file, e.getMessage());
}
}
return result;
}



分段读并统计:

/**
* 读取文件某一段数据
*
* @param file:
* @param begin: 开始
* @param end: 结束
* @throws
* @return: boolean
* @author: lvmoney /XXXXXX科技有限公司
* @date: 2020/5/29 10:08
*/
public static Long readFile(String file, long begin, long end) {
String a = file.split(FileUtil.getFileSuffix(file))[0];
Long result = 0L;
try {
//申明文件切割后的文件磁盘
RandomAccessFile in = new RandomAccessFile(new File(file), "r");
//定义一个可读,可写的文件并且后缀名为的二进制文件
SnowflakeIdFactoryUtil snowflakeIdFactoryUtil = new SnowflakeIdFactoryUtil(1, 2);
String tempFile = a + BaseConstant.CONNECTOR_UNDERLINE + snowflakeIdFactoryUtil.nextId() + FileUtil.getFileSuffix(file);
File delFile = new File(tempFile);
//申明具体每一文件的字节数组

int n = 0;
//从指定位置读取文件字节流
in.seek(begin);

//判断文件流读取的边界
Long seq = end - begin;
if (seq <= FILE_BYTE_LENGTH) {
byte[] w = new byte[Integer.parseInt(String.valueOf(seq))];
n = in.read(w);
result += getCharacterCount(new String(w, 0, n), "A");
} else {
byte[] b = new byte[FILE_BYTE_LENGTH];
int loop = (int) (seq / FILE_BYTE_LENGTH);
for (int i = 0; i < loop; i++) {
n = in.read(b);
String temp = new String(b, 0, n);
result += getCharacterCount(temp, "A");
}
if (seq % FILE_BYTE_LENGTH > 0) {
byte[] w = new byte[Integer.parseInt(String.valueOf(seq % FILE_BYTE_LENGTH))];
n = in.read(w);
String temp = new String(w, 0, n);
result += getCharacterCount(temp, "A");
}

}


//关闭输入流
in.close();
if (delFile.exists()) {
delFile.delete();
}
} catch (
Exception e) {
LOGGER.error("获得文件位置指针报错:{}", e);
}
return result;
}



统计:

/**
* 获得某个字符在字符串中出现的次数
*
* @param res:
* @param s:
* @throws
* @return: java.lang.Long
* @author: lvmoney /XXXXXX科技有限公司
* @date: 2020/5/27 11:28
*/
public static Long getCharacterCount(String res, String s) {
Long count = 0L;
for (char c : res.toCharArray()) {
if (String.valueOf(c).equals(s)) {
count += 1;
}
}
return count;
}



parallelStream测试:

public class Test1 {
public static void main(String[] args) {
long starTime = System.currentTimeMillis();
AtomicLong count = new AtomicLong(0);
String file = "D:\\data\\temp\\test.txt";
List<SplitFileVo> list = SplitFile.getSplit(file, 4);
System.out.println(JsonUtil.t2JsonString(list));
list.parallelStream().forEach(e -> count.getAndAdd(FileUtil.readFile(file, e.getBegin(), e.getEnd())));
System.out.println(count);
long endTime = System.currentTimeMillis();
long Time = endTime - starTime;
System.out.println("执行时间:" + Time);
// FileUtil.readFile(file, 0, 13);
}
}



线程池测试:

public class FileReadThreadPoolExecutor {
private static Long time;

private ThreadPoolExecutor pool = null;

/**
* 线程池初始化方法
* <p>
* corePoolSize 核心线程池大小----1
* maximumPoolSize 最大线程池大小----3
* keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间----30+单位TimeUnit
* TimeUnit keepAliveTime时间单位----TimeUnit.MINUTES
* workQueue 阻塞队列----new ArrayBlockingQueue<Runnable>(5)==== 5容量的阻塞队列
* threadFactory 新建线程工厂----new CustomThreadFactory()====定制的线程工厂
* rejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,
* 即当提交第9个任务时(前面线程都没有执行完,此测试方法中用sleep(100)),
* 任务会交给RejectedExecutionHandler来处理
*/

public void init() {

pool = new ThreadPoolExecutor(4, 8, 30,
TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(8), new FileReadThreadPoolExecutor.FileThreadFactory(), new FileReadThreadPoolExecutor.CustomRejectedExecutionHandler());
}

public void destory() {
if (pool != null) {
pool.shutdownNow();
}
}

public ExecutorService getCustomThreadPoolExecutor() {
return this.pool;
}


private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//核心改造点,由blockingqueue的offer改成put阻塞方法
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

private class FileThreadFactory implements ThreadFactory {

private AtomicInteger count = new AtomicInteger(-1);

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName = CustomUnblockThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
t.setName(threadName);
return t;
}
}

public static void main(String[] args) {
FileReadThreadPoolExecutor exec = new FileReadThreadPoolExecutor();

//1. 初始化
exec.init();
ExecutorService pool = exec.getCustomThreadPoolExecutor();
long start = System.currentTimeMillis();
AtomicLong count = new AtomicLong(0);
String file = "D:\\data\\temp\\test.txt";
List<SplitFileVo> list = SplitFile.getSplit(file, 4);
for (int i = 0; i < list.size(); i++) {
SplitFileVo vo = list.get(i);
Runnable r = () -> {
count.getAndAdd(FileUtil.readFile(file, vo.getBegin(), vo.getEnd()));
};
pool.execute(r);
}

//2. 销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行
pool.shutdown();
while (!pool.isTerminated()) {
}
long end = System.currentTimeMillis(); //获取结束时间
System.out.println(count);
System.out.println(end - start);


}

}




其他详见:https://gitee.com/lvmoney/zhy-frame-parent

发布于: 2020 年 05 月 29 日阅读数: 58
用户头像

柠檬

关注

人生尚未成功,朋友仍需努力 2020.05.21 加入

长期从事微服务,中台等后台开发和架构设计。一些见解和实现可查看https://gitee.com/lvmoney/zhy-frame-parent

评论

发布
暂无评论
k8s 上运行我们的 springboot 服务之——大文件读写