搭建rtmp直播流服務之3:java開發ffmpeg實現rtsp轉rtmp並實現ffmpeg命令的介面化管理架構設計及程式碼實現

  • 2019 年 11 月 1 日
  • 筆記

版權聲明:本文為部落客原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。

本文鏈接:https://blog.csdn.net/eguid_1/article/details/51787646

上一篇文章簡單介紹了java如何調用ffmpeg的命令:http://blog.csdn.net/eguid_1/article/details/51777716

上上一篇介紹了nginx-rtmp伺服器的搭建:http://blog.csdn.net/eguid_1/article/details/51749830

這一篇將進一步深挖java對ffmepg命令的控制並最終實現服務介面化

本篇文章源碼:http://download.csdn.net/detail/eguid_1/9563637

github項目地址:https://github.com/eguid/FFCH4J

通知:由於很多同學反映本章程式碼的命令封裝設計的不是很好,所以對本章程式碼重新進行了實現,新版本推翻了本章原有程式碼內部實現,介面設計更加利於注入自己的實現,並增加可執行原生ffmpeg命令功能

新版本請到這裡查看:java封裝FFmpeg命令,支援原生ffmpeg全部命令,實現FFmpeg多進程處理與多執行緒輸出控制(開啟、關閉、查詢),rtsp/rtmp推流、拉流

(一)、簡單介紹

該服務介面可實現rtsp協議轉換為rtmp協議且可以實現rtmp直播流發布到nginx流媒體伺服器,其中最為重要的是如何實現通過參數生成ffmpeg命令並執行,且可以通過介面進行控制ffmpeg命令的停止

(二)、實現ffmpeg介面化服務架構設計

push端介面化管理

一、介面化調用

1、採用多執行緒方式,每次調用push埠開啟一個主進程及兩個輸出執行緒 2、可以對每個push端(執行緒)進行開啟和關閉的控制 3、統一介面參數,對ffmpeg的命令做到參數可控制

二、架構設計

1、服務介面

PushManager提供push(開啟一個push處理器),closePush(關閉push處理器),viewAppName介面(查看當前已經開啟的應用)

1.1、應用名和push處理器的關係

一個處理器對應一個應用名

1.2、push處理器

一個處理器對應一個push主進程和兩個輸出執行緒

2、主進程式控制制

2.1、主進程開啟

服務介面調用push處理器開啟push主進程,主進程會自動開啟兩個輸出執行緒用於消息輸出, 開啟後會將主進程Process和兩個輸出執行緒OutHandler通過map返回給服務介面。

2.2、主進程關閉

主進程可通過Process的destroy方法進行安全關閉

3、輸出執行緒控制

3.1、輸出執行緒開啟

輸出執行緒從主進程獲取到輸出流進行輸出

3.2、輸出執行緒關閉

輸出執行緒重寫了destory方法,用於安全的關閉輸出執行緒

4、持久層控制

持久層分為兩個:

1、appName(應用名)-pushId(push處理器的ID)對應關係

用於維護應用名和push處理器ID的對應關係,pushId為隨機生成id

2、pushId-主進程-輸出執行緒對應關係

主要用於存放主進程(Process)和兩個輸出執行緒,建立兩者對應關係,方便服務介面管理

(三)、程式碼實現

1、PushManager實現

/**   * 實現push管理器的push,delete,view服務   *   * @author eguid   * @see PushMangerImpl   * @since jdk1.7   */    public class PushManagerImpl implements PushManager  {      /**       * 引用push處理器       */      private PushHandler pusher = new PushHandlerImpl();      /**       * 管理應用名和push處理器之間的關係       */      private PushId_AppRelshipDao pard=new PushId_AppRelshipDaoImpl();      /**       * 管理處理器的主進程Process及兩個輸出執行緒的關係       */      private HandlerDao hd = new HandlerDaoImpl();        public void setPusher(PushHandler pusher)      {          this.pusher = pusher;      }        public void setPard(PushId_AppRelshipDao pard)      {          this.pard = pard;      }        public void setHd(HandlerDao hd)      {          this.hd = hd;      }        @Override      public String push(Map<String, Object> map)      {          if(map==null||map.isEmpty()||!map.containsKey("appName"))          {              return null;          }          String appName=null;          ConcurrentMap<String, Object> resultMap = null;          try          {              appName=(String)map.get("appName");              if(appName!=null&&"".equals(appName.trim()))              {                  return null;              }              resultMap = pusher.push(map);              // 生成一個標識該命令行執行緒集的key              String pushId = UUID.randomUUID().toString();              hd.set(pushId, resultMap);              pard.set(appName, pushId);          }          catch (IOException e)          {              // 暫時先寫這樣,後期加日誌              System.err.println("發生一個異常" + e.getMessage());          }          return appName;      }        @Override      public void closePush(String appName)      {          String pushId=null;          if(pard.isHave(appName))          {              pushId= pard.getPushId(appName);          }          if (pushId!=null&&hd.isHave(pushId))          {              ConcurrentMap<String, Object> map = hd.get(pushId);              //關閉兩個執行緒              ((OutHandler)map.get("error")).destroy();              ((OutHandler)map.get("info")).destroy();              //暫時先這樣寫,後期加日誌              System.out.println("停止命令-----end commond");              //關閉命令主進程              ((Process)map.get("process")).destroy();              //刪除處理器與執行緒對應關係表              hd.delete(pushId);              //刪除應用名對應關係表              pard.delete(appName);          }      }        @Override      public List<String> viewAppName()      {          return pard.getAll();      }

2、pushHandler實現(push處理器)

/**   * 提供解析參數生成ffmpeg命令並處理push操作   * @see PushHandlerImpl   * @since jdk1.7   */    public class PushHandlerImpl implements PushHandler  {      /*       * "ffmpeg -i "+ "rtsp://admin:[email protected]:37779/cam/realmonitor?channel=1&subtype=0 "+" -f flv -r 25 -s 640x360 -an" + " rtmp://192.168.30.21/live/test"       * 推送流格式: name:應用名;input:接收地址;output:推送地址;fmt:影片格式;fps:影片幀率;rs:影片解析度;disableAudio:是否開啟音頻       */      @Override      public ConcurrentMap<String, Object> push(Map<String, Object> paramMap)          throws IOException      {          // 從map裡面取數據,組裝成命令          String comm = getComm4Map(paramMap);          ConcurrentMap<String, Object> resultMap = null;          // 執行命令行            final Process proc = Runtime.getRuntime().exec(comm);          System.out.println("執行命令----start commond");          OutHandler errorGobbler = new OutHandler(proc.getErrorStream(), "Error");          OutHandler outputGobbler = new OutHandler(proc.getInputStream(), "Info");            errorGobbler.start();          outputGobbler.start();          // 返回參數          resultMap = new ConcurrentHashMap<String, Object>();          resultMap.put("info", outputGobbler);          resultMap.put("error", errorGobbler);          resultMap.put("process", proc);          return resultMap;      }        /**       * 通過解析參數生成可執行的命令行字元串;       * name:應用名;input:接收地址;output:推送地址;fmt:影片格式;fps:影片幀率;rs:影片解析度;disableAudio:是否開啟音頻       *       * @param paramMap       * @return 命令行字元串       */      protected String getComm4Map(Map<String, Object> paramMap)      {          // -i:輸入流地址或者文件絕對地址          StringBuilder comm = new StringBuilder("ffmpeg -i ");          // 是否有必輸項:輸入地址,輸出地址,應用名          if (paramMap.containsKey("input") && paramMap.containsKey("output")              && paramMap.containsKey("appName"))          {              comm.append(paramMap.get("input")).append(" ");              // -f :轉換格式,默認flv              comm.append(" -f ").append(paramMap.containsKey("fmt") ? paramMap.get("fmt") : "flv").append(" ");              // -r :幀率,默認25              comm.append("-r ").append(paramMap.containsKey("fps") ? paramMap.get("fps") : "30").append(" ");              // -s 解析度 默認是原解析度              comm.append("-s ").append(paramMap.containsKey("rs") ? paramMap.get("rs") : "").append(" ");              // -an 禁用音頻              comm.append("-an ").append(paramMap.containsKey("disableAudio") && ((Boolean)paramMap.get("disableAudio")) ? "-an" : "").append(" ");              // 輸出地址              comm.append(paramMap.get("output"));              //發布的應用名              comm.append(paramMap.get("appName"));              //一個影片源,可以有多個輸出,第二個輸出為拷貝源影片輸出,不改變影片的各項參數並且命名為應用名+HD              comm.append(" ").append(" -vcodec copy -f flv -an ").append(paramMap.get("output")).append(paramMap.get("appName")).append("HD");              System.out.println(comm.toString());              return comm.toString();          }          else          {              throw new RuntimeException("輸入流地址不能為空!");          }        }  }

3、OutHandler(輸出執行緒)

**   * 用於輸出命令行主進程的消息執行緒(必須開啟,否則命令行主進程無法正常執行) 重要:該類重寫了destroy方法,用於安全的關閉該執行緒   *   * @author eguid   * @see OutHandler   * @since jdk1.7   */    public class OutHandler extends Thread  {      // 控制狀態      volatile boolean status = true;        BufferedReader br = null;        String type = null;        public OutHandler(InputStream is, String type)      {          br = new BufferedReader(new InputStreamReader(is));          this.type = type;      }        /**       * 重寫執行緒銷毀方法,安全的關閉執行緒       */      @Override      public void destroy()      {          status = false;      }        /**       * 執行輸出執行緒       */      @Override      public void run()      {          String msg = null;          try          {              while (status)              {                    if ((msg = br.readLine()) != null)                  {                      System.out.println(type + "消息:" + msg);                  }              }          }          catch (IOException e)          {              e.printStackTrace();          }      }    }

4、兩個dao層介面,方便後期實現該介面並實現持久化

4.1、主進程(Process)和兩個輸出執行緒Dao

/**   * 命令行執行處理器快取,方便管理處理器的開啟和關閉   * @author eguid   * @see HandlerDao   * @since  jdk1.7   */    public interface HandlerDao  {      /**       *  獲取某個處理器       * @param pushId       * @return       */      public ConcurrentMap get(String pushId);      /**       * 存放一個處理器       * @param handlerMap       */      public void set(String key, ConcurrentMap<String, Object> resultMap);      /**       * 獲取全部處理器的id       * @return       */      public ConcurrentMap getAll();      /**       * 刪除某個處理器       * @param pushId       */      public void delete(String pushId);      /**       * 是否存在key       */      public boolean isHave(String pushId);  }

4.2、應用名-pushId對應關係Dao

/**   * 用於維護管理應用名與pushId的關係對應   * @author eguid   * @see PushId_AppRelshipDao   * @since  jdk1.7   */  public interface PushId_AppRelshipDao  {  /**   *  獲取應用名對應的pushId   * @param appName   * @return pushId   */  public String getPushId(String appName);  /**   *  插入一個應用名和pushId對應   * @param appName   * @param pushId   */  public void set(String appName,String pushId);  /**   * 通過應用名刪除對應關係   * @param appName   */  public void delete(String appName);  /**   * 獲取全部應用   */  public List<String> getAll();  /**   * 是否存在應用名   * @param appName   * @return true:存在;false:不存在   */  public boolean isHave(String appName);  }

下一篇將介紹一些支援rtmp直播的播放器