HDFS文件系统操作JAVA-API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
public class HDFSIO {

/**
* 创建文件夹
* @param path
* @throws IOException
*/

public static void mkdir(String path)throws IOException{

//读取配置文件
Configuration configuration = new Configuration();
//获取文件系统
FileSystem fSystem = FileSystem.get(URI.create("hdfs://192.168.32.128:9000/"), configuration);
Path srcPath = new Path(path);
boolean flag = fSystem.mkdirs(srcPath);
if(flag){
System.out.println("create dir ok!");
}else {
System.out.println("create dir failure");
}
fSystem.close();
}
/**
* 删除文件或者文件目录
* @throws IOException
* **/
public static void rmdir(String filePath) throws IOException {
//读取配置文件
Configuration conf = new Configuration();
//获取文件系统
FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.32.128:9000"),conf);
Path path = new Path(filePath);

//调用deleteOnExit()
boolean flag = fs.deleteOnExit(path);
// fs.delete(path);
if(flag) {
System.out.println("delete ok!");
}else {
System.out.println("delete failure");
}

//关闭文件系统
fs.close();
}
/**
* 创建文件
* @param dsString
* @param contents
* @throws IOException
*/
@SuppressWarnings("unused")
private static void createFile(String dsString ,byte[] contents) throws IOException{

// TODO Auto-generated method stub
Configuration configuration = new Configuration();
FileSystem fSystem = FileSystem.get(URI.create("hdfs://192.168.32.128:9000/"),configuration);
Path dsPath = new Path(dsString);
//打开一个输出流
FSDataOutputStream outputStream = fSystem.create(dsPath);
outputStream.write(contents);

//close
outputStream.close();
fSystem.close();
System.out.println("create success!");
}
/**
* 列出目录下的文件
* @param path
* @throws IOException
*/
public static void listFile(String path) throws IOException{

//读取配置文件
Configuration conf = new Configuration();
//获取文件系统
FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.32.128:9000"),conf);
//获取文件或目录状态
FileStatus[] fileStatus = fs.listStatus(new Path(path));
//打印文件的路径
for (FileStatus file : fileStatus) {
System.out.println(file.getPath());
}

//关闭文件系统
fs.close();
}
/**
* 上传文件
* @param src
* @param dst
* @throws IOException
*/
private static void uploadFile(String src,String dst) throws IOException{

// TODO Auto-generated method stub
Configuration configuration = new Configuration();
FileSystem fsFileSystem = FileSystem.get(URI.create("hdfs:192.168.32.128:9000"),configuration);
Path srcPath = new Path(src);
Path dstPath = new Path(dst);
//调用文件系统的文件复制函数,前面参数是指是否删除原文件,true为删除,默认为false
fsFileSystem.copyFromLocalFile(false, srcPath,dstPath);
//打印文件路径
System.out.println("Upload to "+configuration.get("fs.default.name"));
System.out.println("------------list files------------"+"\n");
FileStatus [] fileStatus = fsFileSystem.listStatus(dstPath);
for(FileStatus file: fileStatus){
System.out.println(file.getPath());
}
fsFileSystem.close();
}
/**
* 重命名
* @param oldName
* @param newName
* @throws IOException
*/
private static void renameFile(String oldName,String newName)throws IOException {
// TODO Auto-generated method stub
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.32.128:9000"),configuration);
Path oldPath = new Path(oldName);
Path newPath = new Path(newName);

boolean flag = fs.rename(oldPath, newPath);
if(flag){
System.out.println("rename ok!");
}else {
System.out.println("rename failure!");
}
fs.close();
}
/**
* 读取文件
* @param uriString
* @throws IOException
*/
private static void readfile(String uriString) throws IOException {
// TODO Auto-generated method stub
Configuration configuration = new Configuration();
FileSystem fSystem = FileSystem.get(URI.create("hdfs://192.168.32.128:9000"), configuration);

InputStream inputStream = null;
try {
inputStream = fSystem.open(new Path(uriString));
//复制到标准输出流
IOUtils.copyBytes(inputStream, System.out,4096,false );
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}finally{
IOUtils.closeStream(inputStream);
}
}
/**
* 添加到文件的末尾(src为本地地址,dst为hdfs文件地址)
* 追加到文件末尾
* @param srcString
* @param dsString
* @throws IOException
*/
private static void appendFile(String srcString,String dsString)throws IOException {
// TODO Auto-generated method stub
Configuration configuration = new Configuration();
FileSystem fsFileSystem = FileSystem.get(URI.create("hdfs://192.168.32.128:9000"),configuration);
Path dstPath = new Path(dsString);
//创建需要写入的文件流
InputStream inputStream = new BufferedInputStream(new FileInputStream(srcString));
//文件输出流写入
FSDataOutputStream outputStream = fsFileSystem.append(dstPath);
IOUtils.copyBytes(inputStream, outputStream,4096,true);
}

FileSystem的get()方法有两个。
FileSystem fs = FileSystem.get(URI.create(“hdfs://localhost:9000”),conf); //默认在hdfs上读取文件
FileSystem fs = FileSystem.get(conf); //默认从本地上读取文件