package main import ( "bufio" "flag" "fmt" "log/slog" "net/http" "os" "path" "e.coding.net/rta/public/saasapi" "e.coding.net/rta/public/saasapi/pkg/saashttp" "google.golang.org/protobuf/encoding/protojson" ) type writeParams struct { cfg *Config sourcePath string appid string batchSize uint async bool clear bool saasHttp *saashttp.SaasClient } func RunWrite(args ...string) error { fs := flag.NewFlagSet("write", flag.ExitOnError) cfgFile := paramConfig(fs) sourcePath := paramSourcePath(fs) appid := paramAppid(fs) batchSize := paramBatchSize(fs) async := paramAsync(fs) clear := paramClear(fs) if err := fs.Parse(args); err != nil { fmt.Println("command line parse error", "err", err) return err } if fs.NArg() > 0 || len(*sourcePath) == 0 { fs.PrintDefaults() return nil } cfg, err := LoadConfigFile(*cfgFile) if err != nil { slog.Error("LoadConfigFile error", "err", err) return err } writeParams := writeParams{ cfg: cfg, sourcePath: *sourcePath, appid: *appid, batchSize: *batchSize, async: *async, clear: *clear, saasHttp: &saashttp.SaasClient{ Client: http.Client{}, ApiUrls: cfg.ApiUrls, Auth: cfg.Auth, }, } return doWrite(writeParams) } func doWrite(writeParams writeParams) error { fsInfo, err := os.Stat(writeParams.sourcePath) if err != nil { return err } if !fsInfo.IsDir() { // 如果是文件,直接写入 return doLoadFileToWrite(writeParams) } // 读取目录下信息 dirEntry, err := os.ReadDir(writeParams.sourcePath) if err != nil { return err } // 遍历目录 for _, dir := range dirEntry { newParam := writeParams newParam.sourcePath = path.Join(writeParams.sourcePath, dir.Name()) if err = doWrite(newParam); err != nil { return err } } return nil } func doLoadFileToWrite(writeParams writeParams) error { // 读取文件并按行遍历,以\t分割为两列,第一列为userid,第二列解析为string数组 file, err := os.Open(writeParams.sourcePath) if err != nil { return err } defer file.Close() scaner := bufio.NewScanner(file) saasWriteCmds := []*saasapi.WriteCmd{} succ := uint32(0) succTotal := uint32(0) total := uint32(0) for scaner.Scan() { line := scaner.Text() if line == "" { continue } saasWriteCmd := &saasapi.WriteCmd{} if err = protojson.Unmarshal([]byte(line), saasWriteCmd); err != nil { return err } saasWriteCmds = append(saasWriteCmds, saasWriteCmd) total++ if len(saasWriteCmds) == int(writeParams.batchSize) { if succ, _, err = submitWrite(writeParams, saasWriteCmds); err != nil { return err } succTotal += succ fmt.Printf("[%v] batch_succ = %v, succ_total = %v, total_processed = %v\n", writeParams.sourcePath, succ, succTotal, total) saasWriteCmds = saasWriteCmds[:0] } } if len(saasWriteCmds) > 0 { if succ, _, err = submitWrite(writeParams, saasWriteCmds); err != nil { return err } succTotal += succ fmt.Printf("[%v] batch_succ = %v, succ_total = %v, total_processed = %v\n", writeParams.sourcePath, succ, succTotal, total) } return nil } func submitWrite(writeParams writeParams, saasWriteCmds []*saasapi.WriteCmd) (succ, total uint32, err error) { saasReq := &saasapi.SaasReq{ Cmd: &saasapi.SaasReq_Write{ Write: &saasapi.Write{ IsClearAllFirst: writeParams.clear, Async: writeParams.async, }, }, } if writeParams.appid != "" { saasReq.UseridType = saasapi.UserIdType_OPENID saasReq.Appid = writeParams.appid } saasReq.Cmd.(*saasapi.SaasReq_Write).Write.WriteCmds = saasWriteCmds total = uint32(len(saasWriteCmds)) succ, err = writeParams.saasHttp.Write(saasReq) return }