Java-API对HDFS的操作(IDEA版)

芒果1年前技术文章843

前期工作
首先就是安装maven
在win系统下不配置hadoop环境,直接运行代码会报错,显示缺少winutils.exe 和 hadoop.dll 两个文件
首先添加pom.xml文件
  <dependencies>
        <!-- Hadoop所需依赖包 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.1</version>
        </dependency>
        <!-- junit测试依赖 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
    </dependencies>
</project>

创建一个HDFSJavaAPI的类
创建目录
package com.hdfsdemo;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
/**
* HDFS Java API文件操作
*/
public class HDFSJavaAPI {
    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "root");
        createDir();
        createFile();
        outFile();
        deleteFile();
        copyFromLocalFile();
        copyToLocalFile();
        updateFlileProgress();
        deleteFile();
    }
    /**
     * 定义创建目录方法
     */
    public static void createDir() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://172.16.121.194:8020");
        FileSystem hdfs = FileSystem.get(conf);
        // 创建目录
        boolean isok = hdfs.mkdirs(new Path("hdfs:/mydir"));
        if (isok) {
            System.out.println("创建目录成功!");
        } else {
            System.out.println("创建目录失败!");
        }
        hdfs.close();
    }
    /**
     * 定义创建文件方法
     */
    public static void createFile() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://172.16.121.194:8020");
        FileSystem fs = FileSystem.get(conf);
        // 打开一个输出流
        FSDataOutputStream outputStream = fs.create(new Path(
                "hdfs:/newfile2.txt"));
        // 写入文件内容
        outputStream.write("我是文件内容1\n我是文件内容2\n我是文件内容3".getBytes());
        outputStream.close();
        fs.close();
        System.out.println("文件创建成功!");
    }
    // 删除文件
    public static void deleteFile() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://172.16.121.194:8020");
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("hdfs:/newfile2.txt");
        boolean isok = fs.deleteOnExit(path);
        if (isok) {
            System.out.println("删除成功!");
        } else {
            System.out.println("删除失败!");
        }
        fs.close();
    }
    // 复制上传本地文件
    public static void copyFromLocalFile() throws Exception {
        // 1.创建配置器
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://172.16.121.194:8020");
        // 2.取得FileSystem文件系统 实例
        FileSystem fs = FileSystem.get(conf);
        // 3.创建可供hadoop使用的文件系统路径
        Path src = new Path("D:/copy_test.txt"); // 本地目录/文件
        Path dst = new Path("hdfs:/"); // 目标目录/文件
        // 4.拷贝上传本地文件(本地文件,目标路径) 至HDFS文件系统中
        fs.copyFromLocalFile(src, dst);
        System.out.println("文件上传成功!");
    }
    // 监控文件上传进度
    public static void updateFlileProgress() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://172.16.121.194:8020");
        InputStream in = new BufferedInputStream(
                new FileInputStream("D:/soft/test.zip"));
        FileSystem fs = FileSystem.get(conf);
        //上传文件并监控上传进度
        FSDataOutputStream outputStream = fs.create(new Path("hdfs:/test.zip"),
                new Progressable() {
                    public void progress() {//回调方法显示进度
                        System.out.print(".");
                    }
                });
        IOUtils.copyBytes(in, outputStream, 4096, false);
    }
    // 复制下载文件
    public static void copyToLocalFile() throws Exception {
        // 1.创建配置器
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://172.16.121.194:8020");
        // 2.取得FileSystem文件系统 实例
        FileSystem fs = FileSystem.get(conf);
        // 3.创建可供hadoop使用的文件系统路径
        Path src = new Path("hdfs:/newfile2.txt");// 目标目录/文件
        Path dst = new Path("D:/new.txt"); // 本地目录/文件
        // 4.从HDFS文件系统中拷贝下载文件(目标路径,本地文件)至本地
        // fs.copyToLocalFile(src, dst);
        fs.copyToLocalFile(false, src, dst, true);
        System.out.println("文件下载成功!");
    }
    // 查看文件内容并输出
    public static void outFile() throws Exception {
        // 1.创建配置器
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://172.16.121.194:8020");
        // 2.取得FileSystem文件系统 实例
        FileSystem fs = FileSystem.get(conf);
        InputStream in = fs.open(new Path("hdfs:/newfile2.txt"));
        IOUtils.copyBytes(in, System.out, 4096, false);
        IOUtils.closeStream(in);
    }
}
上面代码中的参数"hdfs://hadoop1:8020"是hadoop配置文件中core-site.xml的配置信息:fs.defaultFS,
例如要创建一个.txt文件
只需要调用createFile();

AFBCD3C0-EE7F-43AE-AE12-980DA610866C.png

220A78C9-90AA-4505-BA02-FF96EAD13429.png


例如调用上传文件只需要调用copyFromLocalFile();


C970A709-3AD7-4645-B71F-C16972455FE7.png

CC954257-FD6B-40AD-B302-601794FD2F45.png

标签: 大数据运维

相关文章

kafka模拟消费报错 ISR缺失 指定offset提取数据失败场景

kafka模拟消费报错 ISR缺失 指定offset提取数据失败场景

测试集群信息kafka版本:3.0.0172.16.120.236  kafka-id: 0172.16.121.150  kafka-id: 1172.16.121.225  kafka-id: 2...

CDH实操--集成 freeipa

CDH实操--集成 freeipa

1 概述环境准备: 1)安装cdh6.2.1 2)安装FreeIPA,server和client(在所有cdh节点)2 集成2.1 krb5.conf修改注释:default_ccache_nam...

trino组件对接alluxio(三)

trino组件对接alluxio(三)

本文是基于已经部署了trino和alluxio的基础上,进行的trino与alluxio的组件对接,alluxio已经开启了高可用模式。安装部署1、增加alluxio配置在core-site.xml和...

Debezium部署以及同步之DB2数据到Kafka的同步

Debezium部署以及同步之DB2数据到Kafka的同步

因为Debezium依赖于kafka之上,所以我们先部署kafka和zookeeper(忽略)。1 环境介绍Debezium1.9版本 Db2 11.5版本  附官网:http...

远程DEBUG HADOOP源码方法

远程DEBUG HADOOP源码方法

1. 安装IDEA2. 下载hadoop源码,必须与集群服务代码版本一致,否则会导致有的类无法找到3. 将源码导入IDEA工程并完成build4. 点击 菜单栏--运行--编辑配置 进行相关debug...

Hive优化之SQL的优化(三)

Hive优化之SQL的优化(三)

     Hive是大数据领域常用的组件之一,主要是大数据离线数仓的运算,关于Hive的性能调优在日常工作和面试中是经常涉及的一个点,因此掌握一些Hi...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。