270 lines
6.2 KiB
Go
270 lines
6.2 KiB
Go
package main
|
||
|
||
import (
|
||
"bufio"
|
||
"bytes"
|
||
"flag"
|
||
"fmt"
|
||
"os"
|
||
"path"
|
||
"runtime"
|
||
"strings"
|
||
"sync"
|
||
|
||
"git.algo.com.cn/public/saasapi"
|
||
"google.golang.org/protobuf/encoding/protojson"
|
||
)
|
||
|
||
const (
|
||
convertBatchSize = 100000
|
||
convertedExt = ".converted"
|
||
)
|
||
|
||
type convertParams struct {
|
||
mapCfg *MapConfig
|
||
sourcePath string
|
||
destPath string
|
||
}
|
||
|
||
type convertResult struct {
|
||
resultBuf bytes.Buffer
|
||
convertedLines int
|
||
}
|
||
|
||
func RunConvert(args ...string) error {
|
||
fs := flag.NewFlagSet("convert", flag.ExitOnError)
|
||
mapCfgFile := paramMap(fs)
|
||
sourcePath := paramSourcePath(fs)
|
||
destPath := paramDestPath(fs)
|
||
|
||
if err := fs.Parse(args); err != nil {
|
||
return fmt.Errorf("Command line parse error: %w", err)
|
||
}
|
||
|
||
if fs.NArg() > 0 || *mapCfgFile == "" || len(*sourcePath) == 0 || len(*destPath) == 0 {
|
||
fs.PrintDefaults()
|
||
return nil
|
||
}
|
||
|
||
mapCfg, err := LoadMapFile(*mapCfgFile)
|
||
if err != nil {
|
||
return fmt.Errorf("LoadMapFile error: %w", err)
|
||
}
|
||
|
||
convertParams := convertParams{
|
||
mapCfg: mapCfg,
|
||
sourcePath: *sourcePath,
|
||
destPath: *destPath,
|
||
}
|
||
|
||
return doConvert(convertParams)
|
||
}
|
||
|
||
func doConvert(convertParams convertParams) error {
|
||
fsInfo, err := os.Stat(convertParams.sourcePath)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if !fsInfo.IsDir() {
|
||
// 如果是文件,直接写入
|
||
return doFileConvert(convertParams)
|
||
}
|
||
|
||
// 读取目录下信息
|
||
dirEntry, err := os.ReadDir(convertParams.sourcePath)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 遍历目录
|
||
for _, dir := range dirEntry {
|
||
newParam := convertParams
|
||
newParam.sourcePath = path.Join(convertParams.sourcePath, dir.Name())
|
||
|
||
if dir.IsDir() {
|
||
newParam.destPath = path.Join(convertParams.destPath, dir.Name())
|
||
}
|
||
|
||
if err = doConvert(newParam); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func doFileConvert(convertParams convertParams) error {
|
||
// 读取文件并按行遍历,以\t分割为两列,第一列为userid,第二列解析为string数组
|
||
sourceFile, err := os.Open(convertParams.sourcePath)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer sourceFile.Close()
|
||
|
||
if _, err = os.Stat(convertParams.destPath); os.IsNotExist(err) {
|
||
os.MkdirAll(convertParams.destPath, os.ModePerm)
|
||
}
|
||
|
||
destName := path.Join(convertParams.destPath, path.Base(convertParams.sourcePath)+convertedExt)
|
||
destFile, err := os.Create(destName)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer destFile.Close()
|
||
|
||
scaner := bufio.NewScanner(sourceFile)
|
||
destWriter := bufio.NewWriter(destFile)
|
||
defer destWriter.Flush()
|
||
|
||
// 启动处理协程
|
||
workers := []chan []string{}
|
||
results := []chan convertResult{}
|
||
processedLine := 0
|
||
|
||
wg := sync.WaitGroup{}
|
||
convertMaxWorkers := runtime.GOMAXPROCS(0)
|
||
for range convertMaxWorkers {
|
||
workerChan := make(chan []string)
|
||
workers = append(workers, workerChan)
|
||
resultChan := make(chan convertResult)
|
||
results = append(results, resultChan)
|
||
|
||
go func(workerChan <-chan []string, resultChan chan<- convertResult) {
|
||
for lines := range workerChan {
|
||
convertBatch(lines, convertParams, resultChan)
|
||
}
|
||
}(workerChan, resultChan)
|
||
}
|
||
|
||
// 启动写入协程
|
||
go func() {
|
||
i := 0
|
||
|
||
// TIP: 不要改成for range
|
||
for {
|
||
select {
|
||
case result, ok := <-results[i%convertMaxWorkers]:
|
||
if !ok {
|
||
return
|
||
}
|
||
destWriter.Write(result.resultBuf.Bytes())
|
||
destWriter.Flush()
|
||
processedLine += result.convertedLines
|
||
fmt.Printf("\rconverted records: %v [%v]", processedLine, destName)
|
||
i++
|
||
wg.Done()
|
||
}
|
||
|
||
}
|
||
}()
|
||
|
||
// 读取文件并塞给协程处理
|
||
batch := []string{}
|
||
batchCount := 0
|
||
for scaner.Scan() {
|
||
line := scaner.Text()
|
||
if line == "" {
|
||
continue
|
||
}
|
||
|
||
batch = append(batch, line)
|
||
if len(batch) == convertBatchSize {
|
||
// 将batch写入协程
|
||
wg.Add(1)
|
||
workers[batchCount%convertMaxWorkers] <- batch
|
||
batch = nil
|
||
batchCount++
|
||
}
|
||
}
|
||
|
||
if len(batch) > 0 {
|
||
wg.Add(1)
|
||
workers[batchCount%convertMaxWorkers] <- batch
|
||
}
|
||
|
||
wg.Wait()
|
||
// 关闭所有工作协程的通道
|
||
for _, workerChan := range workers {
|
||
close(workerChan)
|
||
}
|
||
for _, resultChan := range results {
|
||
close(resultChan)
|
||
}
|
||
fmt.Println("")
|
||
|
||
return nil
|
||
}
|
||
|
||
func convertBatch(lines []string, convertParams convertParams, resultChan chan<- convertResult) {
|
||
byteBuf := bytes.Buffer{}
|
||
byteBuf.Grow(1024 * 1024 * 10)
|
||
|
||
jasonMarshal := protojson.MarshalOptions{Multiline: false, Indent: ""}
|
||
|
||
for _, line := range lines {
|
||
// 按\t分割为两列
|
||
parts := strings.Split(line, "\t")
|
||
if len(parts) != 2 {
|
||
continue
|
||
}
|
||
|
||
// 读取userid
|
||
userid := parts[0]
|
||
if len(userid) == 0 {
|
||
continue
|
||
}
|
||
|
||
value := parts[1]
|
||
value = strings.ReplaceAll(value, "[", "")
|
||
value = strings.ReplaceAll(value, "]", "")
|
||
// 第二列解析为string数组
|
||
targets := strings.Split(value, " ")
|
||
|
||
saasWriteItem := &saasapi.WriteItem{
|
||
Userid: userid,
|
||
}
|
||
|
||
// 遍历targets,转换成saasapi.WriteCmd
|
||
for _, target := range targets {
|
||
if targetinfo, ok := convertParams.mapCfg.Targets[target]; ok {
|
||
if targetinfo.WriteByte != nil {
|
||
// 转换byte区
|
||
if saasWriteItem.WriteBytes == nil {
|
||
saasWriteItem.WriteBytes = &saasapi.Bytes{}
|
||
}
|
||
saasWriteItem.WriteBytes.Bytes = append(saasWriteItem.WriteBytes.Bytes, *targetinfo.WriteByte)
|
||
saasWriteItem.WriteBytes.Index_1 |= 1 << targetinfo.WriteBytePos
|
||
}
|
||
|
||
if targetinfo.WriteUint32 != nil {
|
||
// 转换uint32区
|
||
if saasWriteItem.WriteUint32S == nil {
|
||
saasWriteItem.WriteUint32S = &saasapi.Uint32S{}
|
||
}
|
||
saasWriteItem.WriteUint32S.Uint32S = append(saasWriteItem.WriteUint32S.Uint32S, *targetinfo.WriteUint32)
|
||
saasWriteItem.WriteUint32S.Index_1 |= 1 << targetinfo.WriteUint32Pos
|
||
}
|
||
|
||
if targetinfo.WriteFlag != nil && targetinfo.WriteExpire != nil {
|
||
// 转换flag区
|
||
if saasWriteItem.WriteFlagsWithExpire == nil {
|
||
saasWriteItem.WriteFlagsWithExpire = &saasapi.FlagsWithExpire{}
|
||
}
|
||
saasWriteItem.WriteFlagsWithExpire.FlagsWithExpire = append(
|
||
saasWriteItem.WriteFlagsWithExpire.FlagsWithExpire, &saasapi.FlagWithExpire{
|
||
Flag: *targetinfo.WriteFlag,
|
||
Expire: *targetinfo.WriteExpire,
|
||
})
|
||
saasWriteItem.WriteFlagsWithExpire.Index_1 |= 1 << targetinfo.WriteFlagWithExpirePos
|
||
}
|
||
}
|
||
}
|
||
|
||
byteBuf.WriteString(jasonMarshal.Format(saasWriteItem))
|
||
byteBuf.WriteString("\n")
|
||
}
|
||
|
||
resultChan <- convertResult{byteBuf, len(lines)}
|
||
}
|