Thrift不同服务类型的使用探索

  • 2019 年 10 月 5 日
  • 筆記

Thrift是一个软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 等等编程语言间无缝结合的、高效的服务。

Thrift最初由facebook开发,07年四月开放源码,08年5月进入apache孵化器。thrift允许你定义一个简单的定义文件中的数据类型和服务接口。以作为输入文件,编译器生成代码用来方便地生成RPC客户端和服务器通信的无缝跨编程语言。

本篇博文编写的目的是对Thrfit不同的服务类型进行整理,并结合代码示例进行说明。

一、目标

本篇博文编写的目的是对Thrfit不同的服务类型进行整理,并结合代码示例进行说明。博文主要包含如下几个部分:

1. 实例代码准备 2. 对不同的服务类型进行介绍说明,并给出示例 3. 异步客户端调用实例 4. Nifty库的使用,包含服务端和客户端代码示例

二、实例

2.1 说明

在这个示例中,我们主要在用户接口中定义三个接口:保存用户,根据name获取用户列表以及删除用户

如:

/**       * 保存用户       *       * @param user       */      public boolean save(com.xxx.tutorial.thrift.entity.User user) throws org.apache.thrift.TException;        /**       * 根据name获取用户列表       *       * @param name       */      public java.util.List<com.xxx.tutorial.thrift.entity.User> findUsersByName(java.lang.String name) throws org.apache.thrift.TException;        /**       * 删除用户       *       * @param userId       */      public void deleteByUserId(int userId) throws com.xxx.tutorial.thrift.exception.UserNotFoundException, org.apache.thrift.TException;

2.2 编写thrift文件

  • user.thrift用于定义用户类
namespace java com.xxx.tutorial.thrift.entity    /**   * 用户类   */  struct  User {    1:i32 userId,    2:string name  }
  • exception.thrift用于自定义异常
include "user.thrift"  include "exception.thrift"    namespace java com.xxx.tutorial.thrift.service    /**   * 用户服务   */  service  UserService {      /**保存用户*/    bool save(1:user.User user),      /**根据name获取用户列表*/    list<user.User> findUsersByName(1:string name),      /**删除用户*/    void deleteByUserId(1:i32 userId) throws (1: exception.UserNotFoundException e)  }

2.3 产生代码

根据thrift文件生成Java代码,这里就不再描述,请参考以前的文章【一步步完成thrift rpc示例】。

2.4 服务接口代码

将生成的Java代码放入thrift-demo-interface模块。如:

2.6 服务实现代码

thrift-demo-service模块增加UserService的实现类。

UserServiceImpl.java的内容如下:

package com.xxx.tutorial.thrift.service.impl;    import java.util.Arrays;  import java.util.List;  import java.util.logging.Logger;    import org.apache.thrift.TException;    import com.xxx.tutorial.thrift.entity.User;  import com.xxx.tutorial.thrift.exception.UserNotFoundException;  import com.xxx.tutorial.thrift.service.UserService;    /**   * @author wangmengjun   *   */  public class UserServiceImpl implements UserService.Iface {      private static final Logger logger = Logger.getLogger(UserServiceImpl.class.getName());      public boolean save(User user) throws TException {      logger.info("方法save的参数user的内容==>" + user.toString());      return true;    }      public List<User> findUsersByName(String name) throws TException {      logger.info("方法findUsersByName的参数name的内容==>" + name);      return Arrays.asList(new User(1, "Wang"), new User(2, "Mengjun"));    }      public void deleteByUserId(int userId) throws UserNotFoundException, TException {      /**       * 直接模拟抛出异常,用于测试       */      logger.info("方法deleteByUserId的参数userId的内容==>" + userId);      throw new UserNotFoundException("1001", String.format("userId=%d的用户不存在", userId));    }  }

经过上述几个操作,用户服务接口以及实现类的编写就完毕了。

接下来,就能够创建Thrift服务~

三、Thrift不同服务端类型

3.1 服务端类型

查看Thrift的TServer层次结构,我们可以看出,Thrift的服务端类型有如下几种。服务端类型的描述如下:

  • TSimpleServer —— 单线程服务器端使用标准的阻塞式 I/O
/**   * Simple singlethreaded server for testing.   *   */  public class TSimpleServer extends TServer {  ... ...  }
  • TThreadPoolServer —— 多线程服务器端使用标准的阻塞式 I/O
/**   * Server which uses Java's built in ThreadPool management to spawn off   * a worker pool that   *   */  public class TThreadPoolServer extends TServer {    ... ...    }
  • TNonblockingServer —— 多线程服务器端使用非阻塞式 I/O
  /**   * A nonblocking TServer implementation. This allows for fairness amongst all   * connected clients in terms of invocations.   *   * This server is inherently single-threaded. If you want a limited thread pool   * coupled with invocation-fairness, see THsHaServer.   *   * To use this server, you MUST use a TFramedTransport at the outermost   * transport, otherwise this server will be unable to determine when a whole   * method call has been read off the wire. Clients must also use TFramedTransport.   */  public class TNonblockingServer extends AbstractNonblockingServer {    ... ...    }
  • THsHaSercver —— 是TNonblockingServer的扩展, 多线程服务器端使用非阻塞式 I/O
/**   * An extension of the TNonblockingServer to a Half-Sync/Half-Async server.   * Like TNonblockingServer, it relies on the use of TFramedTransport.   */  public class THsHaServer extends TNonblockingServer {    ... ...  }
  • TThreadedSelectorServer —— 多线程服务器端使用非阻塞式 I/O
TThreadedSelectorServer是对以上NonblockingServer的扩充, 其分离了Accept和Read/Write的Selector线程,   同时引入Worker工作线程池. 它也是种Half-sync/Half-async的服务模型。
/**   * A Half-Sync/Half-Async server with a separate pool of threads to handle   * non-blocking I/O. Accepts are handled on a single thread, and a configurable   * number of nonblocking selector threads manage reading and writing of client   * connections. A synchronous worker thread pool handles processing of requests.   *   * Performs better than TNonblockingServer/THsHaServer in multi-core   * environments when the the bottleneck is CPU on the single selector thread   * handling I/O. In addition, because the accept handling is decoupled from   * reads/writes and invocation, the server has better ability to handle back-   * pressure from new connections (e.g. stop accepting when busy).   *   * Like TNonblockingServer, it relies on the use of TFramedTransport.   */  public class TThreadedSelectorServer extends AbstractNonblockingServer {  ... ...  }

另外,Facebook还开源了Nifty — 一种基于netty的thrift服务端和客户端实现。Nifty是facebook公司开源的,基于netty的thrift服务端和客户端实现。然后使用此包就可以快速发布出基于netty的高效的服务端和客户端代码。【https://github.com/facebook/nifty】

3.2 服务端创建步骤

(1) 创建一个transport对象 (2) 为transport对象创建输入输出protocol (3) 基于输入输出protocol创建processor (4) 等待连接请求并将之交给processor处理

如:

    try {          /**         * 1. 创建Transport         */        TServerSocket serverTransport = new TServerSocket(SERVER_PORT);        TServer.Args tArgs = new TServer.Args(serverTransport);          /**         * 2. 为Transport创建Protocol         */        tArgs.protocolFactory(new TBinaryProtocol.Factory());        // tArgs.protocolFactory(new TCompactProtocol.Factory());        // tArgs.protocolFactory(new TJSONProtocol.Factory());          /**         * 3. 为Protocol创建Processor         */        TProcessor tprocessor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());        tArgs.processor(tprocessor);            /**         * 4. 创建Server并启动         *         * org.apache.thrift.server.TSimpleServer - 简单的单线程服务模型,一般用于测试         */        TServer server = new TSimpleServer(tArgs);        logger.info("UserService TSimpleServer start ....");        server.serve();          } catch (Exception e) {        logger.severe("Server start error!!!" + e.getLocalizedMessage());        e.printStackTrace();      }

接下来,我们就一起来完成不同服务端类型的代码示例以及客户端调用实例~

四、TSimpleServer服务类型

4.1 服务端

package com.xxx.tutorial.thrift.server;    import java.util.logging.Logger;    import org.apache.thrift.TProcessor;  import org.apache.thrift.protocol.TBinaryProtocol;  import org.apache.thrift.server.TServer;  import org.apache.thrift.server.TSimpleServer;  import org.apache.thrift.transport.TServerSocket;    import com.xxx.tutorial.thrift.service.UserService;  import com.xxx.tutorial.thrift.service.impl.UserServiceImpl;    /**   * @author wangmengjun   *   */  public class TSimpleServerExample {      private static final Logger logger = Logger.getLogger(TSimpleServerExample.class.getName());      private static final int SERVER_PORT = 9123;      public static void main(String[] args) {        try {          /**         * 1. 创建Transport         */        TServerSocket serverTransport = new TServerSocket(SERVER_PORT);        TServer.Args tArgs = new TServer.Args(serverTransport);          /**         * 2. 为Transport创建Protocol         */        tArgs.protocolFactory(new TBinaryProtocol.Factory());        // tArgs.protocolFactory(new TCompactProtocol.Factory());        // tArgs.protocolFactory(new TJSONProtocol.Factory());          /**         * 3. 为Protocol创建Processor         */        TProcessor tprocessor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());        tArgs.processor(tprocessor);            /**         * 4. 创建Server并启动         *         * org.apache.thrift.server.TSimpleServer - 简单的单线程服务模型,一般用于测试         */        TServer server = new TSimpleServer(tArgs);        logger.info("UserService TSimpleServer start ....");        server.serve();          } catch (Exception e) {        logger.severe("Server start error!!!" + e.getLocalizedMessage());        e.printStackTrace();      }    }  }

4.2 客户端

package com.xxx.tutorial.thrift.client;    import java.util.List;  import java.util.logging.Logger;    import org.apache.thrift.TException;  import org.apache.thrift.protocol.TBinaryProtocol;  import org.apache.thrift.protocol.TProtocol;  import org.apache.thrift.transport.TSocket;  import org.apache.thrift.transport.TTransport;  import org.apache.thrift.transport.TTransportException;    import com.xxx.tutorial.thrift.entity.User;  import com.xxx.tutorial.thrift.exception.UserNotFoundException;  import com.xxx.tutorial.thrift.service.UserService;    /**   *   * @author wangmengjun   *   */  public class UserClient {      private static final Logger logger = Logger.getLogger(UserClient.class.getName());      public static void main(String[] args) {        try {          TTransport transport = new TSocket("127.0.0.1", 9123);        TProtocol protocol = new TBinaryProtocol(transport);          UserService.Client client = new UserService.Client(protocol);        transport.open();          /**         * 查询User列表         */        List<User> users = client.findUsersByName("wang");        logger.info("client.findUsersByName()方法結果 == >" + users);          /**         * 保存User         */        boolean isUserSaved = client.save(new User(101, "WMJ"));        logger.info("user saved result == > " + isUserSaved);          /**         * 删除用户         */        client.deleteByUserId(1002);          transport.close();        } catch (TTransportException e) {        logger.severe("TTransportException==>" + e.getLocalizedMessage());      } catch (UserNotFoundException e) {        logger.severe("UserNotFoundException==>" + e.getMessage());      } catch (TException e) {        logger.severe("TException==>" + e.getLocalizedMessage());      }    }  }

4.3 测试

  • 服务端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  SLF4J: Defaulting to no-operation (NOP) logger implementation  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  六月 09, 2017 8:29:14 下午 com.xxx.tutorial.thrift.server.TSimpleServerExample main  信息: UserService TSimpleServer start ....  六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl findUsersByName  信息: 方法findUsersByName的参数name的内容==>wang  六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl save  信息: 方法save的参数user的内容==>User(userId:101, name:WMJ)  六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl deleteByUserId  信息: 方法deleteByUserId的参数userId的内容==>1002
  • 客户端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  SLF4J: Defaulting to no-operation (NOP) logger implementation  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  Received 1  六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.client.UserClient main  信息: client.findUsersByName()方法結果 == >[User(userId:1, name:Wang), User(userId:2, name:Mengjun)]  Received 2  六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.client.UserClient main  信息: user saved result == > true  Received 3  六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.client.UserClient main  严重: UserNotFoundException==>userId=1002的用户不存在

五、TThreadPoolServer 服务类型

5.1 服务端

package com.xxx.tutorial.thrift.server;    import java.util.logging.Logger;    import org.apache.thrift.TProcessor;  import org.apache.thrift.protocol.TBinaryProtocol;  import org.apache.thrift.server.TServer;  import org.apache.thrift.server.TThreadPoolServer;  import org.apache.thrift.transport.TServerSocket;    import com.xxx.tutorial.thrift.service.UserService;  import com.xxx.tutorial.thrift.service.impl.UserServiceImpl;    /**   *   * @author wangmengjun   *   */  public class TThreadPoolServerExample {      private static final Logger logger = Logger.getLogger(TThreadPoolServerExample.class.getName());      private static final int SERVER_PORT = 9123;      public static void main(String[] args) {        try {          /**         * 1. 创建Transport         */        TServerSocket serverTransport = new TServerSocket(SERVER_PORT);        TThreadPoolServer.Args tArgs = new TThreadPoolServer.Args(serverTransport);          /**         * 2. 为Transport创建Protocol         */        tArgs.protocolFactory(new TBinaryProtocol.Factory());        // tArgs.protocolFactory(new TCompactProtocol.Factory());        // tArgs.protocolFactory(new TJSONProtocol.Factory());          /**         * 3. 为Protocol创建Processor         */        TProcessor tprocessor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());        tArgs.processor(tprocessor);            /**         * 4. 创建Server并启动         *         * org.apache.thrift.server.TThreadPoolServer         */        TServer server = new TThreadPoolServer(tArgs);        logger.info("UserService TThreadPoolServer start ....");        server.serve();          } catch (Exception e) {        logger.severe("Server start error!!!" + e.getLocalizedMessage());        e.printStackTrace();      }    }  }

5.2 客户端

客户端的代码可以和TSimpleServer中使用的Client代码一致,

package com.xxx.tutorial.thrift.client;    import java.util.List;  import java.util.logging.Logger;    import org.apache.thrift.TException;  import org.apache.thrift.protocol.TBinaryProtocol;  import org.apache.thrift.protocol.TProtocol;  import org.apache.thrift.transport.TSocket;  import org.apache.thrift.transport.TTransport;  import org.apache.thrift.transport.TTransportException;    import com.xxx.tutorial.thrift.entity.User;  import com.xxx.tutorial.thrift.exception.UserNotFoundException;  import com.xxx.tutorial.thrift.service.UserService;    /**   *   * @author wangmengjun   *   */  public class UserClient {      private static final Logger logger = Logger.getLogger(UserClient.class.getName());      public static void main(String[] args) {        try {          TTransport transport = new TSocket("127.0.0.1", 9123);        TProtocol protocol = new TBinaryProtocol(transport);          UserService.Client client = new UserService.Client(protocol);        transport.open();          /**         * 查询User列表         */        List<User> users = client.findUsersByName("wang");        logger.info("client.findUsersByName()方法結果 == >" + users);          /**         * 保存User         */        boolean isUserSaved = client.save(new User(101, "WMJ"));        logger.info("user saved result == > " + isUserSaved);          /**         * 删除用户         */        client.deleteByUserId(1002);          transport.close();        } catch (TTransportException e) {        logger.severe("TTransportException==>" + e.getLocalizedMessage());      } catch (UserNotFoundException e) {        logger.severe("UserNotFoundException==>" + e.getMessage());      } catch (TException e) {        logger.severe("TException==>" + e.getLocalizedMessage());      }    }  }

5.3 测试

  • 服务端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  SLF4J: Defaulting to no-operation (NOP) logger implementation  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  六月 09, 2017 8:31:44 下午 com.xxx.tutorial.thrift.server.TThreadPoolServerExample main  信息: UserService TThreadPoolServer start ....  六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl findUsersByName  信息: 方法findUsersByName的参数name的内容==>wang  六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl save  信息: 方法save的参数user的内容==>User(userId:101, name:WMJ)  六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl deleteByUserId  信息: 方法deleteByUserId的参数userId的内容==>1002
  • 客户端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  SLF4J: Defaulting to no-operation (NOP) logger implementation  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  Received 1  六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.client.UserClient main  信息: client.findUsersByName()方法結果 == >[User(userId:1, name:Wang), User(userId:2, name:Mengjun)]  Received 2  六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.client.UserClient main  信息: user saved result == > true  Received 3  六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.client.UserClient main  严重: UserNotFoundException==>userId=1002的用户不存在

六、TNonblockingServer 服务类型

6.1 服务端

package com.xxx.tutorial.thrift.server;    import java.util.logging.Logger;    import org.apache.thrift.TProcessor;  import org.apache.thrift.protocol.TBinaryProtocol;  import org.apache.thrift.server.TNonblockingServer;  import org.apache.thrift.server.TServer;  import org.apache.thrift.transport.TFramedTransport;  import org.apache.thrift.transport.TNonblockingServerSocket;  import org.apache.thrift.transport.TNonblockingServerTransport;    import com.xxx.tutorial.thrift.service.UserService;  import com.xxx.tutorial.thrift.service.impl.UserServiceImpl;    /**   *   * @author wangmengjun   *   */  public class TNonblockingServerExample {      private static final Logger logger = Logger.getLogger(TNonblockingServerExample.class.getName());      private static final int SERVER_PORT = 9123;      public static void main(String[] args) {        try {          /**         * 1. 创建Transport         */        TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(SERVER_PORT);        TNonblockingServer.Args tArgs = new TNonblockingServer.Args(serverTransport);          /**         * 2. 为Transport创建Protocol         */        tArgs.transportFactory(new TFramedTransport.Factory());        tArgs.protocolFactory(new TBinaryProtocol.Factory());        // tArgs.protocolFactory(new TCompactProtocol.Factory());        // tArgs.protocolFactory(new TJSONProtocol.Factory());          /**         * 3. 为Protocol创建Processor         */        TProcessor tprocessor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());        tArgs.processor(tprocessor);          /**         * 4. 创建Server并启动         */        TServer server = new TNonblockingServer(tArgs);        logger.info("UserService TNonblockingServer start ....");        server.serve();        } catch (Exception e) {        logger.severe("Server start error!!!" + e.getLocalizedMessage());        e.printStackTrace();      }    }  }

6.2 客户端

package com.xxx.tutorial.thrift.client;    import java.util.List;  import java.util.logging.Logger;    import org.apache.thrift.TException;  import org.apache.thrift.protocol.TBinaryProtocol;  import org.apache.thrift.protocol.TProtocol;  import org.apache.thrift.transport.TFramedTransport;  import org.apache.thrift.transport.TSocket;  import org.apache.thrift.transport.TTransport;  import org.apache.thrift.transport.TTransportException;    import com.xxx.tutorial.thrift.entity.User;  import com.xxx.tutorial.thrift.exception.UserNotFoundException;  import com.xxx.tutorial.thrift.service.UserService;    public class UserClient2 {      private static final Logger logger = Logger.getLogger(UserClient.class.getName());      public static void main(String[] args) {        try {          TTransport transport = new TFramedTransport(new TSocket("127.0.0.1", 9123, 3000));        TProtocol protocol = new TBinaryProtocol(transport);          UserService.Client client = new UserService.Client(protocol);        transport.open();          /**         * 查询User列表         */        List<User> users = client.findUsersByName("wang");        logger.info("client.findUsersByName()方法結果 == >" + users);          /**         * 保存User         */        boolean isUserSaved = client.save(new User(101, "WMJ"));        logger.info("user saved result == > " + isUserSaved);          /**         * 删除用户         */        client.deleteByUserId(1002);          transport.close();        } catch (TTransportException e) {        logger.severe("TTransportException==>" + e.getLocalizedMessage());      } catch (UserNotFoundException e) {        logger.severe("UserNotFoundException==>" + e.getLocalizedMessage());      } catch (TException e) {        logger.severe("TException==>" + e.getLocalizedMessage());      }    }  }

6.3 测试

  • 服务端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  SLF4J: Defaulting to no-operation (NOP) logger implementation  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  六月 09, 2017 8:34:38 下午 com.xxx.tutorial.thrift.server.TNonblockingServerExample main  信息: UserService TNonblockingServer start ....  六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl findUsersByName  信息: 方法findUsersByName的参数name的内容==>wang  六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl save  信息: 方法save的参数user的内容==>User(userId:101, name:WMJ)  六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl deleteByUserId  信息: 方法deleteByUserId的参数userId的内容==>1002
  • 客户端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  SLF4J: Defaulting to no-operation (NOP) logger implementation  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  Received 1  六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.client.UserClient2 main  信息: client.findUsersByName()方法結果 == >[User(userId:1, name:Wang), User(userId:2, name:Mengjun)]  Received 2  六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.client.UserClient2 main  信息: user saved result == > true  Received 3  六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.client.UserClient2 main  严重: UserNotFoundException==>userId=1002的用户不存在

七、THsHaServer服务类型

7.1 服务端

package com.xxx.tutorial.thrift.server;    import java.util.logging.Logger;    import org.apache.thrift.TProcessor;  import org.apache.thrift.protocol.TBinaryProtocol;  import org.apache.thrift.server.THsHaServer;  import org.apache.thrift.server.TServer;  import org.apache.thrift.transport.TFramedTransport;  import org.apache.thrift.transport.TNonblockingServerSocket;    import com.xxx.tutorial.thrift.service.UserService;  import com.xxx.tutorial.thrift.service.impl.UserServiceImpl;    /**   * @author wangmengjun   *   */  public class THsHaServerExample {      private static final Logger logger = Logger.getLogger(THsHaServerExample.class.getName());      private static final int SERVER_PORT = 9123;      public static void main(String[] args) {        try {          /**         * 1. 创建Transport         */        //TServerSocket serverTransport = new TServerSocket(SERVER_PORT);        TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(SERVER_PORT);        THsHaServer.Args tArgs = new THsHaServer.Args(serverTransport);          /**         * 2. 为Transport创建Protocol         */        tArgs.transportFactory(new TFramedTransport.Factory());        tArgs.protocolFactory(new TBinaryProtocol.Factory());        // tArgs.protocolFactory(new TCompactProtocol.Factory());        // tArgs.protocolFactory(new TJSONProtocol.Factory());          /**         * 3. 为Protocol创建Processor         */        TProcessor tprocessor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());        tArgs.processor(tprocessor);            /**         * 4. 创建Server并启动         *         */        //半同步半异步的服务模型        TServer server = new THsHaServer(tArgs);        logger.info("UserService TSimpleServer start ....");        server.serve();          } catch (Exception e) {        logger.severe("Server start error!!!" + e.getLocalizedMessage());        e.printStackTrace();      }    }  }

7.2 客户端

package com.xxx.tutorial.thrift.client;    import java.util.List;  import java.util.logging.Logger;    import org.apache.thrift.TException;  import org.apache.thrift.protocol.TBinaryProtocol;  import org.apache.thrift.protocol.TProtocol;  import org.apache.thrift.transport.TFramedTransport;  import org.apache.thrift.transport.TSocket;  import org.apache.thrift.transport.TTransport;  import org.apache.thrift.transport.TTransportException;    import com.xxx.tutorial.thrift.entity.User;  import com.xxx.tutorial.thrift.exception.UserNotFoundException;  import com.xxx.tutorial.thrift.service.UserService;    public class UserClient2 {      private static final Logger logger = Logger.getLogger(UserClient.class.getName());      public static void main(String[] args) {        try {          TTransport transport = new TFramedTransport(new TSocket("127.0.0.1", 9123, 3000));        TProtocol protocol = new TBinaryProtocol(transport);          UserService.Client client = new UserService.Client(protocol);        transport.open();          /**         * 查询User列表         */        List<User> users = client.findUsersByName("wang");        logger.info("client.findUsersByName()方法結果 == >" + users);          /**         * 保存User         */        boolean isUserSaved = client.save(new User(101, "WMJ"));        logger.info("user saved result == > " + isUserSaved);          /**         * 删除用户         */        client.deleteByUserId(1002);          transport.close();        } catch (TTransportException e) {        logger.severe("TTransportException==>" + e.getLocalizedMessage());      } catch (UserNotFoundException e) {        logger.severe("UserNotFoundException==>" + e.getLocalizedMessage());      } catch (TException e) {        logger.severe("TException==>" + e.getLocalizedMessage());      }    }  }

7.3 测试

  • 服务端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  SLF4J: Defaulting to no-operation (NOP) logger implementation  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  六月 09, 2017 8:57:26 下午 com.xxx.tutorial.thrift.server.THsHaServerExample main  信息: UserService TSimpleServer start ....  六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl findUsersByName  信息: 方法findUsersByName的参数name的内容==>wang  六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl save  信息: 方法save的参数user的内容==>User(userId:101, name:WMJ)  六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl deleteByUserId  信息: 方法deleteByUserId的参数userId的内容==>1002
  • 客户端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  SLF4J: Defaulting to no-operation (NOP) logger implementation  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  Received 1  六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.client.UserClient2 main  信息: client.findUsersByName()方法結果 == >[User(userId:1, name:Wang), User(userId:2, name:Mengjun)]  Received 2  六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.client.UserClient2 main  信息: user saved result == > true  Received 3  六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.client.UserClient2 main  严重: UserNotFoundException==>userId=1002的用户不存在

八、Nifty服务类型Facebook还开源了Nifty — 一种基于netty的thrift服务端和客户端实现。

Nifty是facebook公司开源的,基于netty的thrift服务端和客户端实现。

使用Nifty,我们只要只要导入Nifty的jar包即可。

    <dependency>        <groupId>com.facebook.nifty</groupId>        <artifactId>nifty-core</artifactId>        <version>0.10.0</version>      </dependency>

8.1 服务端

package com.xxx.tutorial.thrift.server;    import org.apache.thrift.TProcessor;    import com.facebook.nifty.core.NettyServerTransport;  import com.facebook.nifty.core.ThriftServerDef;  import com.facebook.nifty.core.ThriftServerDefBuilder;  import com.xxx.tutorial.thrift.service.UserService;  import com.xxx.tutorial.thrift.service.impl.UserServiceImpl;    public class NiftyServer {      public static void main(String[] args) {        // Create the handler      UserService.Iface userServiceImpl = new UserServiceImpl();        // Create the processor      TProcessor processor = new UserService.Processor<>(userServiceImpl);        // Build the server definition      ThriftServerDef serverDef = new ThriftServerDefBuilder().withProcessor(processor).listen(9123).build();        // Create the server transport      final NettyServerTransport server = new NettyServerTransport(serverDef);        // Create netty boss and executor thread pools  /*    ExecutorService bossExecutor = Executors.newCachedThreadPool();      ExecutorService workerExecutor = Executors.newCachedThreadPool();        // Start the server      server.start(bossExecutor, workerExecutor);*/      System.out.println("启动~~~");      server.start();        // Arrange to stop the server at shutdown      Runtime.getRuntime().addShutdownHook(new Thread() {        @Override        public void run() {          try {            server.stop();          } catch (InterruptedException e) {            Thread.currentThread().interrupt();          }        }      });    }  }

8.2 客户端

如下示例给出NiftyClient来完成调用,使用NiftyClient需要导入相关jar包。

    <dependency>        <groupId>com.facebook.nifty</groupId>        <artifactId>nifty-client</artifactId>        <version>0.10.0</version>      </dependency>
package com.xxx.tutorial.thrift.client;    import java.net.InetSocketAddress;  import java.util.List;  import java.util.logging.Logger;    import org.apache.thrift.TException;  import org.apache.thrift.protocol.TBinaryProtocol;  import org.apache.thrift.transport.TTransport;  import org.apache.thrift.transport.TTransportException;    import com.facebook.nifty.client.NiftyClient;  import com.xxx.tutorial.thrift.entity.User;  import com.xxx.tutorial.thrift.service.UserService;    public class ThriftNiftyClient {      private static final Logger logger = Logger.getLogger(ThriftNiftyClient.class.getName());      @SuppressWarnings({ "resource" })    public static void main(String[] args) {        NiftyClient niftyClient = new NiftyClient();      InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9123);      try {        TTransport transport = niftyClient.connectSync(address);        TBinaryProtocol tp = new TBinaryProtocol(transport);        UserService.Client userService = new UserService.Client(tp);        logger.info("userService.findUsersByName 方法调用~");        List<User> users = userService.findUsersByName("wang");        logger.info("userService.findUsersByName 调用结果~");        logger.info("users ==> " + users);        } catch (TTransportException e) {        logger.severe("TTransportException ==> " + e.getMessage());      } catch (InterruptedException e) {        logger.severe("InterruptedException ==> " + e.getMessage());      } catch (TException e) {        logger.severe("TException ==> " + e.getMessage());      }    }  }

8.3 测试

  • 服务端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  SLF4J: Defaulting to no-operation (NOP) logger implementation  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  启动~~~  六月 09, 2017 9:01:44 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl findUsersByName  信息: 方法findUsersByName的参数name的内容==>wang
  • 客户端控制台输出
六月 09, 2017 9:01:44 下午 com.xxx.tutorial.thrift.client.ThriftNiftyClient main  信息: userService.findUsersByName 方法调用~  Received 1  六月 09, 2017 9:01:44 下午 com.xxx.tutorial.thrift.client.ThriftNiftyClient main  信息: userService.findUsersByName 调用结果~  六月 09, 2017 9:01:44 下午 com.xxx.tutorial.thrift.client.ThriftNiftyClient main  信息: users ==> [User(userId:1, name:Wang), User(userId:2, name:Mengjun)]

至此,Thrift多种服务端的编写已经完成,但是上述Client端的编写基本上采用同步来实现~

下面给出一个异步客户端的示例。

九、异步客户端

package com.xxx.tutorial.thrift.client;    import java.util.List;  import java.util.concurrent.CountDownLatch;  import java.util.concurrent.TimeUnit;  import java.util.logging.Logger;    import org.apache.thrift.async.AsyncMethodCallback;  import org.apache.thrift.async.TAsyncClientManager;  import org.apache.thrift.protocol.TBinaryProtocol;  import org.apache.thrift.protocol.TProtocolFactory;  import org.apache.thrift.transport.TNonblockingSocket;  import org.apache.thrift.transport.TNonblockingTransport;    import com.xxx.tutorial.thrift.entity.User;  import com.xxx.tutorial.thrift.service.UserService;    public class UserAsynClient {      private static final Logger logger = Logger.getLogger(UserAsynClient.class.getName());      public static void main(String[] args) {      try {        TAsyncClientManager clientManager = new TAsyncClientManager();        TNonblockingTransport transport = new TNonblockingSocket("127.0.0.1", 9123, 3000);          TProtocolFactory tprotocol = new TBinaryProtocol.Factory();          UserService.AsyncClient asyncClient = new UserService.AsyncClient(tprotocol, clientManager, transport);          System.out.println("Client start .....");          CountDownLatch latch = new CountDownLatch(1);        /*AsynCallback_FindUsersByName callBack = new AsynCallback_FindUsersByName(latch);        asyncClient.findUsersByName("wang", callBack);*/          AsynCallback_SaveUser saveUserCallBack = new AsynCallback_SaveUser(latch);        asyncClient.save(new User(10001,"A name"), saveUserCallBack);          System.out.println("call method findUsersByName_call .... end");          boolean wait = latch.await(30, TimeUnit.SECONDS);          System.out.println("latch.await =:" + wait);        } catch (Exception e) {        e.printStackTrace();      }      System.out.println("startClient end.");    }      public static class AsynCallback_FindUsersByName implements AsyncMethodCallback<List<User>> {        private CountDownLatch latch;        public AsynCallback_FindUsersByName(CountDownLatch latch) {        this.latch = latch;      }        public void onComplete(List<User> response) {          logger.info("onComplete ==> findUsersByName_call");        try {          logger.info("findUsersByName_call response ==> " + response.toString());        } finally {          latch.countDown();        }        }        public void onError(Exception exception) {        logger.severe("onError ==> " + exception.getMessage());        latch.countDown();      }    }      public static class AsynCallback_SaveUser implements AsyncMethodCallback<Boolean> {        private CountDownLatch latch;        public AsynCallback_SaveUser(CountDownLatch latch) {        this.latch = latch;      }        public void onComplete(Boolean response) {        logger.info("onComplete ==> save_call");        try {          logger.info("save_call response ==> " + response.toString());        } finally {          latch.countDown();        }      }        public void onError(Exception exception) {        logger.severe("onError ==> " + exception.getMessage());        latch.countDown();      }      }  }

上述采用AsyncClient 来完成调用,使用AsyncClient 的时候,需要编写一个用于回调的线程类

UserService.AsyncClient asyncClient = new UserService.AsyncClient(tprotocol, clientManager, transport);

使用上述给出的NiftyServer类,启动服务,然后运行UserAsynClient类,就完成一个异步客户端示例。

运行结果~

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  SLF4J: Defaulting to no-operation (NOP) logger implementation  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  Client start .....  call method findUsersByName_call .... end  Received 0  六月 12, 2017 10:52:44 上午 com.xxx.tutorial.thrift.client.UserAsynClient$AsynCallback_SaveUser onComplete  信息: onComplete ==> save_call  六月 12, 2017 10:52:44 上午 com.xxx.tutorial.thrift.client.UserAsynClient$AsynCallback_SaveUser onComplete  信息: save_call response ==> true  latch.await =:true  startClient end.

至此,本篇博文的目标内容就完成了。