搭建rtmp直播流服務之3:java開發ffmpeg實現rtsp轉rtmp並實現ffmpeg命令的介面化管理架構設計及程式碼實現
- 2019 年 11 月 1 日
- 筆記
版權聲明:本文為部落客原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
本文鏈接:https://blog.csdn.net/eguid_1/article/details/51787646
上一篇文章簡單介紹了java如何調用ffmpeg的命令:http://blog.csdn.net/eguid_1/article/details/51777716
上上一篇介紹了nginx-rtmp伺服器的搭建:http://blog.csdn.net/eguid_1/article/details/51749830
這一篇將進一步深挖java對ffmepg命令的控制並最終實現服務介面化
本篇文章源碼:http://download.csdn.net/detail/eguid_1/9563637
github項目地址:https://github.com/eguid/FFCH4J
通知:由於很多同學反映本章程式碼的命令封裝設計的不是很好,所以對本章程式碼重新進行了實現,新版本推翻了本章原有程式碼內部實現,介面設計更加利於注入自己的實現,並增加可執行原生ffmpeg命令功能
新版本請到這裡查看:java封裝FFmpeg命令,支援原生ffmpeg全部命令,實現FFmpeg多進程處理與多執行緒輸出控制(開啟、關閉、查詢),rtsp/rtmp推流、拉流
(一)、簡單介紹
該服務介面可實現rtsp協議轉換為rtmp協議且可以實現rtmp直播流發布到nginx流媒體伺服器,其中最為重要的是如何實現通過參數生成ffmpeg命令並執行,且可以通過介面進行控制ffmpeg命令的停止
(二)、實現ffmpeg介面化服務架構設計
push端介面化管理
一、介面化調用
1、採用多執行緒方式,每次調用push埠開啟一個主進程及兩個輸出執行緒 2、可以對每個push端(執行緒)進行開啟和關閉的控制 3、統一介面參數,對ffmpeg的命令做到參數可控制
二、架構設計
1、服務介面
PushManager提供push(開啟一個push處理器),closePush(關閉push處理器),viewAppName介面(查看當前已經開啟的應用)
1.1、應用名和push處理器的關係
一個處理器對應一個應用名
1.2、push處理器
一個處理器對應一個push主進程和兩個輸出執行緒
2、主進程式控制制
2.1、主進程開啟
服務介面調用push處理器開啟push主進程,主進程會自動開啟兩個輸出執行緒用於消息輸出, 開啟後會將主進程Process和兩個輸出執行緒OutHandler通過map返回給服務介面。
2.2、主進程關閉
主進程可通過Process的destroy方法進行安全關閉
3、輸出執行緒控制
3.1、輸出執行緒開啟
輸出執行緒從主進程獲取到輸出流進行輸出
3.2、輸出執行緒關閉
輸出執行緒重寫了destory方法,用於安全的關閉輸出執行緒
4、持久層控制
持久層分為兩個:
1、appName(應用名)-pushId(push處理器的ID)對應關係
用於維護應用名和push處理器ID的對應關係,pushId為隨機生成id
2、pushId-主進程-輸出執行緒對應關係
主要用於存放主進程(Process)和兩個輸出執行緒,建立兩者對應關係,方便服務介面管理
(三)、程式碼實現
1、PushManager實現
/** * 實現push管理器的push,delete,view服務 * * @author eguid * @see PushMangerImpl * @since jdk1.7 */ public class PushManagerImpl implements PushManager { /** * 引用push處理器 */ private PushHandler pusher = new PushHandlerImpl(); /** * 管理應用名和push處理器之間的關係 */ private PushId_AppRelshipDao pard=new PushId_AppRelshipDaoImpl(); /** * 管理處理器的主進程Process及兩個輸出執行緒的關係 */ private HandlerDao hd = new HandlerDaoImpl(); public void setPusher(PushHandler pusher) { this.pusher = pusher; } public void setPard(PushId_AppRelshipDao pard) { this.pard = pard; } public void setHd(HandlerDao hd) { this.hd = hd; } @Override public String push(Map<String, Object> map) { if(map==null||map.isEmpty()||!map.containsKey("appName")) { return null; } String appName=null; ConcurrentMap<String, Object> resultMap = null; try { appName=(String)map.get("appName"); if(appName!=null&&"".equals(appName.trim())) { return null; } resultMap = pusher.push(map); // 生成一個標識該命令行執行緒集的key String pushId = UUID.randomUUID().toString(); hd.set(pushId, resultMap); pard.set(appName, pushId); } catch (IOException e) { // 暫時先寫這樣,後期加日誌 System.err.println("發生一個異常" + e.getMessage()); } return appName; } @Override public void closePush(String appName) { String pushId=null; if(pard.isHave(appName)) { pushId= pard.getPushId(appName); } if (pushId!=null&&hd.isHave(pushId)) { ConcurrentMap<String, Object> map = hd.get(pushId); //關閉兩個執行緒 ((OutHandler)map.get("error")).destroy(); ((OutHandler)map.get("info")).destroy(); //暫時先這樣寫,後期加日誌 System.out.println("停止命令-----end commond"); //關閉命令主進程 ((Process)map.get("process")).destroy(); //刪除處理器與執行緒對應關係表 hd.delete(pushId); //刪除應用名對應關係表 pard.delete(appName); } } @Override public List<String> viewAppName() { return pard.getAll(); }
2、pushHandler實現(push處理器)
/** * 提供解析參數生成ffmpeg命令並處理push操作 * @see PushHandlerImpl * @since jdk1.7 */ public class PushHandlerImpl implements PushHandler { /* * "ffmpeg -i "+ "rtsp://admin:[email protected]:37779/cam/realmonitor?channel=1&subtype=0 "+" -f flv -r 25 -s 640x360 -an" + " rtmp://192.168.30.21/live/test" * 推送流格式: name:應用名;input:接收地址;output:推送地址;fmt:影片格式;fps:影片幀率;rs:影片解析度;disableAudio:是否開啟音頻 */ @Override public ConcurrentMap<String, Object> push(Map<String, Object> paramMap) throws IOException { // 從map裡面取數據,組裝成命令 String comm = getComm4Map(paramMap); ConcurrentMap<String, Object> resultMap = null; // 執行命令行 final Process proc = Runtime.getRuntime().exec(comm); System.out.println("執行命令----start commond"); OutHandler errorGobbler = new OutHandler(proc.getErrorStream(), "Error"); OutHandler outputGobbler = new OutHandler(proc.getInputStream(), "Info"); errorGobbler.start(); outputGobbler.start(); // 返回參數 resultMap = new ConcurrentHashMap<String, Object>(); resultMap.put("info", outputGobbler); resultMap.put("error", errorGobbler); resultMap.put("process", proc); return resultMap; } /** * 通過解析參數生成可執行的命令行字元串; * name:應用名;input:接收地址;output:推送地址;fmt:影片格式;fps:影片幀率;rs:影片解析度;disableAudio:是否開啟音頻 * * @param paramMap * @return 命令行字元串 */ protected String getComm4Map(Map<String, Object> paramMap) { // -i:輸入流地址或者文件絕對地址 StringBuilder comm = new StringBuilder("ffmpeg -i "); // 是否有必輸項:輸入地址,輸出地址,應用名 if (paramMap.containsKey("input") && paramMap.containsKey("output") && paramMap.containsKey("appName")) { comm.append(paramMap.get("input")).append(" "); // -f :轉換格式,默認flv comm.append(" -f ").append(paramMap.containsKey("fmt") ? paramMap.get("fmt") : "flv").append(" "); // -r :幀率,默認25 comm.append("-r ").append(paramMap.containsKey("fps") ? paramMap.get("fps") : "30").append(" "); // -s 解析度 默認是原解析度 comm.append("-s ").append(paramMap.containsKey("rs") ? paramMap.get("rs") : "").append(" "); // -an 禁用音頻 comm.append("-an ").append(paramMap.containsKey("disableAudio") && ((Boolean)paramMap.get("disableAudio")) ? "-an" : "").append(" "); // 輸出地址 comm.append(paramMap.get("output")); //發布的應用名 comm.append(paramMap.get("appName")); //一個影片源,可以有多個輸出,第二個輸出為拷貝源影片輸出,不改變影片的各項參數並且命名為應用名+HD comm.append(" ").append(" -vcodec copy -f flv -an ").append(paramMap.get("output")).append(paramMap.get("appName")).append("HD"); System.out.println(comm.toString()); return comm.toString(); } else { throw new RuntimeException("輸入流地址不能為空!"); } } }
3、OutHandler(輸出執行緒)
** * 用於輸出命令行主進程的消息執行緒(必須開啟,否則命令行主進程無法正常執行) 重要:該類重寫了destroy方法,用於安全的關閉該執行緒 * * @author eguid * @see OutHandler * @since jdk1.7 */ public class OutHandler extends Thread { // 控制狀態 volatile boolean status = true; BufferedReader br = null; String type = null; public OutHandler(InputStream is, String type) { br = new BufferedReader(new InputStreamReader(is)); this.type = type; } /** * 重寫執行緒銷毀方法,安全的關閉執行緒 */ @Override public void destroy() { status = false; } /** * 執行輸出執行緒 */ @Override public void run() { String msg = null; try { while (status) { if ((msg = br.readLine()) != null) { System.out.println(type + "消息:" + msg); } } } catch (IOException e) { e.printStackTrace(); } } }
4、兩個dao層介面,方便後期實現該介面並實現持久化
4.1、主進程(Process)和兩個輸出執行緒Dao
/** * 命令行執行處理器快取,方便管理處理器的開啟和關閉 * @author eguid * @see HandlerDao * @since jdk1.7 */ public interface HandlerDao { /** * 獲取某個處理器 * @param pushId * @return */ public ConcurrentMap get(String pushId); /** * 存放一個處理器 * @param handlerMap */ public void set(String key, ConcurrentMap<String, Object> resultMap); /** * 獲取全部處理器的id * @return */ public ConcurrentMap getAll(); /** * 刪除某個處理器 * @param pushId */ public void delete(String pushId); /** * 是否存在key */ public boolean isHave(String pushId); }
4.2、應用名-pushId對應關係Dao
/** * 用於維護管理應用名與pushId的關係對應 * @author eguid * @see PushId_AppRelshipDao * @since jdk1.7 */ public interface PushId_AppRelshipDao { /** * 獲取應用名對應的pushId * @param appName * @return pushId */ public String getPushId(String appName); /** * 插入一個應用名和pushId對應 * @param appName * @param pushId */ public void set(String appName,String pushId); /** * 通過應用名刪除對應關係 * @param appName */ public void delete(String appName); /** * 獲取全部應用 */ public List<String> getAll(); /** * 是否存在應用名 * @param appName * @return true:存在;false:不存在 */ public boolean isHave(String appName); }
下一篇將介紹一些支援rtmp直播的播放器