支持dataspace

This commit is contained in:
algotao
2025-05-21 17:06:32 +08:00
parent e7d4aa27f1
commit b8c2a3a7d4
7 changed files with 265 additions and 342 deletions

View File

@@ -7,6 +7,7 @@ import (
"net/http"
"os"
"path"
"strings"
"e.coding.net/rta/public/saasapi"
"e.coding.net/rta/public/saasapi/pkg/saashttp"
@@ -17,6 +18,7 @@ type writeParams struct {
cfg *Config
sourcePath string
appid string
ds string
batchSize uint
clear bool
saasHttp *saashttp.SaasClient
@@ -27,6 +29,7 @@ func RunWrite(args ...string) error {
cfgFile := paramConfig(fs)
sourcePath := paramSourcePath(fs)
appid := paramAppid(fs)
ds := paramDataSpaceId(fs)
batchSize := paramBatchSize(fs)
clear := paramClear(fs)
@@ -35,11 +38,16 @@ func RunWrite(args ...string) error {
return err
}
if fs.NArg() > 0 || len(*sourcePath) == 0 {
if fs.NArg() > 0 || len(*sourcePath) == 0 || len(*ds) == 0 {
fs.PrintDefaults()
return nil
}
if strings.ToLower(*ds) == "openid" && len(*appid) == 0 {
fmt.Fprintln(os.Stderr, "appid must be set when data space is openid")
return nil
}
cfg, err := LoadConfigFile(*cfgFile)
if err != nil {
fmt.Fprintln(os.Stderr, "load config file error", "err", err)
@@ -50,6 +58,7 @@ func RunWrite(args ...string) error {
cfg: cfg,
sourcePath: *sourcePath,
appid: *appid,
ds: *ds,
batchSize: *batchSize,
clear: *clear,
saasHttp: &saashttp.SaasClient{
@@ -107,9 +116,9 @@ func doLoadFileToWrite(writeParams writeParams) error {
saasWriteItems := []*saasapi.WriteItem{}
succ := uint32(0)
succTotal := uint32(0)
total := uint32(0)
errCount := 0
errTotal := 0
total := 0
for scaner.Scan() {
total++
line := scaner.Text()
@@ -125,45 +134,42 @@ func doLoadFileToWrite(writeParams writeParams) error {
saasWriteItems = append(saasWriteItems, saasWriteItem)
if len(saasWriteItems) == int(writeParams.batchSize) {
if succ, _, err = submitWrite(writeParams, saasWriteItems); err != nil {
if errCount, err = submitWrite(writeParams, saasWriteItems); err != nil {
return err
}
succTotal += succ
fmt.Printf("[%v] batch_succ = %v, succ_total = %v, total_processed = %v\n", writeParams.sourcePath, succ, succTotal, total)
errTotal += errCount
fmt.Printf("[%v] err_batch = %v, err_total = %v, total_processed = %v\n", writeParams.sourcePath, errCount, errTotal, total)
saasWriteItems = saasWriteItems[:0]
}
}
if len(saasWriteItems) > 0 {
if succ, _, err = submitWrite(writeParams, saasWriteItems); err != nil {
if errCount, err = submitWrite(writeParams, saasWriteItems); err != nil {
return err
}
succTotal += succ
fmt.Printf("[%v] batch_succ = %v, succ_total = %v, total_processed = %v\n", writeParams.sourcePath, succ, succTotal, total)
errTotal += errCount
fmt.Printf("[%v] err_batch = %v, err_total = %v, total_processed = %v\n", writeParams.sourcePath, errCount, errTotal, total)
}
return nil
}
func submitWrite(writeParams writeParams, saasWriteCmds []*saasapi.WriteItem) (succ, total uint32, err error) {
func submitWrite(writeParams writeParams, saasWriteCmds []*saasapi.WriteItem) (errcount int, err error) {
write := &saasapi.Write{
DataspaceId: writeParams.ds,
Appid: writeParams.appid,
IsClearAllFirst: writeParams.clear,
}
saasReq := &saasapi.SaasReq{
Cmd: &saasapi.SaasReq_Write{
Write: &saasapi.Write{
IsClearAllFirst: writeParams.clear,
},
Write: write,
},
}
if writeParams.appid != "" {
saasReq.UseridType = saasapi.UserIdType_OPENID
saasReq.Appid = writeParams.appid
}
write.WriteItems = saasWriteCmds
saasReq.Cmd.(*saasapi.SaasReq_Write).Write.WriteItems = saasWriteCmds
total = uint32(len(saasWriteCmds))
res, err := writeParams.saasHttp.Write(saasReq)
if err != nil {
@@ -177,5 +183,5 @@ func submitWrite(writeParams writeParams, saasWriteCmds []*saasapi.WriteItem) (s
return
}
return res.GetWriteRes().GetSuccCmdCount(), total, nil
return len(res.GetWriteRes().GetFailedUserid()), nil
}