package org.rain;
import redis.clients.jedis.*;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class Main {
public static void readTxtFile(String filePath, JedisPool pool) {
try {
File file = new File(filePath);
if (file.isFile() && file.exists()) {
InputStreamReader isr = new InputStreamReader(Files.newInputStream(file.toPath()), StandardCharsets.UTF_8);
BufferedReader br = new BufferedReader(isr);
String lineTxt = null;
AtomicInteger i = new AtomicInteger();
ExecutorService executorService = new ThreadPoolExecutor(50, 50, 100L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
while ((lineTxt = br.readLine()) != null) {
String[] textArr = lineTxt.split("\t");
if (textArr.length <= 6) {
continue;
}
executorService.submit(() -> {
try (Jedis jedis = pool.getResource()) {
jedis.setex(textArr[0] + "-" + i.getAndIncrement(), 14400, textArr[6]);
// System.out.println("已处理第" + i);
}
});
executorService.submit(() -> {
try (Jedis jedis = pool.getResource()) {
jedis.setex(textArr[1] + "-" + i.getAndIncrement(), 14400, textArr[6]);
// System.out.println("已处理第" + i);
}
});
}
System.out.println("处理完毕,共处理:" + i);
executorService.shutdown();
br.close();
} else {
System.out.println("文件不存在!");
}
} catch (Exception e) {
System.out.println("文件读取错误!" + e);
}
}
public static void main(String[] args) {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(200);
config.setMaxTotal(300);
config.setTestOnBorrow(false);
config.setTestOnReturn(false);
String redisHost = "r-.redis.rds.aliyuncs.com";
String redisUser = "r-";
String redisPassword = "";
String tairHost = "r-.redis.rds.aliyuncs.com";
String tairUser = "r-";
String tairPassword = "";
JedisPool pool;
if ("redis".equals(args[0])) {
System.out.println("redis");
pool = new JedisPool(config, redisHost, 6379, redisUser, redisPassword);
} else {
System.out.println("tair");
pool = new JedisPool(config, tairHost, 6379, tairUser, tairPassword);
}
String filePath = "local".equals(args[1]) ? "/Users/Downloads/weibo_train_data.txt" : "/home/weibo_train_data.txt";
System.out.println(filePath);
readTxtFile(filePath, pool);
}
}
评论