It's our wits that make us men.

grpc基础教程

Posted on By eatMelon-Masses

定义服务

我们需要指定路径,用来存放通过proto脚本生成的java代码

option java_package = "io.grpc.examples.routeguide";

如果要定义一个服务我们在.proto使用service关键字

service RouteGuide {
    ...
}

定义rpc方法,定义传输的request 和 reponse对象。

几种服务方法

  • 一个简单的rpc调用,客户端发送一个消息到服务端,服务端收到请求后,包装响应信息,返回结果。
    // Obtains the feature at a given position.
    rpc GetFeature(Point) returns (Feature) {}
    
  • 一个服务端流处理rpc调用,客户端发送一个消息到服务端,服务端收到消息后,得到一个流,向客户端连续发送一个或者多个响应,客户端读取所有服务端的响应后进行逻辑处理。
// Obtains the Features available within the given Rectangle.  Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
  • 一个客户端流处理 rpc调用,客户端通过流发送一连串有序的消息到服务端,如果客户端发送完所有消息后,便等待服务端去读取他们和返回多个响应。当服务端连续读取信息,直到没有更多的消息后,进行逻辑处理,返回响应信息。
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
  • 一个客户端和服务端都是用流传输的rpc调用,如果客户端完成消息写入,它等待服务器读取所有消息,然后返回他们的响应。
    // Accepts a stream of RouteNotes sent while a route is being traversed,
    // while receiving other RouteNotes (e.g. from other users).
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

生成客户端和服务端代码

我们需要生成grpc客户端和服务端接口通过.proto 服务定义。我们使用协议缓冲编译器protoc 和指定的jrpc java plugin。 当使用gradle or maven, the protoc 构建 插件能够生成需要的代码在构建阶段。

以下这些class是自动生成的

  • Feature.java, Point.java, Rectangle.java, 和其他包括所有协议缓冲代码,序列化、请求和响应消息类型。
  • RouteGuideGrpc.java 它包括以下代码
    1. 一个基础的类作为RouteGuide 服务的实现类RouteGuideGrpc.RouteGuideImplBase,它的所有方法都在RouteGuide 接口类定义。
    2. stub classes 部分客户端可以使用他们与RouteGuide服务交互

      如何用maven 编译 .proto 脚本生成代码

      创建服务

      需要做两步工作

  • 覆盖 service base class 用我们自己的服务定义:做一些实际的工作。
  • 运行 grpc server 用来监听来自客户端的请求和返回服务响应 可以参考如下代码java/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideServer.java

    实现RouteGuide

    我们的RouteGuideService 继承 RouteGuideGrpc.RouteGuideImplBase 抽象类

    private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
    ...
    }
    

simple rpc

RouteGuideService实现所有服务方法。

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  responseObserver.onNext(checkFeature(request));
  responseObserver.onCompleted();
}

...

private Feature checkFeature(Point location) {
  for (Feature feature : features) {
    if (feature.getLocation().getLatitude() == location.getLatitude()
        && feature.getLocation().getLongitude() == location.getLongitude()) {
      return feature;
    }
  }

  // No feature was found, return an unnamed feature.
  return Feature.newBuilder().setName("").setLocation(location).build();
}

这个 getFeature()方法有两个参数

  • Point: the request
  • StreamObserver: 一个响应观察者, 它是一个特殊的接口,用来让服务返回它的响应。 给客户端返回响应和完成调用
    1. 我们构建一个Feature 响应对象作为客户端返回对象,在我们的服务定义里指定。
    2. 我们使用响应观察者的onNext()方法来返回Feature对象
    3. 我们使用响应观察者的onCompleted()方法来指定我们已经完成了rpc调用的处理。

      服务端流rpc

      ListFeatures 是一个服务端流rpc,我们需要发送多个Feature给客户端。

private final Collection<Feature> features;

...

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

Like the simple RPC, this method gets a request object (the Rectangle in which our client wants to find Features) and a StreamObserver response observer.

This time, we get as many Feature objects as we need to return to the client (in this case, we select them from the service’s feature collection based on whether they’re inside our request Rectangle), and write them each in turn to the response observer using its onNext() method. Finally, as in our simple RPC, we use the response observer’s onCompleted() method to tell gRPC that we’ve finished writing responses.

优势

  • 从服务端的角度来说用流能够减小传输封包的数据量,非流传输每个消息都需要完整的封装,同时处理数据和发送数据不能同步进行。
  • 从客户端的角度来说能够减少拆包逻辑处理的次数。

    客户端流rpc

我们将从客户端流中获取Points 然后发送一个RouteSummary

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

服务端方法需要返回一个StreamObserver<Point>接口的对象,用来在客户端流消息每次到达后快速处理和返回一个总体处理结果。
onCompleted()方法(当客户端发送完消息后被调用)
注意:必须等到客户端发完所有消息以后服务端才调用onCompleted()返回处理结果

## 双向 流rpc

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

即使每一端将总是按照另一端写入顺序获取到消息,客户端和服务端能够按照顺序读取和写入,他们的流操作完全独立,服务端的onNext()和onCompeted()无需写在一个函数里。

开启服务端

如果我们实现了所有方法,我们需要在客户端使用grpc服务

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
...
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
 ...
}
  1. 指定一个ip和port用来监听客户端请求,使用ServerBuilder.forPort()
  2. 创建一个实现类RouteGuideService,调用serverBuilder.addService()添加服务
  3. 调用builder的build() 和 start()用来创建和启动rpc服务。

创建客户端

接下来我们将要为RouteGuide服务创建一个客户端,以下连接可以看到完整的example代码grpc-java/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideClient.java.

  • 一个 阻塞/同步stub: 意味着rpc程序调用需要等待服务响应,将会收到一个响应或者是错误信息。
  • 一个 非阻塞/异步stub,意味着非阻塞调用,可能响应已经异步返回。你需要确定的是,流类型的rpc调用只能使用异步stub
public RouteGuideClient(String host, int port) {
  this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
}
/** Construct client for accessing RouteGuide server using the existing channel. */
public RouteGuideClient(ManagedChannelBuilder<?> channelBuilder) {
  channel = channelBuilder.build();
  blockingStub = RouteGuideGrpc.newBlockingStub(channel);
  asyncStub = RouteGuideGrpc.newStub(channel);
}
我们使用ManagedChannelBuilder创建管道。
现在我们能够使用管道来创建stub,newStub() 或者是newBlockingStub()是RouteGuideGrpc类提供的,我们通过.proto自动生成的类。 ```
blockingStub = RouteGuideGrpc.newBlockingStub(channel); asyncStub = RouteGuideGrpc.newStub(channel); ```

调用服务方法