diff --git a/cmd.pb.go b/cmd.pb.go index cf6791c..c6206ae 100644 --- a/cmd.pb.go +++ b/cmd.pb.go @@ -193,11 +193,14 @@ type SaasReq struct { state protoimpl.MessageState `protogen:"open.v1"` UseridType UserIdType `protobuf:"varint,1,opt,name=userid_type,json=useridType,proto3,enum=saasapi.UserIdType" json:"userid_type,omitempty"` // 用户ID类型 Appid string `protobuf:"bytes,2,opt,name=appid,proto3" json:"appid,omitempty"` // 小程序/小游戏/公众号/视频号的appid - Async bool `protobuf:"varint,4,opt,name=async,proto3" json:"async,omitempty"` // 是否异步执行 // Types that are valid to be assigned to Cmd: // // *SaasReq_Write // *SaasReq_Read + // *SaasReq_ColumnWrite + // *SaasReq_TaskList + // *SaasReq_TaskCancel + // *SaasReq_TaskDetail Cmd isSaasReq_Cmd `protobuf_oneof:"cmd"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -247,13 +250,6 @@ func (x *SaasReq) GetAppid() string { return "" } -func (x *SaasReq) GetAsync() bool { - if x != nil { - return x.Async - } - return false -} - func (x *SaasReq) GetCmd() isSaasReq_Cmd { if x != nil { return x.Cmd @@ -279,6 +275,42 @@ func (x *SaasReq) GetRead() *Read { return nil } +func (x *SaasReq) GetColumnWrite() *ColumnWrite { + if x != nil { + if x, ok := x.Cmd.(*SaasReq_ColumnWrite); ok { + return x.ColumnWrite + } + } + return nil +} + +func (x *SaasReq) GetTaskList() *TaskList { + if x != nil { + if x, ok := x.Cmd.(*SaasReq_TaskList); ok { + return x.TaskList + } + } + return nil +} + +func (x *SaasReq) GetTaskCancel() *TaskCancel { + if x != nil { + if x, ok := x.Cmd.(*SaasReq_TaskCancel); ok { + return x.TaskCancel + } + } + return nil +} + +func (x *SaasReq) GetTaskDetail() *TaskDetail { + if x != nil { + if x, ok := x.Cmd.(*SaasReq_TaskDetail); ok { + return x.TaskDetail + } + } + return nil +} + type isSaasReq_Cmd interface { isSaasReq_Cmd() } @@ -291,16 +323,42 @@ type SaasReq_Read struct { Read *Read `protobuf:"bytes,11,opt,name=read,proto3,oneof"` // 批量读取 } +type SaasReq_ColumnWrite struct { + ColumnWrite *ColumnWrite `protobuf:"bytes,12,opt,name=column_write,json=columnWrite,proto3,oneof"` // 全量列式写入 +} + +type SaasReq_TaskList struct { + TaskList *TaskList `protobuf:"bytes,20,opt,name=task_list,json=taskList,proto3,oneof"` // 任务列表 +} + +type SaasReq_TaskCancel struct { + TaskCancel *TaskCancel `protobuf:"bytes,21,opt,name=task_cancel,json=taskCancel,proto3,oneof"` // 取消任务 +} + +type SaasReq_TaskDetail struct { + TaskDetail *TaskDetail `protobuf:"bytes,22,opt,name=task_detail,json=taskDetail,proto3,oneof"` // 任务详情 +} + func (*SaasReq_Write) isSaasReq_Cmd() {} func (*SaasReq_Read) isSaasReq_Cmd() {} +func (*SaasReq_ColumnWrite) isSaasReq_Cmd() {} + +func (*SaasReq_TaskList) isSaasReq_Cmd() {} + +func (*SaasReq_TaskCancel) isSaasReq_Cmd() {} + +func (*SaasReq_TaskDetail) isSaasReq_Cmd() {} + // Write 批量写入命令 type Write struct { - state protoimpl.MessageState `protogen:"open.v1"` - WriteCmds []*WriteCmd `protobuf:"bytes,1,rep,name=write_cmds,json=writeCmds,proto3" json:"write_cmds,omitempty"` // 批量写入命令 - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Async bool `protobuf:"varint,1,opt,name=async,proto3" json:"async,omitempty"` // 是否异步执行 + IsClearAllFirst bool `protobuf:"varint,2,opt,name=is_clear_all_first,json=isClearAllFirst,proto3" json:"is_clear_all_first,omitempty"` // 是否先执行清空 + WriteCmds []*WriteCmd `protobuf:"bytes,3,rep,name=write_cmds,json=writeCmds,proto3" json:"write_cmds,omitempty"` // 批量写入命令 + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *Write) Reset() { @@ -333,6 +391,20 @@ func (*Write) Descriptor() ([]byte, []int) { return file_cmd_proto_rawDescGZIP(), []int{1} } +func (x *Write) GetAsync() bool { + if x != nil { + return x.Async + } + return false +} + +func (x *Write) GetIsClearAllFirst() bool { + if x != nil { + return x.IsClearAllFirst + } + return false +} + func (x *Write) GetWriteCmds() []*WriteCmd { if x != nil { return x.WriteCmds @@ -347,7 +419,6 @@ type WriteCmd struct { WriteBytes *Bytes `protobuf:"bytes,2,opt,name=write_bytes,json=writeBytes,proto3" json:"write_bytes,omitempty"` // byte区域 WriteUint32S *Uint32S `protobuf:"bytes,3,opt,name=write_uint32s,json=writeUint32s,proto3" json:"write_uint32s,omitempty"` // uint32区域 WriteFlagsWithExpire *FlagsWithExpire `protobuf:"bytes,4,opt,name=write_flags_with_expire,json=writeFlagsWithExpire,proto3" json:"write_flags_with_expire,omitempty"` // 标志位区域 - IsFullOverwrite bool `protobuf:"varint,5,opt,name=is_full_overwrite,json=isFullOverwrite,proto3" json:"is_full_overwrite,omitempty"` // 是否全量覆盖 unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -410,13 +481,6 @@ func (x *WriteCmd) GetWriteFlagsWithExpire() *FlagsWithExpire { return nil } -func (x *WriteCmd) GetIsFullOverwrite() bool { - if x != nil { - return x.IsFullOverwrite - } - return false -} - // Bytes 写入byte区域 type Bytes struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -735,19 +799,201 @@ func (x *ReadCmd) GetUserid() string { return "" } +// ColumnWrite 全量列式写入命令 +type ColumnWrite struct { + state protoimpl.MessageState `protogen:"open.v1"` + WriteBytes *Bytes `protobuf:"bytes,2,opt,name=write_bytes,json=writeBytes,proto3" json:"write_bytes,omitempty"` // byte区域 + WriteUint32S *Uint32S `protobuf:"bytes,3,opt,name=write_uint32s,json=writeUint32s,proto3" json:"write_uint32s,omitempty"` // uint32区域 + WriteFlagsWithExpire *FlagsWithExpire `protobuf:"bytes,4,opt,name=write_flags_with_expire,json=writeFlagsWithExpire,proto3" json:"write_flags_with_expire,omitempty"` // 标志位区域 + IsClearAllFirst bool `protobuf:"varint,5,opt,name=is_clear_all_first,json=isClearAllFirst,proto3" json:"is_clear_all_first,omitempty"` // 是否先执行清空 + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ColumnWrite) Reset() { + *x = ColumnWrite{} + mi := &file_cmd_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ColumnWrite) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ColumnWrite) ProtoMessage() {} + +func (x *ColumnWrite) ProtoReflect() protoreflect.Message { + mi := &file_cmd_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ColumnWrite.ProtoReflect.Descriptor instead. +func (*ColumnWrite) Descriptor() ([]byte, []int) { + return file_cmd_proto_rawDescGZIP(), []int{9} +} + +func (x *ColumnWrite) GetWriteBytes() *Bytes { + if x != nil { + return x.WriteBytes + } + return nil +} + +func (x *ColumnWrite) GetWriteUint32S() *Uint32S { + if x != nil { + return x.WriteUint32S + } + return nil +} + +func (x *ColumnWrite) GetWriteFlagsWithExpire() *FlagsWithExpire { + if x != nil { + return x.WriteFlagsWithExpire + } + return nil +} + +func (x *ColumnWrite) GetIsClearAllFirst() bool { + if x != nil { + return x.IsClearAllFirst + } + return false +} + +// TaskList 任务列表 +type TaskList struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *TaskList) Reset() { + *x = TaskList{} + mi := &file_cmd_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *TaskList) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TaskList) ProtoMessage() {} + +func (x *TaskList) ProtoReflect() protoreflect.Message { + mi := &file_cmd_proto_msgTypes[10] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TaskList.ProtoReflect.Descriptor instead. +func (*TaskList) Descriptor() ([]byte, []int) { + return file_cmd_proto_rawDescGZIP(), []int{10} +} + +// TaskCancel 取消任务 +type TaskCancel struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *TaskCancel) Reset() { + *x = TaskCancel{} + mi := &file_cmd_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *TaskCancel) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TaskCancel) ProtoMessage() {} + +func (x *TaskCancel) ProtoReflect() protoreflect.Message { + mi := &file_cmd_proto_msgTypes[11] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TaskCancel.ProtoReflect.Descriptor instead. +func (*TaskCancel) Descriptor() ([]byte, []int) { + return file_cmd_proto_rawDescGZIP(), []int{11} +} + +// TaskDetail 任务详情 +type TaskDetail struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *TaskDetail) Reset() { + *x = TaskDetail{} + mi := &file_cmd_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *TaskDetail) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TaskDetail) ProtoMessage() {} + +func (x *TaskDetail) ProtoReflect() protoreflect.Message { + mi := &file_cmd_proto_msgTypes[12] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TaskDetail.ProtoReflect.Descriptor instead. +func (*TaskDetail) Descriptor() ([]byte, []int) { + return file_cmd_proto_rawDescGZIP(), []int{12} +} + // SaasRes 命令返回 type SaasRes struct { state protoimpl.MessageState `protogen:"open.v1"` - Code ErrorCode `protobuf:"varint,1,opt,name=code,proto3,enum=saasapi.ErrorCode" json:"code,omitempty"` // 返回码 - Status string `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` // 返回信息的文本提示 - CmdRes []*CmdsResItem `protobuf:"bytes,3,rep,name=cmd_res,json=cmdRes,proto3" json:"cmd_res,omitempty"` // 返回的命令 + Code ErrorCode `protobuf:"varint,1,opt,name=code,proto3,enum=saasapi.ErrorCode" json:"code,omitempty"` // 返回码 + Status string `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` // 返回信息的文本提示 + SuccCmdCount uint32 `protobuf:"varint,3,opt,name=succ_cmd_count,json=succCmdCount,proto3" json:"succ_cmd_count,omitempty"` // 成功的命令数量 + FailCmdCount uint32 `protobuf:"varint,4,opt,name=fail_cmd_count,json=failCmdCount,proto3" json:"fail_cmd_count,omitempty"` // 失败的命令数量 + CmdRes []*CmdsResItem `protobuf:"bytes,5,rep,name=cmd_res,json=cmdRes,proto3" json:"cmd_res,omitempty"` // 返回的命令 unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *SaasRes) Reset() { *x = SaasRes{} - mi := &file_cmd_proto_msgTypes[9] + mi := &file_cmd_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -759,7 +1005,7 @@ func (x *SaasRes) String() string { func (*SaasRes) ProtoMessage() {} func (x *SaasRes) ProtoReflect() protoreflect.Message { - mi := &file_cmd_proto_msgTypes[9] + mi := &file_cmd_proto_msgTypes[13] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -772,7 +1018,7 @@ func (x *SaasRes) ProtoReflect() protoreflect.Message { // Deprecated: Use SaasRes.ProtoReflect.Descriptor instead. func (*SaasRes) Descriptor() ([]byte, []int) { - return file_cmd_proto_rawDescGZIP(), []int{9} + return file_cmd_proto_rawDescGZIP(), []int{13} } func (x *SaasRes) GetCode() ErrorCode { @@ -789,6 +1035,20 @@ func (x *SaasRes) GetStatus() string { return "" } +func (x *SaasRes) GetSuccCmdCount() uint32 { + if x != nil { + return x.SuccCmdCount + } + return 0 +} + +func (x *SaasRes) GetFailCmdCount() uint32 { + if x != nil { + return x.FailCmdCount + } + return 0 +} + func (x *SaasRes) GetCmdRes() []*CmdsResItem { if x != nil { return x.CmdRes @@ -804,13 +1064,14 @@ type CmdsResItem struct { Bytes []byte `protobuf:"bytes,3,opt,name=bytes,proto3" json:"bytes,omitempty"` // byte区域 Uint32S []uint32 `protobuf:"varint,4,rep,packed,name=uint32s,proto3" json:"uint32s,omitempty"` // uint32区域 FlagsWithExpire []*FlagWithExpire `protobuf:"bytes,5,rep,name=flags_with_expire,json=flagsWithExpire,proto3" json:"flags_with_expire,omitempty"` // 标志位区域 + LastModifyTime uint32 `protobuf:"varint,6,opt,name=last_modify_time,json=lastModifyTime,proto3" json:"last_modify_time,omitempty"` // 最后修改时间 unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *CmdsResItem) Reset() { *x = CmdsResItem{} - mi := &file_cmd_proto_msgTypes[10] + mi := &file_cmd_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -822,7 +1083,7 @@ func (x *CmdsResItem) String() string { func (*CmdsResItem) ProtoMessage() {} func (x *CmdsResItem) ProtoReflect() protoreflect.Message { - mi := &file_cmd_proto_msgTypes[10] + mi := &file_cmd_proto_msgTypes[14] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -835,7 +1096,7 @@ func (x *CmdsResItem) ProtoReflect() protoreflect.Message { // Deprecated: Use CmdsResItem.ProtoReflect.Descriptor instead. func (*CmdsResItem) Descriptor() ([]byte, []int) { - return file_cmd_proto_rawDescGZIP(), []int{10} + return file_cmd_proto_rawDescGZIP(), []int{14} } func (x *CmdsResItem) GetCmdIndex() uint32 { @@ -873,30 +1134,43 @@ func (x *CmdsResItem) GetFlagsWithExpire() []*FlagWithExpire { return nil } +func (x *CmdsResItem) GetLastModifyTime() uint32 { + if x != nil { + return x.LastModifyTime + } + return 0 +} + var File_cmd_proto protoreflect.FileDescriptor const file_cmd_proto_rawDesc = "" + "\n" + - "\tcmd.proto\x12\asaasapi\"\xbf\x01\n" + + "\tcmd.proto\x12\asaasapi\"\x86\x03\n" + "\aSaasReq\x124\n" + "\vuserid_type\x18\x01 \x01(\x0e2\x13.saasapi.UserIdTypeR\n" + "useridType\x12\x14\n" + - "\x05appid\x18\x02 \x01(\tR\x05appid\x12\x14\n" + - "\x05async\x18\x04 \x01(\bR\x05async\x12&\n" + + "\x05appid\x18\x02 \x01(\tR\x05appid\x12&\n" + "\x05write\x18\n" + " \x01(\v2\x0e.saasapi.WriteH\x00R\x05write\x12#\n" + - "\x04read\x18\v \x01(\v2\r.saasapi.ReadH\x00R\x04readB\x05\n" + - "\x03cmd\"9\n" + - "\x05Write\x120\n" + + "\x04read\x18\v \x01(\v2\r.saasapi.ReadH\x00R\x04read\x129\n" + + "\fcolumn_write\x18\f \x01(\v2\x14.saasapi.ColumnWriteH\x00R\vcolumnWrite\x120\n" + + "\ttask_list\x18\x14 \x01(\v2\x11.saasapi.TaskListH\x00R\btaskList\x126\n" + + "\vtask_cancel\x18\x15 \x01(\v2\x13.saasapi.TaskCancelH\x00R\n" + + "taskCancel\x126\n" + + "\vtask_detail\x18\x16 \x01(\v2\x13.saasapi.TaskDetailH\x00R\n" + + "taskDetailB\x05\n" + + "\x03cmd\"|\n" + + "\x05Write\x12\x14\n" + + "\x05async\x18\x01 \x01(\bR\x05async\x12+\n" + + "\x12is_clear_all_first\x18\x02 \x01(\bR\x0fisClearAllFirst\x120\n" + "\n" + - "write_cmds\x18\x01 \x03(\v2\x11.saasapi.WriteCmdR\twriteCmds\"\x87\x02\n" + + "write_cmds\x18\x03 \x03(\v2\x11.saasapi.WriteCmdR\twriteCmds\"\xdb\x01\n" + "\bWriteCmd\x12\x16\n" + "\x06userid\x18\x01 \x01(\tR\x06userid\x12/\n" + "\vwrite_bytes\x18\x02 \x01(\v2\x0e.saasapi.BytesR\n" + "writeBytes\x125\n" + "\rwrite_uint32s\x18\x03 \x01(\v2\x10.saasapi.Uint32sR\fwriteUint32s\x12O\n" + - "\x17write_flags_with_expire\x18\x04 \x01(\v2\x18.saasapi.FlagsWithExpireR\x14writeFlagsWithExpire\x12*\n" + - "\x11is_full_overwrite\x18\x05 \x01(\bR\x0fisFullOverwrite\"O\n" + + "\x17write_flags_with_expire\x18\x04 \x01(\v2\x18.saasapi.FlagsWithExpireR\x14writeFlagsWithExpire\"O\n" + "\x05Bytes\x12\x14\n" + "\x05bytes\x18\x01 \x01(\fR\x05bytes\x12\x17\n" + "\aindex_1\x18\x02 \x01(\x04R\x06index1\x12\x17\n" + @@ -914,17 +1188,32 @@ const file_cmd_proto_rawDesc = "" + "\x04Read\x12-\n" + "\tread_cmds\x18\x01 \x03(\v2\x10.saasapi.ReadCmdR\breadCmds\"!\n" + "\aReadCmd\x12\x16\n" + - "\x06userid\x18\x01 \x01(\tR\x06userid\"x\n" + + "\x06userid\x18\x01 \x01(\tR\x06userid\"\xf3\x01\n" + + "\vColumnWrite\x12/\n" + + "\vwrite_bytes\x18\x02 \x01(\v2\x0e.saasapi.BytesR\n" + + "writeBytes\x125\n" + + "\rwrite_uint32s\x18\x03 \x01(\v2\x10.saasapi.Uint32sR\fwriteUint32s\x12O\n" + + "\x17write_flags_with_expire\x18\x04 \x01(\v2\x18.saasapi.FlagsWithExpireR\x14writeFlagsWithExpire\x12+\n" + + "\x12is_clear_all_first\x18\x05 \x01(\bR\x0fisClearAllFirst\"\n" + + "\n" + + "\bTaskList\"\f\n" + + "\n" + + "TaskCancel\"\f\n" + + "\n" + + "TaskDetail\"\xc4\x01\n" + "\aSaasRes\x12&\n" + "\x04code\x18\x01 \x01(\x0e2\x12.saasapi.ErrorCodeR\x04code\x12\x16\n" + - "\x06status\x18\x02 \x01(\tR\x06status\x12-\n" + - "\acmd_res\x18\x03 \x03(\v2\x14.saasapi.CmdsResItemR\x06cmdRes\"\xd1\x01\n" + + "\x06status\x18\x02 \x01(\tR\x06status\x12$\n" + + "\x0esucc_cmd_count\x18\x03 \x01(\rR\fsuccCmdCount\x12$\n" + + "\x0efail_cmd_count\x18\x04 \x01(\rR\ffailCmdCount\x12-\n" + + "\acmd_res\x18\x05 \x03(\v2\x14.saasapi.CmdsResItemR\x06cmdRes\"\xfb\x01\n" + "\vCmdsResItem\x12\x1b\n" + "\tcmd_index\x18\x01 \x01(\rR\bcmdIndex\x120\n" + "\bcmd_code\x18\x02 \x01(\x0e2\x15.saasapi.CmdErrorCodeR\acmdCode\x12\x14\n" + "\x05bytes\x18\x03 \x01(\fR\x05bytes\x12\x18\n" + "\auint32s\x18\x04 \x03(\rR\auint32s\x12C\n" + - "\x11flags_with_expire\x18\x05 \x03(\v2\x17.saasapi.FlagWithExpireR\x0fflagsWithExpire*&\n" + + "\x11flags_with_expire\x18\x05 \x03(\v2\x17.saasapi.FlagWithExpireR\x0fflagsWithExpire\x12(\n" + + "\x10last_modify_time\x18\x06 \x01(\rR\x0elastModifyTime*&\n" + "\n" + "UserIdType\x12\f\n" + "\bDEVICEID\x10\x00\x12\n" + @@ -945,7 +1234,7 @@ const file_cmd_proto_rawDesc = "" + "CMDS_LIMIT\x10r\x12\r\n" + "\tCMDS_NULL\x10s*\x16\n" + "\fCmdErrorCode\x12\x06\n" + - "\x02OK\x10\x00B\vZ\t./saasapib\x06proto3" + "\x02OK\x10\x00B!Z\x1fe.coding.net/rta/public/saasapib\x06proto3" var ( file_cmd_proto_rawDescOnce sync.Once @@ -960,7 +1249,7 @@ func file_cmd_proto_rawDescGZIP() []byte { } var file_cmd_proto_enumTypes = make([]protoimpl.EnumInfo, 3) -var file_cmd_proto_msgTypes = make([]protoimpl.MessageInfo, 11) +var file_cmd_proto_msgTypes = make([]protoimpl.MessageInfo, 15) var file_cmd_proto_goTypes = []any{ (UserIdType)(0), // 0: saasapi.UserIdType (ErrorCode)(0), // 1: saasapi.ErrorCode @@ -974,28 +1263,39 @@ var file_cmd_proto_goTypes = []any{ (*FlagWithExpire)(nil), // 9: saasapi.FlagWithExpire (*Read)(nil), // 10: saasapi.Read (*ReadCmd)(nil), // 11: saasapi.ReadCmd - (*SaasRes)(nil), // 12: saasapi.SaasRes - (*CmdsResItem)(nil), // 13: saasapi.CmdsResItem + (*ColumnWrite)(nil), // 12: saasapi.ColumnWrite + (*TaskList)(nil), // 13: saasapi.TaskList + (*TaskCancel)(nil), // 14: saasapi.TaskCancel + (*TaskDetail)(nil), // 15: saasapi.TaskDetail + (*SaasRes)(nil), // 16: saasapi.SaasRes + (*CmdsResItem)(nil), // 17: saasapi.CmdsResItem } var file_cmd_proto_depIdxs = []int32{ 0, // 0: saasapi.SaasReq.userid_type:type_name -> saasapi.UserIdType 4, // 1: saasapi.SaasReq.write:type_name -> saasapi.Write 10, // 2: saasapi.SaasReq.read:type_name -> saasapi.Read - 5, // 3: saasapi.Write.write_cmds:type_name -> saasapi.WriteCmd - 6, // 4: saasapi.WriteCmd.write_bytes:type_name -> saasapi.Bytes - 7, // 5: saasapi.WriteCmd.write_uint32s:type_name -> saasapi.Uint32s - 8, // 6: saasapi.WriteCmd.write_flags_with_expire:type_name -> saasapi.FlagsWithExpire - 9, // 7: saasapi.FlagsWithExpire.flags_with_expire:type_name -> saasapi.FlagWithExpire - 11, // 8: saasapi.Read.read_cmds:type_name -> saasapi.ReadCmd - 1, // 9: saasapi.SaasRes.code:type_name -> saasapi.ErrorCode - 13, // 10: saasapi.SaasRes.cmd_res:type_name -> saasapi.CmdsResItem - 2, // 11: saasapi.CmdsResItem.cmd_code:type_name -> saasapi.CmdErrorCode - 9, // 12: saasapi.CmdsResItem.flags_with_expire:type_name -> saasapi.FlagWithExpire - 13, // [13:13] is the sub-list for method output_type - 13, // [13:13] is the sub-list for method input_type - 13, // [13:13] is the sub-list for extension type_name - 13, // [13:13] is the sub-list for extension extendee - 0, // [0:13] is the sub-list for field type_name + 12, // 3: saasapi.SaasReq.column_write:type_name -> saasapi.ColumnWrite + 13, // 4: saasapi.SaasReq.task_list:type_name -> saasapi.TaskList + 14, // 5: saasapi.SaasReq.task_cancel:type_name -> saasapi.TaskCancel + 15, // 6: saasapi.SaasReq.task_detail:type_name -> saasapi.TaskDetail + 5, // 7: saasapi.Write.write_cmds:type_name -> saasapi.WriteCmd + 6, // 8: saasapi.WriteCmd.write_bytes:type_name -> saasapi.Bytes + 7, // 9: saasapi.WriteCmd.write_uint32s:type_name -> saasapi.Uint32s + 8, // 10: saasapi.WriteCmd.write_flags_with_expire:type_name -> saasapi.FlagsWithExpire + 9, // 11: saasapi.FlagsWithExpire.flags_with_expire:type_name -> saasapi.FlagWithExpire + 11, // 12: saasapi.Read.read_cmds:type_name -> saasapi.ReadCmd + 6, // 13: saasapi.ColumnWrite.write_bytes:type_name -> saasapi.Bytes + 7, // 14: saasapi.ColumnWrite.write_uint32s:type_name -> saasapi.Uint32s + 8, // 15: saasapi.ColumnWrite.write_flags_with_expire:type_name -> saasapi.FlagsWithExpire + 1, // 16: saasapi.SaasRes.code:type_name -> saasapi.ErrorCode + 17, // 17: saasapi.SaasRes.cmd_res:type_name -> saasapi.CmdsResItem + 2, // 18: saasapi.CmdsResItem.cmd_code:type_name -> saasapi.CmdErrorCode + 9, // 19: saasapi.CmdsResItem.flags_with_expire:type_name -> saasapi.FlagWithExpire + 20, // [20:20] is the sub-list for method output_type + 20, // [20:20] is the sub-list for method input_type + 20, // [20:20] is the sub-list for extension type_name + 20, // [20:20] is the sub-list for extension extendee + 0, // [0:20] is the sub-list for field type_name } func init() { file_cmd_proto_init() } @@ -1006,6 +1306,10 @@ func file_cmd_proto_init() { file_cmd_proto_msgTypes[0].OneofWrappers = []any{ (*SaasReq_Write)(nil), (*SaasReq_Read)(nil), + (*SaasReq_ColumnWrite)(nil), + (*SaasReq_TaskList)(nil), + (*SaasReq_TaskCancel)(nil), + (*SaasReq_TaskDetail)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -1013,7 +1317,7 @@ func file_cmd_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_cmd_proto_rawDesc), len(file_cmd_proto_rawDesc)), NumEnums: 3, - NumMessages: 11, + NumMessages: 15, NumExtensions: 0, NumServices: 0, }, diff --git a/cmd.proto b/cmd.proto index adf94a6..fd1ff62 100644 --- a/cmd.proto +++ b/cmd.proto @@ -2,23 +2,29 @@ syntax = "proto3"; package saasapi; -option go_package = "./saasapi"; +option go_package = "e.coding.net/rta/public/saasapi"; // SaasReq 命令请求 message SaasReq { UserIdType userid_type = 1; // 用户ID类型 string appid = 2; // 小程序/小游戏/公众号/视频号的appid - bool async = 4; // 是否异步执行 oneof cmd { Write write = 10; // 批量写入 Read read = 11; // 批量读取 + ColumnWrite column_write = 12; // 全量列式写入 + + TaskList task_list = 20; // 任务列表 + TaskCancel task_cancel = 21; // 取消任务 + TaskDetail task_detail = 22; // 任务详情 } } // Write 批量写入命令 message Write { - repeated WriteCmd write_cmds = 1; // 批量写入命令 + bool async = 1; // 是否异步执行 + bool is_clear_all_first = 2; // 是否先执行清空 + repeated WriteCmd write_cmds = 3; // 批量写入命令 } // WriteCmd 写入命令 @@ -27,7 +33,6 @@ message WriteCmd { Bytes write_bytes = 2; // byte区域 Uint32s write_uint32s = 3; // uint32区域 FlagsWithExpire write_flags_with_expire = 4; // 标志位区域 - bool is_full_overwrite = 5; // 是否全量覆盖 } // Bytes 写入byte区域 @@ -73,11 +78,36 @@ message ReadCmd { string userid = 1; // 用户ID } +// ColumnWrite 全量列式写入命令 +message ColumnWrite { + Bytes write_bytes = 2; // byte区域 + Uint32s write_uint32s = 3; // uint32区域 + FlagsWithExpire write_flags_with_expire = 4; // 标志位区域 + bool is_clear_all_first = 5; // 是否先执行清空 +} + +// TaskList 任务列表 +message TaskList { + +} + +// TaskCancel 取消任务 +message TaskCancel { + +} + +// TaskDetail 任务详情 +message TaskDetail { + +} + // SaasRes 命令返回 message SaasRes { ErrorCode code = 1; // 返回码 string status = 2; // 返回信息的文本提示 - repeated CmdsResItem cmd_res = 3; // 返回的命令 + uint32 succ_cmd_count = 3; // 成功的命令数量 + uint32 fail_cmd_count = 4; // 失败的命令数量 + repeated CmdsResItem cmd_res = 5; // 返回的命令 } // CmdsResItem 读取命令返回内容 @@ -87,6 +117,7 @@ message CmdsResItem { bytes bytes = 3; // byte区域 repeated uint32 uint32s = 4; // uint32区域 repeated FlagWithExpire flags_with_expire = 5; // 标志位区域 + uint32 last_modify_time = 6; // 最后修改时间 } // ErrorCode 返回码 @@ -103,11 +134,10 @@ enum ErrorCode { QPS_LIMIT = 113; // 并发请求量超限 CMDS_LIMIT = 114; // 命令数量超限 CMDS_NULL = 115; // 命令为空 - } enum CmdErrorCode { - OK = 0; // 成功 + OK = 0; // 成功 } diff --git a/cmd/saastool/.gitignore b/cmd/saastool/.gitignore index d39a3ff..0cfca10 100644 --- a/cmd/saastool/.gitignore +++ b/cmd/saastool/.gitignore @@ -1,2 +1,3 @@ debug/ -saastool \ No newline at end of file +saastool +saastool_linux \ No newline at end of file diff --git a/cmd/saastool/column_write.go b/cmd/saastool/column_write.go new file mode 100644 index 0000000..4ac6c39 --- /dev/null +++ b/cmd/saastool/column_write.go @@ -0,0 +1,5 @@ +package main + +func RunColumnWrite(args ...string) error { + return nil +} diff --git a/cmd/saastool/config.go b/cmd/saastool/config.go index dbf0d2d..4de8098 100644 --- a/cmd/saastool/config.go +++ b/cmd/saastool/config.go @@ -1,25 +1,14 @@ package main import ( + "e.coding.net/rta/public/saasapi/pkg/saashttp" "github.com/BurntSushi/toml" ) // Config 配置 type Config struct { - Auth Auth - ApiUrls ApiUrls -} - -// DB 配置 -type Auth struct { - Account string - Token string -} - -type ApiUrls struct { - UrlBase string - Write string - Read string + Auth saashttp.Auth + ApiUrls saashttp.ApiUrls } // LoadConfigFile 加载配置文件 diff --git a/cmd/saastool/convert.go b/cmd/saastool/convert.go new file mode 100644 index 0000000..08bd065 --- /dev/null +++ b/cmd/saastool/convert.go @@ -0,0 +1,189 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "os" + "path" + "strings" + + "e.coding.net/rta/public/saasapi" + "google.golang.org/protobuf/encoding/protojson" +) + +// TODO 转换加速 + +type convertParams struct { + targetCfg *TargetConfig + sourcePath string + destPath string +} + +func RunConvert(args ...string) error { + fs := flag.NewFlagSet("convert", flag.ExitOnError) + targetCfgFile := paramTargets(fs) + sourcePath := paramSourcePath(fs) + destPath := paramDestPath(fs) + + if err := fs.Parse(args); err != nil { + fmt.Println("command line parse error", "err", err) + return err + } + + if fs.NArg() > 0 || *targetCfgFile == "" || len(*sourcePath) == 0 || len(*destPath) == 0 { + fs.PrintDefaults() + return nil + } + + targetCfg, err := LoadTargetFile(*targetCfgFile) + if err != nil { + fmt.Println("LoadConfigFile error", "err", err) + return err + } + + convertParams := convertParams{ + targetCfg: targetCfg, + sourcePath: *sourcePath, + destPath: *destPath, + } + + return doConvert(convertParams) +} + +func doConvert(convertParams convertParams) error { + fsInfo, err := os.Stat(convertParams.sourcePath) + if err != nil { + return err + } + + if !fsInfo.IsDir() { + // 如果是文件,直接写入 + return doFileConvert(convertParams) + } + + // 读取目录下信息 + dirEntry, err := os.ReadDir(convertParams.sourcePath) + if err != nil { + return err + } + + // 遍历目录 + for _, dir := range dirEntry { + newParam := convertParams + newParam.sourcePath = path.Join(convertParams.sourcePath, dir.Name()) + + if dir.IsDir() { + newParam.destPath = path.Join(convertParams.destPath, dir.Name()) + } + + if err = doConvert(newParam); err != nil { + return err + } + } + + return nil +} + +func doFileConvert(convertParams convertParams) error { + // 读取文件并按行遍历,以\t分割为两列,第一列为userid,第二列解析为string数组 + sourceFile, err := os.Open(convertParams.sourcePath) + if err != nil { + return err + } + defer sourceFile.Close() + + if _, err = os.Stat(convertParams.destPath); os.IsNotExist(err) { + os.MkdirAll(convertParams.destPath, os.ModePerm) + } + + destName := path.Join(convertParams.destPath, path.Base(convertParams.sourcePath)+".converted") + destFile, err := os.Create(destName) + if err != nil { + return err + } + defer destFile.Close() + + scaner := bufio.NewScanner(sourceFile) + destWriter := bufio.NewWriter(destFile) + defer destWriter.Flush() + + jasonMarshal := protojson.MarshalOptions{Multiline: false, Indent: ""} + + processedLine := 0 + for scaner.Scan() { + line := scaner.Text() + if line == "" { + continue + } + + // 按\t分割为两列 + parts := strings.Split(line, "\t") + if len(parts) != 2 { + continue + } + + // 读取userid + userid := parts[0] + value := parts[1] + value = strings.ReplaceAll(value, "[", "") + value = strings.ReplaceAll(value, "]", "") + // 第二列解析为string数组 + targets := strings.Split(value, " ") + + saasWriteCmd := &saasapi.WriteCmd{ + Userid: userid, + } + if len(userid) == 0 || len(targets) == 0 { + continue + } + for _, target := range targets { + if targetinfo, ok := convertParams.targetCfg.Targets[target]; ok { + if targetinfo.WriteByte != nil { + if saasWriteCmd.WriteBytes == nil { + saasWriteCmd.WriteBytes = &saasapi.Bytes{} + } + saasWriteCmd.WriteBytes.Bytes = append(saasWriteCmd.WriteBytes.Bytes, *targetinfo.WriteByte) + if targetinfo.WriteBytePos < 64 { + saasWriteCmd.WriteBytes.Index_1 |= 1 << targetinfo.WriteBytePos + } else if targetinfo.WriteBytePos < 128 { + saasWriteCmd.WriteBytes.Index_2 |= 1 << (targetinfo.WriteBytePos - 64) + } + } + + if targetinfo.WriteUint32 != nil { + if saasWriteCmd.WriteUint32S == nil { + saasWriteCmd.WriteUint32S = &saasapi.Uint32S{} + } + saasWriteCmd.WriteUint32S.Uint32S = append(saasWriteCmd.WriteUint32S.Uint32S, *targetinfo.WriteUint32) + saasWriteCmd.WriteUint32S.Index_1 |= 1 << targetinfo.WriteUint32Pos + } + + if targetinfo.WriteFlag != nil && targetinfo.WriteExpire != nil { + if saasWriteCmd.WriteFlagsWithExpire == nil { + saasWriteCmd.WriteFlagsWithExpire = &saasapi.FlagsWithExpire{} + } + saasWriteCmd.WriteFlagsWithExpire.FlagsWithExpire = append( + saasWriteCmd.WriteFlagsWithExpire.FlagsWithExpire, &saasapi.FlagWithExpire{ + Flag: *targetinfo.WriteFlag, + Expire: *targetinfo.WriteExpire, + }) + saasWriteCmd.WriteFlagsWithExpire.Index_1 |= 1 << targetinfo.WriteFlagWithExpirePos + } + } + } + + // 写入文件 + destWriter.WriteString(jasonMarshal.Format(saasWriteCmd)) + destWriter.WriteString("\n") + + processedLine++ + if processedLine%100000 == 0 { + fmt.Printf("\rconverted records: %v [%v]", processedLine, destName) + } + + } + + fmt.Printf("\rconverted records: %v [%v]\n", processedLine, destName) + return nil +} diff --git a/cmd/saastool/help.go b/cmd/saastool/help.go index 5bc800c..77f4638 100644 --- a/cmd/saastool/help.go +++ b/cmd/saastool/help.go @@ -16,8 +16,13 @@ Usage: [[command] [arguments]] The commands are: - write Write user's bytes / uint32s / flags - read Read user's bytes / uint32s / flags + write Write user's 'bytes / uint32s / flags' + read Read user's 'bytes / uint32s / flags' + columnwrite Write columns for 'deviceid / openid' users + + tasklist List tasks + taskcancel Cancel task + taskdetail Show task detail "help" is the default command. diff --git a/cmd/saastool/main.go b/cmd/saastool/main.go index 5c982c8..09edd3a 100644 --- a/cmd/saastool/main.go +++ b/cmd/saastool/main.go @@ -23,10 +23,22 @@ func Run(args ...string) error { return RunHelp(args...) case "write": return RunWrite(args...) - //case "read": - // return RunRead(args...) + case "read": + return RunRead(args...) + case "columnwrite": + return RunColumnWrite(args...) + case "convert": + return RunConvert(args...) + case "tasklist": + return RunTaskList(args...) + case "taskcancel": + return RunTaskCancel(args...) + case "taskdetail": + return RunTaskDetail(args...) + case "verify": + return RunVerify(args...) default: - err := fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'dmptool help' for usage`, name) + err := fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'saastool help' for usage`, name) slog.Warn(err.Error()) return err } diff --git a/cmd/saastool/params.go b/cmd/saastool/params.go index b577997..f799b81 100644 --- a/cmd/saastool/params.go +++ b/cmd/saastool/params.go @@ -11,8 +11,16 @@ func paramTargets(fs *flag.FlagSet) *string { return fs.String("targets", "", "target setting") } -func paramFromPath(fs *flag.FlagSet) *string { - return fs.String("from", "", "Data path source for write command. (*required*)") +func paramSourcePath(fs *flag.FlagSet) *string { + return fs.String("source", "", "Data path source for write command.") +} + +func paramDestPath(fs *flag.FlagSet) *string { + return fs.String("dest", "", "Data path destination for write command.") +} + +func paramAppid(fs *flag.FlagSet) *string { + return fs.String("appid", "", "Wechat appid") } func paramBatchSize(fs *flag.FlagSet) *uint { @@ -22,3 +30,7 @@ func paramBatchSize(fs *flag.FlagSet) *uint { func paramAsync(fs *flag.FlagSet) *bool { return fs.Bool("async", false, "Async mode") } + +func paramClear(fs *flag.FlagSet) *bool { + return fs.Bool("clear", false, "Clear all data before write") +} diff --git a/cmd/saastool/read.go b/cmd/saastool/read.go index 06ab7d0..d3e0474 100644 --- a/cmd/saastool/read.go +++ b/cmd/saastool/read.go @@ -1 +1,5 @@ package main + +func RunRead(args ...string) error { + return nil +} diff --git a/cmd/saastool/task_cancel.go b/cmd/saastool/task_cancel.go new file mode 100644 index 0000000..99d5af9 --- /dev/null +++ b/cmd/saastool/task_cancel.go @@ -0,0 +1,5 @@ +package main + +func RunTaskCancel(args ...string) error { + return nil +} diff --git a/cmd/saastool/task_detail.go b/cmd/saastool/task_detail.go new file mode 100644 index 0000000..be7d123 --- /dev/null +++ b/cmd/saastool/task_detail.go @@ -0,0 +1,5 @@ +package main + +func RunTaskDetail(args ...string) error { + return nil +} diff --git a/cmd/saastool/task_list.go b/cmd/saastool/task_list.go new file mode 100644 index 0000000..e96cd2c --- /dev/null +++ b/cmd/saastool/task_list.go @@ -0,0 +1,5 @@ +package main + +func RunTaskList(args ...string) error { + return nil +} diff --git a/cmd/saastool/term.go b/cmd/saastool/term.go new file mode 100644 index 0000000..48aa7ed --- /dev/null +++ b/cmd/saastool/term.go @@ -0,0 +1,25 @@ +package main + +import ( + "os" + "syscall" + "unsafe" +) + +// https://man7.org/linux/man-pages/man2/TIOCSWINSZ.2const.html + +// winSize console窗口大小 +type winSize struct { + wsRow uint16 + wsCols uint16 + wsXPixels uint16 + wxYPixels uint16 +} + +// getConsoleSize 获取控制台窗口大小 +func getConsoleSize() (cols, rows int) { + var sz winSize + _, _, _ = syscall.Syscall(syscall.SYS_IOCTL, + os.Stdout.Fd(), uintptr(syscall.TIOCGWINSZ), uintptr(unsafe.Pointer(&sz))) + return int(sz.wsCols), int(sz.wsRow) +} diff --git a/cmd/saastool/verify.go b/cmd/saastool/verify.go new file mode 100644 index 0000000..9ade8e4 --- /dev/null +++ b/cmd/saastool/verify.go @@ -0,0 +1,5 @@ +package main + +func RunVerify(args ...string) error { + return nil +} diff --git a/cmd/saastool/write.go b/cmd/saastool/write.go index 01e1412..a1df702 100644 --- a/cmd/saastool/write.go +++ b/cmd/saastool/write.go @@ -5,36 +5,40 @@ import ( "flag" "fmt" "log/slog" + "net/http" "os" "path" - "strings" "e.coding.net/rta/public/saasapi" - "google.golang.org/protobuf/proto" + "e.coding.net/rta/public/saasapi/pkg/saashttp" + "google.golang.org/protobuf/encoding/protojson" ) type writeParams struct { - cfg *Config - targetCfg *TargetConfig - batchSize uint - dataPath string - async bool + cfg *Config + sourcePath string + appid string + batchSize uint + async bool + clear bool + saasHttp *saashttp.SaasClient } func RunWrite(args ...string) error { fs := flag.NewFlagSet("write", flag.ExitOnError) cfgFile := paramConfig(fs) - targetCfgFile := paramTargets(fs) - dataPath := paramFromPath(fs) + sourcePath := paramSourcePath(fs) + appid := paramAppid(fs) batchSize := paramBatchSize(fs) async := paramAsync(fs) + clear := paramClear(fs) if err := fs.Parse(args); err != nil { fmt.Println("command line parse error", "err", err) return err } - if fs.NArg() > 0 || *targetCfgFile == "" || len(*dataPath) == 0 { + if fs.NArg() > 0 || len(*sourcePath) == 0 { fs.PrintDefaults() return nil } @@ -44,26 +48,25 @@ func RunWrite(args ...string) error { slog.Error("LoadConfigFile error", "err", err) return err } - - targetCfg, err := LoadTargetFile(*targetCfgFile) - if err != nil { - fmt.Println("LoadConfigFile error", "err", err) - return err - } - writeParams := writeParams{ - cfg: cfg, - targetCfg: targetCfg, - batchSize: *batchSize, - dataPath: *dataPath, - async: *async, + cfg: cfg, + sourcePath: *sourcePath, + appid: *appid, + batchSize: *batchSize, + async: *async, + clear: *clear, + saasHttp: &saashttp.SaasClient{ + Client: http.Client{}, + ApiUrls: cfg.ApiUrls, + Auth: cfg.Auth, + }, } return doWrite(writeParams) } func doWrite(writeParams writeParams) error { - fsInfo, err := os.Stat(writeParams.dataPath) + fsInfo, err := os.Stat(writeParams.sourcePath) if err != nil { return err } @@ -74,7 +77,7 @@ func doWrite(writeParams writeParams) error { } // 读取目录下信息 - dirEntry, err := os.ReadDir(writeParams.dataPath) + dirEntry, err := os.ReadDir(writeParams.sourcePath) if err != nil { return err } @@ -82,7 +85,7 @@ func doWrite(writeParams writeParams) error { // 遍历目录 for _, dir := range dirEntry { newParam := writeParams - newParam.dataPath = path.Join(writeParams.dataPath, dir.Name()) + newParam.sourcePath = path.Join(writeParams.sourcePath, dir.Name()) if err = doWrite(newParam); err != nil { return err @@ -94,7 +97,7 @@ func doWrite(writeParams writeParams) error { func doLoadFileToWrite(writeParams writeParams) error { // 读取文件并按行遍历,以\t分割为两列,第一列为userid,第二列解析为string数组 - file, err := os.Open(writeParams.dataPath) + file, err := os.Open(writeParams.sourcePath) if err != nil { return err } @@ -103,103 +106,66 @@ func doLoadFileToWrite(writeParams writeParams) error { scaner := bufio.NewScanner(file) saasWriteCmds := []*saasapi.WriteCmd{} - saasReq := &saasapi.SaasReq{ - UseridType: saasapi.UserIdType_DEVICEID, - Cmd: &saasapi.SaasReq_Write{ - Write: &saasapi.Write{}, - }, - } + succ := uint32(0) + succTotal := uint32(0) + total := uint32(0) for scaner.Scan() { line := scaner.Text() if line == "" { continue } - - // 按\t分割为两列 - parts := strings.Split(line, "\t") - if len(parts) != 2 { - continue - } - - // 读取userid - userid := parts[0] - value := parts[1] - value = strings.ReplaceAll(value, "[", "") - value = strings.ReplaceAll(value, "]", "") - // 第二列解析为string数组 - targets := strings.Split(value, " ") - - saasWriteCmd := &saasapi.WriteCmd{ - Userid: userid, - IsFullOverwrite: true, - } - if len(userid) == 0 || len(targets) == 0 { - continue - } - for _, target := range targets { - if targetinfo, ok := writeParams.targetCfg.Targets[target]; ok { - if targetinfo.WriteByte != nil { - if saasWriteCmd.WriteBytes == nil { - saasWriteCmd.WriteBytes = &saasapi.Bytes{} - } - saasWriteCmd.WriteBytes.Bytes = append(saasWriteCmd.WriteBytes.Bytes, *targetinfo.WriteByte) - if targetinfo.WriteBytePos < 64 { - saasWriteCmd.WriteBytes.Index_1 |= 1 << targetinfo.WriteBytePos - } else if targetinfo.WriteBytePos < 128 { - saasWriteCmd.WriteBytes.Index_2 |= 1 << (targetinfo.WriteBytePos - 64) - } - } - - if targetinfo.WriteUint32 != nil { - if saasWriteCmd.WriteUint32S == nil { - saasWriteCmd.WriteUint32S = &saasapi.Uint32S{} - } - saasWriteCmd.WriteUint32S.Uint32S = append(saasWriteCmd.WriteUint32S.Uint32S, *targetinfo.WriteUint32) - saasWriteCmd.WriteUint32S.Index_1 |= 1 << targetinfo.WriteUint32Pos - } - - if targetinfo.WriteFlag != nil && targetinfo.WriteExpire != nil { - if saasWriteCmd.WriteFlagsWithExpire == nil { - saasWriteCmd.WriteFlagsWithExpire = &saasapi.FlagsWithExpire{} - } - saasWriteCmd.WriteFlagsWithExpire.FlagsWithExpire = append( - saasWriteCmd.WriteFlagsWithExpire.FlagsWithExpire, &saasapi.FlagWithExpire{ - Flag: *targetinfo.WriteFlag, - Expire: *targetinfo.WriteExpire, - }) - saasWriteCmd.WriteFlagsWithExpire.Index_1 |= 1 << targetinfo.WriteFlagWithExpirePos - } - } + saasWriteCmd := &saasapi.WriteCmd{} + if err = protojson.Unmarshal([]byte(line), saasWriteCmd); err != nil { + return err } saasWriteCmds = append(saasWriteCmds, saasWriteCmd) + total++ if len(saasWriteCmds) == int(writeParams.batchSize) { - if err = submitWrite(saasReq, saasWriteCmds); err != nil { + if succ, _, err = submitWrite(writeParams, saasWriteCmds); err != nil { return err } + succTotal += succ + fmt.Printf("[%v] batch_succ = %v, succ_total = %v, total_processed = %v\n", writeParams.sourcePath, succ, succTotal, total) + saasWriteCmds = saasWriteCmds[:0] } } if len(saasWriteCmds) > 0 { - return submitWrite(saasReq, saasWriteCmds) + if succ, _, err = submitWrite(writeParams, saasWriteCmds); err != nil { + return err + } + succTotal += succ + fmt.Printf("[%v] batch_succ = %v, succ_total = %v, total_processed = %v\n", writeParams.sourcePath, succ, succTotal, total) } return nil } -func submitWrite(saasReq *saasapi.SaasReq, saasWriteCmds []*saasapi.WriteCmd) error { +func submitWrite(writeParams writeParams, saasWriteCmds []*saasapi.WriteCmd) (succ, total uint32, err error) { + saasReq := &saasapi.SaasReq{ + Cmd: &saasapi.SaasReq_Write{ + Write: &saasapi.Write{ + IsClearAllFirst: writeParams.clear, + Async: writeParams.async, + }, + }, + } + + if writeParams.appid != "" { + saasReq.UseridType = saasapi.UserIdType_OPENID + saasReq.Appid = writeParams.appid + } + saasReq.Cmd.(*saasapi.SaasReq_Write).Write.WriteCmds = saasWriteCmds - postBuf, err := proto.Marshal(saasReq) - if err != nil { - return err - } - fmt.Println(len(postBuf)) + total = uint32(len(saasWriteCmds)) + succ, err = writeParams.saasHttp.Write(saasReq) - return nil + return } diff --git a/go.mod b/go.mod index bbffe6d..2e4d304 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,12 @@ go 1.23.4 require ( github.com/BurntSushi/toml v1.5.0 + github.com/jroimartin/gocui v0.5.0 google.golang.org/protobuf v1.36.5 ) -require github.com/google/go-cmp v0.6.0 // indirect +require ( + github.com/google/go-cmp v0.6.0 // indirect + github.com/mattn/go-runewidth v0.0.9 // indirect + github.com/nsf/termbox-go v1.1.1 // indirect +) diff --git a/go.sum b/go.sum index fd78d25..84adc66 100644 --- a/go.sum +++ b/go.sum @@ -2,5 +2,11 @@ github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/jroimartin/gocui v0.5.0 h1:DCZc97zY9dMnHXJSJLLmx9VqiEnAj0yh0eTNpuEtG/4= +github.com/jroimartin/gocui v0.5.0/go.mod h1:l7Hz8DoYoL6NoYnlnaX6XCNR62G7J5FfSW5jEogzaxE= +github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/nsf/termbox-go v1.1.1 h1:nksUPLCb73Q++DwbYUBEglYBRPZyoXJdrj5L+TkjyZY= +github.com/nsf/termbox-go v1.1.1/go.mod h1:T0cTdVuOwf7pHQNtfhnEbzHbcNyCEcVU4YPpouCbVxo= google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= diff --git a/pkg/saashttp/cfg.go b/pkg/saashttp/cfg.go new file mode 100644 index 0000000..e62c50f --- /dev/null +++ b/pkg/saashttp/cfg.go @@ -0,0 +1,16 @@ +package saashttp + +type ApiUrls struct { + BaseUrl string + WritePath string + ReadPath string + ColumnWritePath string + TaskListPath string + TaskCancelPath string + TaskDetailPath string +} + +type Auth struct { + Account string + Token string +} diff --git a/pkg/saashttp/httpcli.go b/pkg/saashttp/httpcli.go new file mode 100644 index 0000000..5e3e684 --- /dev/null +++ b/pkg/saashttp/httpcli.go @@ -0,0 +1,140 @@ +package saashttp + +import ( + "bytes" + "crypto/md5" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "time" + + "e.coding.net/rta/public/saasapi" + "google.golang.org/protobuf/proto" +) + +type ResponseEncoder int + +const ( + RESPONSE_ENCODER_PROTOBUF ResponseEncoder = iota + RESPONSE_ENCODER_JSON = 1 +) + +type SaasClient struct { + Client http.Client + ApiUrls ApiUrls + Auth Auth + ResponseEncoder ResponseEncoder +} + +func (c *SaasClient) Write(saasReq *saasapi.SaasReq) (succ uint32, err error) { + postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.WritePath) + return c.post(postUrl, saasReq) +} + +func (c *SaasClient) Read(saasReq *saasapi.SaasReq) (succ uint32, err error) { + postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.ReadPath) + return c.post(postUrl, saasReq) +} + +func (c *SaasClient) ColumnWrite(saasReq *saasapi.SaasReq) (succ uint32, err error) { + postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.ColumnWritePath) + return c.post(postUrl, saasReq) +} + +func (c *SaasClient) TaskList(saasReq *saasapi.SaasReq) (succ uint32, err error) { + postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.TaskListPath) + return c.post(postUrl, saasReq) +} + +func (c *SaasClient) TaskCancel(saasReq *saasapi.SaasReq) (succ uint32, err error) { + postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.TaskCancelPath) + return c.post(postUrl, saasReq) +} + +func (c *SaasClient) TaskDetail(saasReq *saasapi.SaasReq) (succ uint32, err error) { + + postUrl := c.makeUrl(c.ApiUrls.BaseUrl, c.ApiUrls.TaskDetailPath) + return c.post(postUrl, saasReq) +} + +func (c *SaasClient) makeUrl(baseUrl, path string) string { + url, err := url.Parse(baseUrl) + if err != nil { + panic(err) + } + + url = url.JoinPath(path) + queryValues := url.Query() + switch c.ResponseEncoder { + case RESPONSE_ENCODER_PROTOBUF: + queryValues.Add("resmode", "protobuf") + case RESPONSE_ENCODER_JSON: + queryValues.Add("resmode", "json") + default: + queryValues.Add("resmode", "protobuf") + } + url.RawQuery = queryValues.Encode() + + return url.String() +} + +func (c *SaasClient) post(url string, saasReq *saasapi.SaasReq) (succ uint32, err error) { + postBuf, err := proto.Marshal(saasReq) + if err != nil { + fmt.Println("marshal saas req error", err) + return 0, err + } + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(postBuf)) + if err != nil { + fmt.Println("http new request error", err) + return 0, err + } + + timeStamp := strconv.FormatInt(time.Now().Unix(), 10) + md5byte := md5.Sum([]byte(c.Auth.Account + c.Auth.Token + timeStamp)) + authorization := hex.EncodeToString(md5byte[:]) + + req.Header.Add("Content-Type", "application/protobuf") + req.Header.Add("Account", c.Auth.Account) + req.Header.Add("Time", timeStamp) + req.Header.Add("Authorization", authorization) + res, err := c.Client.Do(req) + if err != nil { + fmt.Println("http send error", err) + return 0, err + } + + defer res.Body.Close() + + resBody, err := io.ReadAll(res.Body) + if err != nil { + fmt.Println("http read body error", err) + return 0, err + } + + saasRes := &saasapi.SaasRes{} + if c.ResponseEncoder == RESPONSE_ENCODER_PROTOBUF { + err = proto.Unmarshal(resBody, saasRes) + if err != nil { + fmt.Println("unmarshal response body to protobuf error", err) + return 0, err + } + } else { + err = json.Unmarshal(resBody, saasRes) + if err != nil { + fmt.Println("unmarshal response body to json error", err) + return 0, err + } + } + if saasRes.Code != saasapi.ErrorCode(saasapi.CmdErrorCode_OK) { + fmt.Println("saas api error", saasRes.Code, saasRes.Status) + return 0, err + } + return saasRes.GetSuccCmdCount(), nil + +}