kafka生产者和消费者的javaAPI的示例代码

2025-05-29 0 21

写了个kafka的java demo 顺便记录下,仅供参考

1.创建maven项目

目录如下:

kafka生产者和消费者的javaAPI的示例代码

2.pom文件:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"

xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelversion>4.0.0</modelversion>

<groupid>kafka-maven</groupid>

<artifactid>kafka-maven</artifactid>

<version>0.0.1-snapshot</version>

<dependencies>

<dependency>

<groupid>org.apache.kafka</groupid>

<artifactid>kafka_2.11</artifactid>

<version>0.10.1.1</version>

</dependency>

<dependency>

<groupid>org.apache.hadoop</groupid>

<artifactid>hadoop-common</artifactid>

<version>2.2.0</version>

</dependency>

<dependency>

<groupid>org.apache.hadoop</groupid>

<artifactid>hadoop-hdfs</artifactid>

<version>2.2.0</version>

</dependency>

<dependency>

<groupid>org.apache.hadoop</groupid>

<artifactid>hadoop-client</artifactid>

<version>2.2.0</version>

</dependency>

<dependency>

<groupid>org.apache.hbase</groupid>

<artifactid>hbase-client</artifactid>

<version>1.0.3</version>

</dependency>

<dependency>

<groupid>org.apache.hbase</groupid>

<artifactid>hbase-server</artifactid>

<version>1.0.3</version>

</dependency>

<dependency>

<groupid>org.apache.hadoop</groupid>

<artifactid>hadoop-hdfs</artifactid>

<version>2.2.0</version>

</dependency>

<dependency>

<groupid>jdk.tools</groupid>

<artifactid>jdk.tools</artifactid>

<version>1.7</version>

<scope>system</scope>

<systempath>${java_home}/lib/tools.jar</systempath>

</dependency>

<dependency>

<groupid>org.apache.httpcomponents</groupid>

<artifactid>httpclient</artifactid>

<version>4.3.6</version>

</dependency>

</dependencies>

<build>

<plugins>

<plugin>

<groupid>org.apache.maven.plugins</groupid>

<artifactid>maven-compiler-plugin</artifactid>

<configuration>

<source>1.7</source>

<target>1.7</target>

</configuration>

</plugin>

</plugins>

</build>

</project>

3.kafka生产者kafkaproduce:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62
package com.lijie.producer;

import java.io.file;

import java.io.fileinputstream;

import java.util.properties;

import org.apache.kafka.clients.producer.callback;

import org.apache.kafka.clients.producer.kafkaproducer;

import org.apache.kafka.clients.producer.producerrecord;

import org.apache.kafka.clients.producer.recordmetadata;

import org.slf4j.logger;

import org.slf4j.loggerfactory;

public class kafkaproduce {

private static properties properties;

static {

properties = new properties();

string path = kafkaproducer.class.getresource("/").getfile().tostring()

+ "kafka.properties";

try {

fileinputstream fis = new fileinputstream(new file(path));

properties.load(fis);

} catch (exception e) {

e.printstacktrace();

}

}

/**

* 发送消息

*

* @param topic

* @param key

* @param value

*/

public void sendmsg(string topic, byte[] key, byte[] value) {

// 实例化produce

kafkaproducer<byte[], byte[]> kp = new kafkaproducer<byte[], byte[]>(

properties);

// 消息封装

producerrecord<byte[], byte[]> pr = new producerrecord<byte[], byte[]>(

topic, key, value);

// 发送数据

kp.send(pr, new callback() {

// 回调函数

@override

public void oncompletion(recordmetadata metadata,

exception exception) {

if (null != exception) {

system.out.println("记录的offset在:" + metadata.offset());

system.out.println(exception.getmessage() + exception);

}

}

});

// 关闭produce

kp.close();

}

}

4.kafka消费者kafkaconsume:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73
package com.lijie.consumer;

import java.io.file;

import java.io.fileinputstream;

import java.util.hashmap;

import java.util.list;

import java.util.map;

import java.util.properties;

import org.apache.htrace.fasterxml.jackson.databind.objectmapper;

import com.lijie.pojo.user;

import com.lijie.utils.jsonutils;

import kafka.consumer.consumerconfig;

import kafka.consumer.consumeriterator;

import kafka.consumer.kafkastream;

import kafka.javaapi.consumer.consumerconnector;

import kafka.serializer.stringdecoder;

import kafka.utils.verifiableproperties;

public class kafkaconsume {

private final static string topic = "lijietest";

private static properties properties;

static {

properties = new properties();

string path = kafkaconsume.class.getresource("/").getfile().tostring()

+ "kafka.properties";

try {

fileinputstream fis = new fileinputstream(new file(path));

properties.load(fis);

} catch (exception e) {

e.printstacktrace();

}

}

/**

* 获取消息

*

* @throws exception

*/

public void getmsg() throws exception {

consumerconfig config = new consumerconfig(properties);

consumerconnector consumer = kafka.consumer.consumer

.createjavaconsumerconnector(config);

map<string, integer> topiccountmap = new hashmap<string, integer>();

topiccountmap.put(topic, new integer(1));

stringdecoder keydecoder = new stringdecoder(new verifiableproperties());

stringdecoder valuedecoder = new stringdecoder(

new verifiableproperties());

map<string, list<kafkastream<string, string>>> consumermap = consumer

.createmessagestreams(topiccountmap, keydecoder, valuedecoder);

kafkastream<string, string> stream = consumermap.get(topic).get(0);

consumeriterator<string, string> it = stream.iterator();

while (it.hasnext()) {

string json = it.next().message();

user user = (user) jsonutils.jsontoobj(json, user.class);

system.out.println(user);

}

}

}

5.kafka.properties文件

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19
##produce

bootstrap.servers=192.168.80.123:9092

producer.type=sync

request.required.acks=1

serializer.class=kafka.serializer.defaultencoder

key.serializer=org.apache.kafka.common.serialization.bytearrayserializer

value.serializer=org.apache.kafka.common.serialization.bytearrayserializer

bak.partitioner.class=kafka.producer.defaultpartitioner

bak.key.serializer=org.apache.kafka.common.serialization.stringserializer

bak.value.serializer=org.apache.kafka.common.serialization.stringserializer

##consume

zookeeper.connect=192.168.80.123:2181

group.id=lijiegroup

zookeeper.session.timeout.ms=4000

zookeeper.sync.time.ms=200

auto.commit.interval.ms=1000

auto.offset.reset=smallest

serializer.class=kafka.serializer.stringencoder

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持快网idc。

原文链接:https://blog.csdn.net/qq_20641565/article/details/56277537

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 kafka生产者和消费者的javaAPI的示例代码 https://www.kuaiidc.com/111435.html

相关文章

发表评论
暂无评论