173 lines
3.7 KiB
Go
173 lines
3.7 KiB
Go
package main
|
||
|
||
import (
|
||
"bufio"
|
||
"flag"
|
||
"fmt"
|
||
"log/slog"
|
||
"net/http"
|
||
"os"
|
||
"path"
|
||
|
||
"e.coding.net/rta/public/saasapi"
|
||
"e.coding.net/rta/public/saasapi/pkg/saashttp"
|
||
"google.golang.org/protobuf/encoding/protojson"
|
||
)
|
||
|
||
type writeParams struct {
|
||
cfg *Config
|
||
sourcePath string
|
||
appid string
|
||
batchSize uint
|
||
clear bool
|
||
saasHttp *saashttp.SaasClient
|
||
}
|
||
|
||
func RunWrite(args ...string) error {
|
||
fs := flag.NewFlagSet("write", flag.ExitOnError)
|
||
cfgFile := paramConfig(fs)
|
||
sourcePath := paramSourcePath(fs)
|
||
appid := paramAppid(fs)
|
||
batchSize := paramBatchSize(fs)
|
||
clear := paramClear(fs)
|
||
|
||
if err := fs.Parse(args); err != nil {
|
||
fmt.Println("command line parse error", "err", err)
|
||
return err
|
||
}
|
||
|
||
if fs.NArg() > 0 || len(*sourcePath) == 0 {
|
||
fs.PrintDefaults()
|
||
return nil
|
||
}
|
||
|
||
cfg, err := LoadConfigFile(*cfgFile)
|
||
if err != nil {
|
||
slog.Error("LoadConfigFile error", "err", err)
|
||
return err
|
||
}
|
||
writeParams := writeParams{
|
||
cfg: cfg,
|
||
sourcePath: *sourcePath,
|
||
appid: *appid,
|
||
batchSize: *batchSize,
|
||
clear: *clear,
|
||
saasHttp: &saashttp.SaasClient{
|
||
Client: http.Client{},
|
||
ApiUrls: cfg.ApiUrls,
|
||
Auth: cfg.Auth,
|
||
},
|
||
}
|
||
|
||
return doWrite(writeParams)
|
||
}
|
||
|
||
func doWrite(writeParams writeParams) error {
|
||
fsInfo, err := os.Stat(writeParams.sourcePath)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if !fsInfo.IsDir() {
|
||
// 如果是文件,直接写入
|
||
return doLoadFileToWrite(writeParams)
|
||
}
|
||
|
||
// 读取目录下信息
|
||
dirEntry, err := os.ReadDir(writeParams.sourcePath)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 遍历目录
|
||
for _, dir := range dirEntry {
|
||
newParam := writeParams
|
||
newParam.sourcePath = path.Join(writeParams.sourcePath, dir.Name())
|
||
|
||
if err = doWrite(newParam); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func doLoadFileToWrite(writeParams writeParams) error {
|
||
// 读取文件并按行遍历,以\t分割为两列,第一列为userid,第二列解析为string数组
|
||
file, err := os.Open(writeParams.sourcePath)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer file.Close()
|
||
|
||
scaner := bufio.NewScanner(file)
|
||
|
||
saasWriteItems := []*saasapi.WriteItem{}
|
||
|
||
succ := uint32(0)
|
||
succTotal := uint32(0)
|
||
total := uint32(0)
|
||
for scaner.Scan() {
|
||
line := scaner.Text()
|
||
if line == "" {
|
||
continue
|
||
}
|
||
saasWriteItem := &saasapi.WriteItem{}
|
||
if err = protojson.Unmarshal([]byte(line), saasWriteItem); err != nil {
|
||
return err
|
||
}
|
||
|
||
saasWriteItems = append(saasWriteItems, saasWriteItem)
|
||
total++
|
||
|
||
if len(saasWriteItems) == int(writeParams.batchSize) {
|
||
if succ, _, err = submitWrite(writeParams, saasWriteItems); err != nil {
|
||
return err
|
||
}
|
||
|
||
succTotal += succ
|
||
fmt.Printf("[%v] batch_succ = %v, succ_total = %v, total_processed = %v\n", writeParams.sourcePath, succ, succTotal, total)
|
||
|
||
saasWriteItems = saasWriteItems[:0]
|
||
}
|
||
|
||
}
|
||
|
||
if len(saasWriteItems) > 0 {
|
||
if succ, _, err = submitWrite(writeParams, saasWriteItems); err != nil {
|
||
return err
|
||
}
|
||
succTotal += succ
|
||
fmt.Printf("[%v] batch_succ = %v, succ_total = %v, total_processed = %v\n", writeParams.sourcePath, succ, succTotal, total)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func submitWrite(writeParams writeParams, saasWriteCmds []*saasapi.WriteItem) (succ, total uint32, err error) {
|
||
saasReq := &saasapi.SaasReq{
|
||
Cmd: &saasapi.SaasReq_Write{
|
||
Write: &saasapi.Write{
|
||
IsClearAllFirst: writeParams.clear,
|
||
},
|
||
},
|
||
}
|
||
|
||
if writeParams.appid != "" {
|
||
saasReq.UseridType = saasapi.UserIdType_OPENID
|
||
saasReq.Appid = writeParams.appid
|
||
}
|
||
|
||
saasReq.Cmd.(*saasapi.SaasReq_Write).Write.WriteItems = saasWriteCmds
|
||
|
||
total = uint32(len(saasWriteCmds))
|
||
res, err := writeParams.saasHttp.Write(saasReq)
|
||
|
||
if err != nil {
|
||
slog.Error("submitWrite error", "err", err)
|
||
return
|
||
}
|
||
|
||
return res.GetWriteRes().GetSuccCmdCount(), total, nil
|
||
}
|