首页 技术 正文
技术 2022年11月15日
0 收藏 781 点赞 3,022 浏览 6404 个字

本项目主要实现Windows下利用代码实现Hadoop中文件上传至HDFS

实现上传文本文件中单词个数的计数

1、项目结构

2、相关代码

  • CopyFromLocalFile
  1 package com.hadoop.worldcount;
2
3 import java.io.FileInputStream;
4
5 import java.io.IOException;
6
7 import org.apache.hadoop.conf.Configuration;
8
9 import org.apache.hadoop.fs.FSDataOutputStream;
10
11 import org.apache.hadoop.fs.FileSystem;
12
13 import org.apache.hadoop.fs.Path;
14
15 public class CopyFromLocalFile {
16
17 /**
18
19 * 判断路径是否存在
20
21 */
22
23 public static boolean test(Configuration conf, String path) {
24
25 try (FileSystem fs = FileSystem.get(conf)) {
26
27 return fs.exists(new Path(path));
28
29 } catch (IOException e) {
30
31
32
33 e.printStackTrace();
34
35 return false;
36
37 }
38
39
40
41 }
42
43 /**
44
45
46
47 * 复制文件到指定路径 若路径已存在,则进行覆盖
48
49
50
51 */
52
53
54
55 public static void copyFromLocalFile(Configuration conf,
56
57 String localFilePath, String remoteFilePath) {
58
59 Path localPath = new Path(localFilePath);
60
61 Path remotePath = new Path(remoteFilePath);
62
63 try (FileSystem fs = FileSystem.get(conf)) {
64
65 /* fs.copyFromLocalFile 第一个参数表示是否删除源文件,第二个参数表示是否覆盖 */
66
67 fs.copyFromLocalFile(false, true, localPath, remotePath);
68
69 } catch (IOException e) {
70
71 e.printStackTrace();
72
73 }
74
75 }
76
77
78
79 /**
80
81
82
83 * 追加文件内容
84
85
86
87 */
88
89
90
91 public static void appendToFile(Configuration conf, String localFilePath,
92
93 String remoteFilePath) {
94
95 Path remotePath = new Path(remoteFilePath);
96
97 try (FileSystem fs = FileSystem.get(conf);
98
99 FileInputStream in = new FileInputStream(localFilePath);) {
100
101 FSDataOutputStream out = fs.append(remotePath);
102
103 byte[] data = new byte[1024];
104
105 int read = -1;
106
107 while ((read = in.read(data)) > 0) {
108
109 out.write(data, 0, read);
110
111 }
112
113 out.close();
114
115 } catch (IOException e) {
116
117 e.printStackTrace();
118
119 }
120
121 }
122
123
124
125 /**
126
127
128
129 * 主函数
130
131
132
133 */
134
135
136
137 public static void main(String[] args) {
138
139 Configuration conf = new Configuration();
140
141 conf.set("fs.defaultFS", "hdfs://localhost:9000");
142
143 String localFilePath = "/usr/hadoop/test/test.txt"; // 本地路径
144
145 String remoteFilePath = "/user/hadoop/test/test.txt"; // HDFS路径
146
147 String choice = "append"; // 若文件存在则追加到文件末尾
148
149 //String choice = "overwrite"; // 若文件存在则覆盖
150
151 try {
152
153 /* 判断文件是否存在 */
154
155 boolean fileExists = false;
156
157 if (CopyFromLocalFile.test(conf, remoteFilePath)) {
158
159 fileExists = true;
160
161 System.out.println(remoteFilePath + " 已存在.");
162
163 } else {
164
165 System.out.println(remoteFilePath + " 不存在.");
166
167 }
168
169 /* 进行处理 */
170
171 if (!fileExists) { // 文件不存在,则上传
172
173 CopyFromLocalFile.copyFromLocalFile(conf, localFilePath,
174
175 remoteFilePath);
176
177 System.out.println(localFilePath + " 已上传至 " + remoteFilePath);
178
179 } else if (choice.equals("overwrite")) { // 选择覆盖
180
181 CopyFromLocalFile.copyFromLocalFile(conf, localFilePath,
182
183 remoteFilePath);
184
185 System.out.println(localFilePath + " 已覆盖 " + remoteFilePath);
186
187 } else if (choice.equals("append")) { // 选择追加
188
189 CopyFromLocalFile.appendToFile(conf, localFilePath,
190
191 remoteFilePath);
192
193 System.out.println(localFilePath + " 已追加至 " + remoteFilePath);
194
195 }
196
197 } catch (Exception e) {
198
199
200
201 e.printStackTrace();
202
203 }
204
205 }
206
207 }
  • File
package com.hadoop.worldcount;import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;public class File {/**
* @param args
* @throws IOException
*/public static void main(String[] args) throws Exception {
String localSrc = "E:\\Hadoop\\work\\bashrc.txt";//本地文件
String dst = "hdfs://localhost:9000/user/hadoop/test/bashrc.txt";//复制到hdfs目录下
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
OutputStream out = fs.create(new Path(dst), new Progressable() {//进度条信息
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);//复制
}
}
  • MyWordCount
 1 package com.hadoop.worldcount;
2
3 import java.io.IOException;
4 import java.util.StringTokenizer;
5 import org.apache.hadoop.conf.Configuration;
6 import org.apache.hadoop.fs.Path;
7 import org.apache.hadoop.io.IntWritable;
8 import org.apache.hadoop.io.LongWritable;
9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.Reducer;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15
16 public class MyWordCount {
17
18 public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
19 /**
20 * Mapper中的map方法:
21 * void map(K1 key, V1 value, Context context)
22 * 映射一个单个的输入k/v对到一个中间的k/v对
23 * 输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。
24 * Context:收集Mapper输出的<k,v>对。
25 * Context的write(k, v)方法:增加一个(k,v)对到context
26 * 程序员主要编写Map和Reduce函数.这个Map函数使用StringTokenizer函数对字符串进行分隔,通过write方法把单词存入word中
27 * write方法存入(单词,1)这样的二元组到context中
28 */
29 @Override
30 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
31 throws IOException, InterruptedException {
32 StringTokenizer itr = new StringTokenizer(value.toString());
33 while (itr.hasMoreTokens()) {
34 context.write(new Text(itr.nextToken()), new IntWritable(1));
35 }
36 }
37 }
38
39 public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
40 /**
41 * Reducer类中的reduce方法:
42 * void reduce(Text key, Iterable<IntWritable> values, Context context)
43 * 中k/v来自于map函数中的context,可能经过了进一步处理(combiner),同样通过context输出
44 */
45 @Override
46 protected void reduce(Text key, Iterable<IntWritable> values,
47 Context context) throws IOException, InterruptedException {
48 int sum = 0;
49 for (IntWritable val : values) {
50 sum += val.get();
51 }
52 context.write(key, new IntWritable(sum));
53 }
54 }
55
56 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
57 /**
58 * Configuration:map/reduce的j配置类,向hadoop框架描述map-reduce执行的工作
59 */
60 Configuration conf = new Configuration();
61 Job job = Job.getInstance(conf, "myWordCount"); //设置一个用户定义的job名称
62 job.setJarByClass(MyWordCount.class);
63 job.setMapperClass(TokenizerMapper.class); //为job设置Mapper类
64 job.setCombinerClass(IntSumReducer.class); //为job设置Combiner类
65 job.setReducerClass(IntSumReducer.class); //为job设置Reducer类
66 job.setOutputKeyClass(Text.class); //为job的输出数据设置Key类
67 job.setOutputValueClass(IntWritable.class); //为job输出设置value类
68
69 FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/user/root/input/bashrc.txt"));
70 FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/root/output"));
71
72 System.exit(job.waitForCompletion(true) ?0 : 1); //运行job
73 }
74
75 }
相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:8,910
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,435
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,250
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,061
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,693
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,731