diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..861f93f --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.github +.git +.idea +.gitee diff --git a/README.md b/README.md new file mode 100644 index 0000000..6eccac1 --- /dev/null +++ b/README.md @@ -0,0 +1,130 @@ +# kfkqueue 消息队列,用于操作Kafka消息中间件 + + +## 简介 +~~~ +1. kfkqueue包,是基于[sarama]二次封装的Kafka客户端包,通过对[sarama]消费端、生产端进行二次封装,可以使用者快捷开发,无需再去学习了解[sarama] + 源码和实现逻辑,只需通过调用封装好的方法,就可以快捷使用 +2. 快捷使用方法包含:SyncProducer(同步生产消息)、AsyncProducer(异步生产消息)、Consumer(消费者),根据不同的场景进行调用 +3. 支持单机和集群部署的Kafka下使用 +~~~ + + +## 安装 +~~~ +1.可直接使用Go管理包进行安装:go get -u git@github.com/bobo-rs/kfkqueue +~~~ + +## 使用 +### 生产消息使用示例 +```go +package exmaple + +import "kfkqueue" + +type QueueCfg struct { + // 配置Kafka地址 + Addrs []string + + // 应答模式:0无需发送消息后返回应答结果【异步模式】,1.发送消息后只需Leader应答,-1.发送消息后需要Leader和追随者都应答 + RequiredAck int8 + + // 发送消息后把成功的信息通过channel管道返回:false不返回,true返回(适合1和-1模式) + Success bool + + // 发送消息后把失败的信息通过channel管道返回:false不返回,true返回 + Error bool + + // 分区主题,不传默认自动分配 + Topic string + + // 分区主题先每个消息的KEY,不传默认自动分配 + Key string + + // 分区主题组 + Group string +} + +var ( + Addrs = []string{"127.0.0.1:9092"} + Topics = []string{"queue_example_sync_topic", "queue_example_async_topic"} + Group = "queue_example_topic_group" +) + +func Exmaple_ClientQueue(cfg QueueCfg) *kfkqueue.SQueue { + return kfkqueue.New(kfkqueue.SQueue{ + Addrs: Addrs, + RequiredAck: cfg.RequiredAck, + Success: cfg.Success, + Error: cfg.Error, + Topic: cfg.Topic, + Topics: Topics, + Group: Group, + JobsMap: map[string]kfkqueue.Jobs{ + "ExampleJob": &ExampleJobs1{}, + "ExampleJob1": &ExampleJobs1{}, + "ExampleJob2": &ExampleJobs2{}, + }, + }) +} + +func Example_ClientSync(ack int8) *kfkqueue.SQueue { + topic := "queue_example_sync_topic" + return Exmaple_ClientQueue(QueueCfg{ + RequiredAck: ack, + Topic: topic, + Key: topic + "_key", + }) +} + +func Exmaple_ClientAsync() *kfkqueue.SQueue { + topic := "queue_example_async_topic" + return Exmaple_ClientQueue(QueueCfg{ + Topic: topic, + Success: true, + Error: true, + Key: topic + "_key", + }) +} + +func Test_SyncProducer(t *testing.T) { + client := Example_ClientSync(1) + ctx := context.Background() + for i := 0; i < 10000; i++ { + t := i + fmt.Println( + client.SyncProducer(ctx, struct { + JobName string + Value interface{} + }{JobName: "ExampleJob1", Value: struct { + Name string + Value int + Detail struct{ + Content string + Arr []int + } + }{ + Name: "张三" + strconv.Itoa(t), + Value: t, + Detail: struct { + Content string + Arr []int + }{Content: "不可能,绝对不可能" + strconv.Itoa(t), Arr: []int{t}}, + }}), + ) + } + fmt.Println() +} +``` +2. 消费者使用示例 +```go +func Test_Consumer(t *testing.T) { + ctx := context.Background() + fmt.Println( + Exmaple_ClientQueue(QueueCfg{}).Consumer(ctx), + ) +} +``` +## 依赖 +1. Kafka操作包:github.com/IBM/sarama +2. Goframe工具包:github.com/gogf/gf/v2 \ No newline at end of file diff --git a/exmaple/example.go b/exmaple/example.go new file mode 100644 index 0000000..4d25210 --- /dev/null +++ b/exmaple/example.go @@ -0,0 +1,68 @@ +package exmaple + +import "kfkqueue" + +type QueueCfg struct { + // 配置Kafka地址 + Addrs []string + + // 应答模式:0无需发送消息后返回应答结果【异步模式】,1.发送消息后只需Leader应答,-1.发送消息后需要Leader和追随者都应答 + RequiredAck int8 + + // 发送消息后把成功的信息通过channel管道返回:false不返回,true返回(适合1和-1模式) + Success bool + + // 发送消息后把失败的信息通过channel管道返回:false不返回,true返回 + Error bool + + // 分区主题,不传默认自动分配 + Topic string + + // 分区主题先每个消息的KEY,不传默认自动分配 + Key string + + // 分区主题组 + Group string +} + +var ( + Addrs = []string{"127.0.0.1:9092"} + Topics = []string{"queue_example_sync_topic", "queue_example_async_topic"} + Group = "queue_example_topic_group" +) + +func Exmaple_ClientQueue(cfg QueueCfg) *kfkqueue.SQueue { + return kfkqueue.New(kfkqueue.SQueue{ + Addrs: Addrs, + RequiredAck: cfg.RequiredAck, + Success: cfg.Success, + Error: cfg.Error, + Topic: cfg.Topic, + Topics: Topics, + Group: Group, + JobsMap: map[string]kfkqueue.Jobs{ + "ExampleJob": &ExampleJobs1{}, + "ExampleJob1": &ExampleJobs1{}, + "ExampleJob2": &ExampleJobs2{}, + }, + }) +} + +func Example_ClientSync(ack int8) *kfkqueue.SQueue { + topic := "queue_example_sync_topic" + return Exmaple_ClientQueue(QueueCfg{ + RequiredAck: ack, + Topic: topic, + Key: topic + "_key", + }) +} + +func Exmaple_ClientAsync() *kfkqueue.SQueue { + topic := "queue_example_async_topic" + return Exmaple_ClientQueue(QueueCfg{ + Topic: topic, + Success: true, + Error: true, + Key: topic + "_key", + }) +} diff --git a/exmaple/example_jobs.go b/exmaple/example_jobs.go new file mode 100644 index 0000000..525e933 --- /dev/null +++ b/exmaple/example_jobs.go @@ -0,0 +1,49 @@ +package exmaple + +import ( + "context" + "fmt" + "github.com/gogf/gf/v2/util/gconv" +) + +type ( + ExampleJobs1 struct { + } + ExampleJobs2 struct { + } + + Job1ValueItem struct { + Name string + Value int + Detail struct{ + Content string + Arr []int + } + } +) + +type Jobs struct { +} + +func (j *ExampleJobs1) Execute(ctx context.Context, value interface{}) { + fmt.Println("我是Example的Job1任务") + fmt.Println(value) + + // 数据 + //item := value.(map[string]interface{}) + //fmt.Println(item) + + data := Job1ValueItem{} + _ = gconv.Struct(value, &data) + fmt.Println(data) +} + +func (j *ExampleJobs2) Execute(ctx context.Context, value interface{}) { + fmt.Println("我是Example的Job2任务") + fmt.Println(value) + + + data := Job1ValueItem{} + _ = gconv.Struct(value, &data) + fmt.Println(data) +} diff --git a/exmaple/example_test.go b/exmaple/example_test.go new file mode 100644 index 0000000..dc59349 --- /dev/null +++ b/exmaple/example_test.go @@ -0,0 +1,62 @@ +package exmaple + +import ( + "context" + "fmt" + "strconv" + "testing" +) + +func Test_SyncProducer(t *testing.T) { + client := Example_ClientSync(1) + ctx := context.Background() + for i := 0; i < 10000; i++ { + t := i + fmt.Println( + client.SyncProducer(ctx, struct { + JobName string + Value interface{} + }{JobName: "ExampleJob1", Value: struct { + Name string + Value int + Detail struct{ + Content string + Arr []int + } + }{ + Name: "张三" + strconv.Itoa(t), + Value: t, + Detail: struct { + Content string + Arr []int + }{Content: "不可能,绝对不可能" + strconv.Itoa(t), Arr: []int{t}}, + }}), + ) + } + fmt.Println() +} + +func Test_AsyncProducer(t *testing.T) { + client := Exmaple_ClientAsync() + ctx := context.Background() + for i := 0; i < 10000; i++ { + t := i + client.AsyncProducer(ctx, struct { + JobName string + Value interface{} + }{ JobName: "ExampleJob2", Value: struct { + Name string + Value int + }{ + Name: "张三" + strconv.Itoa(t), + Value: t, + }}) + } +} + +func Test_Consumer(t *testing.T) { + ctx := context.Background() + fmt.Println( + Exmaple_ClientQueue(QueueCfg{}).Consumer(ctx), + ) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..5030bf0 --- /dev/null +++ b/go.mod @@ -0,0 +1,52 @@ +module kfkqueue + +go 1.20 + +require ( + github.com/IBM/sarama v1.42.1 + github.com/gogf/gf/v2 v2.6.2 +) + +require ( + github.com/BurntSushi/toml v1.2.0 // indirect + github.com/clbanning/mxj/v2 v2.7.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.4.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/fatih/color v1.16.0 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/gorilla/websocket v1.5.0 // indirect + github.com/grokify/html-strip-tags-go v0.0.1 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.16.7 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/magiconair/properties v1.8.6 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-runewidth v0.0.15 // indirect + github.com/olekukonko/tablewriter v0.0.5 // indirect + github.com/pierrec/lz4/v4 v4.1.18 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/rivo/uniseg v0.4.4 // indirect + github.com/rogpeppe/go-internal v1.12.0 // indirect + go.opentelemetry.io/otel v1.22.0 // indirect + go.opentelemetry.io/otel/metric v1.22.0 // indirect + go.opentelemetry.io/otel/sdk v1.22.0 // indirect + go.opentelemetry.io/otel/trace v1.22.0 // indirect + golang.org/x/crypto v0.14.0 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.16.0 // indirect + golang.org/x/text v0.13.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..b9deb34 --- /dev/null +++ b/go.sum @@ -0,0 +1,147 @@ +github.com/BurntSushi/toml v1.2.0 h1:Rt8g24XnyGTyglgET/PRUNlrUeu9F5L+7FilkXfZgs0= +github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ= +github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= +github.com/clbanning/mxj/v2 v2.7.0 h1:WA/La7UGCanFe5NpHF0Q3DNtnCsVoxbPKuyBNHWRyME= +github.com/clbanning/mxj/v2 v2.7.0/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn/Qo+ve2s= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= +github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= +github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/gogf/gf/v2 v2.6.2 h1:TvI1UEH2RDbgFVlJJjkc/6ct6+5zjbOS5MiJ2ESG8qg= +github.com/gogf/gf/v2 v2.6.2/go.mod h1:x2XONYcI4hRQ/4gMNbWHmZrNzSEIg20s2NULbzom5k0= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/grokify/html-strip-tags-go v0.0.1 h1:0fThFwLbW7P/kOiTBs03FsJSV9RM2M/Q/MOnCQxKMo0= +github.com/grokify/html-strip-tags-go v0.0.1/go.mod h1:2Su6romC5/1VXOQMaWL2yb618ARB8iVo6/DR99A6d78= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= +github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= +github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= +github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= +github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= +github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.opentelemetry.io/otel v1.22.0 h1:xS7Ku+7yTFvDfDraDIJVpw7XPyuHlB9MCiqqX5mcJ6Y= +go.opentelemetry.io/otel v1.22.0/go.mod h1:eoV4iAi3Ea8LkAEI9+GFT44O6T/D0GWAVFyZVCC6pMI= +go.opentelemetry.io/otel/metric v1.22.0 h1:lypMQnGyJYeuYPhOM/bgjbFM6WE44W1/T45er4d8Hhg= +go.opentelemetry.io/otel/metric v1.22.0/go.mod h1:evJGjVpZv0mQ5QBRJoBF64yMuOf4xCWdXjK8pzFvliY= +go.opentelemetry.io/otel/sdk v1.22.0 h1:6coWHw9xw7EfClIC/+O31R8IY3/+EiRFHevmHafB2Gw= +go.opentelemetry.io/otel/sdk v1.22.0/go.mod h1:iu7luyVGYovrRpe2fmj3CVKouQNdTOkxtLzPvPz1DOc= +go.opentelemetry.io/otel/trace v1.22.0 h1:Hg6pPujv0XG9QaVbGOBVHunyuLcCC3jN7WEhPx83XD0= +go.opentelemetry.io/otel/trace v1.22.0/go.mod h1:RbbHXVqKES9QhzZq/fE5UnOSILqRt40a21sPw2He1xo= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..77d0763 --- /dev/null +++ b/queue.go @@ -0,0 +1,52 @@ +package kfkqueue + +import "context" + +type ( + // SQueue 队列链式属性管理 + SQueue struct { + // 应答模式:0无需消息发送之后应答,1发送消息Leader应答成功即可,-1 发送消息主从都需要应答成功 + RequiredAck int8 + + // 成功交付的消息将在success channel返回:false否,true是 + Success bool + + // 失败交付的消息将在Error channel返回:false否,true是 + Error bool + + // 消息分区主题 + Topic string + + // 消费组:消息消费主题集合 + Topics []string + + // Kafka连接地址 + Addrs []string + + // 消息组 + Group string + + // 分区主题的消息KEY + Key string + + // 消费任务执行Jobs + JobsMap map[string]Jobs + } + + // ConsumerGrp 消费组后置处理操作: 定义结构体实现消费逻辑(ConsumerGroupHandler) + ConsumerGrp struct { + // 消息任务执行Job注册组 + JobsMap map[string]Jobs + } + + // Jobs Jobs任务执行接口,所有任务必须继承实现内部逻辑 + Jobs interface { + Execute(ctx context.Context, value interface{}) + } + + // ClaimItem 发送消息实例结构体 + ClaimItem struct { + JobName string // Job任务名 + Value interface{} // 消息值 + } +) diff --git a/queue_client.go b/queue_client.go new file mode 100644 index 0000000..9c4522e --- /dev/null +++ b/queue_client.go @@ -0,0 +1,27 @@ +package kfkqueue + +import "github.com/IBM/sarama" + +// Client 实例Kafka客户端并完成配置 +func (q *SQueue) Client() (client sarama.Client, err error) { + // 验证配置 + if err = q.Validate(); err != nil { + return nil, err + } + + // 初始化配置 + config := sarama.NewConfig() + config.Producer.Return.Successes = q.Success // 成功交付的消息将在success channel返回:false否,true是 + config.Producer.Return.Errors = q.Error // 失败交付的消息将在error channel返回 :false否,true是 + config.Producer.Partitioner = sarama.NewHashPartitioner // 对Key进行Hash,同样的Key每次都落到一个分区,这样消息是有序的 + // 应答模式:0无需消息发送之后应答,1发送消息Leader应答成功即可,-1 发送消息主从都需要应答成功 + switch q.RequiredAck { + case -1, 1: + config.Producer.RequiredAcks = sarama.RequiredAcks(q.RequiredAck) + default: + config.Producer.RequiredAcks = sarama.NoResponse + } + + // 创建客户端 + return sarama.NewClient(q.Addrs, config) +} diff --git a/queue_consumer.go b/queue_consumer.go new file mode 100644 index 0000000..f70aa30 --- /dev/null +++ b/queue_consumer.go @@ -0,0 +1,45 @@ +package kfkqueue + +import ( + "context" + "fmt" + "github.com/IBM/sarama" + "github.com/gogf/gf/v2/frame/g" +) + +// Consumer 消费数据处理 +func (q *SQueue) Consumer(ctx context.Context) error { + // 客户端 + client, err := q.Client() + if err != nil { + return err + } + + // 获取消费组 + consumerGroup, err := sarama.NewConsumerGroupFromClient(q.Group, client) + if err != nil { + return err + } + + // 关闭消费组 + defer func(consumerGroup sarama.ConsumerGroup) { + _ = consumerGroup.Close() + }(consumerGroup) + + // 定义消费组 + consumerHandler := &ConsumerGrp{ + JobsMap: q.JobsMap, + } + + // 开启协程消费 + go func() { + for { + err = consumerGroup.Consume(ctx, q.Topics, consumerHandler) + if err != nil { + g.Log().Debug(ctx, fmt.Sprintf("消费错误:%v", err)) + } + } + }() + // 无限 + select {} +} diff --git a/queue_consumer_group.go b/queue_consumer_group.go new file mode 100644 index 0000000..a969617 --- /dev/null +++ b/queue_consumer_group.go @@ -0,0 +1,51 @@ +package kfkqueue + +import ( + "fmt" + "github.com/IBM/sarama" + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/util/gconv" +) + + +// Setup 回调函数-在消费者启动时执行操作 +func (c *ConsumerGrp) Setup(session sarama.ConsumerGroupSession) error { + fmt.Println("消费者启动") + return nil +} + +// Cleanup 回调函数-在消费者关闭时执行的操作 +func (c *ConsumerGrp) Cleanup(session sarama.ConsumerGroupSession) error { + fmt.Println("消费者关闭") + return nil +} + +// ConsumerClaim 回调函数-当队列中又消失会触发,处理消息逻辑 +func (c *ConsumerGrp) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { + g.Log().Info(session.Context(), fmt.Sprintf("接收topic=%s, partition=%d, offset=%d, value=%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)) + // 转换接收消息内容 + item, err := ConvertMessageByClaim(msg.Value) + if err != nil { + g.Log().Error(session.Context(), err) + continue + } + // 以Job任务名,执行对应业务逻辑 + c.JobsMap[item.JobName].Execute(session.Context(), item.Value) + } + return nil +} + +// ConsumeClaimHandler 消费数据返回数据处理 +func (c *ConsumerGrp) ConsumeClaimHandler(messages []byte) (ClaimItem, error) { + item := ClaimItem{} + // 把存储的消息内容装换成结构体 + var ( + values []ClaimItem + ) + if err := gconv.Struct(messages, &values); err != nil { + return item, gerror.Newf("转换消息错误:%+v", err) + } + return values[0], nil +} diff --git a/queue_new.go b/queue_new.go new file mode 100644 index 0000000..9388649 --- /dev/null +++ b/queue_new.go @@ -0,0 +1,77 @@ +package kfkqueue + +// New 实例队列 +// queues:队列配置属性 +func New(queues ...SQueue) *SQueue { + if len(queues) > 0 { + return &queues[0] + } + return &SQueue{} +} + +// SetAddrs 设置Kafka客户端连接地址 +func (q *SQueue) SetAddrs(addrs []string) *SQueue { + q.Addrs = addrs + return q +} + +// SetSuccess 设置发送消息成功channel返回 +func (q *SQueue) SetSuccess(success bool) *SQueue { + q.Success = success + return q +} + +// SetError 设置发送消息失败channel返回 +func (q *SQueue) SetError(errors bool) *SQueue { + q.Error = errors + return q +} + +// SetTopic 设置分区主题 +func (q *SQueue) SetTopic(topic string) *SQueue { + q.Topic = topic + return q +} + +// SetTopics 设置消费分区主题Topic集合 +func (q *SQueue) SetTopics(topics []string) *SQueue { + q.Topics = topics + return q +} + +// SetGroup 设置分区主题分组 +func (q *SQueue) SetGroup(group string) *SQueue { + q.Group = group + return q +} + +// SetRequiredAsk 设置应答模式:0无需消息发送之后应答,1发送消息Leader应答成功即可,-1 发送消息主从都需要应答成功 +func (q *SQueue) SetRequiredAsk(requiredAsk int8) *SQueue { + q.RequiredAck = requiredAsk + return q +} + +// SetKey 设置主题消息KEY +func (q *SQueue) SetKey(key string) *SQueue { + q.Key = key + return q +} + +// SetJobsMap 设置动态分发注册Job任务 +// Example: +// 购物下单,后置处理逻辑人任务 +// 调用生产通道(以同步生产为例): +// JobsMap := map[string]jobs{ +// "OrderJob": &OrderJob{} // 必须注册生成Execute方法 +// } +// item := ClaimItem { +// Key: "order_create_after_jobs", +// JobName:"OrderJob", +// Value: "清除购物数据,扣除抵扣优惠券、抵扣积分、赠送积分、锁定库存等逻辑" +// } +// ctx := content.Background() +// New().SetJobsMap(JobsMap).SyncProducer(ctx, item) +func (q *SQueue) SetJobsMap(jobsMap map[string]Jobs) *SQueue { + q.JobsMap = jobsMap + return q +} diff --git a/queue_producer.go b/queue_producer.go new file mode 100644 index 0000000..a7b90e0 --- /dev/null +++ b/queue_producer.go @@ -0,0 +1,108 @@ +package kfkqueue + +import ( + "context" + "fmt" + "github.com/IBM/sarama" + "github.com/gogf/gf/v2/frame/g" +) + +// SyncProducer 同步模式-生产消息 +func (q *SQueue) SyncProducer(ctx context.Context, in ClaimItem) (pid int32, offset int64, err error) { + // 记录日志 + defer func(err error) { + if err != nil { + g.Log().Debug(ctx, err) + } + }(err) + + // 通道消息返回默认设置为True + q.SetSuccess(true) + q.SetError(true) + + // 获取客户端 + client, err := q.Client() + if err != nil { + return 0, 0, err + } + + // 设置同步客户端 + cli, err := sarama.NewSyncProducerFromClient(client) + defer func() { + _ = cli.Close() // 关闭客户端 + }() + if err != nil { + return 0, 0, err + } + + // 构造发送请求信息 + message, err := q.handlerSendMessage(in) + if err != nil { + return 0, 0, err + } + + // 发送消息 + // pid 分区ID + // offset 偏移量 + pid, offset, err = cli.SendMessage(message) + if err != nil { + return 0, 0, err + } + + // 记录日志 + g.Log().Info(ctx, "produce success, partition:", pid, ",offset:", offset, "value:", in) + return +} + +// AsyncProducer 异步生产消息 +func (q *SQueue) AsyncProducer(ctx context.Context, in ClaimItem) { + + // 实例Kafka客户端 + client, err := q.Client() + if err != nil { + g.Log().Info(ctx, err) + return + } + + // 生成异步生产端 + producer, err := sarama.NewAsyncProducerFromClient(client) + if err != nil { + g.Log().Info(ctx, err) + return + } + + // 关闭端 + defer producer.AsyncClose() + + // 开启协程处理返回通道 + go func() { + for { + select { + // 从通道中监听成功消息 + case msg, ok := <-producer.Successes(): + if !ok { + g.Log().Info(ctx, "发送成功,通道关闭") + continue + } + g.Log().Info(ctx, fmt.Sprintf("发送成功, topic=%s, partition=%d, offset=%d, value:%s \n", msg.Topic, msg.Partition, msg.Offset, msg.Value)) + case err, ok := <-producer.Errors(): + if !ok { + g.Log().Info(ctx, "发送成功,生产通道已提前关闭") + continue + } + g.Log().Error(ctx, fmt.Sprintf("发货消息失败:%+v", err)) + } + } + + }() + + // 构造结构体消息发送 + message, err := q.handlerSendMessage(in) + if err != nil { + g.Log().Info(ctx, err) + return + } + // 发送到通道处理 + producer.Input() <- message +} + diff --git a/queue_util.go b/queue_util.go new file mode 100644 index 0000000..1874d00 --- /dev/null +++ b/queue_util.go @@ -0,0 +1,60 @@ +package kfkqueue + +import ( + "encoding/json" + "github.com/IBM/sarama" + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/util/gconv" +) + +// handlerSendMessage 处理并转换生产发送消息体 +func (q *SQueue) handlerSendMessage(item ClaimItem) (*sarama.ProducerMessage, error) { + // 任务是否注册 + if _, ok := q.JobsMap[item.JobName]; !ok { + return nil, gerror.Newf("Queue:队列消息Job[%s]未注册", item.JobName) + } + + // 转JSON处理 + value, err := json.Marshal(item) + if err != nil { + return nil, gerror.Newf("Queue:消息实例转JSON失败%s", err.Error()) + } + + // 初始化 + message := &sarama.ProducerMessage{ + Topic: q.Topic, + Value: sarama.StringEncoder(value), + } + + // 消息KEY为空不传 + if q.Key != "" { + message.Key = sarama.StringEncoder(q.Key) + } + + return message, nil +} + +// Validate 验证必要规则属性 +func (q SQueue) Validate() error { + if len(q.Addrs) == 0 { + return gerror.New("Queue:缺少Kafka客户端代理ICP地址") + } + // 主题 + if q.Topic == "" && len(q.Topics) == 0{ + return gerror.New("Queue:消息主题Topic或Topics必传") + } + // Jobs任务 + if len(q.JobsMap) == 0 { + return gerror.New("Queue:请先注册Jobs任务,再操作") + } + return nil +} + +// ConvertMessageByClaim 转换消息实例到Claim结构体 +func ConvertMessageByClaim(messages []byte) (ClaimItem, error) { + item := ClaimItem{} + if err := gconv.Struct(messages, &item); err != nil { + return item, gerror.Newf("转换[]byte消息ClaimItem失败:%s", err.Error()) + } + return item, nil +}