增加任务取回功能

This commit is contained in:
algotao
2025-07-27 18:16:08 +08:00
parent a78c16d301
commit 6ca9fe7a02
5 changed files with 237 additions and 12 deletions

View File

@@ -25,6 +25,8 @@ func RunTask(args ...string) error {
return RunTaskInfo(args...)
case "upload":
return RunTaskUpload(args...)
case "download":
return RunTaskDownload(args...)
case "run":
return RunTaskRun(args...)
default:
@@ -51,6 +53,7 @@ Commands:
delete Delete a task on server
info Get a task info on server
upload Upload task's file block to server
download Download task's file block to local
"help" is the default command.

View File

@@ -0,0 +1,134 @@
package main
import (
"flag"
"fmt"
"net/http"
"os"
"path/filepath"
"e.coding.net/rta/public/saasapi"
"e.coding.net/rta/public/saasapi/pkg/saashttp"
)
type downloadTaskParams struct {
taskSha256 string
destPath string
saasHttp *saashttp.SaasClient
}
func RunTaskDownload(args ...string) error {
fs := flag.NewFlagSet("download", flag.ExitOnError)
cfgFile := paramConfig(fs)
sha256 := paramSha256(fs)
destPath := paramDestPath(fs)
if err := fs.Parse(args); err != nil {
fmt.Fprintln(os.Stderr, "command line parse error", "err", err)
return err
}
if fs.NArg() > 0 || len(*sha256) == 0 || len(*destPath) == 0 {
fs.PrintDefaults()
return nil
}
cfg, err := LoadConfigFile(*cfgFile)
if err != nil {
fmt.Fprintln(os.Stderr, "LoadConfigFile error", "err", err)
return err
}
downloadTaskParams := downloadTaskParams{
taskSha256: *sha256,
destPath: *destPath,
saasHttp: &saashttp.SaasClient{
Client: &http.Client{},
ApiUrls: &cfg.ApiUrls,
Auth: &cfg.Auth,
},
}
return doTaskDownload(downloadTaskParams)
}
func doTaskDownload(downloadTaskParams downloadTaskParams) error {
infoTaskParams := infoTaskParams{
taskSha256: downloadTaskParams.taskSha256,
saasHttp: downloadTaskParams.saasHttp,
}
taskInfo, err := doTaskInfo(infoTaskParams)
if err != nil {
return err
}
if len(taskInfo.GetSourcePath()) == 0 {
err = fmt.Errorf("task download failed. source path is empty")
fmt.Fprintln(os.Stderr, err)
return err
}
totalFiles := len(taskInfo.GetTaskFileInfos())
fi := 0
for _, finfo := range taskInfo.GetTaskFileInfos() {
fi++
var f *os.File
offset := int64(0)
totalBlocks := len(finfo.GetFileBlocks())
bi := 0
for _, binfo := range finfo.GetFileBlocks() {
bi++
if binfo.GetUploaded() {
if f == nil {
fname := finfo.GetFileName()
if len(downloadTaskParams.destPath) > 0 {
fname = filepath.Join(downloadTaskParams.destPath, fname)
}
os.MkdirAll(filepath.Dir(fname), os.ModePerm)
f, err = os.Create(fname)
if err != nil {
return err
}
}
blockRes, err := downloadTaskParams.saasHttp.TaskDownload(
binfo.GetBlockSha256(),
f,
offset,
int(binfo.GetBlockLength()),
)
if err != nil {
return err
}
if blockRes.GetCode() != saasapi.ErrorCode_SUCC {
err = fmt.Errorf("download block error, code %d, msg %s", blockRes.GetCode(), blockRes.GetStatus())
fmt.Fprintln(os.Stderr, err)
return err
} else {
fmt.Printf("download block success. file: %v, sha256 %v. block %v/%v, file %v/%v\n",
finfo.GetFileName(), binfo.GetBlockSha256(),
bi, totalBlocks, fi, totalFiles,
)
}
} else {
fmt.Printf("download block. file: %v, sha256 %v. block %v/%v, file %v/%v\n",
finfo.GetFileName(), binfo.GetBlockSha256(),
bi, totalBlocks, fi, totalFiles,
)
}
offset += int64(binfo.GetBlockLength())
}
if f != nil {
f.Close()
}
}
return nil
}

View File

@@ -1,7 +1,5 @@
FROM rta-docker.pkg.coding.net/public/docker/entre_dev:latest AS builder
ARG TARGETOS TARGETARCH APPNAME
COPY . /tmp/saasapi/
WORKDIR /tmp/saasapi/

View File

@@ -1,16 +1,17 @@
package saashttp
type ApiUrls struct {
BaseUrl string
WritePath string
ReadPath string
ColumnWritePath string
TaskCreatePath string
TaskListPath string
TaskInfoPath string
TaskDeletePath string
TaskRunPath string
TaskUploadPath string
BaseUrl string
WritePath string
ReadPath string
ColumnWritePath string
TaskCreatePath string
TaskListPath string
TaskInfoPath string
TaskDeletePath string
TaskRunPath string
TaskUploadPath string
TaskDownloadPath string
}
type Auth struct {

View File

@@ -12,6 +12,7 @@ import (
"net/url"
"os"
"strconv"
"strings"
"time"
"e.coding.net/rta/public/saasapi"
@@ -77,6 +78,11 @@ func (c *SaasClient) TaskUpload(sha256 string, file *os.File, offset int64, size
return c.upload(postUrl, file, offset, size)
}
func (c *SaasClient) TaskDownload(sha256 string, file *os.File, offset int64, size int) (saasRes *saasapi.SaasRes, err error) {
postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.TaskDownloadPath, "sha256", sha256)
return c.download(postUrl, file, offset, size)
}
func (c *SaasClient) makeUrl(baseUrl, path string, params ...string) string {
url, err := url.Parse(baseUrl)
if err != nil {
@@ -232,3 +238,86 @@ func (c *SaasClient) upload(url string, file *os.File, offset int64, size int) (
return saasRes, nil
}
func (c *SaasClient) download(url string, file *os.File, offset int64, size int) (saasRes *saasapi.SaasRes, err error) {
if file == nil {
return nil, fmt.Errorf("file is nil")
}
if size <= 0 {
return nil, fmt.Errorf("size is invalid")
}
req, err := http.NewRequest("GET", url, bytes.NewBuffer([]byte{}))
if err != nil {
fmt.Println("http new request error", err)
return nil, err
}
timeStamp := strconv.FormatInt(time.Now().Unix(), 10)
md5byte := md5.Sum([]byte(c.Auth.Account + c.Auth.Token + timeStamp))
authorization := hex.EncodeToString(md5byte[:])
req.Header.Add("Account", c.Auth.Account)
req.Header.Add("Time", timeStamp)
req.Header.Add("Authorization", authorization)
req.Header.Add("Accept-Encoding", "gzip")
res, err := c.Client.Do(req)
if err != nil {
fmt.Println("http send error", err)
return nil, err
}
defer res.Body.Close()
if res.StatusCode != 200 {
err = fmt.Errorf("NOT 200. %v", res.StatusCode)
fmt.Println("http state error", err)
return nil, err
}
bodyReader := res.Body
if strings.Contains(res.Header.Get("Content-Encoding"), "gzip") {
gz, err := gzip.NewReader(res.Body)
if err != nil {
fmt.Println("gzip newreader error", err)
return nil, err
}
defer gz.Close()
bodyReader = gz
}
resBody, err := io.ReadAll(bodyReader)
if err != nil {
fmt.Println("read body error", err)
return nil, err
}
saasRes = &saasapi.SaasRes{}
if strings.Contains(res.Header.Get("Content-Type"), "application/octet-stream") {
if len(resBody) == size {
file.WriteAt(resBody, offset)
} else {
err = fmt.Errorf("body size error. body:%v, want:%v", len(resBody), size)
fmt.Println("http read body error", err)
return nil, err
}
} else {
if c.ResponseEncoder == RESPONSE_ENCODER_PROTOBUF {
err = proto.Unmarshal(resBody, saasRes)
if err != nil {
fmt.Println("unmarshal response body to protobuf error", err)
return nil, err
}
} else {
err = json.Unmarshal(resBody, saasRes)
if err != nil {
fmt.Println("unmarshal response body to json error", err)
return nil, err
}
}
}
return saasRes, nil
}