Java中使用etcd,包括基本的set、get、超时设置,watch监听等
- 2020 年 1 月 13 日
- 筆記
etcd的使用文章。
etcd来zookeeper类似,常用的主要有set,get,getPrefix:获取指定前缀的所有数据,grant:key的超时设置,watch:监听回调事件,watchPrefix:监听某个前缀的事件,keepAlive:为某个key设置自动续约、自动刷新过期时间。
zk的大部分功能,etcd都有。但有一个,譬如虚拟节点,zk可以做到当客户端断开时,立马监听到,etcd要靠keepAlive续约机制,超过几秒不续约了,则认为掉线了。
还有etcd的超时时间,是需要先新建一个lease,再把这个lease在put时,作为参数传过去。
别的不多讲,直接上使用指南。
引入pom.xml,https://github.com/IBM/etcd-java 这是一个etcd的java客户端。
<dependency> <groupId>com.ibm.etcd</groupId> <artifactId>etcd-java</artifactId> <version>0.0.13</version> </dependency>
自行下载etcd,并启动它。
定义一个常用的客户端工具类的接口
import com.ibm.etcd.api.KeyValue; import com.ibm.etcd.client.kv.KvClient; import java.util.List; /** * @author wuweifeng wrote on 2019-12-09 * @version 1.0 */ public interface IConfigCenter { /** * 存入key,value */ void put(String key, String value); /** * 存入key、value,和租约id */ void put(String key, String value, long leaseId); /** * 存入key、value,和过期时间,单位是秒 */ void putAndGrant(String key, String value, long ttl); /** * 根据key,获取value */ String get(String key); /** * 获取指定前缀的所有key-value */ List<KeyValue> getPrefix(String key); /** * 监听key */ KvClient.WatchIterator watch(String key); /** * 监听前缀为key的 */ KvClient.WatchIterator watchPrefix(String key); /** * 自动续约 * @param frequencySecs 续约频率,最小是4秒,默认是5秒 * @param minTtl 最小存活时间,最小是2秒,默认是10秒 * @return 返回leaseId */ long keepAlive(String key, String value, int frequencySecs, int minTtl) throws Exception; /** * 判断剩余的过期时间 */ long timeToLive(long leaseId); }
实现这个接口
import com.google.protobuf.ByteString; import com.ibm.etcd.api.KeyValue; import com.ibm.etcd.api.LeaseGrantResponse; import com.ibm.etcd.api.RangeResponse; import com.ibm.etcd.client.KvStoreClient; import com.ibm.etcd.client.kv.KvClient; import com.ibm.etcd.client.lease.LeaseClient; import com.ibm.etcd.client.lease.PersistentLease; import com.ibm.etcd.client.lock.LockClient; import org.springframework.util.CollectionUtils; import java.util.List; import java.util.concurrent.ExecutionException; import static java.util.concurrent.TimeUnit.SECONDS; /** * etcd客户端 * * @author wuweifeng wrote on 2019-12-06 * @version 1.0 */ public class JdEtcdClient implements IConfigCenter { private KvClient kvClient; private LeaseClient leaseClient; private LockClient lockClient; public JdEtcdClient(KvStoreClient kvStoreClient) { this.kvClient = kvStoreClient.getKvClient(); this.leaseClient = kvStoreClient.getLeaseClient(); this.lockClient = kvStoreClient.getLockClient(); } @Override public void put(String key, String value) { kvClient.put(ByteString.copyFromUtf8(key), ByteString.copyFromUtf8(value)).sync(); } @Override public void put(String key, String value, long leaseId) { kvClient.put(ByteString.copyFromUtf8(key), ByteString.copyFromUtf8(value), leaseId).sync(); } @Override public void putAndGrant(String key, String value, long ttl) { LeaseGrantResponse lease = leaseClient.grant(ttl).sync(); put(key, value, lease.getID()); } @Override public String get(String key) { RangeResponse rangeResponse = kvClient.get(ByteString.copyFromUtf8(key)).sync(); List<KeyValue> keyValues = rangeResponse.getKvsList(); if (CollectionUtils.isEmpty(keyValues)) { return null; } return keyValues.get(0).getValue().toStringUtf8(); } @Override public List<KeyValue> getPrefix(String key) { RangeResponse rangeResponse = kvClient.get(ByteString.copyFromUtf8(key)).asPrefix().sync(); return rangeResponse.getKvsList(); } @Override public KvClient.WatchIterator watch(String key) { return kvClient.watch(ByteString.copyFromUtf8(key)).start(); } @Override public KvClient.WatchIterator watchPrefix(String key) { return kvClient.watch(ByteString.copyFromUtf8(key)).asPrefix().start(); } @Override public long keepAlive(String key, String value, int frequencySecs, int minTtl) throws Exception { //minTtl秒租期,每frequencySecs秒续约一下 PersistentLease lease = leaseClient.maintain().leaseId(SystemClock.now()).keepAliveFreq(frequencySecs).minTtl(minTtl).start(); long newId = lease.get(3L, SECONDS); put(key, value, newId); return newId; } @Override public long timeToLive(long leaseId) { try { return leaseClient.ttl(leaseId).get().getTTL(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); return 0L; } } }
其中的SystemClock类,就在网上找一下吧,就是一个获取系统当前时间戳的工具类。
创建这个Client
/** * @param endPoints 如https://127.0.0.1:2379 有多个时逗号分隔 */ public static JdEtcdClient build(String endPoints) { return new JdEtcdClient(EtcdClient.forEndpoints(endPoints).withPlainText().build()); }
之后就可以使用这个etcd客户端工具类了。
