通过Java API操作HDFS上的文件(2.4.1) 2015-02-12 21:00

前提条件

首先通过《Hadoop客户端软件安装(2.4.1)》配置好客户端软件环境。

安装

  • 配置环境变量

将以下jar包和当前目录加入CLASSPATH:
hadoop-2.4.1/share/hadoop/common/hadoop-common-2.4.1.jar
hadoop-2.4.1/share/hadoop/hdfs/hadoop-hdfs-2.4.1.jar
hadoop-2.4.1/share/hadoop/common/lib/目录下的所有jar包

在/home/cheyo/.bashrc中增加如下配置,注明要将当前目录(.)加入CLASSPATH中:

export
CLASSPATH=%CLASSPATH:/opt/hadoop/client/hadoop-2.4.1/share/hadoop/common/hadoop-common-2.4.1.jar:/opt/hadoop/client/hadoop-2.4.1/share/hadoop/hdfs/hadoop-hdfs-2.4.1.jar:/opt/hadoop/client/hadoop-2.4.1/share/hadoop/common/lib/*:.
  • log4j.properties

创建log4j配置文件log4j.properties,内容如下:

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

使用

  • 编译

    javac xxxxx.java
  • 运行

    java xxxxx
  • 运行WordCount Example

首先将待统计的文本文件上传至HDFS的/tmp/input_dir下,然后运行如下命令进行统计:

hadoop jar /opt/hadoop/client/hadoop-2.4.1/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar
    wordcount /tmp/input_dir /tmp/output_dir

示例代码

  • 读文件
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import java.io.*;
import java.net.URI;

public class ReadHDFSFile {

    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://ctrl:9000"), conf);
            Path file = new Path("/tmp/file1.txt");
            FSDataInputStream getIt = fs.open(file);
            BufferedReader d = new BufferedReader(new InputStreamReader(getIt));
            String s = "";
            while ((s = d.readLine()) != null) {
                System.out.println(s);
            }
            d.close();
            fs.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • 写文件
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.net.URI;

public class WriteHDFSFile {
    public static void main(String[] args) throws IOException {
        try {
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://ctrl:9000"), conf);
            Path path = new Path("/tmp/newfile2.txt");
            FSDataOutputStream out = fs.create(path);
            out.writeUTF("hello, I'm cheyo. I'm rocky. rocky is my new English name");
            fs.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • 创建目录
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import java.io.*;
import java.net.URI;

public class MakeHDFSDir {

    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://ctrl:9000"), conf);
            Path path = new Path("/tmp/dir2");
            fs.mkdirs(path);
            fs.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • 删除目录
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import java.io.*;
import java.net.URI;

public class DeleteHDFSDir {

    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://ctrl:9000"), conf);
            Path path = new Path("/tmp/dir2");
            fs.delete(path);
            fs.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • 上传文件
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.net.URI;

public class UploadFile {

    public static void main(String[] args) throws IOException {
        try {
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://ctrl:9000"), conf);
            Path src = new Path("/tmp/t1.txt");
            Path dst = new Path("/tmp/t1_new.txt");
            fs.copyFromLocalFile(src, dst);
            fs.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
Tags: #HDFS #MapReduce    Post on Hadoop