-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
zhouyp
committed
Jan 30, 2024
0 parents
commit ef4aee5
Showing
14 changed files
with
932 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
.github | ||
.git | ||
.idea | ||
.gitee |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
# kfkqueue 消息队列,用于操作Kafka消息中间件 | ||
|
||
|
||
## 简介 | ||
~~~ | ||
1. kfkqueue包,是基于[sarama]二次封装的Kafka客户端包,通过对[sarama]消费端、生产端进行二次封装,可以使用者快捷开发,无需再去学习了解[sarama] | ||
源码和实现逻辑,只需通过调用封装好的方法,就可以快捷使用 | ||
2. 快捷使用方法包含:SyncProducer(同步生产消息)、AsyncProducer(异步生产消息)、Consumer(消费者),根据不同的场景进行调用 | ||
3. 支持单机和集群部署的Kafka下使用 | ||
~~~ | ||
|
||
|
||
## 安装 | ||
~~~ | ||
1.可直接使用Go管理包进行安装:go get -u [email protected]/bobo-rs/kfkqueue | ||
~~~ | ||
|
||
## 使用 | ||
### 生产消息使用示例 | ||
```go | ||
package exmaple | ||
|
||
import "kfkqueue" | ||
|
||
type QueueCfg struct { | ||
// 配置Kafka地址 | ||
Addrs []string | ||
|
||
// 应答模式:0无需发送消息后返回应答结果【异步模式】,1.发送消息后只需Leader应答,-1.发送消息后需要Leader和追随者都应答 | ||
RequiredAck int8 | ||
|
||
// 发送消息后把成功的信息通过channel管道返回:false不返回,true返回(适合1和-1模式) | ||
Success bool | ||
|
||
// 发送消息后把失败的信息通过channel管道返回:false不返回,true返回 | ||
Error bool | ||
|
||
// 分区主题,不传默认自动分配 | ||
Topic string | ||
|
||
// 分区主题先每个消息的KEY,不传默认自动分配 | ||
Key string | ||
|
||
// 分区主题组 | ||
Group string | ||
} | ||
|
||
var ( | ||
Addrs = []string{"127.0.0.1:9092"} | ||
Topics = []string{"queue_example_sync_topic", "queue_example_async_topic"} | ||
Group = "queue_example_topic_group" | ||
) | ||
|
||
func Exmaple_ClientQueue(cfg QueueCfg) *kfkqueue.SQueue { | ||
return kfkqueue.New(kfkqueue.SQueue{ | ||
Addrs: Addrs, | ||
RequiredAck: cfg.RequiredAck, | ||
Success: cfg.Success, | ||
Error: cfg.Error, | ||
Topic: cfg.Topic, | ||
Topics: Topics, | ||
Group: Group, | ||
JobsMap: map[string]kfkqueue.Jobs{ | ||
"ExampleJob": &ExampleJobs1{}, | ||
"ExampleJob1": &ExampleJobs1{}, | ||
"ExampleJob2": &ExampleJobs2{}, | ||
}, | ||
}) | ||
} | ||
|
||
func Example_ClientSync(ack int8) *kfkqueue.SQueue { | ||
topic := "queue_example_sync_topic" | ||
return Exmaple_ClientQueue(QueueCfg{ | ||
RequiredAck: ack, | ||
Topic: topic, | ||
Key: topic + "_key", | ||
}) | ||
} | ||
|
||
func Exmaple_ClientAsync() *kfkqueue.SQueue { | ||
topic := "queue_example_async_topic" | ||
return Exmaple_ClientQueue(QueueCfg{ | ||
Topic: topic, | ||
Success: true, | ||
Error: true, | ||
Key: topic + "_key", | ||
}) | ||
} | ||
|
||
func Test_SyncProducer(t *testing.T) { | ||
client := Example_ClientSync(1) | ||
ctx := context.Background() | ||
for i := 0; i < 10000; i++ { | ||
t := i | ||
fmt.Println( | ||
client.SyncProducer(ctx, struct { | ||
JobName string | ||
Value interface{} | ||
}{JobName: "ExampleJob1", Value: struct { | ||
Name string | ||
Value int | ||
Detail struct{ | ||
Content string | ||
Arr []int | ||
} | ||
}{ | ||
Name: "张三" + strconv.Itoa(t), | ||
Value: t, | ||
Detail: struct { | ||
Content string | ||
Arr []int | ||
}{Content: "不可能,绝对不可能" + strconv.Itoa(t), Arr: []int{t}}, | ||
}}), | ||
) | ||
} | ||
fmt.Println() | ||
} | ||
``` | ||
2. 消费者使用示例 | ||
```go | ||
func Test_Consumer(t *testing.T) { | ||
ctx := context.Background() | ||
fmt.Println( | ||
Exmaple_ClientQueue(QueueCfg{}).Consumer(ctx), | ||
) | ||
} | ||
``` | ||
## 依赖 | ||
1. Kafka操作包:github.com/IBM/sarama | ||
2. Goframe工具包:github.com/gogf/gf/v2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
package exmaple | ||
|
||
import "kfkqueue" | ||
|
||
type QueueCfg struct { | ||
// 配置Kafka地址 | ||
Addrs []string | ||
|
||
// 应答模式:0无需发送消息后返回应答结果【异步模式】,1.发送消息后只需Leader应答,-1.发送消息后需要Leader和追随者都应答 | ||
RequiredAck int8 | ||
|
||
// 发送消息后把成功的信息通过channel管道返回:false不返回,true返回(适合1和-1模式) | ||
Success bool | ||
|
||
// 发送消息后把失败的信息通过channel管道返回:false不返回,true返回 | ||
Error bool | ||
|
||
// 分区主题,不传默认自动分配 | ||
Topic string | ||
|
||
// 分区主题先每个消息的KEY,不传默认自动分配 | ||
Key string | ||
|
||
// 分区主题组 | ||
Group string | ||
} | ||
|
||
var ( | ||
Addrs = []string{"127.0.0.1:9092"} | ||
Topics = []string{"queue_example_sync_topic", "queue_example_async_topic"} | ||
Group = "queue_example_topic_group" | ||
) | ||
|
||
func Exmaple_ClientQueue(cfg QueueCfg) *kfkqueue.SQueue { | ||
return kfkqueue.New(kfkqueue.SQueue{ | ||
Addrs: Addrs, | ||
RequiredAck: cfg.RequiredAck, | ||
Success: cfg.Success, | ||
Error: cfg.Error, | ||
Topic: cfg.Topic, | ||
Topics: Topics, | ||
Group: Group, | ||
JobsMap: map[string]kfkqueue.Jobs{ | ||
"ExampleJob": &ExampleJobs1{}, | ||
"ExampleJob1": &ExampleJobs1{}, | ||
"ExampleJob2": &ExampleJobs2{}, | ||
}, | ||
}) | ||
} | ||
|
||
func Example_ClientSync(ack int8) *kfkqueue.SQueue { | ||
topic := "queue_example_sync_topic" | ||
return Exmaple_ClientQueue(QueueCfg{ | ||
RequiredAck: ack, | ||
Topic: topic, | ||
Key: topic + "_key", | ||
}) | ||
} | ||
|
||
func Exmaple_ClientAsync() *kfkqueue.SQueue { | ||
topic := "queue_example_async_topic" | ||
return Exmaple_ClientQueue(QueueCfg{ | ||
Topic: topic, | ||
Success: true, | ||
Error: true, | ||
Key: topic + "_key", | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package exmaple | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"github.com/gogf/gf/v2/util/gconv" | ||
) | ||
|
||
type ( | ||
ExampleJobs1 struct { | ||
} | ||
ExampleJobs2 struct { | ||
} | ||
|
||
Job1ValueItem struct { | ||
Name string | ||
Value int | ||
Detail struct{ | ||
Content string | ||
Arr []int | ||
} | ||
} | ||
) | ||
|
||
type Jobs struct { | ||
} | ||
|
||
func (j *ExampleJobs1) Execute(ctx context.Context, value interface{}) { | ||
fmt.Println("我是Example的Job1任务") | ||
fmt.Println(value) | ||
|
||
// 数据 | ||
//item := value.(map[string]interface{}) | ||
//fmt.Println(item) | ||
|
||
data := Job1ValueItem{} | ||
_ = gconv.Struct(value, &data) | ||
fmt.Println(data) | ||
} | ||
|
||
func (j *ExampleJobs2) Execute(ctx context.Context, value interface{}) { | ||
fmt.Println("我是Example的Job2任务") | ||
fmt.Println(value) | ||
|
||
|
||
data := Job1ValueItem{} | ||
_ = gconv.Struct(value, &data) | ||
fmt.Println(data) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package exmaple | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strconv" | ||
"testing" | ||
) | ||
|
||
func Test_SyncProducer(t *testing.T) { | ||
client := Example_ClientSync(1) | ||
ctx := context.Background() | ||
for i := 0; i < 10000; i++ { | ||
t := i | ||
fmt.Println( | ||
client.SyncProducer(ctx, struct { | ||
JobName string | ||
Value interface{} | ||
}{JobName: "ExampleJob1", Value: struct { | ||
Name string | ||
Value int | ||
Detail struct{ | ||
Content string | ||
Arr []int | ||
} | ||
}{ | ||
Name: "张三" + strconv.Itoa(t), | ||
Value: t, | ||
Detail: struct { | ||
Content string | ||
Arr []int | ||
}{Content: "不可能,绝对不可能" + strconv.Itoa(t), Arr: []int{t}}, | ||
}}), | ||
) | ||
} | ||
fmt.Println() | ||
} | ||
|
||
func Test_AsyncProducer(t *testing.T) { | ||
client := Exmaple_ClientAsync() | ||
ctx := context.Background() | ||
for i := 0; i < 10000; i++ { | ||
t := i | ||
client.AsyncProducer(ctx, struct { | ||
JobName string | ||
Value interface{} | ||
}{ JobName: "ExampleJob2", Value: struct { | ||
Name string | ||
Value int | ||
}{ | ||
Name: "张三" + strconv.Itoa(t), | ||
Value: t, | ||
}}) | ||
} | ||
} | ||
|
||
func Test_Consumer(t *testing.T) { | ||
ctx := context.Background() | ||
fmt.Println( | ||
Exmaple_ClientQueue(QueueCfg{}).Consumer(ctx), | ||
) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
module kfkqueue | ||
|
||
go 1.20 | ||
|
||
require ( | ||
github.com/IBM/sarama v1.42.1 | ||
github.com/gogf/gf/v2 v2.6.2 | ||
) | ||
|
||
require ( | ||
github.com/BurntSushi/toml v1.2.0 // indirect | ||
github.com/clbanning/mxj/v2 v2.7.0 // indirect | ||
github.com/davecgh/go-spew v1.1.1 // indirect | ||
github.com/eapache/go-resiliency v1.4.0 // indirect | ||
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect | ||
github.com/eapache/queue v1.1.0 // indirect | ||
github.com/fatih/color v1.16.0 // indirect | ||
github.com/fsnotify/fsnotify v1.7.0 // indirect | ||
github.com/go-logr/logr v1.4.1 // indirect | ||
github.com/go-logr/stdr v1.2.2 // indirect | ||
github.com/golang/snappy v0.0.4 // indirect | ||
github.com/gorilla/websocket v1.5.0 // indirect | ||
github.com/grokify/html-strip-tags-go v0.0.1 // indirect | ||
github.com/hashicorp/errwrap v1.0.0 // indirect | ||
github.com/hashicorp/go-multierror v1.1.1 // indirect | ||
github.com/hashicorp/go-uuid v1.0.3 // indirect | ||
github.com/jcmturner/aescts/v2 v2.0.0 // indirect | ||
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect | ||
github.com/jcmturner/gofork v1.7.6 // indirect | ||
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect | ||
github.com/jcmturner/rpc/v2 v2.0.3 // indirect | ||
github.com/klauspost/compress v1.16.7 // indirect | ||
github.com/kr/text v0.2.0 // indirect | ||
github.com/magiconair/properties v1.8.6 // indirect | ||
github.com/mattn/go-colorable v0.1.13 // indirect | ||
github.com/mattn/go-isatty v0.0.20 // indirect | ||
github.com/mattn/go-runewidth v0.0.15 // indirect | ||
github.com/olekukonko/tablewriter v0.0.5 // indirect | ||
github.com/pierrec/lz4/v4 v4.1.18 // indirect | ||
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect | ||
github.com/rivo/uniseg v0.4.4 // indirect | ||
github.com/rogpeppe/go-internal v1.12.0 // indirect | ||
go.opentelemetry.io/otel v1.22.0 // indirect | ||
go.opentelemetry.io/otel/metric v1.22.0 // indirect | ||
go.opentelemetry.io/otel/sdk v1.22.0 // indirect | ||
go.opentelemetry.io/otel/trace v1.22.0 // indirect | ||
golang.org/x/crypto v0.14.0 // indirect | ||
golang.org/x/net v0.17.0 // indirect | ||
golang.org/x/sys v0.16.0 // indirect | ||
golang.org/x/text v0.13.0 // indirect | ||
gopkg.in/yaml.v3 v3.0.1 // indirect | ||
) |
Oops, something went wrong.