From 8da742fea121da5f960af6348639d3611afb8f7c Mon Sep 17 00:00:00 2001 From: algotao Date: Sat, 12 Apr 2025 21:53:56 +0800 Subject: [PATCH] tasklist taskinfo --- cmd.pb.go | 214 +++++++++++++------- cmd.proto | 9 +- cmd/saastool/help.go | 3 +- cmd/saastool/main.go | 2 - cmd/saastool/params.go | 31 ++- cmd/saastool/read.go | 6 +- cmd/saastool/task.go | 13 +- cmd/saastool/task_create.go | 91 +++++++++ cmd/saastool/task_info.go | 71 +++++++ cmd/saastool/task_list.go | 83 ++++++++ cmd/saastool/{make_hash.go => task_make.go} | 64 +++--- cmd/saastool/write.go | 6 +- pkg/saashttp/cfg.go | 7 +- pkg/saashttp/httpcli.go | 23 ++- 14 files changed, 484 insertions(+), 139 deletions(-) rename cmd/saastool/{make_hash.go => task_make.go} (66%) diff --git a/cmd.pb.go b/cmd.pb.go index 013aa34..0627d63 100644 --- a/cmd.pb.go +++ b/cmd.pb.go @@ -71,6 +71,53 @@ func (UserIdType) EnumDescriptor() ([]byte, []int) { return file_cmd_proto_rawDescGZIP(), []int{0} } +// NameSpaceId 用户 ID 类型 +type NameSpaceId int32 + +const ( + NameSpaceId_DID NameSpaceId = 0 // 默认设备号命名空间 + NameSpaceId_WUID NameSpaceId = 1 // 默认WUID命名空间 +) + +// Enum value maps for NameSpaceId. +var ( + NameSpaceId_name = map[int32]string{ + 0: "DID", + 1: "WUID", + } + NameSpaceId_value = map[string]int32{ + "DID": 0, + "WUID": 1, + } +) + +func (x NameSpaceId) Enum() *NameSpaceId { + p := new(NameSpaceId) + *p = x + return p +} + +func (x NameSpaceId) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (NameSpaceId) Descriptor() protoreflect.EnumDescriptor { + return file_cmd_proto_enumTypes[1].Descriptor() +} + +func (NameSpaceId) Type() protoreflect.EnumType { + return &file_cmd_proto_enumTypes[1] +} + +func (x NameSpaceId) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use NameSpaceId.Descriptor instead. +func (NameSpaceId) EnumDescriptor() ([]byte, []int) { + return file_cmd_proto_rawDescGZIP(), []int{1} +} + // ErrorCode 返回码 type ErrorCode int32 @@ -153,11 +200,11 @@ func (x ErrorCode) String() string { } func (ErrorCode) Descriptor() protoreflect.EnumDescriptor { - return file_cmd_proto_enumTypes[1].Descriptor() + return file_cmd_proto_enumTypes[2].Descriptor() } func (ErrorCode) Type() protoreflect.EnumType { - return &file_cmd_proto_enumTypes[1] + return &file_cmd_proto_enumTypes[2] } func (x ErrorCode) Number() protoreflect.EnumNumber { @@ -166,7 +213,7 @@ func (x ErrorCode) Number() protoreflect.EnumNumber { // Deprecated: Use ErrorCode.Descriptor instead. func (ErrorCode) EnumDescriptor() ([]byte, []int) { - return file_cmd_proto_rawDescGZIP(), []int{1} + return file_cmd_proto_rawDescGZIP(), []int{2} } type CmdErrorCode int32 @@ -196,11 +243,11 @@ func (x CmdErrorCode) String() string { } func (CmdErrorCode) Descriptor() protoreflect.EnumDescriptor { - return file_cmd_proto_enumTypes[2].Descriptor() + return file_cmd_proto_enumTypes[3].Descriptor() } func (CmdErrorCode) Type() protoreflect.EnumType { - return &file_cmd_proto_enumTypes[2] + return &file_cmd_proto_enumTypes[3] } func (x CmdErrorCode) Number() protoreflect.EnumNumber { @@ -209,7 +256,7 @@ func (x CmdErrorCode) Number() protoreflect.EnumNumber { // Deprecated: Use CmdErrorCode.Descriptor instead. func (CmdErrorCode) EnumDescriptor() ([]byte, []int) { - return file_cmd_proto_rawDescGZIP(), []int{2} + return file_cmd_proto_rawDescGZIP(), []int{3} } type TaskStatus int32 @@ -254,11 +301,11 @@ func (x TaskStatus) String() string { } func (TaskStatus) Descriptor() protoreflect.EnumDescriptor { - return file_cmd_proto_enumTypes[3].Descriptor() + return file_cmd_proto_enumTypes[4].Descriptor() } func (TaskStatus) Type() protoreflect.EnumType { - return &file_cmd_proto_enumTypes[3] + return &file_cmd_proto_enumTypes[4] } func (x TaskStatus) Number() protoreflect.EnumNumber { @@ -267,14 +314,15 @@ func (x TaskStatus) Number() protoreflect.EnumNumber { // Deprecated: Use TaskStatus.Descriptor instead. func (TaskStatus) EnumDescriptor() ([]byte, []int) { - return file_cmd_proto_rawDescGZIP(), []int{3} + return file_cmd_proto_rawDescGZIP(), []int{4} } // SaasReq 命令请求 type SaasReq struct { - state protoimpl.MessageState `protogen:"open.v1"` - UseridType UserIdType `protobuf:"varint,1,opt,name=userid_type,json=useridType,proto3,enum=saasapi.UserIdType" json:"userid_type,omitempty"` // 用户ID类型 - Appid string `protobuf:"bytes,2,opt,name=appid,proto3" json:"appid,omitempty"` // 小程序/小游戏/公众号/视频号的appid + state protoimpl.MessageState `protogen:"open.v1"` + UseridType UserIdType `protobuf:"varint,1,opt,name=userid_type,json=useridType,proto3,enum=saasapi.UserIdType" json:"userid_type,omitempty"` // 用户ID类型 + Appid string `protobuf:"bytes,2,opt,name=appid,proto3" json:"appid,omitempty"` // 小程序/小游戏/公众号/视频号的appid + NamespaceId uint32 `protobuf:"varint,3,opt,name=namespace_id,json=namespaceId,proto3" json:"namespace_id,omitempty"` // 命名空间ID // Types that are valid to be assigned to Cmd: // // *SaasReq_Read @@ -334,6 +382,13 @@ func (x *SaasReq) GetAppid() string { return "" } +func (x *SaasReq) GetNamespaceId() uint32 { + if x != nil { + return x.NamespaceId + } + return 0 +} + func (x *SaasReq) GetCmd() isSaasReq_Cmd { if x != nil { return x.Cmd @@ -1319,7 +1374,7 @@ type FileBlock struct { state protoimpl.MessageState `protogen:"open.v1"` BlockSha256 string `protobuf:"bytes,1,opt,name=block_sha256,json=blockSha256,proto3" json:"block_sha256,omitempty"` // 块的sha256 BlockLength uint64 `protobuf:"varint,2,opt,name=block_length,json=blockLength,proto3" json:"block_length,omitempty"` // 块的字节长度 - Uploaded bool `protobuf:"varint,3,opt,name=uploaded,proto3" json:"uploaded,omitempty"` // 是否已上传(在TaskInfo请求返回) + Uploaded bool `protobuf:"varint,3,opt,name=uploaded,proto3" json:"uploaded,omitempty"` // 是否已上传(在TaskCreate/TaskInfo请求返回) unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -1807,11 +1862,12 @@ var File_cmd_proto protoreflect.FileDescriptor const file_cmd_proto_rawDesc = "" + "\n" + - "\tcmd.proto\x12\asaasapi\"\xe1\x03\n" + + "\tcmd.proto\x12\asaasapi\"\x84\x04\n" + "\aSaasReq\x124\n" + "\vuserid_type\x18\x01 \x01(\x0e2\x13.saasapi.UserIdTypeR\n" + "useridType\x12\x14\n" + - "\x05appid\x18\x02 \x01(\tR\x05appid\x12#\n" + + "\x05appid\x18\x02 \x01(\tR\x05appid\x12!\n" + + "\fnamespace_id\x18\x03 \x01(\rR\vnamespaceId\x12#\n" + "\x04read\x18\n" + " \x01(\v2\r.saasapi.ReadH\x00R\x04read\x12&\n" + "\x05write\x18\v \x01(\v2\x0e.saasapi.WriteH\x00R\x05write\x129\n" + @@ -1929,7 +1985,10 @@ const file_cmd_proto_rawDesc = "" + "\n" + "\x06OPENID\x10\x01\x12\f\n" + "\bINNERID1\x10\n" + - "*\xef\x02\n" + + "* \n" + + "\vNameSpaceId\x12\a\n" + + "\x03DID\x10\x00\x12\b\n" + + "\x04WUID\x10\x01*\xef\x02\n" + "\tErrorCode\x12\b\n" + "\x04SUCC\x10\x00\x12\x13\n" + "\x0fINVALID_ACCOUNT\x10e\x12\x15\n" + @@ -1975,72 +2034,73 @@ func file_cmd_proto_rawDescGZIP() []byte { return file_cmd_proto_rawDescData } -var file_cmd_proto_enumTypes = make([]protoimpl.EnumInfo, 4) +var file_cmd_proto_enumTypes = make([]protoimpl.EnumInfo, 5) var file_cmd_proto_msgTypes = make([]protoimpl.MessageInfo, 22) var file_cmd_proto_goTypes = []any{ (UserIdType)(0), // 0: saasapi.UserIdType - (ErrorCode)(0), // 1: saasapi.ErrorCode - (CmdErrorCode)(0), // 2: saasapi.CmdErrorCode - (TaskStatus)(0), // 3: saasapi.TaskStatus - (*SaasReq)(nil), // 4: saasapi.SaasReq - (*Read)(nil), // 5: saasapi.Read - (*ReadItem)(nil), // 6: saasapi.ReadItem - (*Write)(nil), // 7: saasapi.Write - (*WriteItem)(nil), // 8: saasapi.WriteItem - (*Bytes)(nil), // 9: saasapi.Bytes - (*Uint32S)(nil), // 10: saasapi.Uint32s - (*FlagsWithExpire)(nil), // 11: saasapi.FlagsWithExpire - (*FlagWithExpire)(nil), // 12: saasapi.FlagWithExpire - (*ColumnWrite)(nil), // 13: saasapi.ColumnWrite - (*Task)(nil), // 14: saasapi.Task - (*TaskList)(nil), // 15: saasapi.TaskList - (*TaskRun)(nil), // 16: saasapi.TaskRun - (*TaskDelete)(nil), // 17: saasapi.TaskDelete - (*TaskInfo)(nil), // 18: saasapi.TaskInfo - (*FileInfo)(nil), // 19: saasapi.FileInfo - (*FileBlock)(nil), // 20: saasapi.FileBlock - (*SaasRes)(nil), // 21: saasapi.SaasRes - (*ReadRes)(nil), // 22: saasapi.ReadRes - (*WriteRes)(nil), // 23: saasapi.WriteRes - (*ValueItem)(nil), // 24: saasapi.ValueItem - (*TaskListRes)(nil), // 25: saasapi.TaskListRes + (NameSpaceId)(0), // 1: saasapi.NameSpaceId + (ErrorCode)(0), // 2: saasapi.ErrorCode + (CmdErrorCode)(0), // 3: saasapi.CmdErrorCode + (TaskStatus)(0), // 4: saasapi.TaskStatus + (*SaasReq)(nil), // 5: saasapi.SaasReq + (*Read)(nil), // 6: saasapi.Read + (*ReadItem)(nil), // 7: saasapi.ReadItem + (*Write)(nil), // 8: saasapi.Write + (*WriteItem)(nil), // 9: saasapi.WriteItem + (*Bytes)(nil), // 10: saasapi.Bytes + (*Uint32S)(nil), // 11: saasapi.Uint32s + (*FlagsWithExpire)(nil), // 12: saasapi.FlagsWithExpire + (*FlagWithExpire)(nil), // 13: saasapi.FlagWithExpire + (*ColumnWrite)(nil), // 14: saasapi.ColumnWrite + (*Task)(nil), // 15: saasapi.Task + (*TaskList)(nil), // 16: saasapi.TaskList + (*TaskRun)(nil), // 17: saasapi.TaskRun + (*TaskDelete)(nil), // 18: saasapi.TaskDelete + (*TaskInfo)(nil), // 19: saasapi.TaskInfo + (*FileInfo)(nil), // 20: saasapi.FileInfo + (*FileBlock)(nil), // 21: saasapi.FileBlock + (*SaasRes)(nil), // 22: saasapi.SaasRes + (*ReadRes)(nil), // 23: saasapi.ReadRes + (*WriteRes)(nil), // 24: saasapi.WriteRes + (*ValueItem)(nil), // 25: saasapi.ValueItem + (*TaskListRes)(nil), // 26: saasapi.TaskListRes } var file_cmd_proto_depIdxs = []int32{ 0, // 0: saasapi.SaasReq.userid_type:type_name -> saasapi.UserIdType - 5, // 1: saasapi.SaasReq.read:type_name -> saasapi.Read - 7, // 2: saasapi.SaasReq.write:type_name -> saasapi.Write - 13, // 3: saasapi.SaasReq.column_write:type_name -> saasapi.ColumnWrite - 14, // 4: saasapi.SaasReq.task_create:type_name -> saasapi.Task - 15, // 5: saasapi.SaasReq.task_list:type_name -> saasapi.TaskList - 16, // 6: saasapi.SaasReq.task_run:type_name -> saasapi.TaskRun - 17, // 7: saasapi.SaasReq.task_delete:type_name -> saasapi.TaskDelete - 18, // 8: saasapi.SaasReq.task_info:type_name -> saasapi.TaskInfo - 6, // 9: saasapi.Read.read_items:type_name -> saasapi.ReadItem - 8, // 10: saasapi.Write.write_items:type_name -> saasapi.WriteItem - 9, // 11: saasapi.WriteItem.write_bytes:type_name -> saasapi.Bytes - 10, // 12: saasapi.WriteItem.write_uint32s:type_name -> saasapi.Uint32s - 11, // 13: saasapi.WriteItem.write_flags_with_expire:type_name -> saasapi.FlagsWithExpire - 12, // 14: saasapi.FlagsWithExpire.flags_with_expire:type_name -> saasapi.FlagWithExpire - 9, // 15: saasapi.ColumnWrite.write_bytes:type_name -> saasapi.Bytes - 10, // 16: saasapi.ColumnWrite.write_uint32s:type_name -> saasapi.Uint32s - 11, // 17: saasapi.ColumnWrite.write_flags_with_expire:type_name -> saasapi.FlagsWithExpire - 19, // 18: saasapi.Task.task_file_infos:type_name -> saasapi.FileInfo - 3, // 19: saasapi.Task.status:type_name -> saasapi.TaskStatus - 3, // 20: saasapi.TaskList.status_filter:type_name -> saasapi.TaskStatus - 20, // 21: saasapi.FileInfo.file_blocks:type_name -> saasapi.FileBlock - 1, // 22: saasapi.SaasRes.code:type_name -> saasapi.ErrorCode - 22, // 23: saasapi.SaasRes.read_res:type_name -> saasapi.ReadRes - 23, // 24: saasapi.SaasRes.write_res:type_name -> saasapi.WriteRes - 14, // 25: saasapi.SaasRes.task_create_res:type_name -> saasapi.Task - 25, // 26: saasapi.SaasRes.task_list_res:type_name -> saasapi.TaskListRes - 14, // 27: saasapi.SaasRes.task_run_res:type_name -> saasapi.Task - 14, // 28: saasapi.SaasRes.task_delete_res:type_name -> saasapi.Task - 14, // 29: saasapi.SaasRes.task_info_res:type_name -> saasapi.Task - 24, // 30: saasapi.ReadRes.cmd_res:type_name -> saasapi.ValueItem - 24, // 31: saasapi.WriteRes.cmd_res:type_name -> saasapi.ValueItem - 2, // 32: saasapi.ValueItem.cmd_code:type_name -> saasapi.CmdErrorCode - 12, // 33: saasapi.ValueItem.flags_with_expire:type_name -> saasapi.FlagWithExpire - 14, // 34: saasapi.TaskListRes.tasks:type_name -> saasapi.Task + 6, // 1: saasapi.SaasReq.read:type_name -> saasapi.Read + 8, // 2: saasapi.SaasReq.write:type_name -> saasapi.Write + 14, // 3: saasapi.SaasReq.column_write:type_name -> saasapi.ColumnWrite + 15, // 4: saasapi.SaasReq.task_create:type_name -> saasapi.Task + 16, // 5: saasapi.SaasReq.task_list:type_name -> saasapi.TaskList + 17, // 6: saasapi.SaasReq.task_run:type_name -> saasapi.TaskRun + 18, // 7: saasapi.SaasReq.task_delete:type_name -> saasapi.TaskDelete + 19, // 8: saasapi.SaasReq.task_info:type_name -> saasapi.TaskInfo + 7, // 9: saasapi.Read.read_items:type_name -> saasapi.ReadItem + 9, // 10: saasapi.Write.write_items:type_name -> saasapi.WriteItem + 10, // 11: saasapi.WriteItem.write_bytes:type_name -> saasapi.Bytes + 11, // 12: saasapi.WriteItem.write_uint32s:type_name -> saasapi.Uint32s + 12, // 13: saasapi.WriteItem.write_flags_with_expire:type_name -> saasapi.FlagsWithExpire + 13, // 14: saasapi.FlagsWithExpire.flags_with_expire:type_name -> saasapi.FlagWithExpire + 10, // 15: saasapi.ColumnWrite.write_bytes:type_name -> saasapi.Bytes + 11, // 16: saasapi.ColumnWrite.write_uint32s:type_name -> saasapi.Uint32s + 12, // 17: saasapi.ColumnWrite.write_flags_with_expire:type_name -> saasapi.FlagsWithExpire + 20, // 18: saasapi.Task.task_file_infos:type_name -> saasapi.FileInfo + 4, // 19: saasapi.Task.status:type_name -> saasapi.TaskStatus + 4, // 20: saasapi.TaskList.status_filter:type_name -> saasapi.TaskStatus + 21, // 21: saasapi.FileInfo.file_blocks:type_name -> saasapi.FileBlock + 2, // 22: saasapi.SaasRes.code:type_name -> saasapi.ErrorCode + 23, // 23: saasapi.SaasRes.read_res:type_name -> saasapi.ReadRes + 24, // 24: saasapi.SaasRes.write_res:type_name -> saasapi.WriteRes + 15, // 25: saasapi.SaasRes.task_create_res:type_name -> saasapi.Task + 26, // 26: saasapi.SaasRes.task_list_res:type_name -> saasapi.TaskListRes + 15, // 27: saasapi.SaasRes.task_run_res:type_name -> saasapi.Task + 15, // 28: saasapi.SaasRes.task_delete_res:type_name -> saasapi.Task + 15, // 29: saasapi.SaasRes.task_info_res:type_name -> saasapi.Task + 25, // 30: saasapi.ReadRes.cmd_res:type_name -> saasapi.ValueItem + 25, // 31: saasapi.WriteRes.cmd_res:type_name -> saasapi.ValueItem + 3, // 32: saasapi.ValueItem.cmd_code:type_name -> saasapi.CmdErrorCode + 13, // 33: saasapi.ValueItem.flags_with_expire:type_name -> saasapi.FlagWithExpire + 15, // 34: saasapi.TaskListRes.tasks:type_name -> saasapi.Task 35, // [35:35] is the sub-list for method output_type 35, // [35:35] is the sub-list for method input_type 35, // [35:35] is the sub-list for extension type_name @@ -2077,7 +2137,7 @@ func file_cmd_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_cmd_proto_rawDesc), len(file_cmd_proto_rawDesc)), - NumEnums: 4, + NumEnums: 5, NumMessages: 22, NumExtensions: 0, NumServices: 0, diff --git a/cmd.proto b/cmd.proto index efcf26c..b315114 100644 --- a/cmd.proto +++ b/cmd.proto @@ -8,7 +8,8 @@ option go_package = "e.coding.net/rta/public/saasapi"; message SaasReq { UserIdType userid_type = 1; // 用户ID类型 string appid = 2; // 小程序/小游戏/公众号/视频号的appid - + uint32 namespace_id = 3; // 命名空间ID + oneof cmd { Read read = 10; // 批量读取 Write write = 11; // 批量写入 @@ -80,6 +81,12 @@ enum UserIdType { INNERID1 = 10; // 内部ID1 } +// NameSpaceId 用户 ID 类型 +enum NameSpaceId { + DID = 0; // 默认设备号命名空间 + WUID = 1; // 默认WUID命名空间 +} + // ColumnWrite 全量列式写入命令 message ColumnWrite { bool is_clear_all_first = 1; // 是否先执行清空 diff --git a/cmd/saastool/help.go b/cmd/saastool/help.go index 4c19c2f..c65c8dd 100644 --- a/cmd/saastool/help.go +++ b/cmd/saastool/help.go @@ -20,8 +20,7 @@ Commands: columnwrite Write columns for 'deviceid / openid' users convert Convert data to write format - makehash Make file hash for upload task - + task Task commands "help" is the default command. diff --git a/cmd/saastool/main.go b/cmd/saastool/main.go index 01f830c..48480da 100644 --- a/cmd/saastool/main.go +++ b/cmd/saastool/main.go @@ -29,8 +29,6 @@ func Run(args ...string) error { return RunColumnWrite(args...) case "convert": return RunConvert(args...) - case "makehash": - return RunMakeHash(args...) case "verify": return RunVerify(args...) case "task": diff --git a/cmd/saastool/params.go b/cmd/saastool/params.go index 30c95ee..7be92f9 100644 --- a/cmd/saastool/params.go +++ b/cmd/saastool/params.go @@ -7,6 +7,9 @@ import ( "strings" ) +// paramConfig 设置并返回配置文件路径的命令行参数。 +// 该函数接收一个 flag.FlagSet 指针作为参数,用于注册 "-config" 标志, +// 默认值为 "cfg.toml",返回存储配置文件路径的字符串指针。 func paramConfig(fs *flag.FlagSet) *string { return fs.String("config", "cfg.toml", "Config file.") } @@ -18,10 +21,30 @@ func paramSourcePath(fs *flag.FlagSet) *string { return fs.String("source", "", "Source path or filename") } +func paramSourceConvertedPath(fs *flag.FlagSet) *string { + return fs.String("source", "", "Source path of the converted files") +} + +func paramTaskDesc(fs *flag.FlagSet) *string { + return fs.String("desc", "", "Task description") +} + func paramDestPath(fs *flag.FlagSet) *string { return fs.String("dest", "", "Destination path or filename") } +func paramOutputHashFile(fs *flag.FlagSet) *string { + return fs.String("hashfile", "", "Output hash file") +} + +func paramInputHashFile(fs *flag.FlagSet) *string { + return fs.String("hashfile", "", "Input hash file") +} + +func paramFilterStatus(fs *flag.FlagSet) *string { + return fs.String("status", "", "Filter status. enums 'all', 'waiting', 'running', 'success', 'fail', 'deleted'") +} + func paramAppid(fs *flag.FlagSet) *string { return fs.String("appid", "", "Wechat appid") } @@ -30,6 +53,10 @@ func paramUserids(fs *flag.FlagSet) *string { return fs.String("userids", "", "Device ID or Wechat UserID, separated by comma") } +func paramSha256(fs *flag.FlagSet) *string { + return fs.String("sha256", "", "Task SHA256 hash") +} + func paramBatchSize(fs *flag.FlagSet) *uint { return fs.Uint("batchsize", 10000, "Batch size to sync") } @@ -46,10 +73,6 @@ func paramBlockSize(fs *flag.FlagSet) uint64 { return num } -func paramAsync(fs *flag.FlagSet) *bool { - return fs.Bool("async", false, "Async mode") -} - func paramClear(fs *flag.FlagSet) *bool { return fs.Bool("clear", false, "Clear all data before write") } diff --git a/cmd/saastool/read.go b/cmd/saastool/read.go index f69ff96..5b178bb 100644 --- a/cmd/saastool/read.go +++ b/cmd/saastool/read.go @@ -53,9 +53,9 @@ func RunRead(args ...string) error { userids: idsSlice, appid: *appid, saasHttp: &saashttp.SaasClient{ - Client: http.Client{}, - ApiUrls: cfg.ApiUrls, - Auth: cfg.Auth, + Client: &http.Client{}, + ApiUrls: &cfg.ApiUrls, + Auth: &cfg.Auth, }, } diff --git a/cmd/saastool/task.go b/cmd/saastool/task.go index ee7561d..4820385 100644 --- a/cmd/saastool/task.go +++ b/cmd/saastool/task.go @@ -13,6 +13,8 @@ func RunTask(args ...string) error { switch name { case "", "help": return RunTaskHelp(args...) + case "make": + return RunTaskMake(args...) case "create": return RunTaskCreate(args...) case "list": @@ -37,12 +39,13 @@ const taskUsage = ` Usage: saastoola task COMMAND [OPTIONS] Commands: - create Create data file to task + make Make file hash for upload task + create Create a task on server list List tasks on server - run Run task on server - delete Delete task on server - info Get task info on server - upload Upload file block to server + run Run a task on server + delete Delete a task on server + info Get a task info on server + upload Upload task's file block to server "help" is the default command. diff --git a/cmd/saastool/task_create.go b/cmd/saastool/task_create.go index 8a3790a..f7af06d 100644 --- a/cmd/saastool/task_create.go +++ b/cmd/saastool/task_create.go @@ -1,5 +1,96 @@ package main +import ( + "flag" + "fmt" + "net/http" + "os" + + "e.coding.net/rta/public/saasapi" + "e.coding.net/rta/public/saasapi/pkg/saashttp" + "google.golang.org/protobuf/encoding/protojson" +) + +type createTaskParams struct { + hashFile string + appid string + task *saasapi.Task + saasHttp *saashttp.SaasClient +} + func RunTaskCreate(args ...string) error { + fs := flag.NewFlagSet("create", flag.ExitOnError) + cfgFile := paramConfig(fs) + // sourcePath := paramSourceConvertedPath(fs) + hashFile := paramInputHashFile(fs) + appid := paramAppid(fs) + + if err := fs.Parse(args); err != nil { + fmt.Println("command line parse error", "err", err) + return err + } + + if fs.NArg() > 0 || len(*hashFile) == 0 { + fs.PrintDefaults() + return nil + } + + cfg, err := LoadConfigFile(*cfgFile) + if err != nil { + fmt.Println("LoadConfigFile error", "err", err) + return err + } + + createTaskParams := createTaskParams{ + hashFile: *hashFile, + appid: *appid, + saasHttp: &saashttp.SaasClient{ + Client: &http.Client{}, + ApiUrls: &cfg.ApiUrls, + Auth: &cfg.Auth, + }, + task: &saasapi.Task{}, + } + + taskBuf, err := os.ReadFile(createTaskParams.hashFile) + if err != nil { + fmt.Println("open task file error", "err", err) + return err + } + + if err = protojson.Unmarshal(taskBuf, createTaskParams.task); err != nil { + fmt.Println("parse task file error", "err", err) + } + + return doTaskCreate(createTaskParams) +} + +func doTaskCreate(createTaskParams createTaskParams) error { + saasReq := &saasapi.SaasReq{ + Cmd: &saasapi.SaasReq_TaskCreate{ + TaskCreate: createTaskParams.task, + }, + } + + if createTaskParams.appid != "" { + saasReq.UseridType = saasapi.UserIdType_OPENID + saasReq.Appid = createTaskParams.appid + } + + res, err := createTaskParams.saasHttp.TaskCreate(saasReq) + + if err != nil { + fmt.Println("submit Create Task error", "err", err) + return err + } + + if res.Code != saasapi.ErrorCode_SUCC { + fmt.Println("task create failed", "code", res.Code, "status", res.Status) + return nil + } + + taskRes := res.GetTaskCreateRes() + + fmt.Printf("task res: %v\n", protojson.Format(taskRes)) return nil } diff --git a/cmd/saastool/task_info.go b/cmd/saastool/task_info.go index 1f58b5f..cddd074 100644 --- a/cmd/saastool/task_info.go +++ b/cmd/saastool/task_info.go @@ -1,5 +1,76 @@ package main +import ( + "flag" + "fmt" + "net/http" + + "e.coding.net/rta/public/saasapi" + "e.coding.net/rta/public/saasapi/pkg/saashttp" + "google.golang.org/protobuf/encoding/protojson" +) + +type infoTaskParams struct { + taskSha256 string + saasHttp *saashttp.SaasClient +} + func RunTaskInfo(args ...string) error { + fs := flag.NewFlagSet("create", flag.ExitOnError) + cfgFile := paramConfig(fs) + sha256 := paramSha256(fs) + + if err := fs.Parse(args); err != nil { + fmt.Println("command line parse error", "err", err) + return err + } + + if fs.NArg() > 0 || len(*sha256) == 0 { + fs.PrintDefaults() + return nil + } + + cfg, err := LoadConfigFile(*cfgFile) + if err != nil { + fmt.Println("LoadConfigFile error", "err", err) + return err + } + + infoTaskParams := infoTaskParams{ + taskSha256: *sha256, + saasHttp: &saashttp.SaasClient{ + Client: &http.Client{}, + ApiUrls: &cfg.ApiUrls, + Auth: &cfg.Auth, + }, + } + + return doTaskInfo(infoTaskParams) + +} + +func doTaskInfo(infoTaskParams infoTaskParams) error { + saasReq := &saasapi.SaasReq{ + Cmd: &saasapi.SaasReq_TaskInfo{ + TaskInfo: &saasapi.TaskInfo{ + TaskSha256: infoTaskParams.taskSha256, + }, + }, + } + + res, err := infoTaskParams.saasHttp.TaskInfo(saasReq) + if err != nil { + fmt.Println("submit Task info error", "err", err) + return err + } + + if res.Code != saasapi.ErrorCode_SUCC { + fmt.Println("task info failed", "code", res.Code, "status", res.Status) + return nil + } + + taskRes := res.GetTaskInfoRes() + + fmt.Printf("task res: %v\n", protojson.Format(taskRes)) return nil } diff --git a/cmd/saastool/task_list.go b/cmd/saastool/task_list.go index e96cd2c..a961e86 100644 --- a/cmd/saastool/task_list.go +++ b/cmd/saastool/task_list.go @@ -1,5 +1,88 @@ package main +import ( + "flag" + "fmt" + "net/http" + + "e.coding.net/rta/public/saasapi" + "e.coding.net/rta/public/saasapi/pkg/saashttp" + "google.golang.org/protobuf/encoding/protojson" +) + +type listTaskParams struct { + filterStatus saasapi.TaskStatus + saasHttp *saashttp.SaasClient +} + func RunTaskList(args ...string) error { + fs := flag.NewFlagSet("create", flag.ExitOnError) + cfgFile := paramConfig(fs) + filter := paramFilterStatus(fs) + + if err := fs.Parse(args); err != nil { + fmt.Println("command line parse error", "err", err) + return err + } + + if fs.NArg() > 0 { + fs.PrintDefaults() + return nil + } + + cfg, err := LoadConfigFile(*cfgFile) + if err != nil { + fmt.Println("LoadConfigFile error", "err", err) + return err + } + + listTaskParams := listTaskParams{ + saasHttp: &saashttp.SaasClient{ + Client: &http.Client{}, + ApiUrls: &cfg.ApiUrls, + Auth: &cfg.Auth, + }, + } + + switch *filter { + case "all": + listTaskParams.filterStatus = saasapi.TaskStatus_ALL + case "waiting": + listTaskParams.filterStatus = saasapi.TaskStatus_WAITING + case "running": + listTaskParams.filterStatus = saasapi.TaskStatus_RUNNING + case "success": + listTaskParams.filterStatus = saasapi.TaskStatus_SUCCESS + case "fail": + listTaskParams.filterStatus = saasapi.TaskStatus_FAIL + } + + return doTaskList(listTaskParams) +} + +func doTaskList(listTaskParams listTaskParams) error { + saasReq := &saasapi.SaasReq{ + Cmd: &saasapi.SaasReq_TaskList{ + TaskList: &saasapi.TaskList{ + StatusFilter: listTaskParams.filterStatus, + }, + }, + } + + res, err := listTaskParams.saasHttp.TaskList(saasReq) + + if err != nil { + fmt.Println("submit List Task error", "err", err) + return err + } + + if res.Code != saasapi.ErrorCode_SUCC { + fmt.Println("task list failed", "code", res.Code, "status", res.Status) + return nil + } + + taskRes := res.GetTaskListRes() + + fmt.Printf("task res: %v\n", protojson.Format(taskRes)) return nil } diff --git a/cmd/saastool/make_hash.go b/cmd/saastool/task_make.go similarity index 66% rename from cmd/saastool/make_hash.go rename to cmd/saastool/task_make.go index 08e605a..1bc0063 100644 --- a/cmd/saastool/make_hash.go +++ b/cmd/saastool/task_make.go @@ -20,9 +20,9 @@ const ( blockSizeMax = 200 * 1024 * 1024 ) -type makeHashParams struct { +type makeTaskParams struct { sourcePath string - destPath string + hashFile string task *saasapi.Task } @@ -39,18 +39,19 @@ type hashResult struct { index int } -func RunMakeHash(args ...string) error { - fs := flag.NewFlagSet("tasklocalmake", flag.ExitOnError) - sourcePath := paramSourcePath(fs) - destPath := paramDestPath(fs) +func RunTaskMake(args ...string) error { + fs := flag.NewFlagSet("make", flag.ExitOnError) + sourcePath := paramSourceConvertedPath(fs) + hashFile := paramOutputHashFile(fs) blockSize := paramBlockSize(fs) + desc := paramTaskDesc(fs) if err := fs.Parse(args); err != nil { fmt.Println("command line parse error", "err", err) return err } - if fs.NArg() > 0 || len(*sourcePath) == 0 || len(*destPath) == 0 { + if fs.NArg() > 0 || len(*sourcePath) == 0 || len(*hashFile) == 0 { fs.PrintDefaults() return nil } @@ -60,52 +61,49 @@ func RunMakeHash(args ...string) error { return nil } - makeHashParams := makeHashParams{ + makeTaskParams := makeTaskParams{ sourcePath: *sourcePath, - destPath: *destPath, + hashFile: *hashFile, task: &saasapi.Task{ - TaskBlockSize: blockSize, + TaskBlockSize: blockSize, + TaskDescription: *desc, }, } - return doMakeHash(makeHashParams) + return doMakeHash(makeTaskParams) } -func doMakeHash(makeHashParams makeHashParams) error { - fsInfo, err := os.Stat(makeHashParams.sourcePath) +func doMakeHash(makeTaskParams makeTaskParams) error { + fsInfo, err := os.Stat(makeTaskParams.sourcePath) if err != nil { return err } if !fsInfo.IsDir() { // 如果是文件,直接计算 - return doFileHash(makeHashParams) + return doTaskMake(makeTaskParams) } // 读取目录下信息 - dirEntry, err := os.ReadDir(makeHashParams.sourcePath) + dirEntry, err := os.ReadDir(makeTaskParams.sourcePath) if err != nil { return err } // 遍历目录 for _, dir := range dirEntry { - newParam := makeHashParams - newParam.sourcePath = path.Join(makeHashParams.sourcePath, dir.Name()) - - if dir.IsDir() { - newParam.destPath = path.Join(makeHashParams.destPath, dir.Name()) - } + newParam := makeTaskParams + newParam.sourcePath = path.Join(makeTaskParams.sourcePath, dir.Name()) if err = doMakeHash(newParam); err != nil { return err } } - return saveTaskFile(makeHashParams) + return saveTaskFile(makeTaskParams) } -func doFileHash(makeHashParams makeHashParams) error { - sourceFile, err := os.Open(makeHashParams.sourcePath) +func doTaskMake(makeTaskParams makeTaskParams) error { + sourceFile, err := os.Open(makeTaskParams.sourcePath) if err != nil { return err } @@ -129,12 +127,12 @@ func doFileHash(makeHashParams makeHashParams) error { wg.Add(1) go func() { index := 0 - buffer := make([]byte, makeHashParams.task.TaskBlockSize) + buffer := make([]byte, makeTaskParams.task.TaskBlockSize) for { n, err := sourceFile.Read(buffer) if n > 0 { wg.Add(1) - fmt.Printf("\rhashing file [%v], block [%v]", makeHashParams.sourcePath, index) + fmt.Printf("\rhashing file [%v], block [%v]", makeTaskParams.sourcePath, index) tasks <- hashTask{chunk: buffer[:n], index: index} index++ } @@ -164,7 +162,7 @@ func doFileHash(makeHashParams makeHashParams) error { // 输出结果 fileInfo := &saasapi.FileInfo{ - FileName: makeHashParams.sourcePath, + FileName: makeTaskParams.sourcePath, FileSize: uint64(fi.Size()), } for _, r := range allResults { @@ -173,7 +171,7 @@ func doFileHash(makeHashParams makeHashParams) error { BlockLength: r.blockSize, }) } - makeHashParams.task.TaskFileInfos = append(makeHashParams.task.TaskFileInfos, fileInfo) + makeTaskParams.task.TaskFileInfos = append(makeTaskParams.task.TaskFileInfos, fileInfo) fmt.Println("") return nil @@ -189,8 +187,8 @@ func hashWorker(tasks <-chan hashTask, results chan<- hashResult) { } } -func saveTaskFile(makeHashParams makeHashParams) error { - taskFile, err := os.Create(makeHashParams.destPath) +func saveTaskFile(makeTaskParams makeTaskParams) error { + taskFile, err := os.Create(makeTaskParams.hashFile) if err != nil { return err } @@ -198,15 +196,15 @@ func saveTaskFile(makeHashParams makeHashParams) error { h := sha256.New() - for _, fileInfo := range makeHashParams.task.TaskFileInfos { + for _, fileInfo := range makeTaskParams.task.TaskFileInfos { for _, fileBlock := range fileInfo.FileBlocks { h.Write([]byte(fileBlock.BlockSha256)) } } - makeHashParams.task.TaskSha256 = hex.EncodeToString(h.Sum(nil)) + makeTaskParams.task.TaskSha256 = hex.EncodeToString(h.Sum(nil)) - _, err = taskFile.WriteString(protojson.Format(makeHashParams.task)) + _, err = taskFile.WriteString(protojson.Format(makeTaskParams.task)) if err != nil { return err } diff --git a/cmd/saastool/write.go b/cmd/saastool/write.go index 84849a6..ac773c5 100644 --- a/cmd/saastool/write.go +++ b/cmd/saastool/write.go @@ -53,9 +53,9 @@ func RunWrite(args ...string) error { batchSize: *batchSize, clear: *clear, saasHttp: &saashttp.SaasClient{ - Client: http.Client{}, - ApiUrls: cfg.ApiUrls, - Auth: cfg.Auth, + Client: &http.Client{}, + ApiUrls: &cfg.ApiUrls, + Auth: &cfg.Auth, }, } diff --git a/pkg/saashttp/cfg.go b/pkg/saashttp/cfg.go index e62c50f..8b48656 100644 --- a/pkg/saashttp/cfg.go +++ b/pkg/saashttp/cfg.go @@ -5,9 +5,12 @@ type ApiUrls struct { WritePath string ReadPath string ColumnWritePath string + TaskCreatePath string TaskListPath string - TaskCancelPath string - TaskDetailPath string + TaskInfoPath string + TaskDeletePath string + TaskRunPath string + TaskUploadPath string } type Auth struct { diff --git a/pkg/saashttp/httpcli.go b/pkg/saashttp/httpcli.go index aefef79..2df7187 100644 --- a/pkg/saashttp/httpcli.go +++ b/pkg/saashttp/httpcli.go @@ -24,9 +24,9 @@ const ( ) type SaasClient struct { - Client http.Client - ApiUrls ApiUrls - Auth Auth + Client *http.Client + ApiUrls *ApiUrls + Auth *Auth ResponseEncoder ResponseEncoder } @@ -45,19 +45,28 @@ func (c *SaasClient) ColumnWrite(saasReq *saasapi.SaasReq) (saasRes *saasapi.Saa return c.post(postUrl, saasReq) } +func (c *SaasClient) TaskCreate(saasReq *saasapi.SaasReq) (saasRes *saasapi.SaasRes, err error) { + postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.TaskCreatePath) + return c.post(postUrl, saasReq) +} + func (c *SaasClient) TaskList(saasReq *saasapi.SaasReq) (saasRes *saasapi.SaasRes, err error) { postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.TaskListPath) return c.post(postUrl, saasReq) } -func (c *SaasClient) TaskCancel(saasReq *saasapi.SaasReq) (saasRes *saasapi.SaasRes, err error) { - postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.TaskCancelPath) +func (c *SaasClient) TaskDelete(saasReq *saasapi.SaasReq) (saasRes *saasapi.SaasRes, err error) { + postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.TaskDeletePath) return c.post(postUrl, saasReq) } -func (c *SaasClient) TaskDetail(saasReq *saasapi.SaasReq) (saasRes *saasapi.SaasRes, err error) { +func (c *SaasClient) TaskInfo(saasReq *saasapi.SaasReq) (saasRes *saasapi.SaasRes, err error) { + postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.TaskInfoPath) + return c.post(postUrl, saasReq) +} - postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.TaskDetailPath) +func (c *SaasClient) TaskRun(saasReq *saasapi.SaasReq) (saasRes *saasapi.SaasRes, err error) { + postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.TaskRunPath) return c.post(postUrl, saasReq) }