rocketmq golang客户端

发布时间:2024-11-05 18:59:17

Go语言(Golang)是一门开发高性能、并发和可扩展应用的语言。RocketMQ是由Apache基金会开发的一款分布式消息传递系统,支持高吞吐量、低延迟的消息发布/订阅。本文将介绍如何使用RocketMQ的Golang客户端。

首先,我们需要在项目中导入RocketMQ的Golang客户端。可以通过在项目的go.mod文件中添加以下内容来获取最新的版本:

1. 安装Golang客户端

要安装RocketMQ的Golang客户端,我们可以使用以下命令:

go get -u github.com/apache/rocketmq-client-go/v2

这将自动下载并安装最新版本的RocketMQ Golang客户端。

2. 生产者示例

在使用RocketMQ进行消息生产之前,我们首先需要创建一个生产者示例。以下是一个简单的生产者示例:

package main

import (
    "fmt"

    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
    p, err := rocketmq.NewProducer(
        rocketmq.WithNameServer([]string{"127.0.0.1:9876"}),
    )
    if err != nil {
        fmt.Printf("Failed to create producer: %s\n", err.Error())
        return
    }

    err = p.Start()
    if err != nil {
        fmt.Printf("Failed to start producer: %s\n", err.Error())
        return
    }

    msg := primitive.NewMessage("TopicTest", []byte("Hello, RocketMQ!"))

    result, err := p.SendSync(context.Background(), msg)
    if err != nil {
        fmt.Printf("Failed to send message: %s\n", err.Error())
        return
    }

    fmt.Printf("Send message success. result=%s\n", result.String())

    p.Shutdown()
}

在代码中,我们首先通过rocketmq.NewProducer()创建了一个生产者示例,并在WithNamesrv()参数中指定了NameServer的地址。然后,我们通过调用p.Start()方法来启动生产者。

接下来,我们创建了一条消息,并通过调用p.SendSync()方法将其同步发送到RocketMQ。

最后,我们通过调用p.Shutdown()方法来关闭生产者示例。

3. 消费者示例

与生产者类似,我们使用RocketMQ的Golang客户端还可以创建消费者示例来接收和处理消息。以下是一个消费者示例:

package main

import (
    "context"
    "fmt"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/consumer"
    "github.com/apache/rocketmq-client-go/v2/primitive"
)

type MyMessageListener struct{}

func (l *MyMessageListener) ConsumeMessage(ctx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
    for i := range ext {
        fmt.Printf("Received message: %s\n", string(ext[i].Body))
    }

    return consumer.ConsumeSuccess, nil
}

func main() {
    c, err := rocketmq.NewPushConsumer(
        rocketmq.WithNameServer([]string{"127.0.0.1:9876"}),
    )
    if err != nil {
        fmt.Printf("Failed to create consumer: %s\n", err.Error())
        return
    }

    err = c.Subscribe("TopicTest", consumer.MessageSelector{}, &MyMessageListener{})
    if err != nil {
        fmt.Printf("Failed to subscribe: %s\n", err.Error())
        return
    }

    err = c.Start()
    if err != nil {
        fmt.Printf("Failed to start consumer: %s\n", err.Error())
        return
    }

    // Block until the consumer is closed.
    select {}
}

在代码中,我们首先通过rocketmq.NewPushConsumer()创建了一个消费者示例,并在WithNameServer()参数中指定了NameServer的地址。

然后,我们使用c.Subscribe()方法来订阅名为"TopicTest"的主题,并将自定义的消息监听器MyMessageListener{}传递给它。

最后,我们通过调用c.Start()方法来启动消费者。需要注意的是,我们使用了select{}语句来阻塞主线程,以保持消费者的运行。

总之,RocketMQ的Golang客户端提供了强大的功能和易于使用的API,可以帮助我们构建高性能、可靠的消息传递应用。通过本文的介绍,您现在应该对如何使用RocketMQ的Golang客户端有了更深入的理解。

相关推荐