tasklist taskinfo

This commit is contained in:
2025-04-12 21:53:56 +08:00
parent 4fc2aeb9c4
commit 8da742fea1
14 changed files with 484 additions and 139 deletions

View File

@@ -20,8 +20,7 @@ Commands:
columnwrite Write columns for 'deviceid / openid' users
convert Convert data to write format
makehash Make file hash for upload task
task Task commands
"help" is the default command.

View File

@@ -29,8 +29,6 @@ func Run(args ...string) error {
return RunColumnWrite(args...)
case "convert":
return RunConvert(args...)
case "makehash":
return RunMakeHash(args...)
case "verify":
return RunVerify(args...)
case "task":

View File

@@ -7,6 +7,9 @@ import (
"strings"
)
// paramConfig 设置并返回配置文件路径的命令行参数。
// 该函数接收一个 flag.FlagSet 指针作为参数,用于注册 "-config" 标志,
// 默认值为 "cfg.toml",返回存储配置文件路径的字符串指针。
func paramConfig(fs *flag.FlagSet) *string {
return fs.String("config", "cfg.toml", "Config file.")
}
@@ -18,10 +21,30 @@ func paramSourcePath(fs *flag.FlagSet) *string {
return fs.String("source", "", "Source path or filename")
}
func paramSourceConvertedPath(fs *flag.FlagSet) *string {
return fs.String("source", "", "Source path of the converted files")
}
func paramTaskDesc(fs *flag.FlagSet) *string {
return fs.String("desc", "", "Task description")
}
func paramDestPath(fs *flag.FlagSet) *string {
return fs.String("dest", "", "Destination path or filename")
}
func paramOutputHashFile(fs *flag.FlagSet) *string {
return fs.String("hashfile", "", "Output hash file")
}
func paramInputHashFile(fs *flag.FlagSet) *string {
return fs.String("hashfile", "", "Input hash file")
}
func paramFilterStatus(fs *flag.FlagSet) *string {
return fs.String("status", "", "Filter status. enums 'all', 'waiting', 'running', 'success', 'fail', 'deleted'")
}
func paramAppid(fs *flag.FlagSet) *string {
return fs.String("appid", "", "Wechat appid")
}
@@ -30,6 +53,10 @@ func paramUserids(fs *flag.FlagSet) *string {
return fs.String("userids", "", "Device ID or Wechat UserID, separated by comma")
}
func paramSha256(fs *flag.FlagSet) *string {
return fs.String("sha256", "", "Task SHA256 hash")
}
func paramBatchSize(fs *flag.FlagSet) *uint {
return fs.Uint("batchsize", 10000, "Batch size to sync")
}
@@ -46,10 +73,6 @@ func paramBlockSize(fs *flag.FlagSet) uint64 {
return num
}
func paramAsync(fs *flag.FlagSet) *bool {
return fs.Bool("async", false, "Async mode")
}
func paramClear(fs *flag.FlagSet) *bool {
return fs.Bool("clear", false, "Clear all data before write")
}

View File

@@ -53,9 +53,9 @@ func RunRead(args ...string) error {
userids: idsSlice,
appid: *appid,
saasHttp: &saashttp.SaasClient{
Client: http.Client{},
ApiUrls: cfg.ApiUrls,
Auth: cfg.Auth,
Client: &http.Client{},
ApiUrls: &cfg.ApiUrls,
Auth: &cfg.Auth,
},
}

View File

@@ -13,6 +13,8 @@ func RunTask(args ...string) error {
switch name {
case "", "help":
return RunTaskHelp(args...)
case "make":
return RunTaskMake(args...)
case "create":
return RunTaskCreate(args...)
case "list":
@@ -37,12 +39,13 @@ const taskUsage = `
Usage: saastoola task COMMAND [OPTIONS]
Commands:
create Create data file to task
make Make file hash for upload task
create Create a task on server
list List tasks on server
run Run task on server
delete Delete task on server
info Get task info on server
upload Upload file block to server
run Run a task on server
delete Delete a task on server
info Get a task info on server
upload Upload task's file block to server
"help" is the default command.

View File

@@ -1,5 +1,96 @@
package main
import (
"flag"
"fmt"
"net/http"
"os"
"e.coding.net/rta/public/saasapi"
"e.coding.net/rta/public/saasapi/pkg/saashttp"
"google.golang.org/protobuf/encoding/protojson"
)
type createTaskParams struct {
hashFile string
appid string
task *saasapi.Task
saasHttp *saashttp.SaasClient
}
func RunTaskCreate(args ...string) error {
fs := flag.NewFlagSet("create", flag.ExitOnError)
cfgFile := paramConfig(fs)
// sourcePath := paramSourceConvertedPath(fs)
hashFile := paramInputHashFile(fs)
appid := paramAppid(fs)
if err := fs.Parse(args); err != nil {
fmt.Println("command line parse error", "err", err)
return err
}
if fs.NArg() > 0 || len(*hashFile) == 0 {
fs.PrintDefaults()
return nil
}
cfg, err := LoadConfigFile(*cfgFile)
if err != nil {
fmt.Println("LoadConfigFile error", "err", err)
return err
}
createTaskParams := createTaskParams{
hashFile: *hashFile,
appid: *appid,
saasHttp: &saashttp.SaasClient{
Client: &http.Client{},
ApiUrls: &cfg.ApiUrls,
Auth: &cfg.Auth,
},
task: &saasapi.Task{},
}
taskBuf, err := os.ReadFile(createTaskParams.hashFile)
if err != nil {
fmt.Println("open task file error", "err", err)
return err
}
if err = protojson.Unmarshal(taskBuf, createTaskParams.task); err != nil {
fmt.Println("parse task file error", "err", err)
}
return doTaskCreate(createTaskParams)
}
func doTaskCreate(createTaskParams createTaskParams) error {
saasReq := &saasapi.SaasReq{
Cmd: &saasapi.SaasReq_TaskCreate{
TaskCreate: createTaskParams.task,
},
}
if createTaskParams.appid != "" {
saasReq.UseridType = saasapi.UserIdType_OPENID
saasReq.Appid = createTaskParams.appid
}
res, err := createTaskParams.saasHttp.TaskCreate(saasReq)
if err != nil {
fmt.Println("submit Create Task error", "err", err)
return err
}
if res.Code != saasapi.ErrorCode_SUCC {
fmt.Println("task create failed", "code", res.Code, "status", res.Status)
return nil
}
taskRes := res.GetTaskCreateRes()
fmt.Printf("task res: %v\n", protojson.Format(taskRes))
return nil
}

View File

@@ -1,5 +1,76 @@
package main
import (
"flag"
"fmt"
"net/http"
"e.coding.net/rta/public/saasapi"
"e.coding.net/rta/public/saasapi/pkg/saashttp"
"google.golang.org/protobuf/encoding/protojson"
)
type infoTaskParams struct {
taskSha256 string
saasHttp *saashttp.SaasClient
}
func RunTaskInfo(args ...string) error {
fs := flag.NewFlagSet("create", flag.ExitOnError)
cfgFile := paramConfig(fs)
sha256 := paramSha256(fs)
if err := fs.Parse(args); err != nil {
fmt.Println("command line parse error", "err", err)
return err
}
if fs.NArg() > 0 || len(*sha256) == 0 {
fs.PrintDefaults()
return nil
}
cfg, err := LoadConfigFile(*cfgFile)
if err != nil {
fmt.Println("LoadConfigFile error", "err", err)
return err
}
infoTaskParams := infoTaskParams{
taskSha256: *sha256,
saasHttp: &saashttp.SaasClient{
Client: &http.Client{},
ApiUrls: &cfg.ApiUrls,
Auth: &cfg.Auth,
},
}
return doTaskInfo(infoTaskParams)
}
func doTaskInfo(infoTaskParams infoTaskParams) error {
saasReq := &saasapi.SaasReq{
Cmd: &saasapi.SaasReq_TaskInfo{
TaskInfo: &saasapi.TaskInfo{
TaskSha256: infoTaskParams.taskSha256,
},
},
}
res, err := infoTaskParams.saasHttp.TaskInfo(saasReq)
if err != nil {
fmt.Println("submit Task info error", "err", err)
return err
}
if res.Code != saasapi.ErrorCode_SUCC {
fmt.Println("task info failed", "code", res.Code, "status", res.Status)
return nil
}
taskRes := res.GetTaskInfoRes()
fmt.Printf("task res: %v\n", protojson.Format(taskRes))
return nil
}

View File

@@ -1,5 +1,88 @@
package main
import (
"flag"
"fmt"
"net/http"
"e.coding.net/rta/public/saasapi"
"e.coding.net/rta/public/saasapi/pkg/saashttp"
"google.golang.org/protobuf/encoding/protojson"
)
type listTaskParams struct {
filterStatus saasapi.TaskStatus
saasHttp *saashttp.SaasClient
}
func RunTaskList(args ...string) error {
fs := flag.NewFlagSet("create", flag.ExitOnError)
cfgFile := paramConfig(fs)
filter := paramFilterStatus(fs)
if err := fs.Parse(args); err != nil {
fmt.Println("command line parse error", "err", err)
return err
}
if fs.NArg() > 0 {
fs.PrintDefaults()
return nil
}
cfg, err := LoadConfigFile(*cfgFile)
if err != nil {
fmt.Println("LoadConfigFile error", "err", err)
return err
}
listTaskParams := listTaskParams{
saasHttp: &saashttp.SaasClient{
Client: &http.Client{},
ApiUrls: &cfg.ApiUrls,
Auth: &cfg.Auth,
},
}
switch *filter {
case "all":
listTaskParams.filterStatus = saasapi.TaskStatus_ALL
case "waiting":
listTaskParams.filterStatus = saasapi.TaskStatus_WAITING
case "running":
listTaskParams.filterStatus = saasapi.TaskStatus_RUNNING
case "success":
listTaskParams.filterStatus = saasapi.TaskStatus_SUCCESS
case "fail":
listTaskParams.filterStatus = saasapi.TaskStatus_FAIL
}
return doTaskList(listTaskParams)
}
func doTaskList(listTaskParams listTaskParams) error {
saasReq := &saasapi.SaasReq{
Cmd: &saasapi.SaasReq_TaskList{
TaskList: &saasapi.TaskList{
StatusFilter: listTaskParams.filterStatus,
},
},
}
res, err := listTaskParams.saasHttp.TaskList(saasReq)
if err != nil {
fmt.Println("submit List Task error", "err", err)
return err
}
if res.Code != saasapi.ErrorCode_SUCC {
fmt.Println("task list failed", "code", res.Code, "status", res.Status)
return nil
}
taskRes := res.GetTaskListRes()
fmt.Printf("task res: %v\n", protojson.Format(taskRes))
return nil
}

View File

@@ -20,9 +20,9 @@ const (
blockSizeMax = 200 * 1024 * 1024
)
type makeHashParams struct {
type makeTaskParams struct {
sourcePath string
destPath string
hashFile string
task *saasapi.Task
}
@@ -39,18 +39,19 @@ type hashResult struct {
index int
}
func RunMakeHash(args ...string) error {
fs := flag.NewFlagSet("tasklocalmake", flag.ExitOnError)
sourcePath := paramSourcePath(fs)
destPath := paramDestPath(fs)
func RunTaskMake(args ...string) error {
fs := flag.NewFlagSet("make", flag.ExitOnError)
sourcePath := paramSourceConvertedPath(fs)
hashFile := paramOutputHashFile(fs)
blockSize := paramBlockSize(fs)
desc := paramTaskDesc(fs)
if err := fs.Parse(args); err != nil {
fmt.Println("command line parse error", "err", err)
return err
}
if fs.NArg() > 0 || len(*sourcePath) == 0 || len(*destPath) == 0 {
if fs.NArg() > 0 || len(*sourcePath) == 0 || len(*hashFile) == 0 {
fs.PrintDefaults()
return nil
}
@@ -60,52 +61,49 @@ func RunMakeHash(args ...string) error {
return nil
}
makeHashParams := makeHashParams{
makeTaskParams := makeTaskParams{
sourcePath: *sourcePath,
destPath: *destPath,
hashFile: *hashFile,
task: &saasapi.Task{
TaskBlockSize: blockSize,
TaskBlockSize: blockSize,
TaskDescription: *desc,
},
}
return doMakeHash(makeHashParams)
return doMakeHash(makeTaskParams)
}
func doMakeHash(makeHashParams makeHashParams) error {
fsInfo, err := os.Stat(makeHashParams.sourcePath)
func doMakeHash(makeTaskParams makeTaskParams) error {
fsInfo, err := os.Stat(makeTaskParams.sourcePath)
if err != nil {
return err
}
if !fsInfo.IsDir() {
// 如果是文件,直接计算
return doFileHash(makeHashParams)
return doTaskMake(makeTaskParams)
}
// 读取目录下信息
dirEntry, err := os.ReadDir(makeHashParams.sourcePath)
dirEntry, err := os.ReadDir(makeTaskParams.sourcePath)
if err != nil {
return err
}
// 遍历目录
for _, dir := range dirEntry {
newParam := makeHashParams
newParam.sourcePath = path.Join(makeHashParams.sourcePath, dir.Name())
if dir.IsDir() {
newParam.destPath = path.Join(makeHashParams.destPath, dir.Name())
}
newParam := makeTaskParams
newParam.sourcePath = path.Join(makeTaskParams.sourcePath, dir.Name())
if err = doMakeHash(newParam); err != nil {
return err
}
}
return saveTaskFile(makeHashParams)
return saveTaskFile(makeTaskParams)
}
func doFileHash(makeHashParams makeHashParams) error {
sourceFile, err := os.Open(makeHashParams.sourcePath)
func doTaskMake(makeTaskParams makeTaskParams) error {
sourceFile, err := os.Open(makeTaskParams.sourcePath)
if err != nil {
return err
}
@@ -129,12 +127,12 @@ func doFileHash(makeHashParams makeHashParams) error {
wg.Add(1)
go func() {
index := 0
buffer := make([]byte, makeHashParams.task.TaskBlockSize)
buffer := make([]byte, makeTaskParams.task.TaskBlockSize)
for {
n, err := sourceFile.Read(buffer)
if n > 0 {
wg.Add(1)
fmt.Printf("\rhashing file [%v], block [%v]", makeHashParams.sourcePath, index)
fmt.Printf("\rhashing file [%v], block [%v]", makeTaskParams.sourcePath, index)
tasks <- hashTask{chunk: buffer[:n], index: index}
index++
}
@@ -164,7 +162,7 @@ func doFileHash(makeHashParams makeHashParams) error {
// 输出结果
fileInfo := &saasapi.FileInfo{
FileName: makeHashParams.sourcePath,
FileName: makeTaskParams.sourcePath,
FileSize: uint64(fi.Size()),
}
for _, r := range allResults {
@@ -173,7 +171,7 @@ func doFileHash(makeHashParams makeHashParams) error {
BlockLength: r.blockSize,
})
}
makeHashParams.task.TaskFileInfos = append(makeHashParams.task.TaskFileInfos, fileInfo)
makeTaskParams.task.TaskFileInfos = append(makeTaskParams.task.TaskFileInfos, fileInfo)
fmt.Println("")
return nil
@@ -189,8 +187,8 @@ func hashWorker(tasks <-chan hashTask, results chan<- hashResult) {
}
}
func saveTaskFile(makeHashParams makeHashParams) error {
taskFile, err := os.Create(makeHashParams.destPath)
func saveTaskFile(makeTaskParams makeTaskParams) error {
taskFile, err := os.Create(makeTaskParams.hashFile)
if err != nil {
return err
}
@@ -198,15 +196,15 @@ func saveTaskFile(makeHashParams makeHashParams) error {
h := sha256.New()
for _, fileInfo := range makeHashParams.task.TaskFileInfos {
for _, fileInfo := range makeTaskParams.task.TaskFileInfos {
for _, fileBlock := range fileInfo.FileBlocks {
h.Write([]byte(fileBlock.BlockSha256))
}
}
makeHashParams.task.TaskSha256 = hex.EncodeToString(h.Sum(nil))
makeTaskParams.task.TaskSha256 = hex.EncodeToString(h.Sum(nil))
_, err = taskFile.WriteString(protojson.Format(makeHashParams.task))
_, err = taskFile.WriteString(protojson.Format(makeTaskParams.task))
if err != nil {
return err
}

View File

@@ -53,9 +53,9 @@ func RunWrite(args ...string) error {
batchSize: *batchSize,
clear: *clear,
saasHttp: &saashttp.SaasClient{
Client: http.Client{},
ApiUrls: cfg.ApiUrls,
Auth: cfg.Auth,
Client: &http.Client{},
ApiUrls: &cfg.ApiUrls,
Auth: &cfg.Auth,
},
}