本文共 2915 字,大约阅读时间需要 9 分钟。
– Start
废话少说,直接上代码。
package shangbo.kafka.example10;import org.springframework.context.ApplicationContext;import org.springframework.context.annotation.AnnotationConfigApplicationContext;public class App { @SuppressWarnings({ "resource", "unused" }) public static void main(String[] args) throws InterruptedException { ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class); }}
package shangbo.kafka.example10;import java.util.HashMap;import java.util.Map;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.listener.KafkaMessageListenerContainer;import org.springframework.kafka.listener.MessageListener;import org.springframework.kafka.listener.config.ContainerProperties;@Configurationpublic class AppConfig { @Bean public KafkaMessageListenerContainerkafkaMessageListenerContainer(ConsumerFactory consumerFactory, ContainerProperties containerProperties) { return new KafkaMessageListenerContainer (consumerFactory, containerProperties); } @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map consumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } @Bean public ContainerProperties containerProperties(MessageListener messageListener) { ContainerProperties containerProperties = new ContainerProperties("topic0"); containerProperties.setGroupId("testConsumerGroup1"); containerProperties.setMessageListener(messageListener); return containerProperties; } @Bean public TestMessageListener messageListener() { return new TestMessageListener(); }}
package shangbo.kafka.example10;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.listener.MessageListener;public class TestMessageListener implements MessageListener{ @Override public void onMessage(ConsumerRecord record) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }}
– 更多参见:
– 声 明:转载请注明出处 – Last Edited on 2018-06-14 – Written by ShangBo on 2018-06-14 – End