package main import ( "bufio" "bytes" "flag" "fmt" "os" "path" "runtime" "strings" "sync" "git.algo.com.cn/public/saasapi" "google.golang.org/protobuf/encoding/protojson" ) const ( convertBatchSize = 100000 convertedExt = ".converted" ) type convertParams struct { mapCfg *MapConfig sourcePath string destPath string } type convertResult struct { resultBuf bytes.Buffer convertedLines int } func RunConvert(args ...string) error { fs := flag.NewFlagSet("convert", flag.ExitOnError) mapCfgFile := paramMap(fs) sourcePath := paramSourcePath(fs) destPath := paramDestPath(fs) if err := fs.Parse(args); err != nil { fmt.Fprintln(os.Stderr, "command line parse error", "err", err) return err } if fs.NArg() > 0 || *mapCfgFile == "" || len(*sourcePath) == 0 || len(*destPath) == 0 { fs.PrintDefaults() return nil } mapCfg, err := LoadMapFile(*mapCfgFile) if err != nil { fmt.Fprintln(os.Stderr, "LoadConfigFile error", "err", err) return err } convertParams := convertParams{ mapCfg: mapCfg, sourcePath: *sourcePath, destPath: *destPath, } return doConvert(convertParams) } func doConvert(convertParams convertParams) error { fsInfo, err := os.Stat(convertParams.sourcePath) if err != nil { return err } if !fsInfo.IsDir() { // 如果是文件,直接写入 return doFileConvert(convertParams) } // 读取目录下信息 dirEntry, err := os.ReadDir(convertParams.sourcePath) if err != nil { return err } // 遍历目录 for _, dir := range dirEntry { newParam := convertParams newParam.sourcePath = path.Join(convertParams.sourcePath, dir.Name()) if dir.IsDir() { newParam.destPath = path.Join(convertParams.destPath, dir.Name()) } if err = doConvert(newParam); err != nil { return err } } return nil } func doFileConvert(convertParams convertParams) error { // 读取文件并按行遍历,以\t分割为两列,第一列为userid,第二列解析为string数组 sourceFile, err := os.Open(convertParams.sourcePath) if err != nil { return err } defer sourceFile.Close() if _, err = os.Stat(convertParams.destPath); os.IsNotExist(err) { os.MkdirAll(convertParams.destPath, os.ModePerm) } destName := path.Join(convertParams.destPath, path.Base(convertParams.sourcePath)+convertedExt) destFile, err := os.Create(destName) if err != nil { return err } defer destFile.Close() scaner := bufio.NewScanner(sourceFile) destWriter := bufio.NewWriter(destFile) defer destWriter.Flush() // 启动处理协程 workers := []chan []string{} results := []chan convertResult{} processedLine := 0 wg := sync.WaitGroup{} convertMaxWorkers := runtime.GOMAXPROCS(0) for range convertMaxWorkers { workerChan := make(chan []string) workers = append(workers, workerChan) resultChan := make(chan convertResult) results = append(results, resultChan) go func(workerChan <-chan []string, resultChan chan<- convertResult) { for lines := range workerChan { convertBatch(lines, convertParams, resultChan) } }(workerChan, resultChan) } // 启动写入协程 go func() { i := 0 // TIP: 不要改成for range for { select { case result, ok := <-results[i%convertMaxWorkers]: if !ok { return } destWriter.Write(result.resultBuf.Bytes()) destWriter.Flush() processedLine += result.convertedLines fmt.Printf("\rconverted records: %v [%v]", processedLine, destName) i++ wg.Done() } } }() // 读取文件并塞给协程处理 batch := []string{} batchCount := 0 for scaner.Scan() { line := scaner.Text() if line == "" { continue } batch = append(batch, line) if len(batch) == convertBatchSize { // 将batch写入协程 wg.Add(1) workers[batchCount%convertMaxWorkers] <- batch batch = nil batchCount++ } } if len(batch) > 0 { wg.Add(1) workers[batchCount%convertMaxWorkers] <- batch } wg.Wait() // 关闭所有工作协程的通道 for _, workerChan := range workers { close(workerChan) } for _, resultChan := range results { close(resultChan) } fmt.Println("") return nil } func convertBatch(lines []string, convertParams convertParams, resultChan chan<- convertResult) { byteBuf := bytes.Buffer{} byteBuf.Grow(1024 * 1024 * 10) jasonMarshal := protojson.MarshalOptions{Multiline: false, Indent: ""} for _, line := range lines { // 按\t分割为两列 parts := strings.Split(line, "\t") if len(parts) != 2 { continue } // 读取userid userid := parts[0] if len(userid) == 0 { continue } value := parts[1] value = strings.ReplaceAll(value, "[", "") value = strings.ReplaceAll(value, "]", "") // 第二列解析为string数组 targets := strings.Split(value, " ") saasWriteItem := &saasapi.WriteItem{ Userid: userid, } // 遍历targets,转换成saasapi.WriteCmd for _, target := range targets { if targetinfo, ok := convertParams.mapCfg.Targets[target]; ok { if targetinfo.WriteByte != nil { // 转换byte区 if saasWriteItem.WriteBytes == nil { saasWriteItem.WriteBytes = &saasapi.Bytes{} } saasWriteItem.WriteBytes.Bytes = append(saasWriteItem.WriteBytes.Bytes, *targetinfo.WriteByte) if targetinfo.WriteBytePos < 64 { saasWriteItem.WriteBytes.Index_1 |= 1 << targetinfo.WriteBytePos } else if targetinfo.WriteBytePos < 128 { saasWriteItem.WriteBytes.Index_2 |= 1 << (targetinfo.WriteBytePos - 64) } } if targetinfo.WriteUint32 != nil { // 转换uint32区 if saasWriteItem.WriteUint32S == nil { saasWriteItem.WriteUint32S = &saasapi.Uint32S{} } saasWriteItem.WriteUint32S.Uint32S = append(saasWriteItem.WriteUint32S.Uint32S, *targetinfo.WriteUint32) saasWriteItem.WriteUint32S.Index_1 |= 1 << targetinfo.WriteUint32Pos } if targetinfo.WriteFlag != nil && targetinfo.WriteExpire != nil { // 转换flag区 if saasWriteItem.WriteFlagsWithExpire == nil { saasWriteItem.WriteFlagsWithExpire = &saasapi.FlagsWithExpire{} } saasWriteItem.WriteFlagsWithExpire.FlagsWithExpire = append( saasWriteItem.WriteFlagsWithExpire.FlagsWithExpire, &saasapi.FlagWithExpire{ Flag: *targetinfo.WriteFlag, Expire: *targetinfo.WriteExpire, }) saasWriteItem.WriteFlagsWithExpire.Index_1 |= 1 << targetinfo.WriteFlagWithExpirePos } } } byteBuf.WriteString(jasonMarshal.Format(saasWriteItem)) byteBuf.WriteString("\n") } resultChan <- convertResult{byteBuf, len(lines)} }