springboot+kafka+邮件发送(最佳实践)

  • 2019 年 10 月 3 日
  • 笔记
  1. ??
    1. ??spring-kafka????????message????????
    2. ?????????????
    3. ????????
    4. ???????????????
  2. ????
    1. ????
   <properties>          <java.version>1.8</java.version>          <mysql.version>5.1.38</mysql.version>          <mapper.version>2.1.5</mapper.version>          <mybatis.version>1.3.2</mybatis.version>          <gson.version>2.8.2</gson.version>          <lang3.version>3.4</lang3.version>      </properties>      <dependencies>          <dependency>              <groupId>org.springframework.boot</groupId>              <artifactId>spring-boot-starter</artifactId>          </dependency>          <!-- Spring Test -->          <dependency>              <groupId>org.springframework.boot</groupId>              <artifactId>spring-boot-starter-test</artifactId>              <scope>test</scope>          </dependency>          <dependency>              <groupId>org.projectlombok</groupId>              <artifactId>lombok</artifactId>              <optional>true</optional>          </dependency>          <dependency>              <groupId>org.springframework.boot</groupId>              <artifactId>spring-boot-starter-jdbc</artifactId>          </dependency>          <dependency>              <groupId>org.mybatis.spring.boot</groupId>              <artifactId>mybatis-spring-boot-starter</artifactId>              <version>${mybatis.version}</version>          </dependency>          <!--?????-->          <dependency>              <groupId>mysql</groupId>              <artifactId>mysql-connector-java</artifactId>              <version>${mysql.version}</version>          </dependency>          <!-- ??Mapper??? -->          <dependency>              <groupId>tk.mybatis</groupId>              <artifactId>mapper-spring-boot-starter</artifactId>              <version>${mapper.version}</version>          </dependency>          <dependency>              <groupId>org.springframework.kafka</groupId>              <artifactId>spring-kafka</artifactId>          </dependency>          <!-- ????????? -->          <dependency>              <groupId>org.springframework.boot</groupId>              <artifactId>spring-boot-configuration-processor</artifactId>              <optional>true</optional>          </dependency>          <!-- ??SLF4J + Logback ?????? -->          <dependency>              <groupId>org.springframework.boot</groupId>              <artifactId>spring-boot-starter-logging</artifactId>          </dependency>          <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-mail-->          <dependency>              <groupId>org.springframework.boot</groupId>              <artifactId>spring-boot-starter-mail</artifactId>          </dependency>          <dependency>              <groupId>com.google.code.gson</groupId>              <artifactId>gson</artifactId>              <version>${gson.version}</version>          </dependency>          <dependency>              <groupId>org.apache.commons</groupId>              <artifactId>commons-lang3</artifactId>              <version>${lang3.version}</version>          </dependency>      </dependencies>

2.application.yml??kafka?????????

?????yml?????????properties????https://www.toyaml.com/index.html?????????

?????????163???qq?????????????????????

??????Hikari??????java??????

# ??Kafka??IP??,??IP?????:
spring:
kafka:
bootstrap-servers: ??kafkaIP:???
producer:
retries: 2 #????????????
key-serializer: org.apache.kafka.common.serialization.StringSerializer #key?????
value-serializer: org.apache.kafka.common.serialization.StringSerializer #value?????
compression-type: gzip #????
batch-size: 16384 #?????????
buffer-memory: 33554432 #32M???????
consumer:
auto-offset-reset: earliest #???????offset
enable-auto-commit: false #????????
#auto-commit-interval: 1000 #?????????
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #key????
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #value????
group-id: kafka.consumer.group.id.1
max-poll-records: 50
properties:
session-timeout-ms: 20000 #??????
max-poll-interval-ms: 15000 #???????poll????,????????????????????poll()?????????????????????????????????max.poll.records???
max-partition-fetch-bytes: 15728640 #????????? 15M
client-id: kafkacli
listener:
ack-mode: manual_immediate

datasource:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.jdbc.Driver
url: * #???
username: * #??
password: * #??
hikari:
minimum-idle: 5
# ?????????????600000?10???
idle-timeout: 180000
# ????????????10
maximum-pool-size: 10
# ?????????????????????,????true
auto-commit: true
# ?????
pool-name: MyHikariCP
# ??????????????????0???????????1800000?30??
max-lifetime: 1800000
# ?????????,??30???30000
connection-timeout: 30000
connection-test-query: SELECT 1

# ?????????163????
mail:
host: smtp.163.com #???????
port: 25 #??
username: * #???
password: * #????
default-encoding: UTF-8
properties:
from: * #???
mail:
smtp:
connectiontimeout: 5000
timeout: 3000
writetimeout: 5000

# ????
thymeleaf:
cache: false
prefix: classpath:/views/
# ????
servlet:
multipart:
max-file-size: 10MB #????????
max-request-size: 50MB #??????


logging:
level:
com.example: debug
pattern:
# console: %d{yyyy/MM/dd-HH:mm:ss} [%thread] %-5level %logger- %msg%n
# file: %d{yyyy/MM/dd-HH:mm} [%thread] %-5level %logger- %msg%n
path: C:log

# ????????
com:
example:
mail:
sendNumber: 3 #??????????
threadKillTime: 60 #??????

mybatis:
type-aliases-package: com.example.mail.entity
configuration:
map-underscore-to-camel-case: true
mapper-locations: mappers/*Mapper.xml


# ??????,???????
async:
executor:
thread:
core_pool_size: 15 #?????????????????????
max_pool_size: 15 #???????????????????????????????
queue_capacity: 99999 #????????????????
keep_alive_seconds: 60 #???????????????????????????
await_termination_seconds: 30 #?????????????????????????????????????????????????????
name:
prefix: async-service-
prefixson: async-service-son

3.??????

{    "mailUid": "??????",    "fromName": "?????",    "fromMail": "?????",    "toMail": "??????????????","???",    "ccMail": "??????????????","???",    "bccMail": "??????????????","???",    "planSendTime": "??????",    "mailSubject": "????",    "mailContent": "????",    "sendNum": ????,    "serverFlag": "???????(???????)"  }

4.kafka????????mail??????????

//???
public void sendToKafkaStandardMessageAsync(MailDTO mailDTO) {

producer = new KafkaProducer<String, Object>(kafkaConfig.producerConfigs());

producer.send(new ProducerRecord<String, Object>(topicName, gson.toJson(mailDTO)), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
log.info("??????{},????{},checksum?{},offset:{},partition:{},topic:{}", mailDTO.getMailUid(),mailDTO.getSendNum(),metadata.checksum(), metadata.offset(), metadata.partition(), metadata.topic());
}
if (exception != null) {
log.info("??????{}", exception.getMessage());
}
}
});
producer.close();
}
//???
/**
* ????Kafka ??
**/
@KafkaListener(topics = MQConstants.Topic.ITEM_EXCHANGE_NAME)
public void receiveMessageFromKafka(ConsumerRecord<?, ?> record, Acknowledgment ack) {
log.info("????,MailUid:{}", gson.fromJson(String.valueOf(record.value()), MailDTO.class).getMailUid());

Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
sendMessageService.sendMessages(gson.fromJson(String.valueOf(record.value()), MailDTO.class));
}
ack.acknowledge();//???????
}

//?????????      public void sendMimeMail(MailVo mailVo) {          try {              MimeMessageHelper messageHelper = new MimeMessageHelper(mailSender.createMimeMessage(), true);//true????????              mailVo.setFrom("???????????from??");//???????????              messageHelper.setFrom(mailVo.getFrom());//?????              messageHelper.setSentDate(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2019-07-18 12:45:48"));              messageHelper.setTo(mailVo.getTo().split(","));//?????              messageHelper.setSubject(mailVo.getSubject());//????              messageHelper.setText(mailVo.getText());//????              if (!StringUtils.isEmpty(mailVo.getCc())) {//??                  messageHelper.setCc(mailVo.getCc().split(","));              }              if (!StringUtils.isEmpty(mailVo.getBcc())) {//??                  messageHelper.setCc(mailVo.getBcc().split(","));              }              if (mailVo.getMultipartFiles() != null) {//??????                  for (MultipartFile multipartFile : mailVo.getMultipartFiles()) {                      messageHelper.addAttachment(multipartFile.getOriginalFilename(), multipartFile);                  }              }              if (StringUtils.isEmpty((CharSequence) mailVo.getSentDate())) {//????                  mailVo.setSentDate(new Date());                  messageHelper.setSentDate(mailVo.getSentDate());              }              mailSender.send(messageHelper.getMimeMessage());//??????              mailVo.setStatus("ok");              log.info("???????{}->{}", mailVo.getFrom(), mailVo.getTo());          } catch (Exception e) {              throw new RuntimeException(e);//????          }      }

 5.??????????????

 

????? https://github.com/wwt729/mail.git