package main import ( "bufio" "encoding/json" "fmt" "log/slog" "net/http" "net/url" "os" "strconv" "strings" "time" "git.algo.com.cn/public/saasapi" "git.algo.com.cn/public/saasapi/pkg/saashttp" ) type Daemon struct { saasHttp *saashttp.SaasClient currTimeStamp uint32 } type daemonPublicParams struct { ds string appid string } func RunDaemon(args ...string) error { var err error account := os.Getenv("SRTA_ACCOUNT") token := os.Getenv("SRTA_TOKEN") env := os.Getenv("SRTA_ENV") slog.Info("Env", "SRTA_ACCOUNT", account) slog.Info("Env", "SRTA_TOKEN", token) slog.Info("Env", "SRTA_ENV", env) if strings.TrimSpace(account) == "" { err = fmt.Errorf("SRTA_ACCOUNT is empty") slog.Error("Env", "err", err) return err } if strings.TrimSpace(token) == "" { err = fmt.Errorf("SRTA_TOKEN is empty") slog.Error("Env", "err", err) return err } apiurls := &saashttp.ApiUrls{} switch strings.ToLower(env) { case "demo": apiurls.BaseUrl = "https://srta.algo.com.cn" case "prd": apiurls.BaseUrl = "https://api.rta.qq.com" default: err = fmt.Errorf("SRTA_ENV is not demo or prd") slog.Error("Env", "err", err) return err } mux := http.NewServeMux() daemon := &Daemon{ saasHttp: &saashttp.SaasClient{ Client: &http.Client{}, ApiUrls: saashttp.InitAPIUrl(apiurls), Auth: &saashttp.Auth{ Account: account, Token: token, }, }, } mux.HandleFunc("/read", daemon.httpRead) mux.HandleFunc("/write", daemon.httpWrite) daemonPort := ":8080" svrHttp := http.Server{ Addr: daemonPort, Handler: mux, } slog.Info("Start running", "port", daemonPort) err = svrHttp.ListenAndServe() if err != nil { slog.Error("failed", "err", err) return err } return nil } func (d *Daemon) httpRead(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { w.Header().Set("Allow", http.MethodGet) http.Error(w, fmt.Sprintf("Not support method: %s。Only GET。", r.Method), http.StatusMethodNotAllowed) return } w.Header().Set("Content-Type", "application/json") w.Header().Set("X-Content-Type-Options", "nosniff") saasRes := &saasapi.SaasRes{} defer func() { json.NewEncoder(w).Encode(saasRes) }() params, err := d.parsePublicParams(r) if err != nil { saasRes.Code = saasapi.ErrorCode_CMD_ERROR saasRes.Status = err.Error() return } q := r.URL.Query() userid := strings.TrimSpace(q.Get("userid")) if len(userid) == 0 { saasRes.Code = saasapi.ErrorCode_CMD_ERROR saasRes.Status = "userid is null" return } saasReadItems := []*saasapi.ReadItem{} saasReadItems = append(saasReadItems, &saasapi.ReadItem{ Userid: userid, }) saasReq := &saasapi.SaasReq{ Cmd: &saasapi.SaasReq_Read{ Read: &saasapi.Read{ DataspaceId: params.ds, Appid: params.appid, ReadItems: saasReadItems, }, }, } saasRes, err = d.saasHttp.Read(saasReq) if err != nil { saasRes.Code = saasapi.ErrorCode_CMD_ERROR saasRes.Status = fmt.Sprintf("submit read error. %v", err) return } } func (d *Daemon) httpWrite(w http.ResponseWriter, r *http.Request) { bPost := false switch r.Method { case http.MethodPost: bPost = true case http.MethodGet: default: w.Header().Set("Allow", http.MethodGet) http.Error(w, fmt.Sprintf("Not support method: %s。Only GET/POST。", r.Method), http.StatusMethodNotAllowed) return } w.Header().Set("Content-Type", "application/json") w.Header().Set("X-Content-Type-Options", "nosniff") saasRes := &saasapi.SaasRes{} defer func() { json.NewEncoder(w).Encode(saasRes) }() params, err := d.parsePublicParams(r) if err != nil { saasRes.Code = saasapi.ErrorCode_CMD_ERROR saasRes.Status = err.Error() return } q := r.URL.Query() clear := q.Get("clear") bClear := false if len(clear) > 0 { bClear, err = strconv.ParseBool(clear) if err != nil { saasRes.Code = saasapi.ErrorCode_CMD_ERROR saasRes.Status = fmt.Sprintf("clear is error. %v", err) return } } d.currTimeStamp = uint32(time.Now().Unix()) writeItems, err := d.parseWriteParams(r, bPost) if err != nil { saasRes.Code = saasapi.ErrorCode_CMD_ERROR saasRes.Status = fmt.Sprintf("write info error. %v", err) return } write := &saasapi.Write{ DataspaceId: params.ds, Appid: params.appid, IsClearAllFirst: bClear, } saasReq := &saasapi.SaasReq{ Cmd: &saasapi.SaasReq_Write{ Write: write, }, } write.WriteItems = writeItems saasRes, err = d.saasHttp.Write(saasReq) if err != nil { saasRes.Code = saasapi.ErrorCode_CMD_ERROR saasRes.Status = fmt.Sprintf("submit write error. %v", err) return } } func (d *Daemon) parsePublicParams(r *http.Request) (*daemonPublicParams, error) { params := &daemonPublicParams{} q := r.URL.Query() params.ds = strings.ToLower(strings.TrimSpace(q.Get("ds"))) switch params.ds { case "did", "wuid", "geo": default: return nil, fmt.Errorf("ds must use did/wuid/geo. current is %v", params.ds) } params.appid = strings.TrimSpace(q.Get("appid")) if params.ds == "wuid" && len(params.appid) == 0 { return nil, fmt.Errorf("appid must exist when ds=wuid") } return params, nil } func (d *Daemon) parseWriteParams(r *http.Request, post bool) (writeItems []*saasapi.WriteItem, reterr error) { writeItems = nil lineNum := 0 if post { scanner := bufio.NewScanner(r.Body) for scanner.Scan() { line := strings.TrimSpace(scanner.Text()) lineNum++ parsedQuery, err := url.ParseQuery(line) if err != nil { reterr = fmt.Errorf("parse error: %v", err) return } item, err := d.parseQuery(parsedQuery, lineNum) if err != nil { reterr = fmt.Errorf("parse error: %v", err) return } writeItems = append(writeItems, item) } } else { item, err := d.parseQuery(r.URL.Query(), lineNum) if err != nil { reterr = fmt.Errorf("parse error: %v", err) return } writeItems = append(writeItems, item) } return } func (d *Daemon) parseQuery(q url.Values, lineNum int) (writeItem *saasapi.WriteItem, reterr error) { userid := strings.TrimSpace(q.Get("userid")) if len(userid) == 0 { reterr = fmt.Errorf("userid is null. line %v", lineNum) return } writeItem = &saasapi.WriteItem{ Userid: userid, } for key, value := range q { parts := strings.Split(strings.ToLower(key), ".") if len(parts) == 2 { switch parts[0] { case "u8": index, err := strconv.ParseUint(parts[1], 10, 8) if err != nil { reterr = fmt.Errorf("param is error. line %v, param %v", lineNum, key) return } if index == 0 || index > uint64(saasapi.MAX_U8) { reterr = fmt.Errorf("param index is error. line %v, param %v, index %v", lineNum, key, index) return } nValue, err := strconv.ParseUint(value[0], 10, 8) if err != nil { reterr = fmt.Errorf("param value is error. line %v, param %v, value %v", lineNum, key, value) return } if writeItem.GetWriteBytes() == nil { writeItem.WriteBytes = &saasapi.Bytes{} } if writeItem.GetWriteBytes().GetIndex_1()&(1<<(index-1)) > 0 { // 已经被写入过数据,跳过 continue } writeItem.GetWriteBytes().Bytes = append(writeItem.GetWriteBytes().Bytes, uint8(nValue)) writeItem.GetWriteBytes().Index_1 |= 1 << (index - 1) case "u32": index, err := strconv.ParseUint(parts[1], 10, 8) if err != nil { reterr = fmt.Errorf("param is error. line %v, param %v", lineNum, key) return } if index == 0 || index > uint64(saasapi.MAX_U32) { reterr = fmt.Errorf("param index is error. line %v, param %v, index %v", lineNum, key, index) return } nValue, err := strconv.ParseUint(value[0], 10, 32) if err != nil { reterr = fmt.Errorf("param value is error. line %v, param %v, value %v", lineNum, key, value) return } if writeItem.GetWriteUint32S() == nil { writeItem.WriteUint32S = &saasapi.Uint32S{} } if writeItem.GetWriteUint32S().GetIndex_1()&(1<<(index-1)) > 0 { // 已经被写入过数据,跳过 continue } writeItem.GetWriteUint32S().Uint32S = append(writeItem.GetWriteUint32S().Uint32S, uint32(nValue)) writeItem.GetWriteUint32S().Index_1 |= 1 << (index - 1) case "flag": index, err := strconv.ParseUint(parts[1], 10, 8) if err != nil { reterr = fmt.Errorf("param is error. line %v, param %v", lineNum, key) return } if index == 0 || index > uint64(saasapi.MAX_FLAG) { reterr = fmt.Errorf("param index is error. line %v, param %v, index %v", lineNum, key, index) return } flag := saasapi.FlagWithExpire{} switch strings.ToLower(value[0]) { case "true": flag.Flag = true case "false": flag.Flag = false default: if strings.HasPrefix(value[0], "!") { // 相对时间戳 nValue, err := strconv.ParseUint(value[0][1:], 10, 32) if err != nil { reterr = fmt.Errorf("param value is error. line %v, param %v, value %v", lineNum, key, value) return } flag.Flag = true flag.Expire = d.currTimeStamp + uint32(nValue) } else { nValue, err := strconv.ParseUint(value[0], 10, 32) if err != nil || nValue > 30*24*86400 { reterr = fmt.Errorf("param value is error. line %v, param %v, value %v", lineNum, key, value) return } flag.Flag = true flag.Expire = uint32(nValue) } } if writeItem.GetWriteFlagsWithExpire() == nil { writeItem.WriteFlagsWithExpire = &saasapi.FlagsWithExpire{} } if writeItem.GetWriteFlagsWithExpire().GetIndex_1()&(1<<(index-1)) > 0 { // 已经被写入过数据,跳过 continue } writeItem.GetWriteFlagsWithExpire().FlagsWithExpire = append(writeItem.GetWriteFlagsWithExpire().FlagsWithExpire, &flag) writeItem.GetWriteFlagsWithExpire().Index_1 |= 1 << (index - 1) } } } return }