package main import ( "bufio" "flag" "fmt" "log/slog" "os" "path" "strings" "e.coding.net/rta/public/saasapi" "google.golang.org/protobuf/proto" ) type writeParams struct { cfg *Config targetCfg *TargetConfig batchSize uint dataPath string async bool } func RunWrite(args ...string) error { fs := flag.NewFlagSet("write", flag.ExitOnError) cfgFile := paramConfig(fs) targetCfgFile := paramTargets(fs) dataPath := paramFromPath(fs) batchSize := paramBatchSize(fs) async := paramAsync(fs) if err := fs.Parse(args); err != nil { fmt.Println("command line parse error", "err", err) return err } if fs.NArg() > 0 || *targetCfgFile == "" || len(*dataPath) == 0 { fs.PrintDefaults() return nil } cfg, err := LoadConfigFile(*cfgFile) if err != nil { slog.Error("LoadConfigFile error", "err", err) return err } targetCfg, err := LoadTargetFile(*targetCfgFile) if err != nil { fmt.Println("LoadConfigFile error", "err", err) return err } writeParams := writeParams{ cfg: cfg, targetCfg: targetCfg, batchSize: *batchSize, dataPath: *dataPath, async: *async, } return doWrite(writeParams) } func doWrite(writeParams writeParams) error { fsInfo, err := os.Stat(writeParams.dataPath) if err != nil { return err } if !fsInfo.IsDir() { // 如果是文件,直接写入 return doLoadFileToWrite(writeParams) } // 读取目录下信息 dirEntry, err := os.ReadDir(writeParams.dataPath) if err != nil { return err } // 遍历目录 for _, dir := range dirEntry { newParam := writeParams newParam.dataPath = path.Join(writeParams.dataPath, dir.Name()) if err = doWrite(newParam); err != nil { return err } } return nil } func doLoadFileToWrite(writeParams writeParams) error { // 读取文件并按行遍历,以\t分割为两列,第一列为userid,第二列解析为string数组 file, err := os.Open(writeParams.dataPath) if err != nil { return err } defer file.Close() scaner := bufio.NewScanner(file) saasWriteCmds := []*saasapi.WriteCmd{} saasReq := &saasapi.SaasReq{ UseridType: saasapi.UserIdType_DEVICEID, Cmd: &saasapi.SaasReq_Write{ Write: &saasapi.Write{}, }, } for scaner.Scan() { line := scaner.Text() if line == "" { continue } // 按\t分割为两列 parts := strings.Split(line, "\t") if len(parts) != 2 { continue } // 读取userid userid := parts[0] value := parts[1] value = strings.ReplaceAll(value, "[", "") value = strings.ReplaceAll(value, "]", "") // 第二列解析为string数组 targets := strings.Split(value, " ") saasWriteCmd := &saasapi.WriteCmd{ Userid: userid, IsFullOverwrite: true, } if len(userid) == 0 || len(targets) == 0 { continue } for _, target := range targets { if targetinfo, ok := writeParams.targetCfg.Targets[target]; ok { if targetinfo.WriteByte != nil { if saasWriteCmd.WriteBytes == nil { saasWriteCmd.WriteBytes = &saasapi.Bytes{} } saasWriteCmd.WriteBytes.Bytes = append(saasWriteCmd.WriteBytes.Bytes, *targetinfo.WriteByte) if targetinfo.WriteBytePos < 64 { saasWriteCmd.WriteBytes.Index_1 |= 1 << targetinfo.WriteBytePos } else if targetinfo.WriteBytePos < 128 { saasWriteCmd.WriteBytes.Index_2 |= 1 << (targetinfo.WriteBytePos - 64) } } if targetinfo.WriteUint32 != nil { if saasWriteCmd.WriteUint32S == nil { saasWriteCmd.WriteUint32S = &saasapi.Uint32S{} } saasWriteCmd.WriteUint32S.Uint32S = append(saasWriteCmd.WriteUint32S.Uint32S, *targetinfo.WriteUint32) saasWriteCmd.WriteUint32S.Index_1 |= 1 << targetinfo.WriteUint32Pos } if targetinfo.WriteFlag != nil && targetinfo.WriteExpire != nil { if saasWriteCmd.WriteFlagsWithExpire == nil { saasWriteCmd.WriteFlagsWithExpire = &saasapi.FlagsWithExpire{} } saasWriteCmd.WriteFlagsWithExpire.FlagsWithExpire = append( saasWriteCmd.WriteFlagsWithExpire.FlagsWithExpire, &saasapi.FlagWithExpire{ Flag: *targetinfo.WriteFlag, Expire: *targetinfo.WriteExpire, }) saasWriteCmd.WriteFlagsWithExpire.Index_1 |= 1 << targetinfo.WriteFlagWithExpirePos } } } saasWriteCmds = append(saasWriteCmds, saasWriteCmd) if len(saasWriteCmds) == int(writeParams.batchSize) { if err = submitWrite(saasReq, saasWriteCmds); err != nil { return err } saasWriteCmds = saasWriteCmds[:0] } } if len(saasWriteCmds) > 0 { return submitWrite(saasReq, saasWriteCmds) } return nil } func submitWrite(saasReq *saasapi.SaasReq, saasWriteCmds []*saasapi.WriteCmd) error { saasReq.Cmd.(*saasapi.SaasReq_Write).Write.WriteCmds = saasWriteCmds postBuf, err := proto.Marshal(saasReq) if err != nil { return err } fmt.Println(len(postBuf)) return nil }