博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka 接收消息 at most once -- Spring 整合
阅读量:4051 次
发布时间:2019-05-25

本文共 2915 字,大约阅读时间需要 9 分钟。

– Start


废话少说,直接上代码。

package shangbo.kafka.example10;import org.springframework.context.ApplicationContext;import org.springframework.context.annotation.AnnotationConfigApplicationContext;public class App {	@SuppressWarnings({ "resource", "unused" })	public static void main(String[] args) throws InterruptedException {		ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);	}}
package shangbo.kafka.example10;import java.util.HashMap;import java.util.Map;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.listener.KafkaMessageListenerContainer;import org.springframework.kafka.listener.MessageListener;import org.springframework.kafka.listener.config.ContainerProperties;@Configurationpublic class AppConfig {	@Bean	public KafkaMessageListenerContainer
kafkaMessageListenerContainer(ConsumerFactory
consumerFactory, ContainerProperties containerProperties) { return new KafkaMessageListenerContainer
(consumerFactory, containerProperties); } @Bean public ConsumerFactory
consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map
consumerConfigs() { Map
props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } @Bean public ContainerProperties containerProperties(MessageListener
messageListener) { ContainerProperties containerProperties = new ContainerProperties("topic0"); containerProperties.setGroupId("testConsumerGroup1"); containerProperties.setMessageListener(messageListener); return containerProperties; } @Bean public TestMessageListener messageListener() { return new TestMessageListener(); }}
package shangbo.kafka.example10;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.listener.MessageListener;public class TestMessageListener implements MessageListener
{ @Override public void onMessage(ConsumerRecord
record) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }}

– 更多参见:

– 声 明:转载请注明出处
– Last Edited on 2018-06-14
– Written by ShangBo on 2018-06-14
– End

你可能感兴趣的文章
uboot start.s文件分析
查看>>
没有路由器的情况下,开发板,虚拟机Ubuntu,win10主机,三者也可以ping通
查看>>
本地服务方式搭建etcd集群
查看>>
安装k8s Master高可用集群
查看>>
忽略图片透明区域的事件(Flex)
查看>>
忽略图片透明区域的事件(Flex)
查看>>
AS3 Flex基础知识100条
查看>>
Flex动态获取flash资源库文件
查看>>
flex4 中创建自定义弹出窗口
查看>>
01Java基础语法-16. while循环结构
查看>>
01Java基础语法-18. 各种循环语句的区别和应用场景
查看>>
01Java基础语法-19. 循环跳转控制语句
查看>>
Django框架全面讲解 -- Form
查看>>
socket,accept函数解析
查看>>
今日互联网关注(写在清明节后):每天都有值得关注的大变化
查看>>
”舍得“大法:把自己的优点当缺点倒出去
查看>>
[今日关注]鼓吹“互联网泡沫,到底为了什么”
查看>>
[互联网学习]如何提高网站的GooglePR值
查看>>
[关注大学生]求职不可不知——怎样的大学生不受欢迎
查看>>
[关注大学生]读“贫困大学生的自白”
查看>>