在上一篇文章中,我们使用 kit 命令行工具搭建了一个本地环境。在本文中我们将继续使用这些代码。

让我们首先通过编写原型定义来实现 Notificator 服务,它是一个 gRPC 服务。我们已经生成了 notificator/pkg/grpc/pb/notificator.pb 文件,让我们简单的去实现它。

syntax = "proto3";

package pb;

service Notificator {
    rpc SendEmail (SendEmailRequest) returns (SendEmailReply);
}

message SendEmailRequest {
    string email = 1;
    string content = 2;
}

message SendEmailReply {
    string id = 1;
}

现在我们要生成服务端和客户端的存根,可以使用 kit 工具生成的 compile.sh 脚本,脚本已经包含 protoc 命令。

cd notificator/pkg/grpc/pb
./compile.sh

可以发现,notificator.pb.go 这个文件已经更新了。
现在我们需要实现服务本身了。我们用生成一个 UUID 代替发送真实的电子邮件。首先需要对服务进行调整以匹配我们的请求和响应格式(返回一个新的参数:id)。

notificator/pkg/service/service.go:

import (
    "context"

    uuid "github.com/satori/go.uuid"
)

// NotificatorService describes the service.
type NotificatorService interface {
    // Add your methods here
    SendEmail(ctx context.Context, email string, content string) (string, error)
}

type basicNotificatorService struct{}

func (b *basicNotificatorService) SendEmail(ctx context.Context, email string, content string) (string, error) {
    id, err := uuid.NewV4()
    if err != nil {
        return "", err
    }

    return id.String(), nil
}

notificator/pkg/service/middleware.go:

func (l loggingMiddleware) SendEmail(ctx context.Context, email string, content string) (string, error) {
    defer func() {
        l.logger.Log("method", "SendEmail", "email", email, "content", content)
    }()
    return l.next.SendEmail(ctx, email, content)
}

notificator/pkg/endpoint/endpoint.go

// SendEmailResponse 接受 SendEmail 方法的响应
type SendEmailResponse struct {
    Id string
    E0 error `json:"e0"`
}

// MakeSendEmailEndpoint 返回 SendEmail 调用的端点
func MakeSendEmailEndpoint(s service.NotificatorService) endpoint.Endpoint {
    return func(ctx context.Context, request interface{}) (interface{}, error) {
        req := request.(SendEmailRequest)
        id, e0 := s.SendEmail(ctx, req.Email, req.Content)
        return SendEmailResponse{Id: id, E0: e0}, nil
    }
}

如果我们搜索 TODO grep -R "TODO" notificator ,可以看到还需要实现 gRPC 请求和响应的编码和解码。
notificator/pkg/grpc/handler.go:

func decodeSendEmailRequest(_ context.Context, r interface{}) (interface{}, error) {
    req := r.(*pb.SendEmailRequest)
    return endpoint.SendEmailRequest{Email: req.Email, Content: req.Content}, nil
}

func encodeSendEmailResponse(_ context.Context, r interface{}) (interface{}, error) {
    reply := r.(endpoint.SendEmailResponse)
    return &pb.SendEmailReply{Id: reply.Id}, nil
}

服务发现

SendEmail 接口将由用户服务调用,所以用户服务必须知道通知服务的地址,这是典型的服务发现问题。在本地环境中,我们使用 Docker Compose 部署时知道如何连接服务,但是,在实际的分布式环境中,可能会遇到其他问题。

首先,在 etcd 中注册我们的通知服务。etcd 是一种可靠的分布式键值存储,广泛应用于服务发现。go-kit 支持使用其他的服务发现技术如:eureka、consul 和 zookeeper 等。

把它添加进 Docker Compose 中,这样我们的服务就可以使用它了。CV 工程师上线:

docker-compose.yml:

    etcd:
        image: 'quay.io/coreos/etcd:v3.1.7'
        restart: always
        ports:
            - '23791:2379'
            - '23801:2380'
        environment:
            ETCD_NAME: infra
            ETCD_INITIAL_ADVERTISE_PEER_URLS: 'http://etcd:2380'
            ETCD_INITIAL_CLUSTER: infra=http://etcd:2380
            ETCD_INITIAL_CLUSTER_STATE: new
            ETCD_INITIAL_CLUSTER_TOKEN: secrettoken
            ETCD_LISTEN_CLIENT_URLS: 'http://etcd:2379,http://localhost:2379'
            ETCD_LISTEN_PEER_URLS: 'http://etcd:2380'
            ETCD_ADVERTISE_CLIENT_URLS: 'http://etcd:2379'

在 etcd 中注册通知服务 notificator/cmd/service/service.go:

registrar, err := registerService(logger)
if err != nil {
    logger.Log(err)
    return
}

defer registrar.Deregister()

func registerService(logger log.Logger) (*sdetcd.Registrar, error) {
    var (
        etcdServer = "http://etcd:2379"
        prefix     = "/services/notificator/"
        instance   = "notificator:8082"
        key        = prefix + instance
    )

    client, err := sdetcd.NewClient(context.Background(), []string{etcdServer}, sdetcd.ClientOptions{})
    if err != nil {
        return nil, err
    }

    registrar := sdetcd.NewRegistrar(client, sdetcd.Service{
        Key:   key,
        Value: instance,
    }, logger)

    registrar.Register()

    return registrar, nil
}

当我们的程序停止或者崩溃时,应该要记得取消注册。现在 etcd 已经知道我们的服务了,这个例子中只有一个实例,实际环境中显然会有更多。
现在让我们来测试一下通知服务是否注册到了 etcd 中:

docker-compose up -d etcd
docker-compose up -d notificator

现在我们使用用户服务调用通知服务,当我们创建一个服务后它会发送一个虚构的通知给用户。

由于通知服务是一个 gRPC 服务,在用户服务中,我们需要和客户端共用一个客户端存根。

protobuf 的客户端存根代码在 notificator/pkg/grpc/pb/notificator.pb.go ,我们把这个包引入我们的客户端

users/pkg/service/service.go:

import (
    "github.com/plutov/packagemain/13-go-kit-2/notificator/pkg/grpc/pb"
    "google.golang.org/grpc"
)

type basicUsersService struct {
    notificatorServiceClient pb.NotificatorClient
}

func (b *basicUsersService) Create(ctx context.Context, email string) error {
    reply, err := b.notificatorServiceClient.SendEmail(context.Background(), &pb.SendEmailRequest{
        Email:   email,
        Content: "Hi! Thank you for registration...",
    })

    if reply != nil {
        log.Printf("Email ID: %s", reply.Id)
    }

    return err
}

// NewBasicUsersService 返回一个简单的、无状态的用户服务
func NewBasicUsersService() UsersService {
    conn, err := grpc.Dial("notificator:8082", grpc.WithInsecure())
    if err != nil {
        log.Printf("unable to connect to notificator: %s", err.Error())
        return new(basicUsersService)
    }

    log.Printf("connected to notificator")

    return &basicUsersService{
        notificatorServiceClient: pb.NewNotificatorClient(conn),
    }
}

当通知服务注册到 etcd 中时,我们可以用 etcd 的地址替换它的硬编码地址。

var etcdServer = "http://etcd:2379"

client, err := sdetcd.NewClient(context.Background(), []string{etcdServer}, sdetcd.ClientOptions{})
if err != nil {
    log.Printf("unable to connect to etcd: %s", err.Error())
    return new(basicUsersService)
}

entries, err := client.GetEntries("/services/notificator/")
if err != nil || len(entries) == 0 {
    log.Printf("unable to get prefix entries: %s", err.Error())
    return new(basicUsersService)
}

conn, err := grpc.Dial(entries[0], grpc.WithInsecure())

因为我们只有一个服务,所以只获取到了一个连接,但实际系统中可能有上百个服务,所以我们可以应用一些逻辑来进行实例选择,例如轮询。
现在让我们启动用户服务来测试一下:

docker-compose up users

调用 http 接口来创建一个用户:

curl -XPOST http://localhost:8802/create -d '{"email": "test"}'

结论

这篇文章我们实现了虚构的 Notificator gRPC 服务,将它注册到 etcd 中,并在用户服务中调用它。

————————————————

原文作者:Summer
转自链接:https://learnku.com/go/t/36960
版权声明:著作权归作者所有。商业转载请联系作者获得授权,非商业转载请保留以上作者信息和原文链接。

最后编辑: Simon  文档更新时间: 2021-02-04 15:50   作者:Simon