hdfs學習(三)

HDFS 的 API 操作

使用url方式訪問數據(了解)

@Test
    public void urlHdfs() throws IOException {
        //1.註冊url
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
        //2.獲取hdfs文件的輸入流
        InputStream inputStream=new URL("hdfs://hadoop101:8020/a.txt").openStream();
        //3.獲取本地文件的輸出流
        OutputStream outputStream= new FileOutputStream(new File("E:\\hello.txt"));
        //4.實現文件的拷貝
        IOUtils.copy(inputStream,outputStream);
        //5.關流
        IOUtils.closeQuietly(inputStream);
        IOUtils.closeQuietly(outputStream);
    }

使用文件系統方式訪問數據(掌握)

獲取 FileSystem 的幾種方式

/*
    獲取FileSystem,方式1
     */
    @Test
    public void getFileSystem1() throws IOException{
        //1.創建Configuration對象
        Configuration configuration=new Configuration();
        //2.設置文件系統類型
        configuration.set("fs.defaultFS","hdfs://hadoop101:8020");
        //3.獲取指定的文件系統
        FileSystem fileSystem= FileSystem.get(configuration);
        //4.輸出
        System.out.println(fileSystem);
    }

②用的次數比較多

/*
    獲取FileSystem,方式2
    ctrl+alt+v自動補全返回值
     */
    @Test
    public void getFileSystem2() throws URISyntaxException, IOException, InterruptedException {
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop101:8020"), new Configuration(),"root");
        System.out.println(fileSystem);
    }

/*
    獲取FileSystem,方式3
    ctrl+alt+v自動補全返回值
     */
    @Test
    public void getFileSystem3() throws IOException {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS","hdfs://hadoop101:8020","root");
        //3.獲取指定的文件系統
        FileSystem fileSystem= FileSystem.newInstance(configuration);
        //4.輸出
        System.out.println(fileSystem.toString());
    }

/*
    獲取FileSystem,方式4
    ctrl+alt+v自動補全返回值
     */
    @Test
    public void getFileSystem4() throws URISyntaxException, IOException, InterruptedException {
        FileSystem fileSystem= FileSystem.newInstance(new URI("hdfs://hadoop101:8020"), new Configuration(),"root");
        System.out.println(fileSystem);
    }

遍歷 HDFS中所有文件

/*
    hdfs文件的遍歷
     */
    @Test
    public void listFiles() throws URISyntaxException, IOException, InterruptedException {
        //1.獲取FileSystem實例
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop101:8020"), new Configuration(),"root");
        //2.調用方法listFiles獲取/目錄下的所有文件資訊
        RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(new Path("/"), true);
        //3.遍歷迭代器
        while (iterator.hasNext()){
            LocatedFileStatus fileStatus = iterator.next();

            //獲取文件的絕對路徑:hdfs://hadoop101:/xxx
            System.out.println(fileStatus.getPath()+"----"+fileStatus.getPath().getName());
            //文件的block資訊
            BlockLocation[] blockLocations = fileStatus.getBlockLocations();
            System.out.println("block數目:"+blockLocations.length);
        }
    }

輸出:

 

 

 HDFS 上創建文件夾

 /*
    hdfs創建文件夾
     */
    @Test
    public void mkdirsTest() throws URISyntaxException, IOException, InterruptedException {
        //1.獲取FileSystem實例
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop101:8020"), new Configuration(),"root");
        //2.獲取文件夾
        boolean bl=fileSystem.mkdirs(new Path("/aaa/bbb/ccc/a.txt"));
        //fileSystem.create(new Path("/aaa/bbb/ccc/a.txt"));
        System.out.println(bl);
        //3.關閉FileSystem
        fileSystem.close();
    }

 

 

 下載文件

/*
   文件下載2:使用方法copyToLocalFile下載到本地E盤下的bbb.txt
    */
    @Test
    public void downloadFile2() throws URISyntaxException, IOException, InterruptedException {
        //1.獲取FileSystem
        FileSystem fileSystem=FileSystem.get(new URI("hdfs://hadoop101:8020"),new Configuration(),"root");

        fileSystem.copyToLocalFile(new Path("/a.txt"),new Path("E://bbb.txt"));

        fileSystem.close();

    }
    /*
   文件下載
    */
    @Test
    public void downloadFile() throws URISyntaxException, IOException {
        //1.獲取FileSystem
        FileSystem fileSystem=FileSystem.get(new URI("hdfs://hadoop101:8020"),new Configuration());
        //2.獲取hdfs的輸入流
        FSDataInputStream inputStream = fileSystem.open(new Path("/a.txt"));
        //3.獲取本地路徑的輸出流
        FileOutputStream outputStream = new FileOutputStream("E://a.txt");
        //4.文件的拷貝
        IOUtils.copy(inputStream,outputStream);
        //5.關閉流
        IOUtils.closeQuietly(inputStream);
        IOUtils.closeQuietly(outputStream);
        fileSystem.close();
    }

 

 

 HDFS 文件上傳

 /*
  文件的上傳
   */
    @Test
    public void uploadFile() throws URISyntaxException, IOException, InterruptedException {
        FileSystem fileSystem=FileSystem.get(new URI("hdfs://hadoop101:8020"),new Configuration(),"root");
        fileSystem.copyFromLocalFile(new Path("E://hello.txt"),new Path("/"));
        fileSystem.close();
    }

 

 

小文件合併

由於 Hadoop 擅長存儲大文件,因為大文件的元數據資訊比較少,如果 Hadoop 集群當中有大
量的小文件,那麼每個小文件都需要維護一份元數據資訊,會大大的增加集群管理元數據的
記憶體壓力,所以在實際工作當中,如果有必要一定要將小文件合併成大文件進行一起處理
在我們的 HDFS 的 Shell 命令模式下,可以通過命令行將很多的 hdfs 文件合併成一個大文件下
載到本地

cd /export/servers
hdfs dfs -getmerge /*.xml ./hello.xml

既然可以在下載的時候將這些小文件合併成一個大文件一起下載,那麼肯定就可以在上傳的
時候將小文件合併到一個大文件裡面去

/*
    小文件的合併
     */
    @Test
    public void mergeFile() throws URISyntaxException, IOException, InterruptedException {
        //1.獲取FileSystem
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop101:8020"), new Configuration(), "root");
        //2.獲取hdfs大文件的輸出流
        FSDataOutputStream outputStream = fileSystem.create(new Path("/big_txt.txt"));

        //3.獲取一個本地文件系統
        LocalFileSystem localFileSystem = FileSystem.getLocal(new Configuration());
        //4.獲取本地文件夾下的所有文件的詳情
        FileStatus[] fileStatuses = localFileSystem.listStatus(new Path("E:\\input"));
        //5.遍歷每個文件,獲取每個文件的輸入流
        for (FileStatus fileStatus:fileStatuses){
            FSDataInputStream inputStream=localFileSystem.open(fileStatus.getPath());
            //6.將小文件的數據複製到大文件
            IOUtils.copy(inputStream,outputStream);
            IOUtils.closeQuietly(inputStream);
        }
        IOUtils.closeQuietly(outputStream);
        localFileSystem.close();
        fileSystem.close();
    }

輸出:

 

 裡面的內容是:在E盤下input文件夾下的5個txt文件的總和

 

完成本地小文件的合併,上傳

 

 

 HDFS的高可用機制

在Hadoop 中,NameNode 所處的位置是非常重要的,整個HDFS文件系統的元數據資訊都由
NameNode 來管理,NameNode的可用性直接決定了Hadoop 的可用性,一旦NameNode進程
不能工作了,就會影響整個集群的正常使用。
在典型的HA集群中,兩台獨立的機器被配置為NameNode。在工作集群中,NameNode機器中
的一個處於Active狀態,另一個處於Standby狀態。Active NameNode負責群集中的所有客戶端
操作,而Standby充當從伺服器。Standby機器保持足夠的狀態以提供快速故障切換(如果需
要)。

 

 Hadoop的聯邦機制(Federation)

HDFS Federation是解決namenode記憶體瓶頸問題的水平橫向擴展方案。
Federation意味著在集群中將會有多個namenode/namespace。這些namenode之間是聯合的,
也就是說,他們之間相互獨立且不需要互相協調,各自分工,管理自己的區域。分散式的
datanode被用作通用的數據塊存儲存儲設備。每個datanode要向集群中所有的namenode注
冊,且周期性地向所有namenode發送心跳和塊報告,並執行來自所有namenode的命令。

 

Federation一個典型的例子就是上面提到的NameNode記憶體過高問題,我們完全可以將上面部分
大的文件目錄移到另外一個NameNode上做管理.更重要的一點在於,這些NameNode是共享集
群中所有的DataNode的,它們還是在同一個集群內的**。**
這時候在DataNode上就不僅僅存儲一個Block Pool下的數據了,而是多個(在DataNode的datadir
所在目錄裡面查看BP-xx.xx.xx.xx打頭的目錄)。

概括起來:
多個NN共用一個集群里的存儲資源,每個NN都可以單獨對外提供服務。
每個NN都會定義一個存儲池,有單獨的id,每個DN都為所有存儲池提供存儲。
DN會按照存儲池id向其對應的NN彙報塊資訊,同時,DN會向所有NN彙報本地存儲可用資源
情況。

 

HDFS Federation不足:

HDFS Federation並沒有完全解決單點故障問題。雖然namenode/namespace存在多個,但是從
單個namenode/namespace看,仍然存在單點故障:如果某個namenode掛掉了,其管理的相
應的文件便不可以訪問。Federation中每個namenode仍然像之前HDFS上實現一樣,配有一個
secondary namenode,以便主namenode掛掉一下,用於還原元數據資訊。
所以一般集群規模真的很大的時候,會採用HA+Federation的部署方案。也就是每個聯合的
namenodes都是ha的。

 

 

Tags: