支持run
This commit is contained in:
@@ -61,16 +61,8 @@ func paramBatchSize(fs *flag.FlagSet) *uint {
|
||||
return fs.Uint("batchsize", 10000, "Batch size to sync")
|
||||
}
|
||||
|
||||
func paramBlockSize(fs *flag.FlagSet) uint64 {
|
||||
bsize := fs.String("blocksize", "50M", "Block size to make hash. using size mode K, M, G, T")
|
||||
num, err := ParseByteSize(*bsize)
|
||||
if err != nil {
|
||||
fmt.Println("Error parsing block size", "err", err)
|
||||
fmt.Println("Using default 50M")
|
||||
num = 50 * 1024 * 1024
|
||||
|
||||
}
|
||||
return num
|
||||
func paramBlockSize(fs *flag.FlagSet) *string {
|
||||
return fs.String("blocksize", "50M", "Block size to make hash. using size mode K, M, G, T")
|
||||
}
|
||||
|
||||
func paramClear(fs *flag.FlagSet) *bool {
|
||||
|
||||
@@ -23,6 +23,10 @@ func RunTask(args ...string) error {
|
||||
return RunTaskDelete(args...)
|
||||
case "info":
|
||||
return RunTaskInfo(args...)
|
||||
case "upload":
|
||||
return RunTaskUpload(args...)
|
||||
case "run":
|
||||
return RunTaskRun(args...)
|
||||
default:
|
||||
err := fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'saastool task help' for usage`, name)
|
||||
slog.Warn(err.Error())
|
||||
|
||||
@@ -45,11 +45,16 @@ func RunTaskInfo(args ...string) error {
|
||||
},
|
||||
}
|
||||
|
||||
return doTaskInfo(infoTaskParams)
|
||||
taskRes, err := doTaskInfo(infoTaskParams)
|
||||
|
||||
if err != nil {
|
||||
fmt.Printf("task res: %v\n", protojson.Format(taskRes))
|
||||
}
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
func doTaskInfo(infoTaskParams infoTaskParams) error {
|
||||
func doTaskInfo(infoTaskParams infoTaskParams) (*saasapi.Task, error) {
|
||||
saasReq := &saasapi.SaasReq{
|
||||
Cmd: &saasapi.SaasReq_TaskInfo{
|
||||
TaskInfo: &saasapi.TaskInfo{
|
||||
@@ -61,16 +66,16 @@ func doTaskInfo(infoTaskParams infoTaskParams) error {
|
||||
res, err := infoTaskParams.saasHttp.TaskInfo(saasReq)
|
||||
if err != nil {
|
||||
fmt.Println("submit Task info error", "err", err)
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if res.Code != saasapi.ErrorCode_SUCC {
|
||||
fmt.Println("task info failed", "code", res.Code, "status", res.Status)
|
||||
return nil
|
||||
err = fmt.Errorf("task info failed. code:%v, status:%v", res.Code, res.Status)
|
||||
fmt.Println(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
taskRes := res.GetTaskInfoRes()
|
||||
|
||||
fmt.Printf("task res: %v\n", protojson.Format(taskRes))
|
||||
return nil
|
||||
fmt.Println(protojson.Format(taskRes))
|
||||
return taskRes, nil
|
||||
}
|
||||
|
||||
@@ -28,16 +28,20 @@ type makeTaskParams struct {
|
||||
|
||||
// 计算任务
|
||||
type hashTask struct {
|
||||
chunk []byte
|
||||
index int
|
||||
chunk *[]byte
|
||||
hash string
|
||||
blockSize uint64
|
||||
index int
|
||||
}
|
||||
|
||||
/*
|
||||
// 计算结果
|
||||
type hashResult struct {
|
||||
hash string
|
||||
blockSize uint64
|
||||
index int
|
||||
}
|
||||
*/
|
||||
|
||||
func RunTaskMake(args ...string) error {
|
||||
fs := flag.NewFlagSet("make", flag.ExitOnError)
|
||||
@@ -56,7 +60,15 @@ func RunTaskMake(args ...string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if blockSize < blockSizeMin || blockSize > blockSizeMax {
|
||||
blockSizeNum, err := ParseByteSize(*blockSize)
|
||||
if err != nil {
|
||||
fmt.Println("Error parsing block size", "err", err)
|
||||
fmt.Println("Using default 50M")
|
||||
blockSizeNum = 50 * 1024 * 1024
|
||||
|
||||
}
|
||||
|
||||
if blockSizeNum < blockSizeMin || blockSizeNum > blockSizeMax {
|
||||
fmt.Println("block size error", "min", blockSizeMin, "max", blockSizeMax)
|
||||
return nil
|
||||
}
|
||||
@@ -65,7 +77,7 @@ func RunTaskMake(args ...string) error {
|
||||
sourcePath: *sourcePath,
|
||||
hashFile: *hashFile,
|
||||
task: &saasapi.Task{
|
||||
TaskBlockSize: blockSize,
|
||||
TaskBlockSize: blockSizeNum,
|
||||
TaskDescription: *desc,
|
||||
},
|
||||
}
|
||||
@@ -114,8 +126,15 @@ func doTaskMake(makeTaskParams makeTaskParams) error {
|
||||
return err
|
||||
}
|
||||
|
||||
tasks := make(chan hashTask)
|
||||
results := make(chan hashResult)
|
||||
totalSize := uint64(fi.Size())
|
||||
// 计算读取次数
|
||||
readTimes := int(totalSize / makeTaskParams.task.TaskBlockSize)
|
||||
if totalSize%makeTaskParams.task.TaskBlockSize != 0 {
|
||||
readTimes++
|
||||
}
|
||||
|
||||
tasks := make(chan *hashTask)
|
||||
results := make(chan *hashTask)
|
||||
|
||||
// 启动工作协程
|
||||
hashMaxWorker := runtime.GOMAXPROCS(0)
|
||||
@@ -123,18 +142,27 @@ func doTaskMake(makeTaskParams makeTaskParams) error {
|
||||
go hashWorker(tasks, results)
|
||||
}
|
||||
|
||||
// 初始化读缓存
|
||||
readBuffers := make([][]byte, hashMaxWorker)
|
||||
for i := range hashMaxWorker {
|
||||
readBuffers[i] = make([]byte, makeTaskParams.task.TaskBlockSize)
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
index := 0
|
||||
buffer := make([]byte, makeTaskParams.task.TaskBlockSize)
|
||||
for {
|
||||
n, err := sourceFile.Read(buffer)
|
||||
for index := range readTimes {
|
||||
buffer := &readBuffers[index%hashMaxWorker]
|
||||
n, err := sourceFile.Read(*buffer)
|
||||
|
||||
if n > 0 {
|
||||
wg.Add(1)
|
||||
fmt.Printf("\rhashing file [%v], block [%v]", makeTaskParams.sourcePath, index)
|
||||
tasks <- hashTask{chunk: buffer[:n], index: index}
|
||||
index++
|
||||
//fmt.Printf("\rhashing file [%v], block [%v]\n", makeTaskParams.sourcePath, index)
|
||||
tasks <- &hashTask{
|
||||
chunk: buffer,
|
||||
index: index,
|
||||
blockSize: uint64(n),
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
break
|
||||
@@ -144,10 +172,12 @@ func doTaskMake(makeTaskParams makeTaskParams) error {
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
var allResults []hashResult
|
||||
// 接收结果
|
||||
var allResults []*hashTask
|
||||
go func() {
|
||||
for r := range results {
|
||||
allResults = append(allResults, r)
|
||||
fmt.Printf("\rhashed file [%v], block [%v]", makeTaskParams.sourcePath, r.index)
|
||||
wg.Done()
|
||||
}
|
||||
}()
|
||||
@@ -178,12 +208,14 @@ func doTaskMake(makeTaskParams makeTaskParams) error {
|
||||
}
|
||||
|
||||
// hash计算协程
|
||||
func hashWorker(tasks <-chan hashTask, results chan<- hashResult) {
|
||||
func hashWorker(tasks <-chan *hashTask, results chan<- *hashTask) {
|
||||
h := sha256.New()
|
||||
for t := range tasks {
|
||||
h := sha256.New()
|
||||
h.Write(t.chunk)
|
||||
hash := hex.EncodeToString(h.Sum(nil))
|
||||
results <- hashResult{hash: hash, index: t.index, blockSize: uint64(len(t.chunk))}
|
||||
h.Write((*t.chunk)[:t.blockSize])
|
||||
t.hash = hex.EncodeToString(h.Sum(nil))
|
||||
//fmt.Printf("\rhashing block [%v]\n", t.index)
|
||||
results <- t
|
||||
h.Reset()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,76 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"e.coding.net/rta/public/saasapi"
|
||||
"e.coding.net/rta/public/saasapi/pkg/saashttp"
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
)
|
||||
|
||||
type runTaskParams struct {
|
||||
taskSha256 string
|
||||
saasHttp *saashttp.SaasClient
|
||||
}
|
||||
|
||||
func RunTaskRun(args ...string) error {
|
||||
fs := flag.NewFlagSet("create", flag.ExitOnError)
|
||||
cfgFile := paramConfig(fs)
|
||||
sha256 := paramSha256(fs)
|
||||
|
||||
if err := fs.Parse(args); err != nil {
|
||||
fmt.Println("command line parse error", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if fs.NArg() > 0 || len(*sha256) == 0 {
|
||||
fs.PrintDefaults()
|
||||
return nil
|
||||
}
|
||||
|
||||
cfg, err := LoadConfigFile(*cfgFile)
|
||||
if err != nil {
|
||||
fmt.Println("LoadConfigFile error", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
runTaskParams := runTaskParams{
|
||||
taskSha256: *sha256,
|
||||
saasHttp: &saashttp.SaasClient{
|
||||
Client: &http.Client{},
|
||||
ApiUrls: &cfg.ApiUrls,
|
||||
Auth: &cfg.Auth,
|
||||
},
|
||||
}
|
||||
|
||||
return doTaskRun(runTaskParams)
|
||||
|
||||
}
|
||||
|
||||
func doTaskRun(runTaskParams runTaskParams) error {
|
||||
saasReq := &saasapi.SaasReq{
|
||||
Cmd: &saasapi.SaasReq_TaskRun{
|
||||
TaskRun: &saasapi.TaskRun{
|
||||
TaskSha256: runTaskParams.taskSha256,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
res, err := runTaskParams.saasHttp.TaskRun(saasReq)
|
||||
if err != nil {
|
||||
fmt.Println("submit Task run error", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if res.Code != saasapi.ErrorCode_SUCC {
|
||||
err = fmt.Errorf("task run failed. code:%v, status:%v", res.Code, res.Status)
|
||||
fmt.Println(err)
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Println("task run success", protojson.Format(res))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
116
cmd/saastool/task_upload.go
Normal file
116
cmd/saastool/task_upload.go
Normal file
@@ -0,0 +1,116 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"e.coding.net/rta/public/saasapi"
|
||||
"e.coding.net/rta/public/saasapi/pkg/saashttp"
|
||||
)
|
||||
|
||||
type uploadTaskParams struct {
|
||||
taskSha256 string
|
||||
saasHttp *saashttp.SaasClient
|
||||
}
|
||||
|
||||
func RunTaskUpload(args ...string) error {
|
||||
fs := flag.NewFlagSet("upload", flag.ExitOnError)
|
||||
cfgFile := paramConfig(fs)
|
||||
sha256 := paramSha256(fs)
|
||||
|
||||
if err := fs.Parse(args); err != nil {
|
||||
fmt.Println("command line parse error", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if fs.NArg() > 0 || len(*sha256) == 0 {
|
||||
fs.PrintDefaults()
|
||||
return nil
|
||||
}
|
||||
|
||||
cfg, err := LoadConfigFile(*cfgFile)
|
||||
if err != nil {
|
||||
fmt.Println("LoadConfigFile error", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
uploadTaskParams := uploadTaskParams{
|
||||
taskSha256: *sha256,
|
||||
saasHttp: &saashttp.SaasClient{
|
||||
Client: &http.Client{},
|
||||
ApiUrls: &cfg.ApiUrls,
|
||||
Auth: &cfg.Auth,
|
||||
},
|
||||
}
|
||||
|
||||
return doTaskUpload(uploadTaskParams)
|
||||
|
||||
}
|
||||
func doTaskUpload(uploadTaskParams uploadTaskParams) error {
|
||||
infoTaskParams := infoTaskParams{
|
||||
taskSha256: uploadTaskParams.taskSha256,
|
||||
saasHttp: uploadTaskParams.saasHttp,
|
||||
}
|
||||
taskInfo, err := doTaskInfo(infoTaskParams)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
totalFiles := len(taskInfo.GetTaskFileInfos())
|
||||
fi := 0
|
||||
for _, finfo := range taskInfo.GetTaskFileInfos() {
|
||||
|
||||
fi++
|
||||
var f *os.File
|
||||
offset := int64(0)
|
||||
totalBlocks := len(finfo.GetFileBlocks())
|
||||
bi := 0
|
||||
for _, binfo := range finfo.GetFileBlocks() {
|
||||
bi++
|
||||
if !binfo.GetUploaded() {
|
||||
if f == nil {
|
||||
f, err = os.Open(finfo.GetFileName())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
blockRes, err := uploadTaskParams.saasHttp.TaskUpload(
|
||||
binfo.GetBlockSha256(),
|
||||
f,
|
||||
offset,
|
||||
int(binfo.GetBlockLength()),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if blockRes.GetCode() != saasapi.ErrorCode_SUCC {
|
||||
return fmt.Errorf("upload block error, code %d, msg %s", blockRes.GetCode(), blockRes.GetStatus())
|
||||
} else {
|
||||
fmt.Printf("upload block success. file: %v, sha256 %v. block %v/%v, file %v/%v\n",
|
||||
finfo.GetFileName(), binfo.GetBlockSha256(),
|
||||
bi, totalBlocks, fi, totalFiles,
|
||||
)
|
||||
}
|
||||
|
||||
} else {
|
||||
fmt.Printf("uploaded block. file: %v, sha256 %v. block %v/%v, file %v/%v\n",
|
||||
finfo.GetFileName(), binfo.GetBlockSha256(),
|
||||
bi, totalBlocks, fi, totalFiles,
|
||||
)
|
||||
}
|
||||
offset += int64(binfo.GetBlockLength())
|
||||
|
||||
}
|
||||
|
||||
if f != nil {
|
||||
f.Close()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user