Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

다음의 두 코드는 HDFS Java API로 HDFS를 다루는 기초 코드입니다.

코드 블럭
languagejava
titleHdfsUtils.java
linenumberstrue
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * HDFS 유틸리티.
 *
 * @author Byoung Gon, Kim
 * @since 0.1
 */
public class HdfsUtils {

    /**
     * 지정한 경로를 삭제한다.
     *
     * @param client    DFS Client
     * @param path      삭제할 경로
     * @param recursive Recusive 적용 여부
     * @return 성공시 <tt>true</tt>
     * @throws IOException 파일을 삭제할 수 없는 경우
     */
    public static boolean remove(DFSClient client, String path, boolean recursive) throws IOException {
        return client.exists(path) && client.delete(path, recursive);
    }

    /**
     * Input Split의 경로를 반환한다.
     *
     * @param inputSplit Input Split
     * @return 경로
     */
    public static String getPath(InputSplit inputSplit) {
        FileSplit fileSplit = (FileSplit) inputSplit;
        Path path = fileSplit.getPath();
        return path.toUri().getPath();
    }

    /**
     * DFS Client의 출력 스트립을 얻는다.
     *
     * @param client    DFS Client
     * @param filename  파일명
     * @param overwrite Overwrite 여부
     * @return 출력 스트림
     * @throws IOException HDFS IO를 처리할 수 없는 경우
     */
    public static OutputStream getOutputStream(DFSClient client, String filename, boolean overwrite) throws IOException {
        return client.create(filename, overwrite);
    }

    /**
     * DFS Client의 입력 스트립을 얻는다.
     *
     * @param client   DFS Client
     * @param filename 파일 경로
     * @return 입력 스트림
     * @throws IOException HDFS IO를 처리할 수 없는 경우
     */
    public static InputStream getInputStream(DFSClient client, String filename) throws IOException {
        return client.open(filename);
    }

    /**
     * 출력 스트림을 종료한다.
     *
     * @param outputStream 출력 스트림
     * @throws IOException 출력 스트림을 종료할 수 없는 경우
     */
    public static void closeOuputStream(OutputStream outputStream) throws IOException {
        outputStream.close();
    }

    /**
     * 입력 스트림을 종료한다.
     *
     * @param inputStream 입력 스트림
     * @throws IOException 입력 스트림을 종료할 수 없는 경우
     */
    public static void closeInputStream(InputStream inputStream) throws IOException {
        inputStream.close();
    }

    /**
     * Input Split의 파일명을 반환한다.
     * Input Split은 기본적으로 <tt>file + ":" + start + "+" + length</tt> 형식으로 구성되어 있다.
     *
     * @param inputSplit Input Split
     * @return 파일명
     */
    public static String getFilename(InputSplit inputSplit) {
        String filename = FileUtils.getFilename(inputSplit.toString());
        int start = filename.indexOf(":");
        return filename.substring(0, start);
    }

    /**
     * @param hdfsUrl
     * @return
     * @throws IOException
     */
    public static FileSystem getFileSystem(String hdfsUrl) throws IOException {
        Configuration configuration = new Configuration();
        configuration.set("fs.default.name", hdfsUrl);
        configuration.set("fs.defaultFS", hdfsUrl);
        configuration.set("fs.AbstractFileSystem.hdfs.impl", "org.apache.hadoop.fs.Hdfs");
        FileSystem fileSystem = FileSystem.get(configuration);
        return fileSystem;
    }

    /**
     * 지정한 경로가 존재하는지 확인한다.
     *
     * @param client DFS Client
     * @param path   존재 여부를 판단할 경로
     * @return 존재하면 <tt>true</tt>
     * @throws IOException HDFS IO를 처리할 수 없는 경우
     */
    public static boolean exists(DFSClient client, String path) throws IOException {
        return client.exists(path);
    }

    /**
     * 지정한 경로가 디렉토리인지 확인한다.
     *
     * @param fs   {@link org.apache.hadoop.fs.FileSystem}
     * @param path 경로
     * @return 디렉토리인 경우 <tt>true</tt>
     * @throws IOException HDFS IO를 처리할 수 없는 경우
     */
    public static boolean isDirectory(FileSystem fs, String path) throws IOException {
        try {
            FileStatus status = fs.getFileStatus(new Path(path));
            return status.isDir();
        } catch (FileNotFoundException ex) {
            return false;
        }
    }

    /**
     * HDFS 상에서 지정한 파일을 다른 디렉토리로 파일을 이동시킨다.
     *
     * @param conf            Hadoop Configuration
     * @param path            이동할 파일
     * @param prefixToAppend  파일을 이동할 때 파일명의 prefix에 추가할 문자열
     * @param targetDirectory 목적 디렉토리
     * @throws IOException 파일을 이동할 수 없는 경우
     */
    public static void moveFileToDirectory(Configuration conf, String path, String prefixToAppend, String targetDirectory) throws IOException {
        FileSystem fileSystem = FileSystem.get(conf);
        FileStatus[] statuses = fileSystem.listStatus(new Path(path));
        for (FileStatus fileStatus : statuses) {
            String filename = prefixToAppend + "_" + fileStatus.getPath().getName();
            if (!isExist(conf, targetDirectory + "/" + filename)) {
                fileSystem.rename(fileStatus.getPath(), new Path(targetDirectory + "/" + filename));
            } else {
                throw new RuntimeException("\t  Warn: '" + fileStatus.getPath() + "' cannot moved. Already exists.");
            }
        }
    }

    /**
     * HDFS 상에서 지정한 파일을 다른 디렉토리로 파일을 이동시킨다.
     *
     * @param conf            Hadoop Configuration
     * @param delayFiles      이동할 파일 목록
     * @param targetDirectory 목적 디렉토리
     * @throws IOException 파일을 이동할 수 없는 경우
     */
    public static void moveFilesToDirectory(Configuration conf, List<String> delayFiles, String targetDirectory) throws IOException {
        for (String path : delayFiles) {
            String filename = FileUtils.getFilename(path);
            String delayedFilePrefix = filename.split("-")[0];
            String outputHead = delayedFilePrefix.replaceAll("delay", "");
            String outputMiddle = delayedFilePrefix.substring(0, 5);    // todo
            String outputTail = filename.replaceAll(delayedFilePrefix, "");

            System.out.println("Acceleration Dir " + targetDirectory + "/" + outputHead + "_" + outputMiddle + outputTail);
            makeDirectoryIfNotExists(targetDirectory, conf);

            FileSystem fileSystem = FileSystem.get(conf);
            fileSystem.rename(
                    new Path(path),
                    new Path(targetDirectory + "/" + outputHead + "_" + outputMiddle + outputTail));

            System.out.println("\t Moved: '" + path + "' --> '" + targetDirectory + "'");
        }
    }

    /**
     * HDFS 상에서 지정한 파일을 다른 디렉토리로 파일을 이동시킨다.
     *
     * @param conf            Hadoop Configuration
     * @param paths           이동할 파일 목록
     * @param prefixToAppend  파일을 이동할 때 파일명의 prefix에 추가할 문자열
     * @param targetDirectory 목적 디렉토리
     * @throws IOException 파일을 이동할 수 없는 경우
     */
    public static void moveFilesToDirectory(Configuration conf, List<String> paths, String prefixToAppend, String targetDirectory) throws IOException {
        for (String file : paths) {
            try {
                HdfsUtils.moveFileToDirectory(conf, file, prefixToAppend, targetDirectory);
                System.out.println("\t Moved: '" + file + "' --> '" + targetDirectory + "'");
            } catch (Exception ex) {
                System.err.println(ex.getMessage());
            }
        }
    }

    /**
     * 디렉토리가 존재하지 않는다면 생성한다.
     *
     * @param directory 디렉토리
     * @param conf      Hadoop Configuration
     * @throws IOException HDFS 작업을 실패한 경우
     */
    public static void makeDirectoryIfNotExists(String directory, Configuration conf) throws IOException {
        FileSystem fileSystem = FileSystem.get(conf);
        if (!isExist(conf, directory) && !isDirectory(fileSystem, directory)) {
            fileSystem.mkdirs(new Path(directory));
        }
    }

    /**
     * HDFS의 해당 경로의 모든 파일에서 prefix로 시작하는 파일 목록을 반환한다.
     *
     * @param conf       Configuration
     * @param path       경로
     * @param prefix     파일의 Prefix
     * @param pathFilter 파일을 필터링하는 필터
     * @return 지정한 prefix로 파일명이 시작하는 파일 목록
     * @throws IOException HDFS 작업을 실패한 경우
     */
    public static List<String> getPrefixFiles(Configuration conf, String path, String prefix, PathFilter pathFilter) throws IOException {
        List<String> files = new ArrayList<String>();
        FileSystem fs = FileSystem.get(conf);
        FileStatus[] statuses = fs.listStatus(new Path(path), pathFilter != null ? pathFilter : new BypassPathFilter());
        if (statuses != null) {
            for (FileStatus fileStatus : statuses) {
                if (!fileStatus.isDir() && fileStatus.getPath().getName().startsWith(prefix)) {
                    files.add(fileStatus.getPath().toUri().getPath());
                }
            }
        }
        return files;
    }

    /**
     * HDFS의 해당 경로의 모든 파일에서 가장 최신 파일 하나를 반환한다.
     *
     * @param conf       Hadoop Configuration
     * @param path       경로
     * @param pathFilter 파일을 필터링하는 필터
     * @return 가장 최신 파일
     * @throws IOException HDFS 작업을 실패한 경우
     */
    public static String getLatestFile(Configuration conf, String path, PathFilter pathFilter) throws IOException {
        List<SortableFileStatus> files = new ArrayList<SortableFileStatus>();
        FileSystem fs = FileSystem.get(conf);
        FileStatus[] statuses = fs.listStatus(new Path(path), pathFilter != null ? pathFilter : new BypassPathFilter());
        if (statuses != null) {
            for (FileStatus fileStatus : statuses) {
                if (!fileStatus.isDir()) {
                    files.add(new SortableFileStatus(fileStatus));
                }
            }
        }
        Collections.sort(files);
        FileStatus fileStatus = files.get(0).fileStatus;
        return fileStatus.getPath().toUri().getPath();
    }

    /**
     * 지정한 경로에 파일이 존재하는지 확인한다.
     *
     * @param conf Haodop Job Configuration
     * @param path 존재 여부를 확인할 절대 경로
     * @return 존재한다면 <tt>true</tt>
     * @throws IOException 파일 존재 여부를 알 수 없거나, HDFS에 접근할 수 없는 경우
     */
    public static boolean isExist(Configuration conf, String path) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        return fs.exists(new Path(path));
    }

    /**
     * 해당 경로에 있는 파일을 MERGE한다.
     *
     * @param conf Hadoop Configuration
     * @param path HDFS Path
     * @throws IOException Get Merge할 수 없는 경우
     */
    public static void merge(Configuration conf, String path) throws IOException {
        // 입력 경로의 모든 파일을 Get Merge하여 임시 파일에 기록한다.
        FileSystem fileSystem = FileSystem.get(conf);
        Path source = new Path(path);
        if (!fileSystem.getFileStatus(source).isDir()) {
            // 이미 파일이라면 더이상 Get Merge할 필요없다.
            return;
        }
        Path target = new Path(path + "_temporary");
        FileUtil.copyMerge(fileSystem, source, fileSystem, target, true, conf, null);

        // 원 소스 파일을 삭제한다.
        fileSystem.delete(source, true);

        // 임시 파일을 원 소스 파일명으로 대체한다.
        Path in = new Path(path + "_temporary");
        Path out = new Path(path);
        fileSystem.rename(in, out);

        // 임시 디렉토리를 삭제한다.
        fileSystem.delete(new Path(path + "_temporary"), true);
    }

    /**
     * 지정한 경로를 삭제한다.
     *
     * @param configuration Hadoop Configuration
     * @param path          삭제할 경로
     * @throws IOException 삭제할 수 없는 경우
     */
    public static void delete(Configuration configuration, String path) throws IOException {
        FileSystem fileSystem = FileSystem.get(configuration);
        Path source = new Path(path);
        fileSystem.delete(source, true);
    }
}

...