改造:分布式调度

协议

protobuf3

syntax = "proto3";

import "google/api/annotations.proto";

message JobName {
    string name = 1;
}

message Ignore {
}

message Job {
    string name = 1;
    string command = 2;
    string cronExpr = 3;
}

message Rsp {
    int32 retCode = 1;
    string retMsg = 2;
}

message RspWithJob {
    Job job = 1;
    Rsp rsp = 2;
}

message RspWithJobList {
    repeated Job jobList = 1;
    Rsp rsp = 2;
}

message JobLog {
    string jobName = 1;
    string command = 2;
    string err = 3;
    string output = 4;
    int64 planTime = 5;
    int64 scheduleTime = 6;
    int64 startTime = 7;
    int64 endTime = 8;
}

message ListLogReq {
    string name = 1;
    int32 skip = 2;
    int32 limit = 3;
}

message RspWithLogList {
    repeated JobLog logList = 1;
    Rsp rsp = 2;
}

message RspWithWorkerList {
    repeated string workerList = 1;
    Rsp rsp = 2;
}

service CronTab {
    rpc Save (Job) returns (RspWithJob) {
        option (google.api.http) = {
            post: "/job/save"
            body: "*"
        };
    };
    rpc Delete (JobName) returns (RspWithJob) {
        option (google.api.http) = {
            post: "/job/delete"
            body: "*"
        };
    };
    rpc KillJob (JobName) returns (Rsp) {
        option (google.api.http) = {
            post: "/job/kill"
            body: "*"
        };
    };
    rpc ListJobs (Ignore) returns (RspWithJobList) {
        option (google.api.http) = {
            post: "/job/list"
            body: "*"
        };
    };
    rpc ListLogs (ListLogReq) returns (RspWithLogList) {
        option (google.api.http) = {
            post: "/job/log"
            body: "*"
        };
    };
    rpc ListWorks (Ignore) returns (RspWithWorkerList) {
        option (google.api.http) = {
            post: "/worker/list"
            body: "*"
        };
    };
}

使用gofast编译

go get github.com/gogo/protobuf/protoc-gen-gofast
protoc --gofast_out=plugins=grpc:. word_games.proto

使用 gogo/protobuf 编译protoc --gofast_out=plugins=grpc:. helloworld.proto

编译报错

[crontab]pprotoc --gofast_out=plugins=grpc:.   crontab.proto                                                    10:42:56  ☁  pb ☂ ⚡ ✭
protoc-gen-gofast: program not found or is not executable
Please specify a program using absolute path or make sure the program is available in your PATH system variable
--gofast_out: protoc-gen-gofast: Plugin failed with status code 1.

解决方案:配置 Go 环境变量(当然在这之前你要确保是否安装了插件)

export PATH=$PATH:$GOPATH/bin

gRPC:实现接口

Main

package main

import (
    "github.com/1005281342/crontab/config"
    "log"
    "net"

    "github.com/1005281342/crontab/internel/logic"
    "google.golang.org/grpc"
    "google.golang.org/grpc/reflection"

    pb "github.com/1005281342/crontab"
)

const (
    port = ":50051"
)

func main() {

    var err error
    if err = config.InitConfig("./master.json"); err != nil {
        panic(err)
    }
    var imp pb.CronTabServer
    imp, err = logic.NewImpCronTab()
    if err != nil {
        panic("NewImpWordGames Failed")
    }

    var lis net.Listener
    lis, err = net.Listen("tcp", port)
    if err != nil {
        log.Panicf("failed to listen: %v", err)
    }

    var s = grpc.NewServer() //起一个服务
    pb.RegisterCronTabServer(s, imp)
    // 注册反射服务 这个服务是CLI使用的 跟服务本身没有关系
    reflection.Register(s)
    if err = s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

认证:签名

提供Http接口

双向认证

Gateway:同时提供RPC和Http接口

https://segmentfault.com/a/1190000013339403

https://segmentfault.com/a/1190000013408485

https://segmentfault.com/a/1190000013513469

为了方便代码编写,我们将/go/pkg/mod/github.com/grpc-ecosystem/grpc-gateway@v1.14.6/third_party/googleapis/google 拷贝到当前项目下(得到crontab/google)

安装

go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway

http

package main

import (
    "context"
    "flag"
    "net/http"

    "github.com/golang/glog"
    "github.com/grpc-ecosystem/grpc-gateway/runtime"
    "google.golang.org/grpc"

    gw "github.com/1005281342/crontab" // Update
)

var (
    // command-line options:
    // gRPC server endpoint
    grpcServerEndpoint = flag.String("grpc-server-endpoint", "localhost:50051", "gRPC server endpoint")
)

func run() error {
    ctx := context.Background()
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    // Register gRPC server endpoint
    // Note: Make sure the gRPC server is running properly and accessible
    mux := runtime.NewServeMux()
    opts := []grpc.DialOption{grpc.WithInsecure()}
    err := gw.RegisterCronTabHandlerFromEndpoint(ctx, mux, *grpcServerEndpoint, opts)
    if err != nil {
        return err
    }

    // Start HTTP server (and proxy calls to gRPC server endpoint)
    return http.ListenAndServe(":8081", mux)
}

func main() {
    flag.Parse()
    defer glog.Flush()

    if err := run(); err != nil {
        glog.Fatal(err)
    }
}

文档:基于Swagger

安装

go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger

生成

protoc -I/usr/local/include -I. -I$GOPATH/src/github.com/1005281342/crontab/google/api --swagger_out=logtostderr=true:. ./crontab.proto

执行成功会在当前目录下生成crontab.swagger.json文件(在本项目中需要将该文件生成到cmd目录下)

下载Swagger UI文件

Swagger提供可视化的API管理平台,就是Swagger UI

我们将其源码下载下来,并将其dist目录下的所有文件拷贝到我们项目中的$GOPATH/src/grpc-hello-world/third_party/swagger-ui

Swagger UI转换为Go源代码

go get -u github.com/jteeuwen/go-bindata/...

转换

在项目下新建pkg/ui/data/swagger目录(不创建也可以,执行命令会自动创建),回到$GOPATH/src/github.com/1005281342/crontab/下,执行命令

go-bindata --nocompress -pkg swagger -o pkg/ui/data/swagger/datafile.go third_party/swagger-ui/...

检查

回到pkg/ui/data/swagger目录,检查是否存在datafile.go文件

Swagger UI文件服务器(对外提供服务)

在这一步,我们需要使用与其配套的go-bindata-assetfs

它能够使用go-bindata所生成Swagger UIGo代码,结合net/http对外提供服务

安装

go get github.com/elazarl/go-bindata-assetfs/...

编写main

package main

import (
    "context"
    "google.golang.org/grpc/reflection"
    "log"
    "net"
    "net/http"
    "path"
    "strings"
    "time"

    "github.com/1005281342/crontab/config"
    "github.com/1005281342/crontab/internel/logic"
    "github.com/1005281342/crontab/pkg/ui/data/swagger"
    "github.com/grpc-ecosystem/grpc-gateway/runtime"
    "google.golang.org/grpc"

    gw "github.com/1005281342/crontab"
    pb "github.com/1005281342/crontab"
    assetfs "github.com/elazarl/go-bindata-assetfs"
)

const (
    gRPCPort  = ":50051"
    httpPoint = ":8081"
)

func startGRPC() {
    var err error
    if err = config.InitConfig("./master.json"); err != nil {
        panic(err)
    }
    var imp pb.CronTabServer
    imp, err = logic.NewImpCronTab()
    if err != nil {
        panic("NewImpWordGames Failed")
    }

    var lis net.Listener
    lis, err = net.Listen("tcp", gRPCPort)
    if err != nil {
        log.Panicf("failed to listen: %v", err)
    }

    var s = grpc.NewServer() //起一个服务
    pb.RegisterCronTabServer(s, imp)
    // 注册反射服务 这个服务是CLI使用的 跟服务本身没有关系
    reflection.Register(s)
    if err = s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

func startGateway() {
    ctx := context.Background()
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    // Register gRPC server endpoint
    // Note: Make sure the gRPC server is running properly and accessible
    gwmux := runtime.NewServeMux()
    opts := []grpc.DialOption{grpc.WithInsecure()}
    err := gw.RegisterCronTabHandlerFromEndpoint(ctx, gwmux, gRPCPort, opts)
    if err != nil {
        panic(err)
    }

    mux := http.NewServeMux()
    mux.Handle("/", gwmux)
    mux.HandleFunc("/swagger/", serveSwaggerFile)
    serveSwaggerUI(mux)

    // Start HTTP server (and proxy calls to gRPC server endpoint)
    if err = http.ListenAndServe(httpPoint, mux); err != nil {
        panic(err)
    }
}

func serveSwaggerFile(w http.ResponseWriter, r *http.Request) {
    if !strings.HasSuffix(r.URL.Path, "swagger.json") {
        log.Printf("Not Found: %s", r.URL.Path)
        http.NotFound(w, r)
        return
    }

    p := strings.TrimPrefix(r.URL.Path, "/swagger/")
    p = path.Join("", p)

    log.Printf("Serving swagger-file: %s", p)

    http.ServeFile(w, r, p)
}

func serveSwaggerUI(mux *http.ServeMux) {
    fileServer := http.FileServer(&assetfs.AssetFS{
        Asset:    swagger.Asset,
        AssetDir: swagger.AssetDir,
        Prefix:   "third_party/swagger-ui",
    })
    prefix := "/swagger-ui/"
    mux.Handle(prefix, http.StripPrefix(prefix, fileServer))
}

func main() {

    go startGRPC()
    time.Sleep(time.Second)
    startGateway()
}

测试

查看数据 http://127.0.0.1:8081/swagger/crontab.swagger.json

测试地址 http://127.0.0.1:8081/swagger-ui/

(注意默认填写有地址,需要替换成 http://127.0.0.1:8081/swagger/crontab.swagger.json)

最后更新于

这有帮助吗?