Files
saasapi/cmd/saastool/convert.go
2025-04-22 14:30:30 +08:00

276 lines
6.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"bufio"
"bytes"
"flag"
"fmt"
"os"
"path"
"runtime"
"strings"
"sync"
"e.coding.net/rta/public/saasapi"
"google.golang.org/protobuf/encoding/protojson"
)
const (
convertBatchSize = 100000
convertedExt = ".converted"
)
type convertParams struct {
mapCfg *MapConfig
sourcePath string
destPath string
}
type convertResult struct {
resultBuf bytes.Buffer
convertedLines int
}
func RunConvert(args ...string) error {
fs := flag.NewFlagSet("convert", flag.ExitOnError)
mapCfgFile := paramMap(fs)
sourcePath := paramSourcePath(fs)
destPath := paramDestPath(fs)
if err := fs.Parse(args); err != nil {
fmt.Fprintln(os.Stderr, "command line parse error", "err", err)
return err
}
if fs.NArg() > 0 || *mapCfgFile == "" || len(*sourcePath) == 0 || len(*destPath) == 0 {
fs.PrintDefaults()
return nil
}
mapCfg, err := LoadMapFile(*mapCfgFile)
if err != nil {
fmt.Fprintln(os.Stderr, "LoadConfigFile error", "err", err)
return err
}
convertParams := convertParams{
mapCfg: mapCfg,
sourcePath: *sourcePath,
destPath: *destPath,
}
return doConvert(convertParams)
}
func doConvert(convertParams convertParams) error {
fsInfo, err := os.Stat(convertParams.sourcePath)
if err != nil {
return err
}
if !fsInfo.IsDir() {
// 如果是文件,直接写入
return doFileConvert(convertParams)
}
// 读取目录下信息
dirEntry, err := os.ReadDir(convertParams.sourcePath)
if err != nil {
return err
}
// 遍历目录
for _, dir := range dirEntry {
newParam := convertParams
newParam.sourcePath = path.Join(convertParams.sourcePath, dir.Name())
if dir.IsDir() {
newParam.destPath = path.Join(convertParams.destPath, dir.Name())
}
if err = doConvert(newParam); err != nil {
return err
}
}
return nil
}
func doFileConvert(convertParams convertParams) error {
// 读取文件并按行遍历,以\t分割为两列第一列为userid第二列解析为string数组
sourceFile, err := os.Open(convertParams.sourcePath)
if err != nil {
return err
}
defer sourceFile.Close()
if _, err = os.Stat(convertParams.destPath); os.IsNotExist(err) {
os.MkdirAll(convertParams.destPath, os.ModePerm)
}
destName := path.Join(convertParams.destPath, path.Base(convertParams.sourcePath)+convertedExt)
destFile, err := os.Create(destName)
if err != nil {
return err
}
defer destFile.Close()
scaner := bufio.NewScanner(sourceFile)
destWriter := bufio.NewWriter(destFile)
defer destWriter.Flush()
// 启动处理协程
workers := []chan []string{}
results := []chan convertResult{}
processedLine := 0
wg := sync.WaitGroup{}
convertMaxWorkers := runtime.GOMAXPROCS(0)
for range convertMaxWorkers {
workerChan := make(chan []string)
workers = append(workers, workerChan)
resultChan := make(chan convertResult)
results = append(results, resultChan)
go func(workerChan <-chan []string, resultChan chan<- convertResult) {
for lines := range workerChan {
convertBatch(lines, convertParams, resultChan)
}
}(workerChan, resultChan)
}
// 启动写入协程
go func() {
i := 0
// TIP: 不要改成for range
for {
select {
case result, ok := <-results[i%convertMaxWorkers]:
if !ok {
return
}
destWriter.Write(result.resultBuf.Bytes())
destWriter.Flush()
processedLine += result.convertedLines
fmt.Printf("\rconverted records: %v [%v]", processedLine, destName)
i++
wg.Done()
}
}
}()
// 读取文件并塞给协程处理
batch := []string{}
batchCount := 0
for scaner.Scan() {
line := scaner.Text()
if line == "" {
continue
}
batch = append(batch, line)
if len(batch) == convertBatchSize {
// 将batch写入协程
wg.Add(1)
workers[batchCount%convertMaxWorkers] <- batch
batch = nil
batchCount++
}
}
if len(batch) > 0 {
wg.Add(1)
workers[batchCount%convertMaxWorkers] <- batch
}
wg.Wait()
// 关闭所有工作协程的通道
for _, workerChan := range workers {
close(workerChan)
}
for _, resultChan := range results {
close(resultChan)
}
fmt.Println("")
return nil
}
func convertBatch(lines []string, convertParams convertParams, resultChan chan<- convertResult) {
byteBuf := bytes.Buffer{}
byteBuf.Grow(1024 * 1024 * 10)
jasonMarshal := protojson.MarshalOptions{Multiline: false, Indent: ""}
for _, line := range lines {
// 按\t分割为两列
parts := strings.Split(line, "\t")
if len(parts) != 2 {
continue
}
// 读取userid
userid := parts[0]
if len(userid) == 0 {
continue
}
value := parts[1]
value = strings.ReplaceAll(value, "[", "")
value = strings.ReplaceAll(value, "]", "")
// 第二列解析为string数组
targets := strings.Split(value, " ")
saasWriteItem := &saasapi.WriteItem{
Userid: userid,
}
// 遍历targets转换成saasapi.WriteCmd
for _, target := range targets {
if targetinfo, ok := convertParams.mapCfg.Targets[target]; ok {
if targetinfo.WriteByte != nil {
// 转换byte区
if saasWriteItem.WriteBytes == nil {
saasWriteItem.WriteBytes = &saasapi.Bytes{}
}
saasWriteItem.WriteBytes.Bytes = append(saasWriteItem.WriteBytes.Bytes, *targetinfo.WriteByte)
if targetinfo.WriteBytePos < 64 {
saasWriteItem.WriteBytes.Index_1 |= 1 << targetinfo.WriteBytePos
} else if targetinfo.WriteBytePos < 128 {
saasWriteItem.WriteBytes.Index_2 |= 1 << (targetinfo.WriteBytePos - 64)
}
}
if targetinfo.WriteUint32 != nil {
// 转换uint32区
if saasWriteItem.WriteUint32S == nil {
saasWriteItem.WriteUint32S = &saasapi.Uint32S{}
}
saasWriteItem.WriteUint32S.Uint32S = append(saasWriteItem.WriteUint32S.Uint32S, *targetinfo.WriteUint32)
saasWriteItem.WriteUint32S.Index_1 |= 1 << targetinfo.WriteUint32Pos
}
if targetinfo.WriteFlag != nil && targetinfo.WriteExpire != nil {
// 转换flag区
if saasWriteItem.WriteFlagsWithExpire == nil {
saasWriteItem.WriteFlagsWithExpire = &saasapi.FlagsWithExpire{}
}
saasWriteItem.WriteFlagsWithExpire.FlagsWithExpire = append(
saasWriteItem.WriteFlagsWithExpire.FlagsWithExpire, &saasapi.FlagWithExpire{
Flag: *targetinfo.WriteFlag,
Expire: *targetinfo.WriteExpire,
})
saasWriteItem.WriteFlagsWithExpire.Index_1 |= 1 << targetinfo.WriteFlagWithExpirePos
}
}
}
byteBuf.WriteString(jasonMarshal.Format(saasWriteItem))
byteBuf.WriteString("\n")
}
resultChan <- convertResult{byteBuf, len(lines)}
}