422 lines
10 KiB
Go
422 lines
10 KiB
Go
package main
|
||
|
||
import (
|
||
"bufio"
|
||
"encoding/json"
|
||
"fmt"
|
||
"log/slog"
|
||
"net/http"
|
||
"net/url"
|
||
"os"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"git.algo.com.cn/public/saasapi"
|
||
"git.algo.com.cn/public/saasapi/pkg/saashttp"
|
||
)
|
||
|
||
type Daemon struct {
|
||
saasHttp *saashttp.SaasClient
|
||
currTimeStamp uint32
|
||
}
|
||
|
||
type daemonPublicParams struct {
|
||
ds string
|
||
appid string
|
||
}
|
||
|
||
func RunDaemon(args ...string) error {
|
||
var err error
|
||
account := os.Getenv("SRTA_ACCOUNT")
|
||
token := os.Getenv("SRTA_TOKEN")
|
||
env := os.Getenv("SRTA_ENV")
|
||
|
||
slog.Info("Env", "SRTA_ACCOUNT", account)
|
||
slog.Info("Env", "SRTA_TOKEN", token)
|
||
slog.Info("Env", "SRTA_ENV", env)
|
||
|
||
if strings.TrimSpace(account) == "" {
|
||
err = fmt.Errorf("SRTA_ACCOUNT is empty")
|
||
slog.Error("Env", "err", err)
|
||
return err
|
||
}
|
||
|
||
if strings.TrimSpace(token) == "" {
|
||
err = fmt.Errorf("SRTA_TOKEN is empty")
|
||
slog.Error("Env", "err", err)
|
||
return err
|
||
}
|
||
|
||
apiurls := &saashttp.ApiUrls{}
|
||
|
||
switch strings.ToLower(env) {
|
||
case "demo":
|
||
apiurls.BaseUrl = "https://srta.algo.com.cn"
|
||
case "prd":
|
||
apiurls.BaseUrl = "https://api.rta.qq.com"
|
||
case "dev":
|
||
apiurls.BaseUrl = "http://localhost:8080"
|
||
default:
|
||
err = fmt.Errorf("SRTA_ENV is not demo or prd")
|
||
slog.Error("Env", "err", err)
|
||
return err
|
||
}
|
||
|
||
mux := http.NewServeMux()
|
||
|
||
daemon := &Daemon{
|
||
saasHttp: &saashttp.SaasClient{
|
||
Client: &http.Client{},
|
||
ApiUrls: saashttp.InitAPIUrl(apiurls),
|
||
Auth: &saashttp.Auth{
|
||
Account: account,
|
||
Token: token,
|
||
},
|
||
},
|
||
}
|
||
mux.HandleFunc("/read", daemon.httpRead)
|
||
mux.HandleFunc("/write", daemon.httpWrite)
|
||
|
||
daemonPort := os.Getenv("SRTA_PORT")
|
||
if strings.TrimSpace(daemonPort) == "" {
|
||
daemonPort = "8080"
|
||
}
|
||
daemonPort = ":" + daemonPort
|
||
svrHttp := http.Server{
|
||
Addr: daemonPort,
|
||
Handler: mux,
|
||
}
|
||
|
||
slog.Info("Start running", "port", daemonPort)
|
||
err = svrHttp.ListenAndServe()
|
||
if err != nil {
|
||
slog.Error("failed", "err", err)
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (d *Daemon) httpRead(w http.ResponseWriter, r *http.Request) {
|
||
if r.Method != http.MethodGet {
|
||
w.Header().Set("Allow", http.MethodGet)
|
||
http.Error(w, fmt.Sprintf("Not support method: %s。Only GET。", r.Method), http.StatusMethodNotAllowed)
|
||
return
|
||
}
|
||
|
||
w.Header().Set("Content-Type", "application/json")
|
||
w.Header().Set("X-Content-Type-Options", "nosniff")
|
||
|
||
saasRes := &saasapi.SaasRes{}
|
||
|
||
defer func() {
|
||
json.NewEncoder(w).Encode(saasRes)
|
||
}()
|
||
|
||
params, err := d.parsePublicParams(r)
|
||
if err != nil {
|
||
saasRes.Code = saasapi.ErrorCode_CMD_ERROR
|
||
saasRes.Status = err.Error()
|
||
return
|
||
}
|
||
|
||
q := r.URL.Query()
|
||
userid := strings.TrimSpace(q.Get("userid"))
|
||
if len(userid) == 0 {
|
||
saasRes.Code = saasapi.ErrorCode_CMD_ERROR
|
||
saasRes.Status = "userid is null"
|
||
return
|
||
}
|
||
|
||
saasReadItems := []*saasapi.ReadItem{}
|
||
saasReadItems = append(saasReadItems, &saasapi.ReadItem{
|
||
Userid: userid,
|
||
})
|
||
saasReq := &saasapi.SaasReq{
|
||
Cmd: &saasapi.SaasReq_Read{
|
||
Read: &saasapi.Read{
|
||
DataspaceId: params.ds,
|
||
Appid: params.appid,
|
||
ReadItems: saasReadItems,
|
||
},
|
||
},
|
||
}
|
||
|
||
saasRes, err = d.saasHttp.Read(saasReq)
|
||
|
||
if err != nil {
|
||
saasRes.Code = saasapi.ErrorCode_CMD_ERROR
|
||
saasRes.Status = fmt.Sprintf("submit read error. %v", err)
|
||
return
|
||
}
|
||
}
|
||
|
||
func (d *Daemon) httpWrite(w http.ResponseWriter, r *http.Request) {
|
||
bPost := false
|
||
switch r.Method {
|
||
case http.MethodPost:
|
||
bPost = true
|
||
case http.MethodGet:
|
||
default:
|
||
w.Header().Set("Allow", http.MethodGet)
|
||
http.Error(w, fmt.Sprintf("Not support method: %s。Only GET/POST。", r.Method), http.StatusMethodNotAllowed)
|
||
return
|
||
}
|
||
|
||
w.Header().Set("Content-Type", "application/json")
|
||
w.Header().Set("X-Content-Type-Options", "nosniff")
|
||
|
||
saasRes := &saasapi.SaasRes{}
|
||
|
||
defer func() {
|
||
json.NewEncoder(w).Encode(saasRes)
|
||
}()
|
||
|
||
params, err := d.parsePublicParams(r)
|
||
if err != nil {
|
||
saasRes.Code = saasapi.ErrorCode_CMD_ERROR
|
||
saasRes.Status = err.Error()
|
||
return
|
||
}
|
||
|
||
q := r.URL.Query()
|
||
|
||
clear := q.Get("clear")
|
||
bClear := false
|
||
if len(clear) > 0 {
|
||
bClear, err = strconv.ParseBool(clear)
|
||
if err != nil {
|
||
saasRes.Code = saasapi.ErrorCode_CMD_ERROR
|
||
saasRes.Status = fmt.Sprintf("clear is error. %v", err)
|
||
return
|
||
}
|
||
}
|
||
|
||
d.currTimeStamp = uint32(time.Now().Unix())
|
||
writeItems, err := d.parseWriteParams(r, bPost)
|
||
if err != nil {
|
||
saasRes.Code = saasapi.ErrorCode_CMD_ERROR
|
||
saasRes.Status = fmt.Sprintf("write info error. %v", err)
|
||
return
|
||
}
|
||
|
||
write := &saasapi.Write{
|
||
DataspaceId: params.ds,
|
||
Appid: params.appid,
|
||
IsClearAllFirst: bClear,
|
||
}
|
||
saasReq := &saasapi.SaasReq{
|
||
Cmd: &saasapi.SaasReq_Write{
|
||
Write: write,
|
||
},
|
||
}
|
||
|
||
write.WriteItems = writeItems
|
||
|
||
saasRes, err = d.saasHttp.Write(saasReq)
|
||
if err != nil {
|
||
saasRes.Code = saasapi.ErrorCode_CMD_ERROR
|
||
saasRes.Status = fmt.Sprintf("submit write error. %v", err)
|
||
return
|
||
}
|
||
}
|
||
|
||
func (d *Daemon) parsePublicParams(r *http.Request) (*daemonPublicParams, error) {
|
||
params := &daemonPublicParams{}
|
||
q := r.URL.Query()
|
||
|
||
params.ds = strings.ToLower(strings.TrimSpace(q.Get("ds")))
|
||
switch params.ds {
|
||
case "did", "wuid", "geo":
|
||
default:
|
||
return nil, fmt.Errorf("ds must use did/wuid/geo. current is %v", params.ds)
|
||
}
|
||
|
||
params.appid = strings.TrimSpace(q.Get("appid"))
|
||
if params.ds == "wuid" && len(params.appid) == 0 {
|
||
return nil, fmt.Errorf("appid must exist when ds=wuid")
|
||
}
|
||
return params, nil
|
||
}
|
||
|
||
func (d *Daemon) parseWriteParams(r *http.Request, post bool) (writeItems []*saasapi.WriteItem, reterr error) {
|
||
writeItems = nil
|
||
lineNum := 0
|
||
if post {
|
||
scanner := bufio.NewScanner(r.Body)
|
||
for scanner.Scan() {
|
||
line := strings.TrimSpace(scanner.Text())
|
||
lineNum++
|
||
parsedQuery, err := url.ParseQuery(line)
|
||
if err != nil {
|
||
reterr = fmt.Errorf("parse error: %v", err)
|
||
return
|
||
}
|
||
item, err := d.parseQuery(parsedQuery, lineNum)
|
||
if err != nil {
|
||
reterr = fmt.Errorf("parse error: %v", err)
|
||
return
|
||
}
|
||
writeItems = append(writeItems, item)
|
||
}
|
||
} else {
|
||
item, err := d.parseQuery(r.URL.Query(), lineNum)
|
||
if err != nil {
|
||
reterr = fmt.Errorf("parse error: %v", err)
|
||
return
|
||
}
|
||
writeItems = append(writeItems, item)
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
func (d *Daemon) parseQuery(q url.Values, lineNum int) (writeItem *saasapi.WriteItem, reterr error) {
|
||
userid := strings.TrimSpace(q.Get("userid"))
|
||
if len(userid) == 0 {
|
||
reterr = fmt.Errorf("userid is null. line %v", lineNum)
|
||
return
|
||
}
|
||
|
||
writeItem = &saasapi.WriteItem{
|
||
Userid: userid,
|
||
}
|
||
|
||
// 临时存储,按index排序后再写入
|
||
u8Map := make(map[uint64]uint8)
|
||
u32Map := make(map[uint64]uint32)
|
||
flagMap := make(map[uint64]*saasapi.FlagWithExpire)
|
||
|
||
for key, value := range q {
|
||
parts := strings.Split(strings.ToLower(key), ".")
|
||
if len(parts) == 2 {
|
||
switch parts[0] {
|
||
case "u8":
|
||
index, err := strconv.ParseUint(parts[1], 10, 8)
|
||
if err != nil {
|
||
reterr = fmt.Errorf("param is error. line %v, param %v", lineNum, key)
|
||
return
|
||
}
|
||
|
||
if index == 0 || index > uint64(saasapi.MAX_U8) {
|
||
reterr = fmt.Errorf("param index is error. line %v, param %v, index %v", lineNum, key, index)
|
||
return
|
||
}
|
||
|
||
nValue, err := strconv.ParseUint(value[0], 10, 8)
|
||
if err != nil {
|
||
reterr = fmt.Errorf("param value is error. line %v, param %v, value %v", lineNum, key, value)
|
||
return
|
||
}
|
||
|
||
if _, exists := u8Map[index]; !exists {
|
||
u8Map[index] = uint8(nValue)
|
||
}
|
||
|
||
case "u32":
|
||
index, err := strconv.ParseUint(parts[1], 10, 8)
|
||
if err != nil {
|
||
reterr = fmt.Errorf("param is error. line %v, param %v", lineNum, key)
|
||
return
|
||
}
|
||
if index == 0 || index > uint64(saasapi.MAX_U32) {
|
||
reterr = fmt.Errorf("param index is error. line %v, param %v, index %v", lineNum, key, index)
|
||
return
|
||
}
|
||
|
||
nValue, err := strconv.ParseUint(value[0], 10, 32)
|
||
if err != nil {
|
||
reterr = fmt.Errorf("param value is error. line %v, param %v, value %v", lineNum, key, value)
|
||
return
|
||
}
|
||
|
||
if _, exists := u32Map[index]; !exists {
|
||
u32Map[index] = uint32(nValue)
|
||
}
|
||
|
||
case "flag":
|
||
index, err := strconv.ParseUint(parts[1], 10, 8)
|
||
if err != nil {
|
||
reterr = fmt.Errorf("param is error. line %v, param %v", lineNum, key)
|
||
return
|
||
}
|
||
if index == 0 || index > uint64(saasapi.MAX_FLAG) {
|
||
reterr = fmt.Errorf("param index is error. line %v, param %v, index %v", lineNum, key, index)
|
||
return
|
||
}
|
||
|
||
if _, exists := flagMap[index]; exists {
|
||
continue
|
||
}
|
||
|
||
flag := &saasapi.FlagWithExpire{}
|
||
switch strings.ToLower(value[0]) {
|
||
case "true":
|
||
flag.Flag = true
|
||
case "false":
|
||
flag.Flag = false
|
||
default:
|
||
if strings.HasPrefix(value[0], "!") {
|
||
// 相对时间戳
|
||
nValue, err := strconv.ParseUint(value[0][1:], 10, 32)
|
||
if err != nil {
|
||
reterr = fmt.Errorf("param value is error. line %v, param %v, value %v", lineNum, key, value)
|
||
return
|
||
}
|
||
|
||
flag.Flag = true
|
||
flag.Expire = d.currTimeStamp + uint32(nValue)
|
||
|
||
} else {
|
||
nValue, err := strconv.ParseUint(value[0], 10, 32)
|
||
if err != nil || nValue > 30*24*86400 {
|
||
reterr = fmt.Errorf("param value is error. line %v, param %v, value %v", lineNum, key, value)
|
||
return
|
||
}
|
||
|
||
flag.Flag = true
|
||
flag.Expire = uint32(nValue)
|
||
}
|
||
}
|
||
|
||
flagMap[index] = flag
|
||
}
|
||
}
|
||
}
|
||
|
||
// 按index顺序写入u8
|
||
if len(u8Map) > 0 {
|
||
writeItem.WriteBytes = &saasapi.Bytes{}
|
||
for i := uint64(1); i <= uint64(saasapi.MAX_U8); i++ {
|
||
if v, exists := u8Map[i]; exists {
|
||
writeItem.WriteBytes.Bytes = append(writeItem.WriteBytes.Bytes, v)
|
||
writeItem.WriteBytes.Index_1 |= 1 << (i - 1)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 按index顺序写入u32
|
||
if len(u32Map) > 0 {
|
||
writeItem.WriteUint32S = &saasapi.Uint32S{}
|
||
for i := uint64(1); i <= uint64(saasapi.MAX_U32); i++ {
|
||
if v, exists := u32Map[i]; exists {
|
||
writeItem.WriteUint32S.Uint32S = append(writeItem.WriteUint32S.Uint32S, v)
|
||
writeItem.WriteUint32S.Index_1 |= 1 << (i - 1)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 按index顺序写入flag
|
||
if len(flagMap) > 0 {
|
||
writeItem.WriteFlagsWithExpire = &saasapi.FlagsWithExpire{}
|
||
for i := uint64(1); i <= uint64(saasapi.MAX_FLAG); i++ {
|
||
if v, exists := flagMap[i]; exists {
|
||
writeItem.WriteFlagsWithExpire.FlagsWithExpire = append(writeItem.WriteFlagsWithExpire.FlagsWithExpire, v)
|
||
writeItem.WriteFlagsWithExpire.Index_1 |= 1 << (i - 1)
|
||
}
|
||
}
|
||
}
|
||
|
||
return
|
||
}
|