diff --git a/cmd.pb.go b/cmd.pb.go index 0627d63..6021913 100644 --- a/cmd.pb.go +++ b/cmd.pb.go @@ -140,6 +140,10 @@ const ( ErrorCode_TASK_BLOCK_SIZE ErrorCode = 123 // 块大小超限 ErrorCode_TASK_TOTAL_SIZE ErrorCode = 124 // 总文件大小超限 ErrorCode_TASK_MARSHAL ErrorCode = 125 // 序列化 + ErrorCode_TASK_IS_WATING ErrorCode = 130 // 任务未上传完毕 + ErrorCode_TASK_IS_RUNNING ErrorCode = 131 // 任务已经在运行 + ErrorCode_TASK_FAILED ErrorCode = 132 // 任务已失败 + ErrorCode_TASK_FINISHED ErrorCode = 133 // 任务已完成 ErrorCode_DATA_ERROR ErrorCode = 201 // 数据错误 ) @@ -164,6 +168,10 @@ var ( 123: "TASK_BLOCK_SIZE", 124: "TASK_TOTAL_SIZE", 125: "TASK_MARSHAL", + 130: "TASK_IS_WATING", + 131: "TASK_IS_RUNNING", + 132: "TASK_FAILED", + 133: "TASK_FINISHED", 201: "DATA_ERROR", } ErrorCode_value = map[string]int32{ @@ -185,6 +193,10 @@ var ( "TASK_BLOCK_SIZE": 123, "TASK_TOTAL_SIZE": 124, "TASK_MARSHAL": 125, + "TASK_IS_WATING": 130, + "TASK_IS_RUNNING": 131, + "TASK_FAILED": 132, + "TASK_FINISHED": 133, "DATA_ERROR": 201, } ) @@ -262,31 +274,34 @@ func (CmdErrorCode) EnumDescriptor() ([]byte, []int) { type TaskStatus int32 const ( - TaskStatus_ALL TaskStatus = 0 // 全部 - TaskStatus_WAITING TaskStatus = 1 // 等待中 - TaskStatus_RUNNING TaskStatus = 2 // 运行中 - TaskStatus_SUCCESS TaskStatus = 3 // 成功 - TaskStatus_FAIL TaskStatus = 4 // 失败 - TaskStatus_DELETED TaskStatus = 5 // 已删除,仅在执行删除成功时返回 + TaskStatus_ALL TaskStatus = 0 // 全部 + TaskStatus_WAITING TaskStatus = 1 // 等待中 + TaskStatus_READY TaskStatus = 2 // 上传完毕 + TaskStatus_RUNNING TaskStatus = 3 // 运行中 + TaskStatus_SUCCESS TaskStatus = 4 // 成功 + TaskStatus_FAIL TaskStatus = 5 // 失败 + TaskStatus_DELETED TaskStatus = 10 // 已删除,仅在执行删除成功时返回 ) // Enum value maps for TaskStatus. var ( TaskStatus_name = map[int32]string{ - 0: "ALL", - 1: "WAITING", - 2: "RUNNING", - 3: "SUCCESS", - 4: "FAIL", - 5: "DELETED", + 0: "ALL", + 1: "WAITING", + 2: "READY", + 3: "RUNNING", + 4: "SUCCESS", + 5: "FAIL", + 10: "DELETED", } TaskStatus_value = map[string]int32{ "ALL": 0, "WAITING": 1, - "RUNNING": 2, - "SUCCESS": 3, - "FAIL": 4, - "DELETED": 5, + "READY": 2, + "RUNNING": 3, + "SUCCESS": 4, + "FAIL": 5, + "DELETED": 10, } ) @@ -1988,7 +2003,7 @@ const file_cmd_proto_rawDesc = "" + "* \n" + "\vNameSpaceId\x12\a\n" + "\x03DID\x10\x00\x12\b\n" + - "\x04WUID\x10\x01*\xef\x02\n" + + "\x04WUID\x10\x01*\xc0\x03\n" + "\tErrorCode\x12\b\n" + "\x04SUCC\x10\x00\x12\x13\n" + "\x0fINVALID_ACCOUNT\x10e\x12\x15\n" + @@ -2008,19 +2023,25 @@ const file_cmd_proto_rawDesc = "" + "\x0eTASK_NUM_LIMIT\x10z\x12\x13\n" + "\x0fTASK_BLOCK_SIZE\x10{\x12\x13\n" + "\x0fTASK_TOTAL_SIZE\x10|\x12\x10\n" + - "\fTASK_MARSHAL\x10}\x12\x0f\n" + + "\fTASK_MARSHAL\x10}\x12\x13\n" + + "\x0eTASK_IS_WATING\x10\x82\x01\x12\x14\n" + + "\x0fTASK_IS_RUNNING\x10\x83\x01\x12\x10\n" + + "\vTASK_FAILED\x10\x84\x01\x12\x12\n" + + "\rTASK_FINISHED\x10\x85\x01\x12\x0f\n" + "\n" + "DATA_ERROR\x10\xc9\x01*\x16\n" + "\fCmdErrorCode\x12\x06\n" + - "\x02OK\x10\x00*S\n" + + "\x02OK\x10\x00*^\n" + "\n" + "TaskStatus\x12\a\n" + "\x03ALL\x10\x00\x12\v\n" + - "\aWAITING\x10\x01\x12\v\n" + - "\aRUNNING\x10\x02\x12\v\n" + - "\aSUCCESS\x10\x03\x12\b\n" + - "\x04FAIL\x10\x04\x12\v\n" + - "\aDELETED\x10\x05B!Z\x1fe.coding.net/rta/public/saasapib\x06proto3" + "\aWAITING\x10\x01\x12\t\n" + + "\x05READY\x10\x02\x12\v\n" + + "\aRUNNING\x10\x03\x12\v\n" + + "\aSUCCESS\x10\x04\x12\b\n" + + "\x04FAIL\x10\x05\x12\v\n" + + "\aDELETED\x10\n" + + "B!Z\x1fe.coding.net/rta/public/saasapib\x06proto3" var ( file_cmd_proto_rawDescOnce sync.Once diff --git a/cmd.proto b/cmd.proto index b315114..8c28460 100644 --- a/cmd.proto +++ b/cmd.proto @@ -205,6 +205,12 @@ enum ErrorCode { TASK_TOTAL_SIZE = 124; // 总文件大小超限 TASK_MARSHAL = 125; // 序列化 + TASK_IS_WATING = 130; // 任务未上传完毕 + TASK_IS_RUNNING = 131; // 任务已经在运行 + TASK_FAILED = 132; // 任务已失败 + TASK_FINISHED = 133; // 任务已完成 + + DATA_ERROR = 201; // 数据错误 } @@ -215,10 +221,11 @@ enum CmdErrorCode { enum TaskStatus { ALL = 0; // 全部 WAITING = 1; // 等待中 - RUNNING = 2; // 运行中 - SUCCESS = 3; // 成功 - FAIL = 4; // 失败 + READY = 2; // 上传完毕 + RUNNING = 3; // 运行中 + SUCCESS = 4; // 成功 + FAIL = 5; // 失败 - DELETED = 5; // 已删除,仅在执行删除成功时返回 + DELETED = 10; // 已删除,仅在执行删除成功时返回 } diff --git a/cmd/saastool/params.go b/cmd/saastool/params.go index 7be92f9..254aa78 100644 --- a/cmd/saastool/params.go +++ b/cmd/saastool/params.go @@ -61,16 +61,8 @@ func paramBatchSize(fs *flag.FlagSet) *uint { return fs.Uint("batchsize", 10000, "Batch size to sync") } -func paramBlockSize(fs *flag.FlagSet) uint64 { - bsize := fs.String("blocksize", "50M", "Block size to make hash. using size mode K, M, G, T") - num, err := ParseByteSize(*bsize) - if err != nil { - fmt.Println("Error parsing block size", "err", err) - fmt.Println("Using default 50M") - num = 50 * 1024 * 1024 - - } - return num +func paramBlockSize(fs *flag.FlagSet) *string { + return fs.String("blocksize", "50M", "Block size to make hash. using size mode K, M, G, T") } func paramClear(fs *flag.FlagSet) *bool { diff --git a/cmd/saastool/task.go b/cmd/saastool/task.go index 4820385..3500802 100644 --- a/cmd/saastool/task.go +++ b/cmd/saastool/task.go @@ -23,6 +23,10 @@ func RunTask(args ...string) error { return RunTaskDelete(args...) case "info": return RunTaskInfo(args...) + case "upload": + return RunTaskUpload(args...) + case "run": + return RunTaskRun(args...) default: err := fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'saastool task help' for usage`, name) slog.Warn(err.Error()) diff --git a/cmd/saastool/task_info.go b/cmd/saastool/task_info.go index cddd074..cfe08d2 100644 --- a/cmd/saastool/task_info.go +++ b/cmd/saastool/task_info.go @@ -45,11 +45,16 @@ func RunTaskInfo(args ...string) error { }, } - return doTaskInfo(infoTaskParams) + taskRes, err := doTaskInfo(infoTaskParams) + + if err != nil { + fmt.Printf("task res: %v\n", protojson.Format(taskRes)) + } + return err } -func doTaskInfo(infoTaskParams infoTaskParams) error { +func doTaskInfo(infoTaskParams infoTaskParams) (*saasapi.Task, error) { saasReq := &saasapi.SaasReq{ Cmd: &saasapi.SaasReq_TaskInfo{ TaskInfo: &saasapi.TaskInfo{ @@ -61,16 +66,16 @@ func doTaskInfo(infoTaskParams infoTaskParams) error { res, err := infoTaskParams.saasHttp.TaskInfo(saasReq) if err != nil { fmt.Println("submit Task info error", "err", err) - return err + return nil, err } if res.Code != saasapi.ErrorCode_SUCC { - fmt.Println("task info failed", "code", res.Code, "status", res.Status) - return nil + err = fmt.Errorf("task info failed. code:%v, status:%v", res.Code, res.Status) + fmt.Println(err) + return nil, err } taskRes := res.GetTaskInfoRes() - - fmt.Printf("task res: %v\n", protojson.Format(taskRes)) - return nil + fmt.Println(protojson.Format(taskRes)) + return taskRes, nil } diff --git a/cmd/saastool/task_make.go b/cmd/saastool/task_make.go index 1bc0063..4346047 100644 --- a/cmd/saastool/task_make.go +++ b/cmd/saastool/task_make.go @@ -28,16 +28,20 @@ type makeTaskParams struct { // 计算任务 type hashTask struct { - chunk []byte - index int + chunk *[]byte + hash string + blockSize uint64 + index int } +/* // 计算结果 type hashResult struct { hash string blockSize uint64 index int } +*/ func RunTaskMake(args ...string) error { fs := flag.NewFlagSet("make", flag.ExitOnError) @@ -56,7 +60,15 @@ func RunTaskMake(args ...string) error { return nil } - if blockSize < blockSizeMin || blockSize > blockSizeMax { + blockSizeNum, err := ParseByteSize(*blockSize) + if err != nil { + fmt.Println("Error parsing block size", "err", err) + fmt.Println("Using default 50M") + blockSizeNum = 50 * 1024 * 1024 + + } + + if blockSizeNum < blockSizeMin || blockSizeNum > blockSizeMax { fmt.Println("block size error", "min", blockSizeMin, "max", blockSizeMax) return nil } @@ -65,7 +77,7 @@ func RunTaskMake(args ...string) error { sourcePath: *sourcePath, hashFile: *hashFile, task: &saasapi.Task{ - TaskBlockSize: blockSize, + TaskBlockSize: blockSizeNum, TaskDescription: *desc, }, } @@ -114,8 +126,15 @@ func doTaskMake(makeTaskParams makeTaskParams) error { return err } - tasks := make(chan hashTask) - results := make(chan hashResult) + totalSize := uint64(fi.Size()) + // 计算读取次数 + readTimes := int(totalSize / makeTaskParams.task.TaskBlockSize) + if totalSize%makeTaskParams.task.TaskBlockSize != 0 { + readTimes++ + } + + tasks := make(chan *hashTask) + results := make(chan *hashTask) // 启动工作协程 hashMaxWorker := runtime.GOMAXPROCS(0) @@ -123,18 +142,27 @@ func doTaskMake(makeTaskParams makeTaskParams) error { go hashWorker(tasks, results) } + // 初始化读缓存 + readBuffers := make([][]byte, hashMaxWorker) + for i := range hashMaxWorker { + readBuffers[i] = make([]byte, makeTaskParams.task.TaskBlockSize) + } + wg := sync.WaitGroup{} wg.Add(1) go func() { - index := 0 - buffer := make([]byte, makeTaskParams.task.TaskBlockSize) - for { - n, err := sourceFile.Read(buffer) + for index := range readTimes { + buffer := &readBuffers[index%hashMaxWorker] + n, err := sourceFile.Read(*buffer) + if n > 0 { wg.Add(1) - fmt.Printf("\rhashing file [%v], block [%v]", makeTaskParams.sourcePath, index) - tasks <- hashTask{chunk: buffer[:n], index: index} - index++ + //fmt.Printf("\rhashing file [%v], block [%v]\n", makeTaskParams.sourcePath, index) + tasks <- &hashTask{ + chunk: buffer, + index: index, + blockSize: uint64(n), + } } if err != nil { break @@ -144,10 +172,12 @@ func doTaskMake(makeTaskParams makeTaskParams) error { wg.Done() }() - var allResults []hashResult + // 接收结果 + var allResults []*hashTask go func() { for r := range results { allResults = append(allResults, r) + fmt.Printf("\rhashed file [%v], block [%v]", makeTaskParams.sourcePath, r.index) wg.Done() } }() @@ -178,12 +208,14 @@ func doTaskMake(makeTaskParams makeTaskParams) error { } // hash计算协程 -func hashWorker(tasks <-chan hashTask, results chan<- hashResult) { +func hashWorker(tasks <-chan *hashTask, results chan<- *hashTask) { + h := sha256.New() for t := range tasks { - h := sha256.New() - h.Write(t.chunk) - hash := hex.EncodeToString(h.Sum(nil)) - results <- hashResult{hash: hash, index: t.index, blockSize: uint64(len(t.chunk))} + h.Write((*t.chunk)[:t.blockSize]) + t.hash = hex.EncodeToString(h.Sum(nil)) + //fmt.Printf("\rhashing block [%v]\n", t.index) + results <- t + h.Reset() } } diff --git a/cmd/saastool/task_run.go b/cmd/saastool/task_run.go index 709f8f9..3605dd8 100644 --- a/cmd/saastool/task_run.go +++ b/cmd/saastool/task_run.go @@ -1,5 +1,76 @@ package main +import ( + "flag" + "fmt" + "net/http" + + "e.coding.net/rta/public/saasapi" + "e.coding.net/rta/public/saasapi/pkg/saashttp" + "google.golang.org/protobuf/encoding/protojson" +) + +type runTaskParams struct { + taskSha256 string + saasHttp *saashttp.SaasClient +} + func RunTaskRun(args ...string) error { + fs := flag.NewFlagSet("create", flag.ExitOnError) + cfgFile := paramConfig(fs) + sha256 := paramSha256(fs) + + if err := fs.Parse(args); err != nil { + fmt.Println("command line parse error", "err", err) + return err + } + + if fs.NArg() > 0 || len(*sha256) == 0 { + fs.PrintDefaults() + return nil + } + + cfg, err := LoadConfigFile(*cfgFile) + if err != nil { + fmt.Println("LoadConfigFile error", "err", err) + return err + } + + runTaskParams := runTaskParams{ + taskSha256: *sha256, + saasHttp: &saashttp.SaasClient{ + Client: &http.Client{}, + ApiUrls: &cfg.ApiUrls, + Auth: &cfg.Auth, + }, + } + + return doTaskRun(runTaskParams) + +} + +func doTaskRun(runTaskParams runTaskParams) error { + saasReq := &saasapi.SaasReq{ + Cmd: &saasapi.SaasReq_TaskRun{ + TaskRun: &saasapi.TaskRun{ + TaskSha256: runTaskParams.taskSha256, + }, + }, + } + + res, err := runTaskParams.saasHttp.TaskRun(saasReq) + if err != nil { + fmt.Println("submit Task run error", "err", err) + return err + } + + if res.Code != saasapi.ErrorCode_SUCC { + err = fmt.Errorf("task run failed. code:%v, status:%v", res.Code, res.Status) + fmt.Println(err) + return err + } + + fmt.Println("task run success", protojson.Format(res)) + return nil } diff --git a/cmd/saastool/task_upload.go b/cmd/saastool/task_upload.go new file mode 100644 index 0000000..d6bb290 --- /dev/null +++ b/cmd/saastool/task_upload.go @@ -0,0 +1,116 @@ +package main + +import ( + "flag" + "fmt" + "net/http" + "os" + + "e.coding.net/rta/public/saasapi" + "e.coding.net/rta/public/saasapi/pkg/saashttp" +) + +type uploadTaskParams struct { + taskSha256 string + saasHttp *saashttp.SaasClient +} + +func RunTaskUpload(args ...string) error { + fs := flag.NewFlagSet("upload", flag.ExitOnError) + cfgFile := paramConfig(fs) + sha256 := paramSha256(fs) + + if err := fs.Parse(args); err != nil { + fmt.Println("command line parse error", "err", err) + return err + } + + if fs.NArg() > 0 || len(*sha256) == 0 { + fs.PrintDefaults() + return nil + } + + cfg, err := LoadConfigFile(*cfgFile) + if err != nil { + fmt.Println("LoadConfigFile error", "err", err) + return err + } + + uploadTaskParams := uploadTaskParams{ + taskSha256: *sha256, + saasHttp: &saashttp.SaasClient{ + Client: &http.Client{}, + ApiUrls: &cfg.ApiUrls, + Auth: &cfg.Auth, + }, + } + + return doTaskUpload(uploadTaskParams) + +} +func doTaskUpload(uploadTaskParams uploadTaskParams) error { + infoTaskParams := infoTaskParams{ + taskSha256: uploadTaskParams.taskSha256, + saasHttp: uploadTaskParams.saasHttp, + } + taskInfo, err := doTaskInfo(infoTaskParams) + if err != nil { + return err + } + + totalFiles := len(taskInfo.GetTaskFileInfos()) + fi := 0 + for _, finfo := range taskInfo.GetTaskFileInfos() { + + fi++ + var f *os.File + offset := int64(0) + totalBlocks := len(finfo.GetFileBlocks()) + bi := 0 + for _, binfo := range finfo.GetFileBlocks() { + bi++ + if !binfo.GetUploaded() { + if f == nil { + f, err = os.Open(finfo.GetFileName()) + if err != nil { + return err + } + } + + blockRes, err := uploadTaskParams.saasHttp.TaskUpload( + binfo.GetBlockSha256(), + f, + offset, + int(binfo.GetBlockLength()), + ) + + if err != nil { + return err + } + + if blockRes.GetCode() != saasapi.ErrorCode_SUCC { + return fmt.Errorf("upload block error, code %d, msg %s", blockRes.GetCode(), blockRes.GetStatus()) + } else { + fmt.Printf("upload block success. file: %v, sha256 %v. block %v/%v, file %v/%v\n", + finfo.GetFileName(), binfo.GetBlockSha256(), + bi, totalBlocks, fi, totalFiles, + ) + } + + } else { + fmt.Printf("uploaded block. file: %v, sha256 %v. block %v/%v, file %v/%v\n", + finfo.GetFileName(), binfo.GetBlockSha256(), + bi, totalBlocks, fi, totalFiles, + ) + } + offset += int64(binfo.GetBlockLength()) + + } + + if f != nil { + f.Close() + } + } + + return nil +} diff --git a/pkg/saashttp/httpcli.go b/pkg/saashttp/httpcli.go index 2df7187..f7b3df5 100644 --- a/pkg/saashttp/httpcli.go +++ b/pkg/saashttp/httpcli.go @@ -2,6 +2,7 @@ package saashttp import ( "bytes" + "compress/gzip" "crypto/md5" "encoding/hex" "encoding/json" @@ -9,6 +10,7 @@ import ( "io" "net/http" "net/url" + "os" "strconv" "time" @@ -70,7 +72,12 @@ func (c *SaasClient) TaskRun(saasReq *saasapi.SaasReq) (saasRes *saasapi.SaasRes return c.post(postUrl, saasReq) } -func (c *SaasClient) makeUrl(baseUrl, path string) string { +func (c *SaasClient) TaskUpload(sha256 string, file *os.File, offset int64, size int) (saasRes *saasapi.SaasRes, err error) { + postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.TaskUploadPath, "sha256", sha256) + return c.upload(postUrl, file, offset, size) +} + +func (c *SaasClient) makeUrl(baseUrl, path string, params ...string) string { url, err := url.Parse(baseUrl) if err != nil { panic(err) @@ -86,6 +93,10 @@ func (c *SaasClient) makeUrl(baseUrl, path string) string { default: queryValues.Add("resmode", "protobuf") } + + for i := 0; i < len(params); i += 2 { + queryValues.Add(params[i], params[i+1]) + } url.RawQuery = queryValues.Encode() return url.String() @@ -143,3 +154,81 @@ func (c *SaasClient) post(url string, saasReq *saasapi.SaasReq) (saasRes *saasap return saasRes, nil } + +func (c *SaasClient) upload(url string, file *os.File, offset int64, size int) (saasRes *saasapi.SaasRes, err error) { + if file == nil { + return nil, fmt.Errorf("file is nil") + } + + if size <= 0 { + return nil, fmt.Errorf("size is invalid") + } + + buf := make([]byte, size) + n, err := file.ReadAt(buf, offset) + if err != nil { + fmt.Println("read file error", err) + return nil, err + } + if n != size { + return nil, fmt.Errorf("read file error") + } + + var gzBuf bytes.Buffer + gz := gzip.NewWriter(&gzBuf) + + if _, err := gz.Write(buf); err != nil { + fmt.Println("gzip write error", err) + return nil, err + } + + if err = gz.Close(); err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(gzBuf.Bytes())) + if err != nil { + fmt.Println("http new request error", err) + return nil, err + } + + timeStamp := strconv.FormatInt(time.Now().Unix(), 10) + md5byte := md5.Sum([]byte(c.Auth.Account + c.Auth.Token + timeStamp)) + authorization := hex.EncodeToString(md5byte[:]) + + req.Header.Add("Account", c.Auth.Account) + req.Header.Add("Time", timeStamp) + req.Header.Add("Authorization", authorization) + req.Header.Add("Content-Type", "application/octet-stream") + req.Header.Add("Content-Encoding", "gzip") + res, err := c.Client.Do(req) + if err != nil { + fmt.Println("http send error", err) + return nil, err + } + + defer res.Body.Close() + + resBody, err := io.ReadAll(res.Body) + if err != nil { + fmt.Println("http read body error", err) + return nil, err + } + + saasRes = &saasapi.SaasRes{} + if c.ResponseEncoder == RESPONSE_ENCODER_PROTOBUF { + err = proto.Unmarshal(resBody, saasRes) + if err != nil { + fmt.Println("unmarshal response body to protobuf error", err) + return nil, err + } + } else { + err = json.Unmarshal(resBody, saasRes) + if err != nil { + fmt.Println("unmarshal response body to json error", err) + return nil, err + } + } + + return saasRes, nil +}