Spring Boot |如何使用 Apache Kafka 消费字符串消息
原文:https://www . geesforgeks . org/spring-boot-how-consume-string-messages-use-Apache-Kafka/
Apache Kafka 是一个用于实时数据流的发布-订阅消息队列。消息队列允许您在进程、应用程序和服务器之间发送消息。在本文中,我们将看到如何将字符串消息从 apache kafka 发送到 spring boot 应用程序的控制台。
进场:
步骤 1: 转到 spring initializr 并创建一个具有以下依赖关系的初始项目:
- 阿帕奇卡夫卡的春天
注意:我们还可以创建一个 maven 项目,并将以下代码添加到 pom.xml 文件中。
可扩展标记语言
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
第 2 步:在 IDE 中打开项目,同步依赖项。现在创建一个新的类配置并添加注释@配置和@启用卡夫卡。
第三步:现在用 String 对象创建 bean消费工厂和并发类。
Java 语言(一种计算机语言,尤用于创建网站)
// Java program to consume string
// messages using spring kafka
@EnableKafka
@Configuration
public class Config {
// Function to establish
// connection between spring
// application and kafka server
@Bean
public ConsumerFactory<String, String>
consumerFactory()
{
// HashMap to store the configurations
Map<String, Object> map = new HashMap<>();
// put the host IP inn the map
map.put(ConsumerConfig
.BOOTSTRAP_SERVERS_CONFIG,
"127.0.0.1:9092");
// put the group ID in the map
map.put(ConsumerConfig
.GROUP_ID_CONFIG,
"id");
map.put(ConsumerConfig
.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
map.put(ConsumerConfig
.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(map);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,
String>
kafkaListner()
{
ConcurrentKafkaListenerContainerFactory<String,
String>
obj
= new ConcurrentKafkaListenerContainerFactory<>();
obj.setConsumerFactory(consumerFactory());
return obj;
}
}
第四步:用 @Service 标注创建类 KafkaService 。此类将包含在控制台上发布消息的侦听器方法。
Java 语言(一种计算机语言,尤用于创建网站)
@Service
public class KafkaService {
// Annotation required to listen the
// message from kafka server
@KafkaListener(topics = "StringProducer",
groupId = "id")
public void
publish(String message)
{
System.out.println(
"You have a new message: "
+ message);
}
}
版权属于:月萌API www.moonapi.com,转载请注明出处