Laravel中Kafka的使用详解

2025-05-29 0 23

本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类.

以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78
<?php

namespace App\\Tools;

use Illuminate\\Config\\Repository;

use Illuminate\\Support\\Facades\\DB;

use Monolog\\Logger;

use Monolog\\Handler\\StreamHandler;

use Illuminate\\Http\\Request;

class Kafka

{

public $broker_list = '127.0.0.1';//配置kafka,可以用逗号隔开多个kafka

public $topic = 'test';//管道名称

public $partition = 0;

protected $producer = null;

protected $consumer = null;

public function __construct()

{

if (empty($this->broker_list)) {

throw new InvalidConfigException("broker not config");

}

$rk = new \\RdKafka\\Producer();

if (empty($rk)) {

throw new InvalidConfigException("producer error");

}

$rk->setLogLevel(LOG_DEBUG);

if (!$rk->addBrokers($this->broker_list)) {

throw new InvalidConfigException("producer error");

}

$this->producer = $rk;

}

/**

* 生产者

* @param array $messages

* @return mixed

*/

public function send($messages = [],$topic)

{

$topic = $this->producer->newTopic($topic);

return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));

}

/**

* 消费者

*/

public function consumer($object, $callback){

$conf = new \\RdKafka\\Conf();

$conf->set('group.id', 0);

$conf->set('metadata.broker.list', $this->broker_list);

$topicConf = new \\RdKafka\\TopicConf();

$topicConf->set('auto.offset.reset', 'smallest');

$conf->setDefaultTopicConf($topicConf);

$consumer = new \\RdKafka\\KafkaConsumer($conf);

$consumer->subscribe([$this->topic]);

echo "waiting for messages.....\\n";

while(true) {

$message = $consumer->consume(120*1000);

switch ($message->err) {

case RD_KAFKA_RESP_ERR_NO_ERROR:

echo "message payload....";

$object->$callback($message->payload);

break;

}

sleep(1);

}

}

}

?>

在控制器中如何使用:

首先再头部导入这个类:use App\\Tools\\Kafka;

下面是使用生产者实例:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14
public function test(){

$topic = 'tool';//输入使用管道名称

$data['shop_id'] = 58;

$data['bar_code']=586;

$data['goods_num'] = 1;

$data['goods_unit'] = '个';

$Kafka = new Kafka();

$Error_Msg = $Kafka->send($data,$topic);//传入数组会自动转换json

var_dump($Error_Msg);

}

下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43
<?php

$conf = new RdKafka\\Conf();

$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\\Consumer($conf);

$rk->addBrokers("localhost:9092");

$topicConf = new RdKafka\\TopicConf();

$topicConf->set('auto.commit.interval.ms', 100);

$topicConf->set('offset.store.method', 'file');

$topicConf->set('offset.store.path', sys_get_temp_dir());

$topicConf->set('auto.offset.reset', 'smallest');

$topic = $rk->newTopic("tool", $topicConf);//读取的管道

// Start consuming partition 0

$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {

$message = $topic->consume(0, 120*10000);

switch ($message->err) {

case RD_KAFKA_RESP_ERR_NO_ERROR:

//没有错误打印信息

$message = json_decode(json_encode($message),true);

$data = json_decode($message['payload'],true);

var_dump($data);

break;

case RD_KAFKA_RESP_ERR__PARTITION_EOF:

echo "等待接收信息\\n";

break;

case RD_KAFKA_RESP_ERR__TIMED_OUT:

echo "超时\\n";

break;

default:

throw new \\Exception($message->errstr(), $message->err);

break;

}

sleep(1);

}

?>

到此这篇关于Laravel中Kafka的使用详解的文章就介绍到这了,更多相关Laravel中Kafka内容请搜索快网idc以前的文章或继续浏览下面的相关文章希望大家以后多多支持快网idc!

原文链接:https://blog.csdn.net/qq_16619361/article/details/90712742

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 Laravel中Kafka的使用详解 https://www.kuaiidc.com/90179.html

相关文章

发表评论
暂无评论