diff --git a/cmd.pb.go b/cmd.pb.go index 6021913..71fd139 100644 --- a/cmd.pb.go +++ b/cmd.pb.go @@ -145,6 +145,7 @@ const ( ErrorCode_TASK_FAILED ErrorCode = 132 // 任务已失败 ErrorCode_TASK_FINISHED ErrorCode = 133 // 任务已完成 ErrorCode_DATA_ERROR ErrorCode = 201 // 数据错误 + ErrorCode_CMD_ERROR ErrorCode = 202 // 命令行执行错误 ) // Enum value maps for ErrorCode. @@ -173,6 +174,7 @@ var ( 132: "TASK_FAILED", 133: "TASK_FINISHED", 201: "DATA_ERROR", + 202: "CMD_ERROR", } ErrorCode_value = map[string]int32{ "SUCC": 0, @@ -198,6 +200,7 @@ var ( "TASK_FAILED": 132, "TASK_FINISHED": 133, "DATA_ERROR": 201, + "CMD_ERROR": 202, } ) @@ -1049,7 +1052,7 @@ type Task struct { TaskSha256 string `protobuf:"bytes,1,opt,name=task_sha256,json=taskSha256,proto3" json:"task_sha256,omitempty"` // 任务sha256 TaskDescription string `protobuf:"bytes,2,opt,name=task_description,json=taskDescription,proto3" json:"task_description,omitempty"` // 任务描述 TaskFileInfos []*FileInfo `protobuf:"bytes,3,rep,name=task_file_infos,json=taskFileInfos,proto3" json:"task_file_infos,omitempty"` // 文件列表 - TaskBlockSize uint64 `protobuf:"varint,4,opt,name=task_block_size,json=taskBlockSize,proto3" json:"task_block_size,omitempty"` // 文件块字节大小(推荐50M) + TaskBlockSize uint64 `protobuf:"varint,4,opt,name=task_block_size,json=taskBlockSize,proto3" json:"task_block_size,omitempty"` // 文件块字节大小(推荐200M) // 以下字段只在返回时填写,用于提供服务端的任务状态。在请求时填写会被忽略 CreateTime string `protobuf:"bytes,10,opt,name=create_time,json=createTime,proto3" json:"create_time,omitempty"` // 创建时间 RunTime string `protobuf:"bytes,11,opt,name=run_time,json=runTime,proto3" json:"run_time,omitempty"` // 运行时间 @@ -2003,7 +2006,7 @@ const file_cmd_proto_rawDesc = "" + "* \n" + "\vNameSpaceId\x12\a\n" + "\x03DID\x10\x00\x12\b\n" + - "\x04WUID\x10\x01*\xc0\x03\n" + + "\x04WUID\x10\x01*\xd0\x03\n" + "\tErrorCode\x12\b\n" + "\x04SUCC\x10\x00\x12\x13\n" + "\x0fINVALID_ACCOUNT\x10e\x12\x15\n" + @@ -2029,7 +2032,8 @@ const file_cmd_proto_rawDesc = "" + "\vTASK_FAILED\x10\x84\x01\x12\x12\n" + "\rTASK_FINISHED\x10\x85\x01\x12\x0f\n" + "\n" + - "DATA_ERROR\x10\xc9\x01*\x16\n" + + "DATA_ERROR\x10\xc9\x01\x12\x0e\n" + + "\tCMD_ERROR\x10\xca\x01*\x16\n" + "\fCmdErrorCode\x12\x06\n" + "\x02OK\x10\x00*^\n" + "\n" + diff --git a/cmd.proto b/cmd.proto index 8c28460..b1051a5 100644 --- a/cmd.proto +++ b/cmd.proto @@ -99,7 +99,7 @@ message Task { string task_sha256 = 1; // 任务sha256 string task_description = 2; // 任务描述 repeated FileInfo task_file_infos = 3; // 文件列表 - uint64 task_block_size = 4; // 文件块字节大小(推荐50M) + uint64 task_block_size = 4; // 文件块字节大小(推荐200M) // 以下字段只在返回时填写,用于提供服务端的任务状态。在请求时填写会被忽略 string create_time = 10; // 创建时间 @@ -212,6 +212,7 @@ enum ErrorCode { DATA_ERROR = 201; // 数据错误 + CMD_ERROR = 202; // 命令行执行错误 } enum CmdErrorCode { diff --git a/cmd/saastool/.gitignore b/cmd/saastool/.gitignore index 0cfca10..24033ba 100644 --- a/cmd/saastool/.gitignore +++ b/cmd/saastool/.gitignore @@ -1,3 +1,4 @@ debug/ saastool -saastool_linux \ No newline at end of file +saastool_linux +cfg.toml \ No newline at end of file diff --git a/cmd/saastool/convert.go b/cmd/saastool/convert.go index 1084fb7..bcbd8f9 100644 --- a/cmd/saastool/convert.go +++ b/cmd/saastool/convert.go @@ -38,7 +38,7 @@ func RunConvert(args ...string) error { destPath := paramDestPath(fs) if err := fs.Parse(args); err != nil { - fmt.Println("command line parse error", "err", err) + fmt.Fprintln(os.Stderr, "command line parse error", "err", err) return err } @@ -49,7 +49,7 @@ func RunConvert(args ...string) error { mapCfg, err := LoadMapFile(*mapCfgFile) if err != nil { - fmt.Println("LoadConfigFile error", "err", err) + fmt.Fprintln(os.Stderr, "LoadConfigFile error", "err", err) return err } diff --git a/cmd/saastool/main.go b/cmd/saastool/main.go index 48480da..b368098 100644 --- a/cmd/saastool/main.go +++ b/cmd/saastool/main.go @@ -2,7 +2,6 @@ package main import ( "fmt" - "log/slog" "os" ) @@ -10,11 +9,9 @@ func main() { if err := Run(os.Args[1:]...); err != nil { os.Exit(1) } - } func Run(args ...string) error { - name, args := ParseCommandName(args) // 从参数中解析出命令 @@ -33,10 +30,9 @@ func Run(args ...string) error { return RunVerify(args...) case "task": return RunTask(args...) - default: - err := fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'saastool help' for usage`, name) - slog.Warn(err.Error()) + err := fmt.Errorf(`unknown command "%v"`+"\n"+`Run 'saastool help' for usage`, name) + fmt.Fprintln(os.Stderr, err.Error()) return err } } diff --git a/cmd/saastool/params.go b/cmd/saastool/params.go index 254aa78..00f74bd 100644 --- a/cmd/saastool/params.go +++ b/cmd/saastool/params.go @@ -62,7 +62,7 @@ func paramBatchSize(fs *flag.FlagSet) *uint { } func paramBlockSize(fs *flag.FlagSet) *string { - return fs.String("blocksize", "50M", "Block size to make hash. using size mode K, M, G, T") + return fs.String("blocksize", "200M", "Block size to make hash. using size mode K, M, G, T") } func paramClear(fs *flag.FlagSet) *bool { @@ -106,17 +106,3 @@ func ParseByteSize(sizeStr string) (uint64, error) { return 0, fmt.Errorf("unknown unit: %s", unit) } } - -/* -func main() { - sizes := []string{"1K", "2M", "3G", "4T", "5"} - for _, sizeStr := range sizes { - size, err := ParseByteSize(sizeStr) - if err != nil { - fmt.Printf("Error parsing %s: %v\n", sizeStr, err) - } else { - fmt.Printf("%s = %d bytes\n", sizeStr, size) - } - } -} -*/ diff --git a/cmd/saastool/read.go b/cmd/saastool/read.go index 5b178bb..19ea527 100644 --- a/cmd/saastool/read.go +++ b/cmd/saastool/read.go @@ -3,8 +3,8 @@ package main import ( "flag" "fmt" - "log/slog" "net/http" + "os" "strings" "e.coding.net/rta/public/saasapi" @@ -30,7 +30,7 @@ func RunRead(args ...string) error { userids := paramUserids(fs) if err := fs.Parse(args); err != nil { - fmt.Println("command line parse error", "err", err) + fmt.Fprintln(os.Stderr, "command line parse error", "err", err) return err } @@ -44,7 +44,7 @@ func RunRead(args ...string) error { cfg, err := LoadConfigFile(*cfgFile) if err != nil { - slog.Error("LoadConfigFile error", "err", err) + fmt.Fprintln(os.Stderr, "load config file error", "err", err) return err } @@ -85,11 +85,16 @@ func doRead(readParams readParams) error { res, err := readParams.saasHttp.Read(saasReq) if err != nil { - slog.Error("submitRead error", "err", err) + fmt.Fprintln(os.Stderr, "submit read error", "err", err) return err } - fmt.Println(protojson.Format(res)) + if res.GetCode() != saasapi.ErrorCode_SUCC { + fmt.Fprintln(os.Stderr, protojson.Format(res)) + return err + } else { + fmt.Println(protojson.Format(res)) + } return nil } diff --git a/cmd/saastool/task.go b/cmd/saastool/task.go index 3500802..c3581ba 100644 --- a/cmd/saastool/task.go +++ b/cmd/saastool/task.go @@ -2,7 +2,7 @@ package main import ( "fmt" - "log/slog" + "os" "strings" ) @@ -29,7 +29,8 @@ func RunTask(args ...string) error { return RunTaskRun(args...) default: err := fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'saastool task help' for usage`, name) - slog.Warn(err.Error()) + fmt.Fprintln(os.Stderr, err) + return err } } diff --git a/cmd/saastool/task_create.go b/cmd/saastool/task_create.go index f7af06d..b11893a 100644 --- a/cmd/saastool/task_create.go +++ b/cmd/saastool/task_create.go @@ -26,7 +26,7 @@ func RunTaskCreate(args ...string) error { appid := paramAppid(fs) if err := fs.Parse(args); err != nil { - fmt.Println("command line parse error", "err", err) + fmt.Fprintln(os.Stderr, "command line parse error", "err", err) return err } @@ -37,7 +37,7 @@ func RunTaskCreate(args ...string) error { cfg, err := LoadConfigFile(*cfgFile) if err != nil { - fmt.Println("LoadConfigFile error", "err", err) + fmt.Fprintln(os.Stderr, "LoadConfigFile error", "err", err) return err } @@ -54,12 +54,12 @@ func RunTaskCreate(args ...string) error { taskBuf, err := os.ReadFile(createTaskParams.hashFile) if err != nil { - fmt.Println("open task file error", "err", err) + fmt.Fprintln(os.Stderr, "open task file error", "err", err) return err } if err = protojson.Unmarshal(taskBuf, createTaskParams.task); err != nil { - fmt.Println("parse task file error", "err", err) + fmt.Fprintln(os.Stderr, "parse task file error", "err", err) } return doTaskCreate(createTaskParams) @@ -80,12 +80,12 @@ func doTaskCreate(createTaskParams createTaskParams) error { res, err := createTaskParams.saasHttp.TaskCreate(saasReq) if err != nil { - fmt.Println("submit Create Task error", "err", err) + fmt.Fprintln(os.Stderr, "submit Create Task error", "err", err) return err } if res.Code != saasapi.ErrorCode_SUCC { - fmt.Println("task create failed", "code", res.Code, "status", res.Status) + fmt.Fprintln(os.Stderr, "task create failed", "code", res.Code, "status", res.Status) return nil } diff --git a/cmd/saastool/task_delete.go b/cmd/saastool/task_delete.go index 6cbe739..adee3ad 100644 --- a/cmd/saastool/task_delete.go +++ b/cmd/saastool/task_delete.go @@ -1,5 +1,81 @@ package main -func RunTaskDelete(args ...string) error { - return nil +import ( + "flag" + "fmt" + "net/http" + "os" + + "e.coding.net/rta/public/saasapi" + "e.coding.net/rta/public/saasapi/pkg/saashttp" + "google.golang.org/protobuf/encoding/protojson" +) + +type deleteTaskParams struct { + taskSha256 string + saasHttp *saashttp.SaasClient +} + +func RunTaskDelete(args ...string) error { + fs := flag.NewFlagSet("delete", flag.ExitOnError) + cfgFile := paramConfig(fs) + sha256 := paramSha256(fs) + + if err := fs.Parse(args); err != nil { + fmt.Fprintln(os.Stderr, "command line parse error", "err", err) + return err + } + + if fs.NArg() > 0 || len(*sha256) == 0 { + fs.PrintDefaults() + return nil + } + + cfg, err := LoadConfigFile(*cfgFile) + if err != nil { + fmt.Fprintln(os.Stderr, "LoadConfigFile error", "err", err) + return err + } + + deleteTaskParams := deleteTaskParams{ + taskSha256: *sha256, + saasHttp: &saashttp.SaasClient{ + Client: &http.Client{}, + ApiUrls: &cfg.ApiUrls, + Auth: &cfg.Auth, + }, + } + + taskRes, err := doTaskDelete(deleteTaskParams) + + if err != nil { + fmt.Printf("task res: %v\n", protojson.Format(taskRes)) + } + return err +} + +func doTaskDelete(deleteTaskParams deleteTaskParams) (*saasapi.Task, error) { + saasReq := &saasapi.SaasReq{ + Cmd: &saasapi.SaasReq_TaskDelete{ + TaskDelete: &saasapi.TaskDelete{ + TaskSha256: deleteTaskParams.taskSha256, + }, + }, + } + + res, err := deleteTaskParams.saasHttp.TaskDelete(saasReq) + if err != nil { + fmt.Fprintln(os.Stderr, "submit Task delete error", "err", err) + return nil, err + } + + if res.Code != saasapi.ErrorCode_SUCC { + err = fmt.Errorf("task delete failed. code:%v, status:%v", res.Code, res.Status) + fmt.Fprintln(os.Stderr, err) + return nil, err + } + + taskRes := res.GetTaskDeleteRes() + fmt.Println(protojson.Format(taskRes)) + return taskRes, nil } diff --git a/cmd/saastool/task_info.go b/cmd/saastool/task_info.go index cfe08d2..d5e7413 100644 --- a/cmd/saastool/task_info.go +++ b/cmd/saastool/task_info.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "net/http" + "os" "e.coding.net/rta/public/saasapi" "e.coding.net/rta/public/saasapi/pkg/saashttp" @@ -21,7 +22,7 @@ func RunTaskInfo(args ...string) error { sha256 := paramSha256(fs) if err := fs.Parse(args); err != nil { - fmt.Println("command line parse error", "err", err) + fmt.Fprintln(os.Stderr, "command line parse error", "err", err) return err } @@ -32,7 +33,7 @@ func RunTaskInfo(args ...string) error { cfg, err := LoadConfigFile(*cfgFile) if err != nil { - fmt.Println("LoadConfigFile error", "err", err) + fmt.Fprintln(os.Stderr, "LoadConfigFile error", "err", err) return err } @@ -51,7 +52,6 @@ func RunTaskInfo(args ...string) error { fmt.Printf("task res: %v\n", protojson.Format(taskRes)) } return err - } func doTaskInfo(infoTaskParams infoTaskParams) (*saasapi.Task, error) { @@ -65,13 +65,13 @@ func doTaskInfo(infoTaskParams infoTaskParams) (*saasapi.Task, error) { res, err := infoTaskParams.saasHttp.TaskInfo(saasReq) if err != nil { - fmt.Println("submit Task info error", "err", err) + fmt.Fprintln(os.Stderr, "submit Task info error", "err", err) return nil, err } if res.Code != saasapi.ErrorCode_SUCC { err = fmt.Errorf("task info failed. code:%v, status:%v", res.Code, res.Status) - fmt.Println(err) + fmt.Fprintln(os.Stderr, err) return nil, err } diff --git a/cmd/saastool/task_list.go b/cmd/saastool/task_list.go index a961e86..ce91000 100644 --- a/cmd/saastool/task_list.go +++ b/cmd/saastool/task_list.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "net/http" + "os" "e.coding.net/rta/public/saasapi" "e.coding.net/rta/public/saasapi/pkg/saashttp" @@ -21,7 +22,7 @@ func RunTaskList(args ...string) error { filter := paramFilterStatus(fs) if err := fs.Parse(args); err != nil { - fmt.Println("command line parse error", "err", err) + fmt.Fprintln(os.Stderr, "command line parse error", "err", err) return err } @@ -32,7 +33,7 @@ func RunTaskList(args ...string) error { cfg, err := LoadConfigFile(*cfgFile) if err != nil { - fmt.Println("LoadConfigFile error", "err", err) + fmt.Fprintln(os.Stderr, "LoadConfigFile error", "err", err) return err } @@ -72,12 +73,12 @@ func doTaskList(listTaskParams listTaskParams) error { res, err := listTaskParams.saasHttp.TaskList(saasReq) if err != nil { - fmt.Println("submit List Task error", "err", err) + fmt.Fprintln(os.Stderr, "submit List Task error", "err", err) return err } if res.Code != saasapi.ErrorCode_SUCC { - fmt.Println("task list failed", "code", res.Code, "status", res.Status) + fmt.Fprintln(os.Stderr, "task list failed", "code", res.Code, "status", res.Status) return nil } diff --git a/cmd/saastool/task_make.go b/cmd/saastool/task_make.go index 4346047..49c43cf 100644 --- a/cmd/saastool/task_make.go +++ b/cmd/saastool/task_make.go @@ -16,7 +16,7 @@ import ( ) const ( - blockSizeMin = 10 * 1024 * 1024 + blockSizeMin = 50 * 1024 * 1024 blockSizeMax = 200 * 1024 * 1024 ) @@ -51,7 +51,7 @@ func RunTaskMake(args ...string) error { desc := paramTaskDesc(fs) if err := fs.Parse(args); err != nil { - fmt.Println("command line parse error", "err", err) + fmt.Fprintln(os.Stderr, "command line parse error", "err", err) return err } @@ -62,14 +62,14 @@ func RunTaskMake(args ...string) error { blockSizeNum, err := ParseByteSize(*blockSize) if err != nil { - fmt.Println("Error parsing block size", "err", err) - fmt.Println("Using default 50M") - blockSizeNum = 50 * 1024 * 1024 + fmt.Fprintln(os.Stderr, "Error parsing block size", "err", err) + fmt.Fprintln(os.Stderr, "Using default 200M") + blockSizeNum = 200 * 1024 * 1024 } if blockSizeNum < blockSizeMin || blockSizeNum > blockSizeMax { - fmt.Println("block size error", "min", blockSizeMin, "max", blockSizeMax) + fmt.Fprintln(os.Stderr, "block size error", "min", blockSizeMin, "max", blockSizeMax) return nil } @@ -157,7 +157,6 @@ func doTaskMake(makeTaskParams makeTaskParams) error { if n > 0 { wg.Add(1) - //fmt.Printf("\rhashing file [%v], block [%v]\n", makeTaskParams.sourcePath, index) tasks <- &hashTask{ chunk: buffer, index: index, @@ -213,7 +212,6 @@ func hashWorker(tasks <-chan *hashTask, results chan<- *hashTask) { for t := range tasks { h.Write((*t.chunk)[:t.blockSize]) t.hash = hex.EncodeToString(h.Sum(nil)) - //fmt.Printf("\rhashing block [%v]\n", t.index) results <- t h.Reset() } diff --git a/cmd/saastool/task_run.go b/cmd/saastool/task_run.go index 3605dd8..265d4a6 100644 --- a/cmd/saastool/task_run.go +++ b/cmd/saastool/task_run.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "net/http" + "os" "e.coding.net/rta/public/saasapi" "e.coding.net/rta/public/saasapi/pkg/saashttp" @@ -21,7 +22,7 @@ func RunTaskRun(args ...string) error { sha256 := paramSha256(fs) if err := fs.Parse(args); err != nil { - fmt.Println("command line parse error", "err", err) + fmt.Fprintln(os.Stderr, "command line parse error", "err", err) return err } @@ -32,7 +33,7 @@ func RunTaskRun(args ...string) error { cfg, err := LoadConfigFile(*cfgFile) if err != nil { - fmt.Println("LoadConfigFile error", "err", err) + fmt.Fprintln(os.Stderr, "LoadConfigFile error", "err", err) return err } @@ -60,13 +61,13 @@ func doTaskRun(runTaskParams runTaskParams) error { res, err := runTaskParams.saasHttp.TaskRun(saasReq) if err != nil { - fmt.Println("submit Task run error", "err", err) + fmt.Fprintln(os.Stderr, "submit Task run error", "err", err) return err } if res.Code != saasapi.ErrorCode_SUCC { err = fmt.Errorf("task run failed. code:%v, status:%v", res.Code, res.Status) - fmt.Println(err) + fmt.Fprintln(os.Stderr, err) return err } diff --git a/cmd/saastool/task_upload.go b/cmd/saastool/task_upload.go index d6bb290..d733df1 100644 --- a/cmd/saastool/task_upload.go +++ b/cmd/saastool/task_upload.go @@ -21,7 +21,7 @@ func RunTaskUpload(args ...string) error { sha256 := paramSha256(fs) if err := fs.Parse(args); err != nil { - fmt.Println("command line parse error", "err", err) + fmt.Fprintln(os.Stderr, "command line parse error", "err", err) return err } @@ -32,7 +32,7 @@ func RunTaskUpload(args ...string) error { cfg, err := LoadConfigFile(*cfgFile) if err != nil { - fmt.Println("LoadConfigFile error", "err", err) + fmt.Fprintln(os.Stderr, "LoadConfigFile error", "err", err) return err } diff --git a/cmd/saastool/write.go b/cmd/saastool/write.go index ac773c5..3a2d322 100644 --- a/cmd/saastool/write.go +++ b/cmd/saastool/write.go @@ -4,7 +4,6 @@ import ( "bufio" "flag" "fmt" - "log/slog" "net/http" "os" "path" @@ -32,7 +31,7 @@ func RunWrite(args ...string) error { clear := paramClear(fs) if err := fs.Parse(args); err != nil { - fmt.Println("command line parse error", "err", err) + fmt.Fprintln(os.Stderr, "command line parse error", "err", err) return err } @@ -43,9 +42,10 @@ func RunWrite(args ...string) error { cfg, err := LoadConfigFile(*cfgFile) if err != nil { - slog.Error("LoadConfigFile error", "err", err) + fmt.Fprintln(os.Stderr, "load config file error", "err", err) return err } + writeParams := writeParams{ cfg: cfg, sourcePath: *sourcePath, @@ -65,6 +65,7 @@ func RunWrite(args ...string) error { func doWrite(writeParams writeParams) error { fsInfo, err := os.Stat(writeParams.sourcePath) if err != nil { + fmt.Fprintln(os.Stderr, "file stat error", "err", err) return err } @@ -76,6 +77,7 @@ func doWrite(writeParams writeParams) error { // 读取目录下信息 dirEntry, err := os.ReadDir(writeParams.sourcePath) if err != nil { + fmt.Fprintln(os.Stderr, "read dir error", "err", err) return err } @@ -96,6 +98,7 @@ func doLoadFileToWrite(writeParams writeParams) error { // 读取文件并按行遍历,以\t分割为两列,第一列为userid,第二列解析为string数组 file, err := os.Open(writeParams.sourcePath) if err != nil { + fmt.Fprintln(os.Stderr, "open file error", "file", writeParams.sourcePath, "err", err) return err } defer file.Close() @@ -108,17 +111,18 @@ func doLoadFileToWrite(writeParams writeParams) error { succTotal := uint32(0) total := uint32(0) for scaner.Scan() { + total++ line := scaner.Text() if line == "" { continue } saasWriteItem := &saasapi.WriteItem{} if err = protojson.Unmarshal([]byte(line), saasWriteItem); err != nil { + fmt.Fprintln(os.Stderr, "protojson unmashal error", "file", writeParams.sourcePath, "line", total, "err", err) return err } saasWriteItems = append(saasWriteItems, saasWriteItem) - total++ if len(saasWriteItems) == int(writeParams.batchSize) { if succ, _, err = submitWrite(writeParams, saasWriteItems); err != nil { @@ -130,7 +134,6 @@ func doLoadFileToWrite(writeParams writeParams) error { saasWriteItems = saasWriteItems[:0] } - } if len(saasWriteItems) > 0 { @@ -164,7 +167,13 @@ func submitWrite(writeParams writeParams, saasWriteCmds []*saasapi.WriteItem) (s res, err := writeParams.saasHttp.Write(saasReq) if err != nil { - slog.Error("submitWrite error", "err", err) + fmt.Fprintln(os.Stderr, "submit write error", "err", err) + return + } + + if res.GetCode() != saasapi.ErrorCode_SUCC { + err = fmt.Errorf("write failed. code:%v, status:%v", res.GetCode(), res.GetStatus()) + fmt.Fprintln(os.Stderr, protojson.Format(res)) return }