支持ds重置(当前限geo)

This commit is contained in:
algotao
2026-01-19 14:54:18 +08:00
parent 369023b81d
commit 5cf860acfb
9 changed files with 583 additions and 350 deletions

816
cmd.pb.go

File diff suppressed because it is too large Load Diff

View File

@@ -12,6 +12,7 @@ message SaasReq {
Read read = 10; // 批量读取 Read read = 10; // 批量读取
Write write = 11; // 批量写入 Write write = 11; // 批量写入
ColumnWrite column_write = 12; // 全量列式写入 ColumnWrite column_write = 12; // 全量列式写入
ResetDs reset_ds = 13; // 清空数据区
Task task_create = 20; // 任务创建 Task task_create = 20; // 任务创建
TaskList task_list = 21; // 列出任务 TaskList task_list = 21; // 列出任务
@@ -76,15 +77,15 @@ message WriteItem {
Bytes write_bytes = 2 [deprecated = true]; // byte区域。!!!弃用请使用bytes_kv Bytes write_bytes = 2 [deprecated = true]; // byte区域。!!!弃用请使用bytes_kv
Uint32s write_uint32s = 3 [deprecated = true]; // uint32区域。!!!弃用请使用uint32s_kv Uint32s write_uint32s = 3 [deprecated = true]; // uint32区域。!!!弃用请使用uint32s_kv
FlagsWithExpire write_flags_with_expire = 4 [deprecated = true]; // 标志位区域。!!!弃用请使用flags_with_expire_kv FlagsWithExpire write_flags_with_expire = 4 [deprecated = true]; // 标志位区域。!!!弃用请使用flags_with_expire_kv
map<uint32, uint32> bytes_kv = 5; // 写入bytekey为1-64索引值value为0-255数值。index/value超限会丢弃 map<uint32, uint32> bytes_kv = 5; // 写入uint8key为1-64索引值value为0-255数值。index/value超限会丢弃
map<uint32, uint32> uint32s_kv = 6; // 写入uint32key为1-8索引值value为uint32数值。index超限会丢弃 map<uint32, uint32> uint32s_kv = 6; // 写入uint32key为1-8索引值value为uint32数值。index超限会丢弃
map<uint32, FlagWithExpire> flags_with_expire_kv = 7; // 写入标志位key为1-4索引值index超限会丢弃 map<uint32, FlagWithExpire> flags_with_expire_kv = 7; // 写入标志位key为1-4索引值index超限会丢弃
} }
// Bytes 写入byte区域 // Bytes 写入uint8区域
message Bytes { message Bytes {
bytes bytes = 1; // 写入的byte bytes bytes = 1; // 写入的uint8
uint64 index_1 = 2; // 写入byte的索引值(0..63) uint64 index_1 = 2; // 写入uint8的索引值(0..63)
} }
// Uint32s 写入uint32区域 // Uint32s 写入uint32区域
@@ -115,6 +116,11 @@ message ColumnWrite {
FlagsWithExpire write_flags_with_expire = 5; // 标志位区域 FlagsWithExpire write_flags_with_expire = 5; // 标志位区域
} }
// ResetDs 清空数据区命令
message ResetDs {
string dataspace_id = 1; // 数据空间ID
}
message Task { message Task {
string dataspace_id = 1; // 数据空间ID string dataspace_id = 1; // 数据空间ID
string appid = 2; // 小程序/小游戏/公众号/视频号的appid string appid = 2; // 小程序/小游戏/公众号/视频号的appid
@@ -283,6 +289,7 @@ message SaasRes {
ReadRes read_res = 10; // 读取命令返回 ReadRes read_res = 10; // 读取命令返回
WriteRes write_res = 11; // 写入命令返回 WriteRes write_res = 11; // 写入命令返回
ResetDsRes reset_ds_res = 13; // 清空数据区命令返回
Task task_create_res = 20; // 创建任务返回状态 Task task_create_res = 20; // 创建任务返回状态
TaskListRes task_list_res = 21; // 任务列表返回状态 TaskListRes task_list_res = 21; // 任务列表返回状态
@@ -359,6 +366,11 @@ message ValueItem {
map<uint32, FlagWithExpire> flags_with_expire_kv = 10; // 标志位区域 map<uint32, FlagWithExpire> flags_with_expire_kv = 10; // 标志位区域
} }
// ResetDsRes 清空数据区返回
message ResetDsRes {
string dataspace_id = 1; // 数据空间ID
}
// TaskListRes 任务列表返回 // TaskListRes 任务列表返回
message TaskListRes { message TaskListRes {
repeated Task tasks = 1; // 任务列表 repeated Task tasks = 1; // 任务列表

View File

@@ -94,7 +94,7 @@ func RunExpGet(args ...string) error {
groupBySlice = strings.Split(*groupBy, ",") groupBySlice = strings.Split(*groupBy, ",")
for _, group := range groupBySlice { for _, group := range groupBySlice {
if !validGroupBy[group] { if !validGroupBy[group] {
return fmt.Errorf("Group by error. group:", group) return fmt.Errorf("Group by error. group: %v", group)
} }
} }
} }

View File

@@ -19,6 +19,7 @@ Commands:
write Write user's 'bytes / uint32s / flags' write Write user's 'bytes / uint32s / flags'
read Read user's 'bytes / uint32s / flags' read Read user's 'bytes / uint32s / flags'
columnwrite Write columns for 'deviceid / openid' users columnwrite Write columns for 'deviceid / openid' users
resetds Reset data space
convert Convert data to write format convert Convert data to write format

View File

@@ -27,6 +27,8 @@ func Run(args ...string) error {
return RunRead(args...) return RunRead(args...)
case "columnwrite": case "columnwrite":
return RunColumnWrite(args...) return RunColumnWrite(args...)
case "resetds":
return RunResetDs(args...)
case "convert": case "convert":
return RunConvert(args...) return RunConvert(args...)
case "verify": case "verify":

71
cmd/saastool/resetds.go Normal file
View File

@@ -0,0 +1,71 @@
package main
import (
"flag"
"fmt"
"net/http"
"git.algo.com.cn/public/saasapi"
"git.algo.com.cn/public/saasapi/pkg/saashttp"
"google.golang.org/protobuf/encoding/protojson"
)
type resetDsParams struct {
ds string
saasHttp *saashttp.SaasClient
}
func RunResetDs(args ...string) error {
fs := flag.NewFlagSet("resetds", flag.ExitOnError)
cfgFile := paramConfig(fs)
ds := paramDataSpaceId(fs)
if err := fs.Parse(args); err != nil {
return fmt.Errorf("Command line parse error: %w", err)
}
if fs.NArg() > 0 || len(*ds) == 0 {
fs.PrintDefaults()
return nil
}
cfg, err := LoadConfigFile(*cfgFile)
if err != nil {
return fmt.Errorf("LoadConfigFile error: %w", err)
}
resetDsParams := resetDsParams{
ds: *ds,
saasHttp: &saashttp.SaasClient{
Client: &http.Client{},
ApiUrls: saashttp.InitAPIUrl(&cfg.ApiUrls),
Auth: &cfg.Auth,
},
}
return doResetDs(resetDsParams)
}
func doResetDs(resetDsParams resetDsParams) error {
saasReq := &saasapi.SaasReq{
Cmd: &saasapi.SaasReq_ResetDs{
ResetDs: &saasapi.ResetDs{
DataspaceId: resetDsParams.ds,
},
},
}
res, err := resetDsParams.saasHttp.ResetDS(saasReq)
if err != nil {
return fmt.Errorf("Submit Command error: %w", err)
}
if res.Code != saasapi.ErrorCode_SUCC {
return fmt.Errorf("Command failed. code:%v, status:%v", res.Code, res.Status)
}
resetRes := res.GetResetDsRes()
fmt.Printf("ResetDS res: %v\n", protojson.Format(resetRes))
return nil
}

View File

@@ -3,8 +3,7 @@
set -e set -e
docker buildx build --platform linux/amd64 \ docker buildx build --platform linux/amd64 \
-t rta-docker.pkg.coding.net/public/docker/saastool:latest \ -t rta-docker.pkg.coding.net/public/docker/saastool:2026011117 \
-t rta-docker.pkg.coding.net/public/docker/saastool:2025121617 \
-o type=registry \ -o type=registry \
-f saastool.Dockerfile \ -f saastool.Dockerfile \
../ ; ../ ;

View File

@@ -6,6 +6,7 @@ const (
writePath = "/saas/write" writePath = "/saas/write"
readPath = "/saas/read" readPath = "/saas/read"
columnWritePath = "/saas/column_write" columnWritePath = "/saas/column_write"
resetDSPath = "/saas/resetds"
taskCreatePath = "/saas/task/create" taskCreatePath = "/saas/task/create"
taskListPath = "/saas/task/list" taskListPath = "/saas/task/list"
taskInfoPath = "/saas/task/info" taskInfoPath = "/saas/task/info"
@@ -45,6 +46,7 @@ type ApiUrls struct {
WritePath string WritePath string
ReadPath string ReadPath string
ColumnWritePath string ColumnWritePath string
ResetDSPath string
TaskCreatePath string TaskCreatePath string
TaskListPath string TaskListPath string
TaskInfoPath string TaskInfoPath string
@@ -100,6 +102,11 @@ func InitAPIUrl(c *ApiUrls) *ApiUrls {
} else { } else {
r.ColumnWritePath = columnWritePath r.ColumnWritePath = columnWritePath
} }
if c.ResetDSPath != "" {
r.ResetDSPath = c.ResetDSPath
} else {
r.ResetDSPath = resetDSPath
}
if c.TaskCreatePath != "" { if c.TaskCreatePath != "" {
r.TaskCreatePath = c.TaskCreatePath r.TaskCreatePath = c.TaskCreatePath
} else { } else {

View File

@@ -53,6 +53,11 @@ func (c *SaasClient) ColumnWrite(saasReq *saasapi.SaasReq) (saasRes *saasapi.Saa
return c.post(postUrl, saasReq) return c.post(postUrl, saasReq)
} }
func (c *SaasClient) ResetDS(saasReq *saasapi.SaasReq) (saasRes *saasapi.SaasRes, err error) {
postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.ResetDSPath)
return c.post(postUrl, saasReq)
}
func (c *SaasClient) TaskCreate(saasReq *saasapi.SaasReq) (saasRes *saasapi.SaasRes, err error) { func (c *SaasClient) TaskCreate(saasReq *saasapi.SaasReq) (saasRes *saasapi.SaasRes, err error) {
postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.TaskCreatePath) postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.TaskCreatePath)
return c.post(postUrl, saasReq) return c.post(postUrl, saasReq)