在上一篇文章中,我们使用 kit 命令行工具搭建了一个本地环境。在本文中我们将继续使用这些代码。
让我们首先通过编写原型定义来实现 Notificator 服务,它是一个 gRPC 服务。我们已经生成了 notificator/pkg/grpc/pb/notificator.pb
文件,让我们简单的去实现它。
syntax = "proto3";
package pb;
service Notificator {
rpc SendEmail (SendEmailRequest) returns (SendEmailReply);
}
message SendEmailRequest {
string email = 1;
string content = 2;
}
message SendEmailReply {
string id = 1;
}
现在我们要生成服务端和客户端的存根,可以使用 kit 工具生成的 compile.sh
脚本,脚本已经包含 protoc
命令。
cd notificator/pkg/grpc/pb
./compile.sh
可以发现,notificator.pb.go 这个文件已经更新了。
现在我们需要实现服务本身了。我们用生成一个 UUID 代替发送真实的电子邮件。首先需要对服务进行调整以匹配我们的请求和响应格式(返回一个新的参数:id)。
notificator/pkg/service/service.go:
import (
"context"
uuid "github.com/satori/go.uuid"
)
// NotificatorService describes the service.
type NotificatorService interface {
// Add your methods here
SendEmail(ctx context.Context, email string, content string) (string, error)
}
type basicNotificatorService struct{}
func (b *basicNotificatorService) SendEmail(ctx context.Context, email string, content string) (string, error) {
id, err := uuid.NewV4()
if err != nil {
return "", err
}
return id.String(), nil
}
notificator/pkg/service/middleware.go:
func (l loggingMiddleware) SendEmail(ctx context.Context, email string, content string) (string, error) {
defer func() {
l.logger.Log("method", "SendEmail", "email", email, "content", content)
}()
return l.next.SendEmail(ctx, email, content)
}
notificator/pkg/endpoint/endpoint.go
// SendEmailResponse 接受 SendEmail 方法的响应
type SendEmailResponse struct {
Id string
E0 error `json:"e0"`
}
// MakeSendEmailEndpoint 返回 SendEmail 调用的端点
func MakeSendEmailEndpoint(s service.NotificatorService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(SendEmailRequest)
id, e0 := s.SendEmail(ctx, req.Email, req.Content)
return SendEmailResponse{Id: id, E0: e0}, nil
}
}
如果我们搜索 TODO grep -R "TODO" notificator
,可以看到还需要实现 gRPC 请求和响应的编码和解码。
notificator/pkg/grpc/handler.go:
func decodeSendEmailRequest(_ context.Context, r interface{}) (interface{}, error) {
req := r.(*pb.SendEmailRequest)
return endpoint.SendEmailRequest{Email: req.Email, Content: req.Content}, nil
}
func encodeSendEmailResponse(_ context.Context, r interface{}) (interface{}, error) {
reply := r.(endpoint.SendEmailResponse)
return &pb.SendEmailReply{Id: reply.Id}, nil
}
服务发现
SendEmail 接口将由用户服务调用,所以用户服务必须知道通知服务的地址,这是典型的服务发现问题。在本地环境中,我们使用 Docker Compose 部署时知道如何连接服务,但是,在实际的分布式环境中,可能会遇到其他问题。
首先,在 etcd 中注册我们的通知服务。etcd 是一种可靠的分布式键值存储,广泛应用于服务发现。go-kit 支持使用其他的服务发现技术如:eureka、consul 和 zookeeper 等。
把它添加进 Docker Compose 中,这样我们的服务就可以使用它了。CV 工程师上线:
docker-compose.yml:
etcd:
image: 'quay.io/coreos/etcd:v3.1.7'
restart: always
ports:
- '23791:2379'
- '23801:2380'
environment:
ETCD_NAME: infra
ETCD_INITIAL_ADVERTISE_PEER_URLS: 'http://etcd:2380'
ETCD_INITIAL_CLUSTER: infra=http://etcd:2380
ETCD_INITIAL_CLUSTER_STATE: new
ETCD_INITIAL_CLUSTER_TOKEN: secrettoken
ETCD_LISTEN_CLIENT_URLS: 'http://etcd:2379,http://localhost:2379'
ETCD_LISTEN_PEER_URLS: 'http://etcd:2380'
ETCD_ADVERTISE_CLIENT_URLS: 'http://etcd:2379'
在 etcd 中注册通知服务 notificator/cmd/service/service.go:
registrar, err := registerService(logger)
if err != nil {
logger.Log(err)
return
}
defer registrar.Deregister()
func registerService(logger log.Logger) (*sdetcd.Registrar, error) {
var (
etcdServer = "http://etcd:2379"
prefix = "/services/notificator/"
instance = "notificator:8082"
key = prefix + instance
)
client, err := sdetcd.NewClient(context.Background(), []string{etcdServer}, sdetcd.ClientOptions{})
if err != nil {
return nil, err
}
registrar := sdetcd.NewRegistrar(client, sdetcd.Service{
Key: key,
Value: instance,
}, logger)
registrar.Register()
return registrar, nil
}
当我们的程序停止或者崩溃时,应该要记得取消注册。现在 etcd 已经知道我们的服务了,这个例子中只有一个实例,实际环境中显然会有更多。
现在让我们来测试一下通知服务是否注册到了 etcd 中:
docker-compose up -d etcd
docker-compose up -d notificator
现在我们使用用户服务调用通知服务,当我们创建一个服务后它会发送一个虚构的通知给用户。
由于通知服务是一个 gRPC 服务,在用户服务中,我们需要和客户端共用一个客户端存根。
protobuf 的客户端存根代码在 notificator/pkg/grpc/pb/notificator.pb.go ,我们把这个包引入我们的客户端
users/pkg/service/service.go:
import (
"github.com/plutov/packagemain/13-go-kit-2/notificator/pkg/grpc/pb"
"google.golang.org/grpc"
)
type basicUsersService struct {
notificatorServiceClient pb.NotificatorClient
}
func (b *basicUsersService) Create(ctx context.Context, email string) error {
reply, err := b.notificatorServiceClient.SendEmail(context.Background(), &pb.SendEmailRequest{
Email: email,
Content: "Hi! Thank you for registration...",
})
if reply != nil {
log.Printf("Email ID: %s", reply.Id)
}
return err
}
// NewBasicUsersService 返回一个简单的、无状态的用户服务
func NewBasicUsersService() UsersService {
conn, err := grpc.Dial("notificator:8082", grpc.WithInsecure())
if err != nil {
log.Printf("unable to connect to notificator: %s", err.Error())
return new(basicUsersService)
}
log.Printf("connected to notificator")
return &basicUsersService{
notificatorServiceClient: pb.NewNotificatorClient(conn),
}
}
当通知服务注册到 etcd 中时,我们可以用 etcd 的地址替换它的硬编码地址。
var etcdServer = "http://etcd:2379"
client, err := sdetcd.NewClient(context.Background(), []string{etcdServer}, sdetcd.ClientOptions{})
if err != nil {
log.Printf("unable to connect to etcd: %s", err.Error())
return new(basicUsersService)
}
entries, err := client.GetEntries("/services/notificator/")
if err != nil || len(entries) == 0 {
log.Printf("unable to get prefix entries: %s", err.Error())
return new(basicUsersService)
}
conn, err := grpc.Dial(entries[0], grpc.WithInsecure())
因为我们只有一个服务,所以只获取到了一个连接,但实际系统中可能有上百个服务,所以我们可以应用一些逻辑来进行实例选择,例如轮询。
现在让我们启动用户服务来测试一下:
docker-compose up users
调用 http 接口来创建一个用户:
curl -XPOST http://localhost:8802/create -d '{"email": "test"}'
结论
这篇文章我们实现了虚构的 Notificator gRPC 服务,将它注册到 etcd 中,并在用户服务中调用它。
————————————————
原文作者:Summer
转自链接:https://learnku.com/go/t/36960
版权声明:著作权归作者所有。商业转载请联系作者获得授权,非商业转载请保留以上作者信息和原文链接。