使用Junit封装HFDS
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.IOUtils;import org.junit.After;import org.junit.Before;import org.junit.Test;import java.net.URI;/** * 使用Java API操作HDFS文件系统 */public class HDFSApp { public static final String HDFS_PATH = "hdfs://localhost:9000"; Configuration conf = null; FileSystem fs = null; @Before public void open() throws Exception{ System.out.println("连接HDFS..."); conf = new Configuration(); // 副本系数为1,配置文件只对shell生效 conf.set("dfs.replication","1"); /** * 构造一个访问指定HDFS文件系统的客户端对象 * 第一个参数: HDFS的URI * 第二个参数: 客户端指定的配置参赛 * 第三个参数: 客户端的身份,说白了就是用户名 */ fs = FileSystem.get(new URI(HDFS_PATH),conf,"hadoop"); } @After public void close() throws Exception{ conf = null; fs = null; System.out.println("注销连接..."); }}
创建HDFS文件夹
/** * 创建HDFS文件夹 */@Testpublic void mkdir() throws Exception{ fs.mkdirs(new Path("input"));}
文件上传
/** * 文件上传 */@Testpublic void copyFromLocalFile() throws Exception{ fs.copyFromLocalFile(new Path("/home/hadoop/word.txt"), new Path("input/word.txt"));}
文件下载
/** * 文件下载 */@Testpublic void copyToLocalFile() throws Exception{ fs.copyToLocalFile(new Path("input/word.txt"), new Path("/home/hadoop/word2.txt"));}
查看HDFS文件内容
/** * 查看HDFS文件内容 */@Testpublic void catFileText() throws Exception{ FSDataInputStream in = fs.open(new Path("input/word.txt")); IOUtils.copyBytes(in,System.out,1024);}
列出指定文件夹下的所有内容
/** * 列出指定文件夹下的所有内容 */@Testpublic void listFile() throws Exception{ FileStatus[] listStatus = fs.listStatus(new Path("input")); for (FileStatus file : listStatus) { String isDir = file.isDirectory()?"文件夹":"文件"; String permission = file.getPermission().toString(); short replication = file.getReplication(); long len = file.getLen(); String path = file.getPath().toString(); // 输出信息 System.out.println(isDir+"\t"+permission+"\t"+ replication+"\t"+len+"\t"+path); }}
递归列出指定文件夹下的所有文件(夹)信息
/** * 递归列出指定文件夹下的所有文件(夹)信息 */@Testpublic void listAllFiles() throws Exception{ RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path("input"),true); while (files.hasNext()){ LocatedFileStatus file = files.next(); // 获取信息 String isDir = file.isDirectory()?"文件夹":"文件"; String permission = file.getPermission().toString(); short replication = file.getReplication(); long len = file.getLen(); String path = file.getPath().toString(); // 输出信息 System.out.println(isDir+"\t"+permission+"\t"+ replication+"\t"+len+"\t"+path); }}
创建HDFS文件,并写入内容
/** * 创建HDFS文件,并写入内容 */@Testpublic void create() throws Exception{ FSDataOutputStream out = fs.create(new Path("input/a.txt")); out.writeUTF("Hello,HDFS!"); out.flush(); out.close();}
刪除文件/文件夾
/** * 刪除文件/文件夾 */@Testpublic void deleteFile() throws Exception{ // true递归删除文件夹,false不删除文件夹,文件则无所谓 fs.delete(new Path("input"),true);}
HDFS重命名
/** * HDFS重命名 */@Testpublic void rename() throws Exception{ fs.rename(new Path("input/word.txt"),new Path("input/input.txt"));}
列出文件块信息
/** * 列出文件块信息 */@Testpublic void getFileBlockLocations() throws Exception{ FileStatus fileStatus = fs.getFileStatus(new Path("input/word.txt")); BlockLocation[] blocks = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); for (BlockLocation block : blocks) { // 获取文件块名字(多个,被切分) for (String name:block.getNames()) { System.out.println(name+":"+block.getOffset()+":"+block.getLength()); } }}