MongoDB Java API操作很全的整理

  • 2019 年 11 月 4 日
  • 筆記

MongoDB 是一個基於分散式文件存儲的資料庫。由 C++ 語言編寫,一般生產上建議以共享分片的形式來部署。 但是MongoDB官方也提供了其它語言的客戶端操作API。如下圖所示:

提供了C、C++、C#、.net、GO、java、Node.js、PHP、python、scala等各種語言的版本,如下圖所示:

MongoDB的操作分為同步操作和非同步操作以及響應式編程操作
一、同步操作API

官方JAVA API的路徑:https://docs.mongodb.com/ecosystem/drivers/java/  我們這裡以3.11的java 版本為例。各個版本的API對MongoDB服務的支援情況。

使用API時,先引入maven依賴

<!-- https://mvnrepository.com/artifact/org.mongodb/mongo-java-driver -->  <dependency>      <groupId>org.mongodb</groupId>      <artifactId>mongo-java-driver</artifactId>      <version>3.11.1</version>  </dependency>  

  

 1、關於MongoDB Client的初始化和關閉。

從官方介紹來看,一般建議Client只需要一個建立一個長連接實例,然後使用時,都使用這個實例就可以,也就是可以用java的單例模式來創建連接實例。

 

//mongoClient連接  protected static MongoClient mongoClient;   public synchronized static MongodbClient getInstance(String mongodbUrl) {          if (null == mongoClient) {              mongoClient = MongoClients.create(mongodbUrl);              if(null != mongoClient){                  log.info("mongoClient init success!");              }              else{                  log.info("mongoClient init failed!");              }          }          return mongodbClient;      }   

  

直接通過mongodb的host和port來創建client: 

MongoClient mongoClient = MongoClients.create("mongodb://host1:27017");

client連接到一個 Replica Set:

MongoClient mongoClient = MongoClients.create("mongodb://host1:27017,host2:27017,host3:27017");    MongoClient mongoClient = MongoClients.create("mongodb://host1:27017,host2:27017,host3:27017/?replicaSet=myReplicaSet");  

 或者通過MongoClientSettings.builder() 來輔助生成連接字元串來創建client:

MongoClient mongoClient = MongoClients.create( MongoClientSettings.builder() .applyToClusterSettings(builder -> builder.hosts(Arrays.asList( new ServerAddress("host1", 27017), new ServerAddress("host2", 27017), new ServerAddress("host3", 27017)))) .build());  

  連接關閉:

    public void close() {          if(null!=mongoClient){              mongoClient.close();              mongoClient=null;          }      }  

  2、關於MongoDB 的基本操作

//創建Collection

public void createCollection(String dataBaseName,String collectionName){ getDatabase(dataBaseName).createCollection(collectionName); }
//查詢
dataBaseName
public MongoDatabase getDatabase(String dataBaseName){ return mongoClient.getDatabase(dataBaseName); }
//查詢
Collection
public List<String> listCollectionNames(String dataBaseName){
List
<String> stringList = new ArrayList<String>();
mongoClient.getDatabase(dataBaseName).listCollectionNames().forEach((Consumer
<? super String>) t->{ stringList.add(t); });
return stringList; }

public MongoCollection<Document> getCollectionByName(String dataBaseName, String collectionName){ return getDatabase(dataBaseName).getCollection(collectionName); }

 3、關於MongoDB 的查詢操作

    //通過id(objectid)精確查詢  	public FindIterable<Document>  findMongoDbDocById(String dataBaseName, String collectionName, String id){          BasicDBObject searchDoc = new BasicDBObject().append("_id", id);         return getCollectionByName(dataBaseName,collectionName).find(searchDoc);      }      //通過id(objectid)模糊查詢      public FindIterable<Document>  findMongoDbDocByIdRegex(String dataBaseName, String collectionName, String id){          BasicDBObject searchDoc = new BasicDBObject().append("_id", new BasicDBObject("$regex",id));          return getCollectionByName(dataBaseName,collectionName).find(searchDoc);      }  	//通過開始id和結束id 查詢(根據objectId範圍查詢)      public FindIterable<Document>  findMongoDbDocById(String dataBaseName, String collectionName, String startId,String endId){          BasicDBObject searchDoc = new BasicDBObject().append("_id", new BasicDBObject("$gte", startId).append("$lte", endId));          return getCollectionByName(dataBaseName,collectionName).find(searchDoc);      }      public FindIterable<Document> findMongoDbDoc(String dataBaseName, String collectionName,BasicDBObject basicDBObject){          return getCollectionByName(dataBaseName,collectionName).find(basicDBObject);      }  	//限制查詢返回的條數      public FindIterable<Document> findMongoDbDoc(String dataBaseName, String collectionName,BasicDBObject basicDBObject,Integer limitNum){          return findMongoDbDoc(dataBaseName,collectionName,basicDBObject).limit(limitNum) ;      }      public FindIterable<Document>  findMongoDbDocById(String dataBaseName, String collectionName, String startId,String endId,Integer limitNum){          return findMongoDbDocById(dataBaseName,collectionName,startId,endId).limit(limitNum);      }        /**       * 降序查詢(排序)       * @param dataBaseName       * @param collectionName       * @param startId       * @param endId       * @param sortField  排序欄位       * @return       */      public FindIterable<Document>  findMongoDbDocByIdDescSort(String dataBaseName, String collectionName, String startId,String endId,String sortField){        return findMongoDbDocById(dataBaseName,collectionName,startId,endId).sort(new Document().append(sortField, -1));      }      public FindIterable<Document>  findMongoDbDocByIdDescSort(String dataBaseName, String collectionName, String startId,String endId,String sortField,Integer limitNum){          return findMongoDbDocByIdDescSort(dataBaseName,collectionName,startId,endId,sortField).limit(limitNum);      }        /**       * 降序查詢(排序)       * @param dataBaseName       * @param collectionName       * @param startId       * @param endId       * @param sortField  排序欄位       * @return       */      public FindIterable<Document>  findMongoDbDocByIdAscSort(String dataBaseName, String collectionName, String startId,String endId,String sortField){          return findMongoDbDocById(dataBaseName,collectionName,startId,endId).sort(new Document().append(sortField, 1));      }      public FindIterable<Document>  findMongoDbDocByIdAscSort(String dataBaseName, String collectionName, String startId,String endId,String sortField,Integer limitNum){          return findMongoDbDocByIdAscSort(dataBaseName,collectionName,startId,endId,sortField).limit(limitNum);      }  

   4、關於MongoDB 的插入操作

   //插入操作,注意插入時,如果數據已經存在會報錯,插入時必須數據不存在,不會自動進行覆蓋     //插入單條記錄     public void insertDoc(String dataBaseName, String collectionName, Document document){          getCollectionByName(dataBaseName,collectionName).insertOne(document);      }  	//插入多條記錄      public void insertDoc(String dataBaseName, String collectionName,List<? extends Document> listData){          getCollectionByName(dataBaseName,collectionName).insertMany(listData);      }  

   5、關於MongoDB 的更新操作

	//更新單條      public void updateDoc(String dataBaseName, String collectionName, Bson var1, Bson var2){          getCollectionByName(dataBaseName,collectionName).updateOne(var1,var2);      }      public void updateDoc(String dataBaseName, String collectionName, Bson var1, List<? extends Bson> list){          getCollectionByName(dataBaseName,collectionName).updateOne(var1,list);      }  	//批量更新      public void updateDocs(String dataBaseName, String collectionName, Bson var1, Bson var2){          getCollectionByName(dataBaseName,collectionName).updateMany(var1,var2);      }      public void updateDocs(String dataBaseName, String collectionName, Bson var1, List<? extends Bson> list){          getCollectionByName(dataBaseName,collectionName).updateMany(var1,list);      }  

  6、關於MongoDB 的刪除操作 

//單條刪除    public DeleteResult deleteDoc(String dataBaseName, String collectionName, Bson var1){          return getCollectionByName(dataBaseName,collectionName).deleteOne(var1);      }  //批量刪除      public DeleteResult deleteDocs(String dataBaseName, String collectionName,Bson var1){         return getCollectionByName(dataBaseName,collectionName).deleteMany(var1);      }  

   7、關於MongoDB 的替換操作

	//存在就替換,不存在的話就插入     public UpdateResult replaceDoc(String dataBaseName, String collectionName, Bson var1, Document var2){          return getCollectionByName(dataBaseName,collectionName).replaceOne(var1,var2);      }  

 8、關於MongoDB 的bulkWrite操作 (批量寫入)

    public BulkWriteResult bulkWrite(String dataBaseName, String collectionName, List<? extends WriteModel<? extends Document>> listData){         return getCollectionByName(dataBaseName,collectionName).bulkWrite(listData);      }

二、非同步操作API  

 mongodb非同步驅動程式提供了非同步api,可以利用netty或java 7的asynchronoussocketchannel實現快速、無阻塞的i/o,maven依賴

<dependencies>  <dependency>  <groupId>org.mongodb</groupId>  <artifactId>mongodb-driver-async</artifactId>  <version>3.11.1</version>  </dependency>  </dependencies>

官方地址:http://mongodb.github.io/mongo-java-driver/3.11/driver-async/getting-started/installation/

非同步操作必然會涉及到回調,回調時採用ResultCallback<Document>

SingleResultCallback<Document> callbackPrintDocuments = new SingleResultCallback<Document>() {     @Override     public void onResult(final Document document, final Throwable t) {         System.out.println(document.toJson());     }  };    SingleResultCallback<Void> callbackWhenFinished = new SingleResultCallback<Void>() {      @Override      public void onResult(final Void result, final Throwable t) {          System.out.println("Operation Finished!");      }  };  

  非同步insert操作

collection.insertMany(documents, new SingleResultCallback<Void>() {      @Override      public void onResult(final Void result, final Throwable t) {          System.out.println("Documents inserted!");      }  });  

  非同步刪除操作

collection.deleteMany(gte("i", 100), new SingleResultCallback<DeleteResult>() {      @Override      public void onResult(final DeleteResult result, final Throwable t) {          System.out.println(result.getDeletedCount());      }  });  

  非同步更新操作

collection.updateMany(lt("i", 100), inc("i", 100),      new SingleResultCallback<UpdateResult>() {          @Override          public void onResult(final UpdateResult result, final Throwable t) {              System.out.println(result.getModifiedCount());          }      });  

  非同步統計操作

collection.countDocuments(    new SingleResultCallback<Long>() {        @Override        public void onResult(final Long count, final Throwable t) {            System.out.println(count);        }    });  

  

三、MongoDB Reactive Streams 操作API

官方的MongoDB reactive streams Java驅動程式,為MongoDB提供非同步流處理和無阻塞處理。

完全實現reactive streams api,以提供與jvm生態系統中其他reactive streams的互操作,一般適合於大數據的處理,比如spark,flink,storm等。

<dependencies>      <dependency>          <groupId>org.mongodb</groupId>          <artifactId>mongodb-driver-reactivestreams</artifactId>          <version>1.12.0</version>      </dependency>  </dependencies>  

  官方地址:http://mongodb.github.io/mongo-java-driver-reactivestreams/

會包含如下三部分:

  1. Publisher:Publisher 是數據的發布者。Publisher 介面只有一個方法 subscribe,用於添加數據的訂閱者,也就是 Subscriber。
  2. Subscriber: 是數據的訂閱者。Subscriber 介面有4個方法,都是作為不同事件的處理器。在訂閱者成功訂閱到發布者之後,其 onSubscribe(Subscription s) 方法會被調用。
  3. Subscription:表示的是當前的訂閱關係。

API問的地址:http://mongodb.github.io/mongo-java-driver-reactivestreams/1.12/javadoc/

 

 

 

程式碼示例:

//建立連接  MongoClient mongoClient = MongoClients.create(mongodbUrl);  //獲得資料庫對象  MongoDatabase database = client.getDatabase(databaseName);  //獲得集合  MongoCollection collection = database.getCollection(collectionName);    //非同步返回Publisher  FindPublisher publisher = collection.find();    //訂閱實現  publisher.subscribe(new Subscriber() {      @Override      public void onSubscribe(Subscription str) {          System.out.println("start...");          //執行請求          str.request(Integer.MAX_VALUE);      }      @Override      public void onNext(Document document) {          //獲得文檔          System.out.println("Document:" + document.toJson());      }        @Override      public void onError(Throwable t) {          System.out.println("error occurs.");      }        @Override      public void onComplete() {          System.out.println("finished.");      }  });