270 lines
5.6 KiB
Go
270 lines
5.6 KiB
Go
package main
|
|
|
|
import (
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
|
|
"e.coding.net/rta/public/saasapi"
|
|
"google.golang.org/protobuf/encoding/protojson"
|
|
)
|
|
|
|
const (
|
|
blockSizeMin = 50 * 1024 * 1024
|
|
blockSizeMax = 200 * 1024 * 1024
|
|
)
|
|
|
|
type makeTaskParams struct {
|
|
sourcePath string
|
|
hashFile string
|
|
task *saasapi.Task
|
|
}
|
|
|
|
// 计算任务
|
|
type hashTask struct {
|
|
chunk *[]byte
|
|
hash string
|
|
blockSize uint64
|
|
index int
|
|
}
|
|
|
|
/*
|
|
// 计算结果
|
|
type hashResult struct {
|
|
hash string
|
|
blockSize uint64
|
|
index int
|
|
}
|
|
*/
|
|
|
|
func RunTaskMake(args ...string) error {
|
|
fs := flag.NewFlagSet("make", flag.ExitOnError)
|
|
sourcePath := paramSourceConvertedPath(fs)
|
|
hashFile := paramOutputHashFile(fs)
|
|
blockSize := paramBlockSize(fs)
|
|
desc := paramTaskDesc(fs)
|
|
appid := paramAppid(fs)
|
|
ds := paramDataSpaceId(fs)
|
|
|
|
if err := fs.Parse(args); err != nil {
|
|
fmt.Fprintln(os.Stderr, "command line parse error", "err", err)
|
|
return err
|
|
}
|
|
|
|
if fs.NArg() > 0 || len(*sourcePath) == 0 || len(*hashFile) == 0 || len(*ds) == 0 {
|
|
fs.PrintDefaults()
|
|
return nil
|
|
}
|
|
|
|
if strings.ToLower(*ds) == "openid" && len(*appid) == 0 {
|
|
fmt.Fprintln(os.Stderr, "appid must be set when data space is openid")
|
|
return nil
|
|
}
|
|
|
|
blockSizeNum, err := ParseByteSize(*blockSize)
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, "Error parsing block size", "err", err)
|
|
fmt.Fprintln(os.Stderr, "Using default 200M")
|
|
blockSizeNum = 200 * 1024 * 1024
|
|
|
|
}
|
|
|
|
if blockSizeNum < blockSizeMin || blockSizeNum > blockSizeMax {
|
|
fmt.Fprintln(os.Stderr, "block size error", "min", blockSizeMin, "max", blockSizeMax)
|
|
return nil
|
|
}
|
|
|
|
makeTaskParams := &makeTaskParams{
|
|
sourcePath: *sourcePath,
|
|
hashFile: *hashFile,
|
|
task: &saasapi.Task{
|
|
TaskBlockSize: blockSizeNum,
|
|
TaskDescription: *desc,
|
|
Appid: *appid,
|
|
DataspaceId: *ds,
|
|
SourcePath: *sourcePath,
|
|
},
|
|
}
|
|
return doMakeHash(makeTaskParams, true)
|
|
}
|
|
|
|
func doMakeHash(makeTaskParams *makeTaskParams, firstLevel bool) error {
|
|
fsInfo, err := os.Stat(makeTaskParams.sourcePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !fsInfo.IsDir() {
|
|
// 如果是文件,直接计算
|
|
if firstLevel {
|
|
makeTaskParams.task.SourcePath = filepath.Dir(makeTaskParams.sourcePath)
|
|
}
|
|
err = doTaskMake(makeTaskParams)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
// 读取目录下信息
|
|
dirEntry, err := os.ReadDir(makeTaskParams.sourcePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 遍历目录
|
|
for _, dir := range dirEntry {
|
|
oldSourcePath := makeTaskParams.sourcePath
|
|
makeTaskParams.sourcePath = path.Join(makeTaskParams.sourcePath, dir.Name())
|
|
|
|
if err = doMakeHash(makeTaskParams, false); err != nil {
|
|
return err
|
|
}
|
|
makeTaskParams.sourcePath = oldSourcePath
|
|
}
|
|
}
|
|
|
|
if firstLevel {
|
|
return saveTaskFile(makeTaskParams)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func doTaskMake(makeTaskParams *makeTaskParams) error {
|
|
sourceFile, err := os.Open(makeTaskParams.sourcePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer sourceFile.Close()
|
|
|
|
fi, err := sourceFile.Stat()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
totalSize := uint64(fi.Size())
|
|
// 计算读取次数
|
|
readTimes := int(totalSize / makeTaskParams.task.TaskBlockSize)
|
|
if totalSize%makeTaskParams.task.TaskBlockSize != 0 {
|
|
readTimes++
|
|
}
|
|
|
|
tasks := make(chan *hashTask)
|
|
results := make(chan *hashTask)
|
|
|
|
// 启动工作协程
|
|
hashMaxWorker := runtime.GOMAXPROCS(0)
|
|
for range hashMaxWorker {
|
|
go hashWorker(tasks, results)
|
|
}
|
|
|
|
// 初始化读缓存
|
|
readBuffers := make([][]byte, hashMaxWorker)
|
|
for i := range hashMaxWorker {
|
|
readBuffers[i] = make([]byte, makeTaskParams.task.TaskBlockSize)
|
|
}
|
|
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
go func() {
|
|
for index := range readTimes {
|
|
buffer := &readBuffers[index%hashMaxWorker]
|
|
n, err := sourceFile.Read(*buffer)
|
|
|
|
if n > 0 {
|
|
wg.Add(1)
|
|
tasks <- &hashTask{
|
|
chunk: buffer,
|
|
index: index,
|
|
blockSize: uint64(n),
|
|
}
|
|
}
|
|
if err != nil {
|
|
break
|
|
}
|
|
}
|
|
close(tasks)
|
|
wg.Done()
|
|
}()
|
|
|
|
// 接收结果
|
|
var allResults []*hashTask
|
|
go func() {
|
|
for r := range results {
|
|
allResults = append(allResults, r)
|
|
fmt.Printf("\rhashed file [%v], block [%v]", makeTaskParams.sourcePath, r.index)
|
|
wg.Done()
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
close(results)
|
|
|
|
// 按索引排序结果
|
|
sort.Slice(allResults, func(i, j int) bool {
|
|
return allResults[i].index < allResults[j].index
|
|
})
|
|
|
|
// 输出结果
|
|
relPath, _ := filepath.Rel(makeTaskParams.task.GetSourcePath(), makeTaskParams.sourcePath)
|
|
fileInfo := &saasapi.FileInfo{
|
|
FileName: relPath,
|
|
FileSize: uint64(fi.Size()),
|
|
}
|
|
|
|
for _, r := range allResults {
|
|
fileInfo.FileBlocks = append(fileInfo.FileBlocks, &saasapi.FileBlock{
|
|
BlockSha256: r.hash,
|
|
BlockLength: r.blockSize,
|
|
})
|
|
}
|
|
makeTaskParams.task.TaskFileInfos = append(makeTaskParams.task.TaskFileInfos, fileInfo)
|
|
|
|
makeTaskParams.task.TaskSize += uint64(fi.Size())
|
|
|
|
fmt.Println("")
|
|
return nil
|
|
}
|
|
|
|
// hash计算协程
|
|
func hashWorker(tasks <-chan *hashTask, results chan<- *hashTask) {
|
|
h := sha256.New()
|
|
for t := range tasks {
|
|
h.Write((*t.chunk)[:t.blockSize])
|
|
t.hash = hex.EncodeToString(h.Sum(nil))
|
|
results <- t
|
|
h.Reset()
|
|
}
|
|
}
|
|
|
|
func saveTaskFile(makeTaskParams *makeTaskParams) error {
|
|
taskFile, err := os.Create(makeTaskParams.hashFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer taskFile.Close()
|
|
|
|
h := sha256.New()
|
|
|
|
for _, fileInfo := range makeTaskParams.task.TaskFileInfos {
|
|
for _, fileBlock := range fileInfo.FileBlocks {
|
|
h.Write([]byte(fileBlock.BlockSha256))
|
|
}
|
|
}
|
|
|
|
makeTaskParams.task.TaskSha256 = hex.EncodeToString(h.Sum(nil))
|
|
|
|
_, err = taskFile.WriteString(protojson.Format(makeTaskParams.task))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|