saastool增加docker/daemon模式
This commit is contained in:
397
cmd/saastool/daemon.go
Normal file
397
cmd/saastool/daemon.go
Normal file
@@ -0,0 +1,397 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.algo.com.cn/public/saasapi"
|
||||
"git.algo.com.cn/public/saasapi/pkg/saashttp"
|
||||
)
|
||||
|
||||
type Daemon struct {
|
||||
saasHttp *saashttp.SaasClient
|
||||
currTimeStamp uint32
|
||||
}
|
||||
|
||||
type daemonPublicParams struct {
|
||||
ds string
|
||||
appid string
|
||||
}
|
||||
|
||||
func RunDaemon(args ...string) error {
|
||||
var err error
|
||||
account := os.Getenv("SRTA_ACCOUNT")
|
||||
token := os.Getenv("SRTA_TOKEN")
|
||||
env := os.Getenv("SRTA_ENV")
|
||||
|
||||
slog.Info("Env", "SRTA_ACCOUNT", account)
|
||||
slog.Info("Env", "SRTA_TOKEN", token)
|
||||
slog.Info("Env", "SRTA_ENV", env)
|
||||
|
||||
if strings.TrimSpace(account) == "" {
|
||||
err = fmt.Errorf("SRTA_ACCOUNT is empty")
|
||||
slog.Error("Env", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if strings.TrimSpace(token) == "" {
|
||||
err = fmt.Errorf("SRTA_TOKEN is empty")
|
||||
slog.Error("Env", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
apiurls := &saashttp.ApiUrls{}
|
||||
|
||||
switch strings.ToLower(env) {
|
||||
case "demo":
|
||||
apiurls.BaseUrl = "https://srta.algo.com.cn"
|
||||
case "prd":
|
||||
apiurls.BaseUrl = "https://api.rta.qq.com"
|
||||
default:
|
||||
err = fmt.Errorf("SRTA_ENV is not demo or prd")
|
||||
slog.Error("Env", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
|
||||
daemon := &Daemon{
|
||||
saasHttp: &saashttp.SaasClient{
|
||||
Client: &http.Client{},
|
||||
ApiUrls: saashttp.InitAPIUrl(apiurls),
|
||||
Auth: &saashttp.Auth{
|
||||
Account: account,
|
||||
Token: token,
|
||||
},
|
||||
},
|
||||
}
|
||||
mux.HandleFunc("/read", daemon.httpRead)
|
||||
mux.HandleFunc("/write", daemon.httpWrite)
|
||||
|
||||
daemonPort := ":8080"
|
||||
svrHttp := http.Server{
|
||||
Addr: daemonPort,
|
||||
Handler: mux,
|
||||
}
|
||||
|
||||
slog.Info("Start running", "port", daemonPort)
|
||||
err = svrHttp.ListenAndServe()
|
||||
if err != nil {
|
||||
slog.Error("failed", "err", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Daemon) httpRead(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
w.Header().Set("Allow", http.MethodGet)
|
||||
http.Error(w, fmt.Sprintf("Not support method: %s。Only GET。", r.Method), http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("X-Content-Type-Options", "nosniff")
|
||||
|
||||
saasRes := &saasapi.SaasRes{}
|
||||
|
||||
defer func() {
|
||||
json.NewEncoder(w).Encode(saasRes)
|
||||
}()
|
||||
|
||||
params, err := d.parsePublicParams(r)
|
||||
if err != nil {
|
||||
saasRes.Code = saasapi.ErrorCode_CMD_ERROR
|
||||
saasRes.Status = err.Error()
|
||||
return
|
||||
}
|
||||
|
||||
q := r.URL.Query()
|
||||
userid := strings.TrimSpace(q.Get("userid"))
|
||||
if len(userid) == 0 {
|
||||
saasRes.Code = saasapi.ErrorCode_CMD_ERROR
|
||||
saasRes.Status = "userid is null"
|
||||
return
|
||||
}
|
||||
|
||||
saasReadItems := []*saasapi.ReadItem{}
|
||||
saasReadItems = append(saasReadItems, &saasapi.ReadItem{
|
||||
Userid: userid,
|
||||
})
|
||||
saasReq := &saasapi.SaasReq{
|
||||
Cmd: &saasapi.SaasReq_Read{
|
||||
Read: &saasapi.Read{
|
||||
DataspaceId: params.ds,
|
||||
Appid: params.appid,
|
||||
ReadItems: saasReadItems,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
saasRes, err = d.saasHttp.Read(saasReq)
|
||||
|
||||
if err != nil {
|
||||
saasRes.Code = saasapi.ErrorCode_CMD_ERROR
|
||||
saasRes.Status = fmt.Sprintf("submit read error. %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Daemon) httpWrite(w http.ResponseWriter, r *http.Request) {
|
||||
bPost := false
|
||||
switch r.Method {
|
||||
case http.MethodPost:
|
||||
bPost = true
|
||||
case http.MethodGet:
|
||||
default:
|
||||
w.Header().Set("Allow", http.MethodGet)
|
||||
http.Error(w, fmt.Sprintf("Not support method: %s。Only GET/POST。", r.Method), http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("X-Content-Type-Options", "nosniff")
|
||||
|
||||
saasRes := &saasapi.SaasRes{}
|
||||
|
||||
defer func() {
|
||||
json.NewEncoder(w).Encode(saasRes)
|
||||
}()
|
||||
|
||||
params, err := d.parsePublicParams(r)
|
||||
if err != nil {
|
||||
saasRes.Code = saasapi.ErrorCode_CMD_ERROR
|
||||
saasRes.Status = err.Error()
|
||||
return
|
||||
}
|
||||
|
||||
q := r.URL.Query()
|
||||
|
||||
clear := q.Get("clear")
|
||||
bClear := false
|
||||
if len(clear) > 0 {
|
||||
bClear, err = strconv.ParseBool(clear)
|
||||
if err != nil {
|
||||
saasRes.Code = saasapi.ErrorCode_CMD_ERROR
|
||||
saasRes.Status = fmt.Sprintf("clear is error. %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
d.currTimeStamp = uint32(time.Now().Unix())
|
||||
writeItems, err := d.parseWriteParams(r, bPost)
|
||||
if err != nil {
|
||||
saasRes.Code = saasapi.ErrorCode_CMD_ERROR
|
||||
saasRes.Status = fmt.Sprintf("write info error. %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
write := &saasapi.Write{
|
||||
DataspaceId: params.ds,
|
||||
Appid: params.appid,
|
||||
IsClearAllFirst: bClear,
|
||||
}
|
||||
saasReq := &saasapi.SaasReq{
|
||||
Cmd: &saasapi.SaasReq_Write{
|
||||
Write: write,
|
||||
},
|
||||
}
|
||||
|
||||
write.WriteItems = writeItems
|
||||
|
||||
saasRes, err = d.saasHttp.Write(saasReq)
|
||||
if err != nil {
|
||||
saasRes.Code = saasapi.ErrorCode_CMD_ERROR
|
||||
saasRes.Status = fmt.Sprintf("submit write error. %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Daemon) parsePublicParams(r *http.Request) (*daemonPublicParams, error) {
|
||||
params := &daemonPublicParams{}
|
||||
q := r.URL.Query()
|
||||
|
||||
params.ds = strings.ToLower(strings.TrimSpace(q.Get("ds")))
|
||||
switch params.ds {
|
||||
case "did", "wuid":
|
||||
default:
|
||||
return nil, fmt.Errorf("ds must use did/wuid. current is %v", params.ds)
|
||||
}
|
||||
|
||||
params.appid = strings.TrimSpace(q.Get("appid"))
|
||||
if params.ds == "wuid" && len(params.appid) == 0 {
|
||||
return nil, fmt.Errorf("appid must exist when ds=wuid")
|
||||
}
|
||||
return params, nil
|
||||
}
|
||||
|
||||
func (d *Daemon) parseWriteParams(r *http.Request, post bool) (writeItems []*saasapi.WriteItem, reterr error) {
|
||||
writeItems = nil
|
||||
lineNum := 0
|
||||
if post {
|
||||
scanner := bufio.NewScanner(r.Body)
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
lineNum++
|
||||
parsedQuery, err := url.ParseQuery(line)
|
||||
if err != nil {
|
||||
reterr = fmt.Errorf("parse error: %v", err)
|
||||
return
|
||||
}
|
||||
item, err := d.parseQuery(parsedQuery, lineNum)
|
||||
if err != nil {
|
||||
reterr = fmt.Errorf("parse error: %v", err)
|
||||
return
|
||||
}
|
||||
writeItems = append(writeItems, item)
|
||||
}
|
||||
} else {
|
||||
item, err := d.parseQuery(r.URL.Query(), lineNum)
|
||||
if err != nil {
|
||||
reterr = fmt.Errorf("parse error: %v", err)
|
||||
return
|
||||
}
|
||||
writeItems = append(writeItems, item)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (d *Daemon) parseQuery(q url.Values, lineNum int) (writeItem *saasapi.WriteItem, reterr error) {
|
||||
userid := strings.TrimSpace(q.Get("userid"))
|
||||
if len(userid) == 0 {
|
||||
reterr = fmt.Errorf("userid is null. line %v", lineNum)
|
||||
return
|
||||
}
|
||||
|
||||
writeItem = &saasapi.WriteItem{
|
||||
Userid: userid,
|
||||
}
|
||||
for key, value := range q {
|
||||
parts := strings.Split(strings.ToLower(key), ".")
|
||||
if len(parts) == 2 {
|
||||
switch parts[0] {
|
||||
case "u8":
|
||||
index, err := strconv.ParseUint(parts[1], 10, 8)
|
||||
if err != nil {
|
||||
reterr = fmt.Errorf("param is error. line %v, param %v", lineNum, key)
|
||||
return
|
||||
}
|
||||
|
||||
if index == 0 || index > uint64(saasapi.MAX_U8) {
|
||||
reterr = fmt.Errorf("param index is error. line %v, param %v, index %v", lineNum, key, index)
|
||||
return
|
||||
}
|
||||
|
||||
nValue, err := strconv.ParseUint(value[0], 10, 8)
|
||||
if err != nil {
|
||||
reterr = fmt.Errorf("param value is error. line %v, param %v, value %v", lineNum, key, value)
|
||||
return
|
||||
}
|
||||
|
||||
if writeItem.GetWriteBytes() == nil {
|
||||
writeItem.WriteBytes = &saasapi.Bytes{}
|
||||
}
|
||||
|
||||
if writeItem.GetWriteBytes().GetIndex_1()&(1<<(index-1)) > 0 {
|
||||
// 已经被写入过数据,跳过
|
||||
continue
|
||||
}
|
||||
|
||||
writeItem.GetWriteBytes().Bytes = append(writeItem.GetWriteBytes().Bytes, uint8(nValue))
|
||||
writeItem.GetWriteBytes().Index_1 |= 1 << (index - 1)
|
||||
|
||||
case "u32":
|
||||
index, err := strconv.ParseUint(parts[1], 10, 8)
|
||||
if err != nil {
|
||||
reterr = fmt.Errorf("param is error. line %v, param %v", lineNum, key)
|
||||
return
|
||||
}
|
||||
if index == 0 || index > uint64(saasapi.MAX_U32) {
|
||||
reterr = fmt.Errorf("param index is error. line %v, param %v, index %v", lineNum, key, index)
|
||||
return
|
||||
}
|
||||
|
||||
nValue, err := strconv.ParseUint(value[0], 10, 32)
|
||||
if err != nil {
|
||||
reterr = fmt.Errorf("param value is error. line %v, param %v, value %v", lineNum, key, value)
|
||||
return
|
||||
}
|
||||
|
||||
if writeItem.GetWriteUint32S() == nil {
|
||||
writeItem.WriteUint32S = &saasapi.Uint32S{}
|
||||
}
|
||||
|
||||
if writeItem.GetWriteUint32S().GetIndex_1()&(1<<(index-1)) > 0 {
|
||||
// 已经被写入过数据,跳过
|
||||
continue
|
||||
}
|
||||
|
||||
writeItem.GetWriteUint32S().Uint32S = append(writeItem.GetWriteUint32S().Uint32S, uint32(nValue))
|
||||
writeItem.GetWriteUint32S().Index_1 |= 1 << (index - 1)
|
||||
case "flag":
|
||||
index, err := strconv.ParseUint(parts[1], 10, 8)
|
||||
if err != nil {
|
||||
reterr = fmt.Errorf("param is error. line %v, param %v", lineNum, key)
|
||||
return
|
||||
}
|
||||
if index == 0 || index > uint64(saasapi.MAX_FLAG) {
|
||||
reterr = fmt.Errorf("param index is error. line %v, param %v, index %v", lineNum, key, index)
|
||||
return
|
||||
}
|
||||
|
||||
flag := saasapi.FlagWithExpire{}
|
||||
switch strings.ToLower(value[0]) {
|
||||
case "true":
|
||||
flag.Flag = true
|
||||
case "false":
|
||||
flag.Flag = false
|
||||
default:
|
||||
if strings.HasPrefix(value[0], "!") {
|
||||
// 相对时间戳
|
||||
nValue, err := strconv.ParseUint(value[0][1:], 10, 32)
|
||||
if err != nil {
|
||||
reterr = fmt.Errorf("param value is error. line %v, param %v, value %v", lineNum, key, value)
|
||||
return
|
||||
}
|
||||
|
||||
flag.Flag = true
|
||||
flag.Expire = d.currTimeStamp + uint32(nValue)
|
||||
|
||||
} else {
|
||||
nValue, err := strconv.ParseUint(value[0], 10, 32)
|
||||
if err != nil || nValue > 30*24*86400 {
|
||||
reterr = fmt.Errorf("param value is error. line %v, param %v, value %v", lineNum, key, value)
|
||||
return
|
||||
}
|
||||
|
||||
flag.Flag = true
|
||||
flag.Expire = uint32(nValue)
|
||||
}
|
||||
}
|
||||
|
||||
if writeItem.GetWriteFlagsWithExpire() == nil {
|
||||
writeItem.WriteFlagsWithExpire = &saasapi.FlagsWithExpire{}
|
||||
}
|
||||
|
||||
if writeItem.GetWriteFlagsWithExpire().GetIndex_1()&(1<<(index-1)) > 0 {
|
||||
// 已经被写入过数据,跳过
|
||||
continue
|
||||
}
|
||||
|
||||
writeItem.GetWriteFlagsWithExpire().FlagsWithExpire = append(writeItem.GetWriteFlagsWithExpire().FlagsWithExpire, &flag)
|
||||
writeItem.GetWriteFlagsWithExpire().Index_1 |= 1 << (index - 1)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return
|
||||
}
|
||||
Reference in New Issue
Block a user