Java中使用etcd,包括基本的set、get、超時設置,watch監聽等
- 2020 年 1 月 13 日
- 筆記
etcd的使用文章。
etcd來zookeeper類似,常用的主要有set,get,getPrefix:獲取指定前綴的所有數據,grant:key的超時設置,watch:監聽回調事件,watchPrefix:監聽某個前綴的事件,keepAlive:為某個key設置自動續約、自動刷新過期時間。
zk的大部分功能,etcd都有。但有一個,譬如虛擬節點,zk可以做到當客戶端斷開時,立馬監聽到,etcd要靠keepAlive續約機制,超過幾秒不續約了,則認為掉線了。
還有etcd的超時時間,是需要先新建一個lease,再把這個lease在put時,作為參數傳過去。
別的不多講,直接上使用指南。
引入pom.xml,https://github.com/IBM/etcd-java 這是一個etcd的java客戶端。
<dependency> <groupId>com.ibm.etcd</groupId> <artifactId>etcd-java</artifactId> <version>0.0.13</version> </dependency>
自行下載etcd,並啟動它。
定義一個常用的客戶端工具類的接口
import com.ibm.etcd.api.KeyValue; import com.ibm.etcd.client.kv.KvClient; import java.util.List; /** * @author wuweifeng wrote on 2019-12-09 * @version 1.0 */ public interface IConfigCenter { /** * 存入key,value */ void put(String key, String value); /** * 存入key、value,和租約id */ void put(String key, String value, long leaseId); /** * 存入key、value,和過期時間,單位是秒 */ void putAndGrant(String key, String value, long ttl); /** * 根據key,獲取value */ String get(String key); /** * 獲取指定前綴的所有key-value */ List<KeyValue> getPrefix(String key); /** * 監聽key */ KvClient.WatchIterator watch(String key); /** * 監聽前綴為key的 */ KvClient.WatchIterator watchPrefix(String key); /** * 自動續約 * @param frequencySecs 續約頻率,最小是4秒,默認是5秒 * @param minTtl 最小存活時間,最小是2秒,默認是10秒 * @return 返回leaseId */ long keepAlive(String key, String value, int frequencySecs, int minTtl) throws Exception; /** * 判斷剩餘的過期時間 */ long timeToLive(long leaseId); }
實現這個接口
import com.google.protobuf.ByteString; import com.ibm.etcd.api.KeyValue; import com.ibm.etcd.api.LeaseGrantResponse; import com.ibm.etcd.api.RangeResponse; import com.ibm.etcd.client.KvStoreClient; import com.ibm.etcd.client.kv.KvClient; import com.ibm.etcd.client.lease.LeaseClient; import com.ibm.etcd.client.lease.PersistentLease; import com.ibm.etcd.client.lock.LockClient; import org.springframework.util.CollectionUtils; import java.util.List; import java.util.concurrent.ExecutionException; import static java.util.concurrent.TimeUnit.SECONDS; /** * etcd客戶端 * * @author wuweifeng wrote on 2019-12-06 * @version 1.0 */ public class JdEtcdClient implements IConfigCenter { private KvClient kvClient; private LeaseClient leaseClient; private LockClient lockClient; public JdEtcdClient(KvStoreClient kvStoreClient) { this.kvClient = kvStoreClient.getKvClient(); this.leaseClient = kvStoreClient.getLeaseClient(); this.lockClient = kvStoreClient.getLockClient(); } @Override public void put(String key, String value) { kvClient.put(ByteString.copyFromUtf8(key), ByteString.copyFromUtf8(value)).sync(); } @Override public void put(String key, String value, long leaseId) { kvClient.put(ByteString.copyFromUtf8(key), ByteString.copyFromUtf8(value), leaseId).sync(); } @Override public void putAndGrant(String key, String value, long ttl) { LeaseGrantResponse lease = leaseClient.grant(ttl).sync(); put(key, value, lease.getID()); } @Override public String get(String key) { RangeResponse rangeResponse = kvClient.get(ByteString.copyFromUtf8(key)).sync(); List<KeyValue> keyValues = rangeResponse.getKvsList(); if (CollectionUtils.isEmpty(keyValues)) { return null; } return keyValues.get(0).getValue().toStringUtf8(); } @Override public List<KeyValue> getPrefix(String key) { RangeResponse rangeResponse = kvClient.get(ByteString.copyFromUtf8(key)).asPrefix().sync(); return rangeResponse.getKvsList(); } @Override public KvClient.WatchIterator watch(String key) { return kvClient.watch(ByteString.copyFromUtf8(key)).start(); } @Override public KvClient.WatchIterator watchPrefix(String key) { return kvClient.watch(ByteString.copyFromUtf8(key)).asPrefix().start(); } @Override public long keepAlive(String key, String value, int frequencySecs, int minTtl) throws Exception { //minTtl秒租期,每frequencySecs秒續約一下 PersistentLease lease = leaseClient.maintain().leaseId(SystemClock.now()).keepAliveFreq(frequencySecs).minTtl(minTtl).start(); long newId = lease.get(3L, SECONDS); put(key, value, newId); return newId; } @Override public long timeToLive(long leaseId) { try { return leaseClient.ttl(leaseId).get().getTTL(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); return 0L; } } }
其中的SystemClock類,就在網上找一下吧,就是一個獲取系統當前時間戳的工具類。
創建這個Client
/** * @param endPoints 如https://127.0.0.1:2379 有多個時逗號分隔 */ public static JdEtcdClient build(String endPoints) { return new JdEtcdClient(EtcdClient.forEndpoints(endPoints).withPlainText().build()); }
之後就可以使用這個etcd客戶端工具類了。
