《Kafka筆記》2、環境搭建、Topic管理
一、Kafka環境搭建和Topic管理
1 單機環境搭建
1.1 環境準備
- 安裝JDK1.8+,配置JAVA_HOME(CentOS 6.10+ 64bit)
- 配置主機名和IP映射
- 關閉防火牆&防火牆開機自啟動
- 集群環境下需要配置每台機器,同步時鐘 ntpdate cn.pool.ntp.org | ntp[1-7].aliyun.com
- 安裝&啟動zookeeper
- 安裝&啟動|關閉Kafka
1.1.1 JDK 安裝
- 安裝
~ rpm -qa | grep jdk # 如果沒安裝過,需要安裝
~ rpm -e `rpm -qa | grep jdk` # 卸載老的版本
~ rpm -ivh jdk-8u191-linux-x64.rpm
默認安裝在/usr/java/下
- 配置環境變數
雖然rpm包安裝,默認能識別我們的java和javac命令,但不能證明JAVA_HOME就配置成功了,可以通過jps來驗證。
1、 vi ~/.bashrc
# 底部增加
JAVA_HOME=/usr/java/latest
PATH=$PATH:$JAVA_HOME/bin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
2、source ~/.bashrc
3、echo $JAVA_HOME
1.1.2 配置主機名和ip
1、改主機名cat /etc/sysconfig/network
,把HOSTNAME更改掉後reboot。這裡假設更改為CentOS
2、更改ip
- ipconfig看看eth0的網卡資訊
- vi /etc/hosts/ 末尾添加 192.168.52.129 CentOS。這裡192.168.52.129假設是eth0網卡的ip
3、ping CentOS 來測試是否生效
1.1.3 關閉防火牆和防火牆開機自啟動
為了測試方便,內網環境下,我們把防火牆關閉
1、關閉防火牆
- 終端命令:
service iptables status
看下防火牆服務是否啟動狀態 - 如果沒關閉,終端執行
service iptables stop
- 如果關閉了,要啟動防火牆:
service iptables start
2、關閉防火牆開啟自啟動
- 終端執行:
chkconfig iptables off
,其中iptables是服務名 chkconfig --list
查看所有的開機自啟動的服務
1.1.4 zookeeper下載安裝
zookeeper官網,選擇歷史發行版本下載,選擇我們要下載的版本,scp到我們的安裝機器上
1、解壓
tar -zxf zookeeper-3.4.6.tar.gz -C /usr/
解壓我們的zk
2、進入解壓目錄
cd /usr/zookeeper-3.4.6/
3、修改zk配置文件,先拷貝一份模板配置文件再修改
cp conf/zoo_sample.cfg conf/zoo.cfg
vi conf/zoo.cfg
- 修改zookeeper的數據目錄為:
dataDir=/root/zkdata
,創建我們剛在指定的數據目錄:mkdir /root/zkdata
4、啟動我們的zk服務,先進入zk目錄:cd /usr/zookeeper-3.4.6/
,再啟動服務:./bin/zkServer.sh start zoo.cfg
5、 用jps命令查看zookeeper的服務:jps
後zk的進程名叫:QuoeumPeerMain,用zk自帶命令查看zk的狀態,./bin/zkServer.sh status zoo.cfg
。看到zk的Mode為standalone單機部署版,且狀態ok
1.1.5 kafka下載安裝
kafka官網,左下角download按鈕,選擇我們合適的版本,根據scala版本做選擇,下載之後scp到安裝機器
1、解壓安裝包:tar -zxf kafka_2.11-2.2.0.tag -C /usr/
,進入安裝目錄cd /usr/kafka_2.11-2.2.0
,常規命令目錄ls bin/
,常規配置目錄:ls config/
2、配置server.peoperties: vi config/server.properties
配置解釋:
broler.id=0 # kafka的節點唯一標識
listeners=PLAINTEXT://CentOS:9092 #CentOS為當前節點的主機名
log.dirs=/usr/kafka-logs # kafka的broker節點存儲數據的位置
num.partitions=1 # 設置topic的默認分區數
log.retention.hours=168 # 日誌默認保存的策略,默認7天
zookeeper.connect=CentOS:2181 # 配置kafka依賴的zookeeper的資訊
保存退出
3、kafka的啟動,首先要保證zookeeper是正常的。./bin/kafka-server-start.sh -daemon config/server.properties
,其中-daemon指的是以後台方式啟動
4、查看kafka的啟動狀態,jps
命令,可以看到Kafka進程
5、關閉kafka服務:./bin/kafka-server-stop.sh
等一秒左右
6、創建topic:./bin/kafka-topic.sh --help
可以得知為:
./bin/kafka-topic.sh --bootstrap-server CentOS:9092 --create --topic topic01 --partitions 2 --replication-factor 1
–topic 指定分區名稱
–partitions 指定分區數目
–replication-factor 指定副本因子,副本因子個數不能大於我們broker的個數
7、訂閱topic:
./bin/kafka-console-consumer.sh --bootstrap-server CentOS:9092 --topic topic01 --group group1
–topic 指定訂閱哪個topic
–group 消費者要以組的形式,不指定會默認分配
8、生產者生產消息,打開另外終端充當生產者。
./bin/kafka-console-produser.sh --broker-list CentOS:9092 --topic topic01 # 回車後指定發送的消息
> hello~
第一章可以知道,由於相同組是均分topic資訊,不同組相互負載均衡。
當我們啟動多個消費者時,如果是在一個組裡,每次我們發送消息會被其中一個消費者消費到。
當我們啟動多個消費者,不在一個組裡,每個組都能收到我們發布的資訊,達到廣播的效果。
2 集群環境搭建
準備三台伺服器的虛擬機,假設三台主機的ip分別為,192.168.52.130,192.168.52.131,192.168.52.132
2.1 安裝JDK
在三台機器上分別用rpm包,安裝jdk:
rpm -ivh jdk-8u191-linux-x64.rpm
2.2 配置每台機器的配置
先在a機器上配置,之後拷貝配置到b和c機器即可。跨機器拷貝,做一個機器間sh免密碼驗證。否則scp需要帶上機器密碼資訊
1、在A機器上,vi /etc/hosts
,增加配置:
192.168.52.130 CentOSA
192.168.52.131 CentOSB
192.168.52.132 CentOSC
接著,scp我們的配置到其他兩台B和C機器:
scp /etc/hosts CentOSB:/etc/
scp /etc/hosts CentOSC:/etc/
2、在A機器上配置JDK的環境變數,scp到其他兩台B和C機器:
- 第一步:
vi ~/.bashrc
# 底部增加
JAVA_HOME=/usr/java/latest
PATH=$PATH:$JAVA_HOME/bin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
-
第二步:
source ~/.bashrc
-
第三步:
echo $JAVA_HOME
-
第三步:驗證沒問題,拷貝到B和C機器:
scp ~/.bashrc CentOSB:~/
scp ~/.bashrc CentOSC:~/
- 第四步,在b和c機器上,分別source,讓配置文件生效
2.3 關閉三台機器的防火牆
參考上文單機的關閉方法
2.4 配置集群網路時鐘同步
每台機器安裝ntp工具:
yum install ntp -y
每台機器上都設置時鐘:
ntpdate ntp1.aliyun.com
並執行clock -w
命令同步時鐘
2.5 zookeeper集群安裝和搭建
1、安裝及拷貝配置文件
tar -zxf zookeeper-3.4.6.tar.gz -C /usr/
cd /usr/zookeeper-3.4.6/
cp conf/zoo_sample.cfg conf/zoo.cfg
2、接著更改配置文件:vi zoo.cfg
# 修改原dataDir目錄為
dataDir=/root/zkdata
# 新增集群其他節點資訊,並配置服務埠和主從選舉埠
server.1=CentOSA:2888:3888
server.2=CentOSB:2888:3888
server.3=CentOSC:2888:3888
3、創建我們配置文件中指定的數據目錄
mkdir /root/zkdata
4、把我們在A上做的操作拷貝到其他節點B和C
scp -r /usr/zookeeper-3.4.6 CentOSB:/usr/
scp -r /usr/zookeeper-3.4.6 CentOSC:/usr/
5、在其他集群節點B和C上同樣都創建數據目錄
mkdir /root/zkdata
6、在每台機器的數據目錄中,寫入當前機器的id號。id號參考我們zoo.cfg下的節點配置
A機器: echo 1 > /root/zkdata/myid
B機器: echo 2 > /root/zkdata/myid
C機器: echo 3 > /root/zkdata/myid
7、啟動我們每台節點的zookeeper
啟動:/usr/zookeeper-3.4.6/bin/zkServer.sh start zoo.cfg
查看狀態:/usr/zookeeper-3.4.6/bin/zkServer.sh status zoo.cfg
3.6 集群安裝Kafka
1、安裝及修改配置文件
tar -zxf kafka_2.11-2.2.0.tgz -C /usr/
cd /usr/kafka_2.11-2.2.0/
vi config/server.properties
2、修改配置文件
A節點配置,其他節點scp後單獨改自己的配置:
listeners = PLAINTEXT://CentOSA:9092
log.dirs=/usr/kafka-logs
zookeeper.connect=CentOSA:2181,CentOSB:2181,CentOSC:2181
3、拷貝到其他節點並且更改配置
scp -r kafka_2.11-2.2.0 CentOSB:/usr/
scp -r kafka_2.11-2.2.0 CentOSC:/usr/
B節點:
# A上,默認是0,這裡要更改其他節點的brokerID
broker.id=1
listeners = PLAINTEXT://CentOSB:9092
log.dirs=/usr/kafka-logs
zookeeper.connect=CentOSA:2181,CentOSB:2181,CentOSC:2181
C節點:
# A上,默認是0,這裡要更改其他節點的brokerID
broker.id=2
listeners = PLAINTEXT://CentOSC:9092
log.dirs=/usr/kafka-logs
zookeeper.connect=CentOSA:2181,CentOSB:2181,CentOSC:2181
3 集群環境下topic管理概覽
1、創建topic
[root@CentOSA kafka_2.11-2.2.0]# ./bin/kafka-topics.sh
--bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092
--create
--topic topic01
--partitions 3
--replication-factor 3
在我們配置的kafka日誌目錄中,可以看到我們topic的分區分配情況:ls /usr/kafka-logs/
2、topic列表
[root@CentOSA kafka_2.11-2.2.0]# ./bin/kafka-topics.sh
--bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092
--list
3、topic詳情
[root@CentOSA kafka_2.11-2.2.0]# ./bin/kafka-topics.sh
--bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092
--describe
--topic topic01
Topic:topic01 PartitionCount:3 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: topic01 Partition: 0 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
Topic: topic01 Partition: 1 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: topic01 Partition: 2 Leader: 0 Replicas: 3,0,2 Isr: 0,2,3
4、修改
[root@CentOSA kafka_2.11-2.2.0]# ./bin/kafka-topics.sh
--bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092
--create
--topic topic03
--partitions 1
--replication-factor 1
[root@CentOSA kafka_2.11-2.2.0]# ./bin/kafka-topics.sh
--bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092
--alter
--topic topic03
--partitions 2
注意:kafka的分區數只能由小修改到大,不能由大修改到小
5、刪除
[root@CentOSA kafka_2.11-2.2.0]# ./bin/kafka-topics.sh
--bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092
--delete
--topic topic03
刪除topic之後,我們相應的topic的數據,在我們的日誌目錄下標識為刪除:ls /usr/kafka-logs/
6、訂閱
[root@CentOSA kafka_2.11-2.2.0]# ./bin/kafka-console-consumer.sh
--bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092
--topic topic01
--group g1
--property print.key=true
--property print.value=true
--property key.separator=,
–topic topic01 需要訂閱的topic
–group g1 指定消費者組
–property print.key=true 是否列印key,可選
–property print.value=true 是否列印值,可選
–property key.separator=, key和值用什麼分割,可選
注意:訂閱,用的是./bin/kafka-console-consumer.sh
而不是./bin/kafka-topics.sh
7、生產
[root@CentOSA kafka_2.11-2.2.0]# ./bin/kafka-console-producer.sh
--broker-list CentOSA:9092,CentOSB:9092,CentOSC:9092
--topic topic01
注意:這時用的是--broker-list
而不是--bootstrap-server
注意:發布,用的是./bin/kafka-console-producer.sh
而不是./bin/kafka-topics.sh
8、消費組
[root@CentOSA kafka_2.11-2.2.0]# ./bin/kafka-consumer-groups.sh
--bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092
--list
g1
[root@CentOSA kafka_2.11-2.2.0]# ./bin/kafka-consumer-groups.sh
--bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092
--describe
--group g1
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic01 1 0 0 0 consumer-1-** /192.168.52.130 consumer-1
topic01 0 0 0 0 consumer-1-** /192.168.52.130 consumer-1
topic01 2 1 1 0 consumer-1-** /192.168.52.130 consumer-1
TOPIC : 分區欄
PARTITION:分區欄
CURRENT-OFFSET:消費的偏移量
LOG-END-OFFSET:寫入的偏移量
LAG:間隙,有多少數據沒有被消費。LAG=LOG-END-OFFSET-CURRENT-OFFSET
二、 Kafka基礎API
1 Topic基本操作 DML管理
1.1 新建maven工程
1.2 pom引入依賴
<!-- //mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
<!-- //mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- //mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- //mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<!-- //mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
1.3 配置文件增加日誌配置項
1、resources目錄下,新建log4j.properies
2、添加配置資訊
log4j.rootLogger = info,console
log4j.appender.console = org.apache.log4j.ConsoleAppender
log4j.appender.console.Target = System.out
log4j.appender.console.layout = org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern = %p %d{yyyy-MM-dd HH:mm:ss} %c - %m%n
1.4 增加案例demo-DML
1、鏈接Kafka
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVER_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
KafkaAdminClient adminClient = (KafkaAdminClient)KafkaAdminClient.create(props);
// 查看Topic列表
ListTopicResult topicResult = adminClient.listTopics();
Set<String> names = topicResult.names().get;
names.foreach(name - > Printf:out(name));
// 創建topic資訊。默認是非同步的
CreateTopicResult createTopicsResult = adminClient.createTopic(Arrays.asList(new NewTopic("topic02",3,(short)3);
// 改為同步創建需加上下面這句
createTopicsResult.all().get();
// 刪除topic
DeleteTopicsResult deleteTopics = adminClient.deleteTopics(Arrays.asList("topic02","topic03"));
// 改為同步刪除,加上下面程式碼
deleteTopics.all().get();
// 查看topic詳情資訊
DescribeTopicsResult dtr = adminClient.describeTopics(Array.asList("topic01"));
Map<String, TopicDescribe> topicDescriptionMap = dtr.all().get();
for(Map.Entry<String, TopicDescribption> entry : topicDescriptionMap.entrySet()) {
System.out.prientln(entry.getKey() + "\t" + entry.getValue());
}
// 關閉AdminClient
adminClient.close();
1.2 生產者-demo
鏈接資訊的客戶端要換成ProducerConfig,且增加key和value的反序列化配置項SERIALIZER, StringSerizlizer.class
// 鏈接資訊配置
Properties props = new Properties();
// 這裡由原來的AdminClientConfig更改為ProducerConfig
props.put(ProducerConfig.BOOTSTRAP_SERVER_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
// 配置Key的序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerizlizer.class.getName());
// 配置值的序列化
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerizlizer.class.getName());
// 生產者
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// 發送10條消息給broker
for(int i=0; i<10; i++) {
// 構造消息,消息有多種構造,包括帶不帶分區,時間戳資訊等。
// 這裡例子用一個比較簡單的topic+key+value的形式
ProducerRecord<String, String> record = new ProducerRecord<String, String>("topic01","key"+i, "value"+i);
// 發送消息給伺服器
producer.send(record);
}
// 關閉
producer.close();
1.3 消費者-demo sub/assign
鏈接資訊的客戶端要換成ConsumerConfig,且根據生產者的序列化配置SERIALIZER,改為反序列化DESERIALIZER, StringDeserizlizer.class, 切要配置消費者組資訊,這裡配置”g1″
// 鏈接資訊配置
Properties props = new Properties();
// 這裡由原來的AdminClientConfig更改為ConsumerConfig
props.put(ConsumerConfig.BOOTSTRAP_SERVER_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
// 配置Key的反序列化DESERIALIZER
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserizlizer.class.getName());
// 配置值的反序列化DESERIALIZER
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserizlizer.class.getName());
// 消費者必須屬於某一個消費者組
props.put(ConsumerConfig.GROUP_ID_CONFIG, "g1");
// 通過配置創建消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 訂閱相關的topics.支援正則。這裡訂閱所有以「topic」開頭的topic
consumer.subscribe(Pattern.compile("^topic.*""));
// 遍歷消息隊列
while(true) {
// 每一秒抓取一次
ConsumeRecords<String,String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
// 從隊列中取到數據
if(!consumerRecords.isEmpty()) {
Iterator<ConsumerRecord<String,String>> recordIterator = consumerRecords.iterator();
while(recordIterator.hasNext()) {
// 獲取一個消費消息
ConsumerRecord<String, String> record = recordIterator.next();
// 該消息所屬topic
String topic = record.topic();
// 該消息所屬分區
int partition = record.partition();
// 該消息在隊列中的偏移量
long offset = record.offset();
// 該消息的key
String key = record.key();
// 該消息的value
String value = record.value();
// 該消息的時間戳
long timestamp = record.timestamp();
// 列印
System.out.println(topic);
System.out.println(partition);
System.out.println(offset);
System.out.println(key);
System.out.println(value);
System.out.println(timestamp);
}
}
}
不指定消費者組的消費,這個時候我們可以啟動n個消費者,消費者之前不會互相均分。因為沒有了組管理資訊。消費實例之間不會互相影響
// 鏈接資訊配置
Properties props = new Properties();
// 這裡由原來的AdminClientConfig更改為ConsumerConfig
props.put(ConsumerConfig.BOOTSTRAP_SERVER_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
// 配置Key的反序列化DESERIALIZER
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserizlizer.class.getName());
// 配置值的反序列化DESERIALIZER
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserizlizer.class.getName());
// 通過配置創建消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 訂閱相關的topics,手動指定消費分區,失去組管理特性
// 消費分區資訊
List<TopicPartion> partions = Arrays.asList(new TopicPartion("topic01",0);
// 指定消費者的消費分區
consumer.assign(partions);
// 指定消費分區的位置,根據上文,可以知道我們從topic01的0分區從頭消費
// consumer.seekToBeginning(partitions);
// 從某Topic的某分區的,某位置消費。例如從topic01的0分區的1位置開始消費
consumer.seek(new TopicPartition("topic01",0), 1);
// 遍歷消息隊列
while(true) {
// 每一秒抓取一次
ConsumeRecords<String,String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
// 從隊列中取到數據
if(!consumerRecords.isEmpty()) {
Iterator<ConsumerRecord<String,String>> recordIterator = consumerRecords.iterator();
while(recordIterator.hasNext()) {
// 獲取一個消費消息
ConsumerRecord<String, String> record = recordIterator.next();
// 該消息所屬topic
String topic = record.topic();
// 該消息所屬分區
int partition = record.partition();
// 該消息在隊列中的偏移量
long offset = record.offset();
// 該消息的key
String key = record.key();
// 該消息的value
String value = record.value();
// 該消息的時間戳
long timestamp = record.timestamp();
// 列印
System.out.println(topic);
System.out.println(partition);
System.out.println(offset);
System.out.println(key);
System.out.println(value);
System.out.println(timestamp);
}
}
}
1.4 自定義分區
當我們生產者指定key,我們的負載時根據key的hash來分配,當我們不指定key直接發送消息,我們通過輪詢來保證負載均衡
如果指定了key,我們要自定義分區策略,不採用默認的key的hash。那麼:
1、指定分區
for(int i=0; i<10; i++) {
// "topic01", 1, "key"+i, "value"+i,其中第二個參數表示,我們都只發送到1分區,不在使用默認的輪詢
ProducerRecord<String, String> record = new ProducerRecord<String, String>("topic01", 1, "key"+i, "value"+i);
producer.send(record);
}
2、重寫分區策略
Kafka的默認分區名稱為DefaultPartitioner,策略實現了Partitioner。
看DefaultPartitioner的源碼,可以看到,如果key為空則輪詢,不為空則用kay的hash
public class UserDefinePartitioner implements Partitioner {
// 重寫策略,返回分區號
// keyBytes:key的位元組數組
// valueBytes:value的位元組數組
// cluster : 集群資訊
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if(keyBytes == null) {
// 參考默認的分區策略,也用輪詢 TODO
return 0;
} else {
// 我們自己定義我們想要的分區策略,大參考DefaultPartitioner
// TODO
return 0;
}
return 0;
}
@Override
public void close() {
Systen.out.println("close");
}
@Override
public void configure(Map<String, ?> configs) {
Systen.out.println("configure");
}
}
1.5 序列化
在我們簡單的生產者中,需要制定序列化配置。
// 配置Key的序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerizlizer.class.getName());
// 配置值的序列化
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerizlizer.class.getName());
默認的StringSerizlizer 序列化,實現了Serizlizer