改造:分布式调度
协议
protobuf3
syntax = "proto3";
import "google/api/annotations.proto";
message JobName {
string name = 1;
}
message Ignore {
}
message Job {
string name = 1;
string command = 2;
string cronExpr = 3;
}
message Rsp {
int32 retCode = 1;
string retMsg = 2;
}
message RspWithJob {
Job job = 1;
Rsp rsp = 2;
}
message RspWithJobList {
repeated Job jobList = 1;
Rsp rsp = 2;
}
message JobLog {
string jobName = 1;
string command = 2;
string err = 3;
string output = 4;
int64 planTime = 5;
int64 scheduleTime = 6;
int64 startTime = 7;
int64 endTime = 8;
}
message ListLogReq {
string name = 1;
int32 skip = 2;
int32 limit = 3;
}
message RspWithLogList {
repeated JobLog logList = 1;
Rsp rsp = 2;
}
message RspWithWorkerList {
repeated string workerList = 1;
Rsp rsp = 2;
}
service CronTab {
rpc Save (Job) returns (RspWithJob) {
option (google.api.http) = {
post: "/job/save"
body: "*"
};
};
rpc Delete (JobName) returns (RspWithJob) {
option (google.api.http) = {
post: "/job/delete"
body: "*"
};
};
rpc KillJob (JobName) returns (Rsp) {
option (google.api.http) = {
post: "/job/kill"
body: "*"
};
};
rpc ListJobs (Ignore) returns (RspWithJobList) {
option (google.api.http) = {
post: "/job/list"
body: "*"
};
};
rpc ListLogs (ListLogReq) returns (RspWithLogList) {
option (google.api.http) = {
post: "/job/log"
body: "*"
};
};
rpc ListWorks (Ignore) returns (RspWithWorkerList) {
option (google.api.http) = {
post: "/worker/list"
body: "*"
};
};
}
使用gofast编译
go get github.com/gogo/protobuf/protoc-gen-gofast
protoc --gofast_out=plugins=grpc:. word_games.proto
使用 gogo/protobuf 编译protoc --gofast_out=plugins=grpc:. helloworld.proto
编译报错
[crontab]pprotoc --gofast_out=plugins=grpc:. crontab.proto 10:42:56 ☁ pb ☂ ⚡ ✭
protoc-gen-gofast: program not found or is not executable
Please specify a program using absolute path or make sure the program is available in your PATH system variable
--gofast_out: protoc-gen-gofast: Plugin failed with status code 1.
解决方案:配置 Go 环境变量(当然在这之前你要确保是否安装了插件)
export PATH=$PATH:$GOPATH/bin
gRPC:实现接口
Main
package main
import (
"github.com/1005281342/crontab/config"
"log"
"net"
"github.com/1005281342/crontab/internel/logic"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
pb "github.com/1005281342/crontab"
)
const (
port = ":50051"
)
func main() {
var err error
if err = config.InitConfig("./master.json"); err != nil {
panic(err)
}
var imp pb.CronTabServer
imp, err = logic.NewImpCronTab()
if err != nil {
panic("NewImpWordGames Failed")
}
var lis net.Listener
lis, err = net.Listen("tcp", port)
if err != nil {
log.Panicf("failed to listen: %v", err)
}
var s = grpc.NewServer() //起一个服务
pb.RegisterCronTabServer(s, imp)
// 注册反射服务 这个服务是CLI使用的 跟服务本身没有关系
reflection.Register(s)
if err = s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
认证:签名
提供Http接口
双向认证
Gateway:同时提供RPC和Http接口
https://segmentfault.com/a/1190000013339403
https://segmentfault.com/a/1190000013408485
https://segmentfault.com/a/1190000013513469
为了方便代码编写,我们将/go/pkg/mod/github.com/grpc-ecosystem/grpc-gateway@v1.14.6/third_party/googleapis/google 拷贝到当前项目下(得到crontab/google)
安装
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
http
package main
import (
"context"
"flag"
"net/http"
"github.com/golang/glog"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"
gw "github.com/1005281342/crontab" // Update
)
var (
// command-line options:
// gRPC server endpoint
grpcServerEndpoint = flag.String("grpc-server-endpoint", "localhost:50051", "gRPC server endpoint")
)
func run() error {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Register gRPC server endpoint
// Note: Make sure the gRPC server is running properly and accessible
mux := runtime.NewServeMux()
opts := []grpc.DialOption{grpc.WithInsecure()}
err := gw.RegisterCronTabHandlerFromEndpoint(ctx, mux, *grpcServerEndpoint, opts)
if err != nil {
return err
}
// Start HTTP server (and proxy calls to gRPC server endpoint)
return http.ListenAndServe(":8081", mux)
}
func main() {
flag.Parse()
defer glog.Flush()
if err := run(); err != nil {
glog.Fatal(err)
}
}
文档:基于Swagger
安装
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger
生成
protoc -I/usr/local/include -I. -I$GOPATH/src/github.com/1005281342/crontab/google/api --swagger_out=logtostderr=true:. ./crontab.proto
执行成功会在当前目录下生成crontab.swagger.json文件(在本项目中需要将该文件生成到cmd目录下)
下载Swagger UI
文件
Swagger UI
文件Swagger
提供可视化的API
管理平台,就是Swagger UI
我们将其源码下载下来,并将其dist
目录下的所有文件拷贝到我们项目中的$GOPATH/src/grpc-hello-world/third_party/swagger-ui
去
将Swagger UI
转换为Go
源代码
Swagger UI
转换为Go
源代码go get -u github.com/jteeuwen/go-bindata/...
转换
在项目下新建pkg/ui/data/swagger
目录(不创建也可以,执行命令会自动创建),回到$GOPATH/src/github.com/1005281342/crontab/
下,执行命令
go-bindata --nocompress -pkg swagger -o pkg/ui/data/swagger/datafile.go third_party/swagger-ui/...
检查
回到pkg/ui/data/swagger
目录,检查是否存在datafile.go
文件
Swagger UI
文件服务器(对外提供服务)
Swagger UI
文件服务器(对外提供服务)在这一步,我们需要使用与其配套的go-bindata-assetfs
它能够使用go-bindata
所生成Swagger UI
的Go
代码,结合net/http
对外提供服务
安装
go get github.com/elazarl/go-bindata-assetfs/...
编写main
package main
import (
"context"
"google.golang.org/grpc/reflection"
"log"
"net"
"net/http"
"path"
"strings"
"time"
"github.com/1005281342/crontab/config"
"github.com/1005281342/crontab/internel/logic"
"github.com/1005281342/crontab/pkg/ui/data/swagger"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"
gw "github.com/1005281342/crontab"
pb "github.com/1005281342/crontab"
assetfs "github.com/elazarl/go-bindata-assetfs"
)
const (
gRPCPort = ":50051"
httpPoint = ":8081"
)
func startGRPC() {
var err error
if err = config.InitConfig("./master.json"); err != nil {
panic(err)
}
var imp pb.CronTabServer
imp, err = logic.NewImpCronTab()
if err != nil {
panic("NewImpWordGames Failed")
}
var lis net.Listener
lis, err = net.Listen("tcp", gRPCPort)
if err != nil {
log.Panicf("failed to listen: %v", err)
}
var s = grpc.NewServer() //起一个服务
pb.RegisterCronTabServer(s, imp)
// 注册反射服务 这个服务是CLI使用的 跟服务本身没有关系
reflection.Register(s)
if err = s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
func startGateway() {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Register gRPC server endpoint
// Note: Make sure the gRPC server is running properly and accessible
gwmux := runtime.NewServeMux()
opts := []grpc.DialOption{grpc.WithInsecure()}
err := gw.RegisterCronTabHandlerFromEndpoint(ctx, gwmux, gRPCPort, opts)
if err != nil {
panic(err)
}
mux := http.NewServeMux()
mux.Handle("/", gwmux)
mux.HandleFunc("/swagger/", serveSwaggerFile)
serveSwaggerUI(mux)
// Start HTTP server (and proxy calls to gRPC server endpoint)
if err = http.ListenAndServe(httpPoint, mux); err != nil {
panic(err)
}
}
func serveSwaggerFile(w http.ResponseWriter, r *http.Request) {
if !strings.HasSuffix(r.URL.Path, "swagger.json") {
log.Printf("Not Found: %s", r.URL.Path)
http.NotFound(w, r)
return
}
p := strings.TrimPrefix(r.URL.Path, "/swagger/")
p = path.Join("", p)
log.Printf("Serving swagger-file: %s", p)
http.ServeFile(w, r, p)
}
func serveSwaggerUI(mux *http.ServeMux) {
fileServer := http.FileServer(&assetfs.AssetFS{
Asset: swagger.Asset,
AssetDir: swagger.AssetDir,
Prefix: "third_party/swagger-ui",
})
prefix := "/swagger-ui/"
mux.Handle(prefix, http.StripPrefix(prefix, fileServer))
}
func main() {
go startGRPC()
time.Sleep(time.Second)
startGateway()
}
测试
查看数据 http://127.0.0.1:8081/swagger/crontab.swagger.json
测试地址 http://127.0.0.1:8081/swagger-ui/
(注意默认填写有地址,需要替换成 http://127.0.0.1:8081/swagger/crontab.swagger.json)
最后更新于
这有帮助吗?