点一下关注吧!!!非常感谢!!持续更新!!!
🚀 AI 篇持续更新中!(长期更新)
目前 2025 年 06 月 05 日更新到:AI 炼丹日志-28 - Audiblez 将你的电子书 epub 转换为音频 mp3 做有声书,持续打造实用 AI 工具指南!📐🤖
💻 Java 篇正式开启!(300 篇)
目前 2025 年 06 月 11 日更新到:Java-42 深入浅出 Nginx - 缘起与发展 场景与配置快速上手 MyBatis 已完结,Spring 已完结,深入浅出助你打牢基础!
📊 大数据板块已完成多项干货更新(300 篇):
包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!目前 2025 年 06 月 05 日更新到:大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解
👉 点个关注,不迷路!后续还将持续更新更多大模型+数据智能+工程实战内容,敬请期待!
章节内容
上一节完成:
HDFS 的集群启动
HDFS 的命令行操作
HDFS 上传下载移动重命名等操作
背景介绍
这里是三台公网云服务器,每台 2C4G,搭建一个 Hadoop 的学习环境,供我学习。之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的 3 台机器,赶紧尝试在公网上搭建体验一下。
2C4G 编号 h121
2C4G 编号 h122
2C2G 编号 h123
基本介绍
HDFS(Hadoop Distributed File System)是 Hadoop 的核心组件之一,提供了一个高容错、高吞吐量的分布式文件系统。HDFS Java Client 是访问和操作 HDFS 的主要方式,适合在 Java 程序中进行文件的上传、下载、删除、读取等操作。
我们需要的包大概有下面的:
org.apache.hadoop.fs.FileSystem:这是所有文件系统的抽象基类,HDFS 就是其一个实现。通过该类可以获取文件系统实例。
org.apache.hadoop.fs.Path:HDFS 中路径的封装类,不是 java.io.File。
org.apache.hadoop.conf.Configuration:Hadoop 的配置类,用于加载 core-site.xml、hdfs-site.xml 等配置文件。
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
复制代码
FileSystem
这是操作 HDFS 的核心类,抽象了各种文件系统的通用行为(包括本地、HDFS、S3 等)。
FileSystem fs = FileSystem.get(URI uri, Configuration conf, String user);
复制代码
URI:HDFS 的地址,如 hdfs://namenode:9000
conf:配置类,可以从 XML 或代码中设置
user:以哪个用户身份连接(需有对应 HDFS 权限)
mkdirs(Path path):创建目录(会递归创建父目录)
delete(Path path, boolean recursive):删除文件或目录。目录需设置 recursive=true
rename(Path src, Path dst):重命名或移动文件
exists(Path path):判断路径是否存在
isDirectory(Path path):判断是否是目录
listStatus(Path path):列出路径下的所有文件和目录
open(Path path):读取文件,返回 FSDataInputStream
create(Path path):创建文件(若已存在会报错)
append(Path path):追加内容(前提是开启 append 支持)
copyFromLocalFile(Path localSrc, Path dst):上传本地文件
copyToLocalFile(Path src, Path localDst):下载到本地文件
基本方法
上传文件:copyFromLocalFile(Path src, Path dst)
下载文件:copyToLocalFile(Path src, Path dst)
创建目录:mkdirs(Path path)
删除文件或目录:delete(Path path, boolean recursive)
判断文件存在:exists(Path path)
获取文件信息:getFileStatus(Path path)
读取文件内容:FSDataInputStream in = fs.open(path)
写入文件内容:FSDataOutputStream out = fs.create(path)
注意事项
权限问题:需设置 hdfs_user 为有权限的用户。
URI 格式:必须是 hdfs://host:port,而非 HTTP。
运行环境:需有 HADOOP_HOME,或者引入依赖完整的 Hadoop 类路径。
新建工程
这里使用 IDEA 新建一个 Maven 工程即可!
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>hadoop-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Hadoop Dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>
</project>
复制代码
创建文件
package icu.wzk.demo01;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
public class Main {
public static void main(String[] args) throws IOException {
// 创建文件
mkdirs();
}
public static void mkdirs() throws IOException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://h121.wzk.icu:9000");
FileSystem fileSystem = FileSystem.get(configuration);
fileSystem.mkdirs(new Path("/wzk-test"));
fileSystem.close();
}
}
复制代码
我们对应的查看 HDFS 集群上的目录,是否一致
上传文件
public static void upload() throws IOException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://h121.wzk.icu:9000");
FileSystem fileSystem = FileSystem.get(configuration);
Path filePath = new Path("wzk01.txt");
Path toFilePath = new Path("/wzk-test/wzk01.txt");
fileSystem.copyFromLocalFile(filePath, toFilePath);
fileSystem.close();
}
复制代码
在 HDFS 中查看对应的文件
下载文件
public static void download() throws IOException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://h121.wzk.icu:9000");
FileSystem fileSystem = FileSystem.get(configuration);
Path filePath = new Path("/wzk-test/wzk01.txt");
Path toFilePath = new Path("wzk01-01.txt");
fileSystem.copyToLocalFile(filePath, toFilePath);
fileSystem.close();
}
复制代码
删除文件
public static void delete() throws IOException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://h121.wzk.icu:9000");
FileSystem fileSystem = FileSystem.get(configuration);
fileSystem.delete(new Path("/wzk-test"), true);
fileSystem.close();
}
复制代码
展示列表
public static void listList() throws IOException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://h121.wzk.icu:9000");
FileSystem fileSystem = FileSystem.get(configuration);
RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path("/"), true);
while (listFiles.hasNext()) {
LocatedFileStatus status = listFiles.next();
System.out.println("文件名字: " + status.getPath().getName());
System.out.println("文件长度: " + status.getLen());
System.out.println("文件块大小: " + status.getBlockSize());
System.out.println("权限: " + status.getPermission());
System.out.println("分组: " + status.getGroup());
BlockLocation[] blockLocations = status.getBlockLocations();
for (BlockLocation blockLocation : blockLocations) {
String[] hosts = blockLocation.getHosts();
for (String host : hosts) {
System.out.println(host);
}
}
System.out.println("==========================");
}
fileSystem.close();
}
复制代码
扫描路径
public static void listStatus() throws IOException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://h121.wzk.icu:9000");
FileSystem fileSystem = FileSystem.get(configuration);
FileStatus[] listStatus = fileSystem.listStatus(new Path("/"));
for (FileStatus fileStatus : listStatus) {
if (fileStatus.isFile()) {
System.out.println("文件: " + fileStatus.getPath().getName());
} else {
System.out.println("文件夹: " + fileStatus.getPath().getName());
}
}
fileSystem.close();
}
复制代码
PUT 操作
public static void putFile() throws IOException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://h121.wzk.icu:9000");
FileSystem fileSystem = FileSystem.get(configuration);
try (FileInputStream fis= new FileInputStream("wzk02.txt")) {
FSDataOutputStream fos = fileSystem.create(new Path("/wzk02_io.txt"));
IOUtils.copyBytes(fis, fos, configuration);
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fileSystem.close();
}
}
复制代码
GET 操作
public static void getFile() throws IOException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://h121.wzk.icu:9000");
FileSystem fileSystem = FileSystem.get(configuration);
try (FileOutputStream fos = new FileOutputStream("wzk02_io_get.txt")) {
FSDataInputStream fis = fileSystem.open(new Path("/wzk02_io.txt"));
IOUtils.copyBytes(fis, fos, configuration);
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fileSystem.close();
}
}
复制代码
Seek 操作
public static void seek() throws IOException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://h121.wzk.icu:9000");
FileSystem fileSystem = FileSystem.get(configuration);
FSDataInputStream in = null;
try {
in = fileSystem.open(new Path("/wzk02_io.txt"));
IOUtils.copyBytes(in, System.out, 4096, false);
in.seek(0);
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
复制代码
进度显示
public static void uploadProgress() throws IOException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://h121.wzk.icu:9000");
FileSystem fileSystem = FileSystem.get(configuration);
try (FileInputStream fis = new FileInputStream("music.mp3")) {
FSDataOutputStream fos = fileSystem.create(new Path("/wzk/music.mp3"),
() -> System.out.print("="));
IOUtils.copyBytes(fis, fos, configuration);
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fileSystem.close();
System.out.println("done!");
}
}
复制代码
评论