다음의 두 코드는 HDFS Java API로 HDFS를 다루는 기초 코드입니다.
HdfsUtils.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** * HDFS 유틸리티. * * @author Byoung Gon, Kim * @since 0.1 */ public class HdfsUtils { /** * 지정한 경로를 삭제한다. * * @param client DFS Client * @param path 삭제할 경로 * @param recursive Recusive 적용 여부 * @return 성공시 <tt>true</tt> * @throws IOException 파일을 삭제할 수 없는 경우 */ public static boolean remove(DFSClient client, String path, boolean recursive) throws IOException { return client.exists(path) && client.delete(path, recursive); } /** * Input Split의 경로를 반환한다. * * @param inputSplit Input Split * @return 경로 */ public static String getPath(InputSplit inputSplit) { FileSplit fileSplit = (FileSplit) inputSplit; Path path = fileSplit.getPath(); return path.toUri().getPath(); } /** * DFS Client의 출력 스트립을 얻는다. * * @param client DFS Client * @param filename 파일명 * @param overwrite Overwrite 여부 * @return 출력 스트림 * @throws IOException HDFS IO를 처리할 수 없는 경우 */ public static OutputStream getOutputStream(DFSClient client, String filename, boolean overwrite) throws IOException { return client.create(filename, overwrite); } /** * DFS Client의 입력 스트립을 얻는다. * * @param client DFS Client * @param filename 파일 경로 * @return 입력 스트림 * @throws IOException HDFS IO를 처리할 수 없는 경우 */ public static InputStream getInputStream(DFSClient client, String filename) throws IOException { return client.open(filename); } /** * 출력 스트림을 종료한다. * * @param outputStream 출력 스트림 * @throws IOException 출력 스트림을 종료할 수 없는 경우 */ public static void closeOuputStream(OutputStream outputStream) throws IOException { outputStream.close(); } /** * 입력 스트림을 종료한다. * * @param inputStream 입력 스트림 * @throws IOException 입력 스트림을 종료할 수 없는 경우 */ public static void closeInputStream(InputStream inputStream) throws IOException { inputStream.close(); } /** * Input Split의 파일명을 반환한다. * Input Split은 기본적으로 <tt>file + ":" + start + "+" + length</tt> 형식으로 구성되어 있다. * * @param inputSplit Input Split * @return 파일명 */ public static String getFilename(InputSplit inputSplit) { String filename = FileUtils.getFilename(inputSplit.toString()); int start = filename.indexOf(":"); return filename.substring(0, start); } /** * @param hdfsUrl * @return * @throws IOException */ public static FileSystem getFileSystem(String hdfsUrl) throws IOException { Configuration configuration = new Configuration(); configuration.set("fs.default.name", hdfsUrl); configuration.set("fs.defaultFS", hdfsUrl); configuration.set("fs.AbstractFileSystem.hdfs.impl", "org.apache.hadoop.fs.Hdfs"); FileSystem fileSystem = FileSystem.get(configuration); return fileSystem; } /** * 지정한 경로가 존재하는지 확인한다. * * @param client DFS Client * @param path 존재 여부를 판단할 경로 * @return 존재하면 <tt>true</tt> * @throws IOException HDFS IO를 처리할 수 없는 경우 */ public static boolean exists(DFSClient client, String path) throws IOException { return client.exists(path); } /** * 지정한 경로가 디렉토리인지 확인한다. * * @param fs {@link org.apache.hadoop.fs.FileSystem} * @param path 경로 * @return 디렉토리인 경우 <tt>true</tt> * @throws IOException HDFS IO를 처리할 수 없는 경우 */ public static boolean isDirectory(FileSystem fs, String path) throws IOException { try { FileStatus status = fs.getFileStatus(new Path(path)); return status.isDir(); } catch (FileNotFoundException ex) { return false; } } /** * HDFS 상에서 지정한 파일을 다른 디렉토리로 파일을 이동시킨다. * * @param conf Hadoop Configuration * @param path 이동할 파일 * @param prefixToAppend 파일을 이동할 때 파일명의 prefix에 추가할 문자열 * @param targetDirectory 목적 디렉토리 * @throws IOException 파일을 이동할 수 없는 경우 */ public static void moveFileToDirectory(Configuration conf, String path, String prefixToAppend, String targetDirectory) throws IOException { FileSystem fileSystem = FileSystem.get(conf); FileStatus[] statuses = fileSystem.listStatus(new Path(path)); for (FileStatus fileStatus : statuses) { String filename = prefixToAppend + "_" + fileStatus.getPath().getName(); if (!isExist(conf, targetDirectory + "/" + filename)) { fileSystem.rename(fileStatus.getPath(), new Path(targetDirectory + "/" + filename)); } else { throw new RuntimeException("\t Warn: '" + fileStatus.getPath() + "' cannot moved. Already exists."); } } } /** * HDFS 상에서 지정한 파일을 다른 디렉토리로 파일을 이동시킨다. * * @param conf Hadoop Configuration * @param delayFiles 이동할 파일 목록 * @param targetDirectory 목적 디렉토리 * @throws IOException 파일을 이동할 수 없는 경우 */ public static void moveFilesToDirectory(Configuration conf, List<String> delayFiles, String targetDirectory) throws IOException { for (String path : delayFiles) { String filename = FileUtils.getFilename(path); String delayedFilePrefix = filename.split("-")[0]; String outputHead = delayedFilePrefix.replaceAll("delay", ""); String outputMiddle = delayedFilePrefix.substring(0, 5); // todo String outputTail = filename.replaceAll(delayedFilePrefix, ""); System.out.println("Acceleration Dir " + targetDirectory + "/" + outputHead + "_" + outputMiddle + outputTail); makeDirectoryIfNotExists(targetDirectory, conf); FileSystem fileSystem = FileSystem.get(conf); fileSystem.rename( new Path(path), new Path(targetDirectory + "/" + outputHead + "_" + outputMiddle + outputTail)); System.out.println("\t Moved: '" + path + "' --> '" + targetDirectory + "'"); } } /** * HDFS 상에서 지정한 파일을 다른 디렉토리로 파일을 이동시킨다. * * @param conf Hadoop Configuration * @param paths 이동할 파일 목록 * @param prefixToAppend 파일을 이동할 때 파일명의 prefix에 추가할 문자열 * @param targetDirectory 목적 디렉토리 * @throws IOException 파일을 이동할 수 없는 경우 */ public static void moveFilesToDirectory(Configuration conf, List<String> paths, String prefixToAppend, String targetDirectory) throws IOException { for (String file : paths) { try { HdfsUtils.moveFileToDirectory(conf, file, prefixToAppend, targetDirectory); System.out.println("\t Moved: '" + file + "' --> '" + targetDirectory + "'"); } catch (Exception ex) { System.err.println(ex.getMessage()); } } } /** * 디렉토리가 존재하지 않는다면 생성한다. * * @param directory 디렉토리 * @param conf Hadoop Configuration * @throws IOException HDFS 작업을 실패한 경우 */ public static void makeDirectoryIfNotExists(String directory, Configuration conf) throws IOException { FileSystem fileSystem = FileSystem.get(conf); if (!isExist(conf, directory) && !isDirectory(fileSystem, directory)) { fileSystem.mkdirs(new Path(directory)); } } /** * HDFS의 해당 경로의 모든 파일에서 prefix로 시작하는 파일 목록을 반환한다. * * @param conf Configuration * @param path 경로 * @param prefix 파일의 Prefix * @param pathFilter 파일을 필터링하는 필터 * @return 지정한 prefix로 파일명이 시작하는 파일 목록 * @throws IOException HDFS 작업을 실패한 경우 */ public static List<String> getPrefixFiles(Configuration conf, String path, String prefix, PathFilter pathFilter) throws IOException { List<String> files = new ArrayList<String>(); FileSystem fs = FileSystem.get(conf); FileStatus[] statuses = fs.listStatus(new Path(path), pathFilter != null ? pathFilter : new BypassPathFilter()); if (statuses != null) { for (FileStatus fileStatus : statuses) { if (!fileStatus.isDir() && fileStatus.getPath().getName().startsWith(prefix)) { files.add(fileStatus.getPath().toUri().getPath()); } } } return files; } /** * HDFS의 해당 경로의 모든 파일에서 가장 최신 파일 하나를 반환한다. * * @param conf Hadoop Configuration * @param path 경로 * @param pathFilter 파일을 필터링하는 필터 * @return 가장 최신 파일 * @throws IOException HDFS 작업을 실패한 경우 */ public static String getLatestFile(Configuration conf, String path, PathFilter pathFilter) throws IOException { List<SortableFileStatus> files = new ArrayList<SortableFileStatus>(); FileSystem fs = FileSystem.get(conf); FileStatus[] statuses = fs.listStatus(new Path(path), pathFilter != null ? pathFilter : new BypassPathFilter()); if (statuses != null) { for (FileStatus fileStatus : statuses) { if (!fileStatus.isDir()) { files.add(new SortableFileStatus(fileStatus)); } } } Collections.sort(files); FileStatus fileStatus = files.get(0).fileStatus; return fileStatus.getPath().toUri().getPath(); } /** * 지정한 경로에 파일이 존재하는지 확인한다. * * @param conf Haodop Job Configuration * @param path 존재 여부를 확인할 절대 경로 * @return 존재한다면 <tt>true</tt> * @throws IOException 파일 존재 여부를 알 수 없거나, HDFS에 접근할 수 없는 경우 */ public static boolean isExist(Configuration conf, String path) throws IOException { FileSystem fs = FileSystem.get(conf); return fs.exists(new Path(path)); } /** * 해당 경로에 있는 파일을 MERGE한다. * * @param conf Hadoop Configuration * @param path HDFS Path * @throws IOException Get Merge할 수 없는 경우 */ public static void merge(Configuration conf, String path) throws IOException { // 입력 경로의 모든 파일을 Get Merge하여 임시 파일에 기록한다. FileSystem fileSystem = FileSystem.get(conf); Path source = new Path(path); if (!fileSystem.getFileStatus(source).isDir()) { // 이미 파일이라면 더이상 Get Merge할 필요없다. return; } Path target = new Path(path + "_temporary"); FileUtil.copyMerge(fileSystem, source, fileSystem, target, true, conf, null); // 원 소스 파일을 삭제한다. fileSystem.delete(source, true); // 임시 파일을 원 소스 파일명으로 대체한다. Path in = new Path(path + "_temporary"); Path out = new Path(path); fileSystem.rename(in, out); // 임시 디렉토리를 삭제한다. fileSystem.delete(new Path(path + "_temporary"), true); } /** * 지정한 경로를 삭제한다. * * @param configuration Hadoop Configuration * @param path 삭제할 경로 * @throws IOException 삭제할 수 없는 경우 */ public static void delete(Configuration configuration, String path) throws IOException { FileSystem fileSystem = FileSystem.get(configuration); Path source = new Path(path); fileSystem.delete(source, true); } }
FileSystemUtils.java
import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.helpers.MessageFormatter; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.URI; import java.net.UnknownHostException; /** * File System Utility. * * @author Byoung Gon, Kim * @since 0.2 */ public class FileSystemUtils { /** * 지정한 경로를 보정한다. <tt>hdfs://</tt> 또는 <tt>file:</tt> 으로 시작하지 않는 경우 * 기본으로 로컬 파일 시스템으로 간주하고 <tt>file:</tt>을 붙인다. * * @param path 파일 또는 디렉토리의 경로 * @return 보정한 디렉토리의 경로 */ public static String correctPath(String path) { if (path == null || path.length() < 1) { return null; } Path filePath = new Path(path); String scheme = filePath.toUri().getScheme(); if (scheme == null) { return "file:" + path; } return path; } /** * 지정한 경로를 보정한다. <tt>hdfs://</tt> 또는 <tt>file:</tt> 으로 시작하지 않는 경우 * 기본으로 로컬 파일 시스템으로 간주하고 <tt>file:</tt>을 붙인다. * * @param parent 부모 경로 * @param child 자식 경로 * @return 보정한 디렉토리의 경로 */ public static String correctPath(String parent, String child) { if (isEmpty(parent) || isEmpty(child)) { return null; } String path = parent + "/" + child; return correctPath(path); } /** * 지정한 경로를 보정한다. <tt>hdfs://</tt> 또는 <tt>file:</tt> 으로 시작하지 않는 경우 * 기본으로 로컬 파일 시스템으로 간주하고 <tt>file:</tt>을 붙인다. * * @param parent 부모 경로 * @param prefix 파일명의 prefix * @param filename 파일명 * @return 보정한 디렉토리의 경로 */ public static String correctPath(String parent, String prefix, String filename) { if (isEmpty(parent) || isEmpty(prefix) || isEmpty(filename)) { return null; } String path = parent + "/" + prefix + "_" + filename; return correctPath(path); } /** * 지정한 경로가 파일 시스템의 경로인지 확인한다. * * @param path 경로 * @param scheme 파일 시스템의 scheme(예; hdfs:// file:) */ public static void checkScheme(String path, FileSystemScheme scheme) { if (path == null || path.length() < 1) { return; } if (!path.startsWith(scheme.getScheme())) { throw new ServiceException(ExceptionUtils.getMessage("Invalid file system '{}' => Path '{}'", scheme.getScheme(), path)); } } /** * 지정한 디렉토리가 로컬 경로인 경우 보정을 하고 경로를 생성한다. * * @param path 경로 */ public static void testCorrentAndCreateDir(String path) { String correctedPath = correctPath(path); testCreateDir(new Path(correctedPath)); } /** * 지정한 디렉토리를 테스트하여 존재하지 않으면 생성한다. * 다만 존재하면서 디렉토리가 아닌 파일이라면 예외를 발생시킨다. * * @param path 경로 * @throws ServiceException 디렉토리를 생성할 수 없거나, 존재하지만 디렉토리가 아니라 파일인 경우 */ public static void testCreateDir(Path path) { try { Configuration conf = new Configuration(); FileSystem fs = path.getFileSystem(conf); if (fs.exists(path) && !fs.getFileStatus(path).isDirectory()) { throw new ServiceException(ExceptionUtils.getMessage("'{}' is not directory.", path)); } if (!fs.exists(path)) { if (!fs.mkdirs(path)) { throw new ServiceException(ExceptionUtils.getMessage("Cannot create '{}'", path)); } } } catch (Exception ex) { String message = MessageFormatter.format("Cannot create '{}'", path.toString()).getMessage(); throw new ServiceException(message, ex); } } /** * 지정한 경로의 파일 시스템을 반환한다. * * @param path 파일 시스템을 얻을 경로 * @return 파일 시스템 */ public static FileSystem getFileSystem(Path path) { try { Configuration conf = new Configuration(); return path.getFileSystem(conf); } catch (Exception ex) { throw new ServiceException(ExceptionUtils.getMessage("Cannot get file system of '{}'", path), ex); } } /** * 지정한 경로의 파일 시스템을 반환한다. * * @param path 파일 시스템을 얻을 경로 * @return 파일 시스템 */ public static FileSystem getFileSystem(String path) { return getFileSystem(new Path(path)); } /** * 두 입력 경로의 파일 시스템이 동일한지 확인한다. * * @param path1 경로1 * @param path2 경로2 */ public static void validateSameFileSystem(String path1, String path2) { Path p1 = new Path(correctPath(path1)); Path p2 = new Path(correctPath(path2)); FileSystem fs1 = null; FileSystem fs2 = null; try { fs1 = p1.getFileSystem(new Configuration()); fs2 = p2.getFileSystem(new Configuration()); } catch (Exception ex) { throw new ServiceException(ExceptionUtils.getMessage("Cannot access '{}' or '{}'.", p1, p2), ex); } if (!compareFs(fs1, fs2)) { throw new ServiceException(ExceptionUtils.getMessage( "File system is not same : {}, {}", p1, p2 )); } if (p1.equals(p2)) { throw new ServiceException(ExceptionUtils.getMessage("Same path : {}, {}", p1, p2)); } } /** * 두 파일 시스템에 대해서 URI의 scheme을 확인하여 동일한지 확인한다. * * @param fs1 파일시스템1 * @param fs2 파일시스템2 * @return 동일하다면 <tt>true</tt> */ private static boolean compareFs(FileSystem fs1, FileSystem fs2) { URI uri1 = fs1.getUri(); URI uri2 = fs2.getUri(); if (uri1.getScheme() == null) { return false; } if (!uri1.getScheme().equals(uri2.getScheme())) { return false; } String srcHost = uri1.getHost(); String dstHost = uri2.getHost(); if ((srcHost != null) && (dstHost != null)) { try { srcHost = InetAddress.getByName(srcHost).getCanonicalHostName(); dstHost = InetAddress.getByName(dstHost).getCanonicalHostName(); } catch (UnknownHostException ue) { return false; } if (!srcHost.equals(dstHost)) { return false; } } else if (srcHost == null && dstHost != null) { return false; } else if (srcHost != null) { return false; } // 포트 확인 return uri1.getPort() == uri2.getPort(); } /** * 지정한 경로의 파일을 로딩한다. 지정한 경로에 파일이 존재하는 경우 해당 경로의 파일을 로딩한다. * 다만 경로에 파일명만 전달된 경우 현재 작업 디렉토리에서 파일을 찾는다. * * @param sourceFilePath 로딩할 파일의 경로 * @return 파일의 내용 * @throws ServiceException 파일을 로딩할 수 없는 경우 */ public static String load(String sourceFilePath) { String workingDirectory = System.getProperty("user.dir"); try { if (new File(sourceFilePath).exists()) { String filePath = correctPath(sourceFilePath); return loadFromFile(filePath); } else { String filePath = new File(workingDirectory, sourceFilePath).getAbsolutePath(); return loadFromFile(filePath); } } catch (IOException ex) { throw new ServiceException(ExceptionUtils.getMessage("Cannot load '{}'.", sourceFilePath), ex); } } /** * 지정한 경로의 파일을 로딩한다. 지정한 경로에 파일이 존재하는 경우 해당 경로의 파일을 로딩한다. * 다만 경로에 파일명만 전달된 경우 현재 작업 디렉토리에서 파일을 찾는다. * * @param sourceFilePath 로딩할 파일의 경로 * @return 파일의 내용 * @throws ServiceException 파일을 로딩할 수 없는 경우 */ public static byte[] loadBytes(String sourceFilePath) { String workingDirectory = System.getProperty("user.dir"); try { if (new File(sourceFilePath).exists()) { String filePath = correctPath(sourceFilePath); return loadBytesFromFile(filePath); } else { String filePath = new File(workingDirectory, sourceFilePath).getAbsolutePath(); return loadBytesFromFile(filePath); } } catch (IOException ex) { throw new ServiceException(ExceptionUtils.getMessage("Cannot load '{}'.", sourceFilePath), ex); } } /** * 지정한 경로의 파일을 로딩한다. 지정한 경로에 파일이 존재하는 경우 해당 경로의 파일을 로딩한다. * 다만 경로에 파일명만 전달된 경우 현재 작업 디렉토리에서 파일을 찾는다. * * @param sourceFilePath 로딩할 파일의 경로 * @return 파일의 내용 * @throws ServiceException 파일을 로딩할 수 없는 경우 */ public static String loadFromFile(String sourceFilePath) throws IOException { byte[] bytes = loadBytesFromFile(sourceFilePath); return new String(bytes); } /** * 지정한 경로의 파일을 로딩한다. 지정한 경로에 파일이 존재하는 경우 해당 경로의 파일을 로딩한다. * 다만 경로에 파일명만 전달된 경우 현재 작업 디렉토리에서 파일을 찾는다. * * @param sourceFilePath 로딩할 파일의 경로 * @return 파일의 내용 * @throws ServiceException 파일을 로딩할 수 없는 경우 */ public static byte[] loadBytesFromFile(String sourceFilePath) throws IOException { FileSystem fs = getFileSystem(sourceFilePath); FSDataInputStream is = fs.open(new Path(sourceFilePath)); byte[] bytes = FileCopyUtils.copyToByteArray(is); IOUtils.closeQuietly(is); return bytes; } /** * 지정한 경로의 파일을 로딩한다. 지정한 경로에 파일이 존재하는 경우 해당 경로의 파일을 로딩한다. * 다만 경로에 파일명만 전달된 경우 현재 작업 디렉토리에서 파일을 찾는다. * * @param sourceFileSystem 파일 시스템 * @param sourceFilePath 로딩할 파일의 경로 * @return 파일의 내용 * @throws ServiceException 파일을 로딩할 수 없는 경우 */ public static byte[] loadBytesFromFile(FileSystem sourceFileSystem, String sourceFilePath) throws IOException { FSDataInputStream is = sourceFileSystem.open(new Path(sourceFilePath)); byte[] bytes = FileCopyUtils.copyToByteArray(is); IOUtils.closeQuietly(is); return bytes; } /** * 지정한 경로에 파일을 저장한다. 지정한 경로에 파일이 존재하지 않는 경우 * 저장하는 파일의 경로는 현재 작업 디렉토리로 결정한다. * 예를 들어 <tt>path</tt> 변수에 파일명만 지정한다면 파일이 존재하지 않는다고 가정하므로 현재 작업 디렉토리에 해당 파일을 기록하게 된다. * * @param source 파일의 내용 * @param targetFilePath 저장할 파일의 경로 * @return 정상 저장 여부(<tt>true</tt> 면 정상적으로 저장) * @throws ServiceException 파일을 저장할 수 없는 경우 */ public static boolean save(byte[] source, String targetFilePath) { String workingDirectory = System.getProperty("user.dir"); try { // 지정한 경로에 파일이 존재하지 않으면 현재 작업 디렉토리에 파일을 저장한다. if (!new File(targetFilePath).exists()) { String filePath = new File(workingDirectory, targetFilePath).getAbsolutePath(); return saveToFile(source, filePath); } else { String filePath = new File(targetFilePath).getAbsolutePath(); return saveToFile(source, filePath); } } catch (Exception ex) { throw new ServiceException(ExceptionUtils.getMessage("Cannot save '{}'.", targetFilePath), ex); } } /** * 지정한 경로에 파일을 기록한다. * * @param source 파일의 내용 * @param targetFilePath 저장할 파일의 경로 * @return 정상 저장 여부(<tt>true</tt> 면 정상적으로 저장) */ public static boolean saveToFile(byte[] source, String targetFilePath) { try { FileSystem fs = getFileSystem(targetFilePath); FSDataOutputStream fos = fs.create(new Path(targetFilePath)); FileCopyUtils.copy(source, fos); IOUtils.closeQuietly(fos); return fs.exists(new Path(targetFilePath)); } catch (Exception ex) { throw new ServiceException(ExceptionUtils.getMessage("Unable to save '{}'.", targetFilePath), ex); } } /** * 지정한 경로에 파일을 기록한다. * * @param targetFileSystem 파일을 저장할 경로의 파일 시스템 * @param source 파일의 내용 * @param targetFilePath 저장할 파일의 경로 * @return 정상 저장 여부(<tt>true</tt> 면 정상적으로 저장) * @throws IOException 파일을 저장할 수 없는 경우 */ public static boolean saveToFile(byte[] source, FileSystem targetFileSystem, String targetFilePath) throws IOException { FSDataOutputStream fos = targetFileSystem.create(new Path(targetFilePath)); FileCopyUtils.copy(source, fos); IOUtils.closeQuietly(fos); return targetFileSystem.exists(new Path(targetFilePath)); } }
1 Comment
Edward
Feb 14, 2021파일 목록을 정렬할 수 있도록 별도로 구성한 File Status. 수정 날짜 기준으로 정렬을 지원하도록 작성함