package main import ( "bufio" "flag" "fmt" "net/http" "os" "path" "e.coding.net/rta/public/saasapi" "e.coding.net/rta/public/saasapi/pkg/saashttp" "google.golang.org/protobuf/encoding/protojson" ) type writeParams struct { cfg *Config sourcePath string appid string batchSize uint clear bool saasHttp *saashttp.SaasClient } func RunWrite(args ...string) error { fs := flag.NewFlagSet("write", flag.ExitOnError) cfgFile := paramConfig(fs) sourcePath := paramSourcePath(fs) appid := paramAppid(fs) batchSize := paramBatchSize(fs) clear := paramClear(fs) if err := fs.Parse(args); err != nil { fmt.Fprintln(os.Stderr, "command line parse error", "err", err) return err } if fs.NArg() > 0 || len(*sourcePath) == 0 { fs.PrintDefaults() return nil } cfg, err := LoadConfigFile(*cfgFile) if err != nil { fmt.Fprintln(os.Stderr, "load config file error", "err", err) return err } writeParams := writeParams{ cfg: cfg, sourcePath: *sourcePath, appid: *appid, batchSize: *batchSize, clear: *clear, saasHttp: &saashttp.SaasClient{ Client: &http.Client{}, ApiUrls: &cfg.ApiUrls, Auth: &cfg.Auth, }, } return doWrite(writeParams) } func doWrite(writeParams writeParams) error { fsInfo, err := os.Stat(writeParams.sourcePath) if err != nil { fmt.Fprintln(os.Stderr, "file stat error", "err", err) return err } if !fsInfo.IsDir() { // 如果是文件,直接写入 return doLoadFileToWrite(writeParams) } // 读取目录下信息 dirEntry, err := os.ReadDir(writeParams.sourcePath) if err != nil { fmt.Fprintln(os.Stderr, "read dir error", "err", err) return err } // 遍历目录 for _, dir := range dirEntry { newParam := writeParams newParam.sourcePath = path.Join(writeParams.sourcePath, dir.Name()) if err = doWrite(newParam); err != nil { return err } } return nil } func doLoadFileToWrite(writeParams writeParams) error { // 读取文件并按行遍历,以\t分割为两列,第一列为userid,第二列解析为string数组 file, err := os.Open(writeParams.sourcePath) if err != nil { fmt.Fprintln(os.Stderr, "open file error", "file", writeParams.sourcePath, "err", err) return err } defer file.Close() scaner := bufio.NewScanner(file) saasWriteItems := []*saasapi.WriteItem{} succ := uint32(0) succTotal := uint32(0) total := uint32(0) for scaner.Scan() { total++ line := scaner.Text() if line == "" { continue } saasWriteItem := &saasapi.WriteItem{} if err = protojson.Unmarshal([]byte(line), saasWriteItem); err != nil { fmt.Fprintln(os.Stderr, "protojson unmashal error", "file", writeParams.sourcePath, "line", total, "err", err) return err } saasWriteItems = append(saasWriteItems, saasWriteItem) if len(saasWriteItems) == int(writeParams.batchSize) { if succ, _, err = submitWrite(writeParams, saasWriteItems); err != nil { return err } succTotal += succ fmt.Printf("[%v] batch_succ = %v, succ_total = %v, total_processed = %v\n", writeParams.sourcePath, succ, succTotal, total) saasWriteItems = saasWriteItems[:0] } } if len(saasWriteItems) > 0 { if succ, _, err = submitWrite(writeParams, saasWriteItems); err != nil { return err } succTotal += succ fmt.Printf("[%v] batch_succ = %v, succ_total = %v, total_processed = %v\n", writeParams.sourcePath, succ, succTotal, total) } return nil } func submitWrite(writeParams writeParams, saasWriteCmds []*saasapi.WriteItem) (succ, total uint32, err error) { saasReq := &saasapi.SaasReq{ Cmd: &saasapi.SaasReq_Write{ Write: &saasapi.Write{ IsClearAllFirst: writeParams.clear, }, }, } if writeParams.appid != "" { saasReq.UseridType = saasapi.UserIdType_OPENID saasReq.Appid = writeParams.appid } saasReq.Cmd.(*saasapi.SaasReq_Write).Write.WriteItems = saasWriteCmds total = uint32(len(saasWriteCmds)) res, err := writeParams.saasHttp.Write(saasReq) if err != nil { fmt.Fprintln(os.Stderr, "submit write error", "err", err) return } if res.GetCode() != saasapi.ErrorCode_SUCC { err = fmt.Errorf("write failed. code:%v, status:%v", res.GetCode(), res.GetStatus()) fmt.Fprintln(os.Stderr, protojson.Format(res)) return } return res.GetWriteRes().GetSuccCmdCount(), total, nil }