发布时间:2024-12-22 21:54:41
```go go get github.com/Shopify/sarama ```
```go config := sarama.NewConfig() producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(err) } defer producer.AsyncClose() message := &sarama.ProducerMessage{Topic: "my-topic", Value: sarama.StringEncoder("Hello Kafka!")} producer.Input() <- message select { case <-producer.Successes(): fmt.Println("Message sent successfully") case err := <-producer.Errors(): fmt.Println("Failed to send message:", err.Err) } ```
以上代码首先创建了一个Kafka配置对象,并创建了一个异步生产者实例。然后,我们定义了一个要发送的消息,并使用`producer.Input()`方法将消息发送到Kafka集群。最后,使用`select`语句来监听消息发送的结果。```go config := sarama.NewConfig() consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config) if err != nil { panic(err) } defer consumer.Close() partitionConsumer, err := consumer.ConsumePartition("my-topic", 0, sarama.OffsetNewest) if err != nil { panic(err) } defer partitionConsumer.Close() for message := range partitionConsumer.Messages() { fmt.Println("Received message:", string(message.Value)) } ```
以上代码首先创建一个Kafka配置对象,并创建一个消费者实例。然后,通过调用`consumer.ConsumePartition()`方法来订阅指定主题和分区的消息。最后,通过使用`range`关键字来遍历消费者接收到的消息。- 可以通过设置Kafka配置对象的属性来配置生产者和消费者的行为,例如超时时间、重新连接策略等。 - 可以使用消息过滤器来过滤特定的消息。 - 可以使用分区器来决定消息发送到哪个分区,从而实现消息的负载均衡。 - 可以通过使用拦截器来在消息生产或消费的过程中添加自定义的逻辑。
(完)