首页 技术 正文
技术 2022年11月24日
0 收藏 828 点赞 2,400 浏览 5943 个字

写一个基类

  1 <?php
2
3 namespace BI\Service\RabbitMQJob;
4
5 use AMQPConnection;
6 use AMQPChannel;
7 use AMQPExchange;
8 use AMQPQueue;
9 use Exception;
10
11 abstract class Base{
12
13 Const EXCHANGE_TYPE_DIRECCT = AMQP_EX_TYPE_DIRECT;
14 Const EXCHANGE_TYPE_FANOUT = AMQP_EX_TYPE_FANOUT;
15 Const EXCHANGE_TYPE_TOPIC = AMQP_EX_TYPE_TOPIC;
16
17 Const JOB_TYPE_MAIL = 'mail';
18 Const JOB_TYPE_TEST = 'test';
19 Const JOB_TYPE_STRUCTURE = 'structure';
20
21 /** @var AMQPChannel */
22 protected $channel;
23
24 /** @var AMQPExchange */
25 protected $exchange;
26
27 public function __construct($config, $vhost = '/')
28 {
29 extract($config);
30 $connArgs = array('host' => $host, 'port' => $port, 'login' => $login, 'password' => $password, 'vhost' => $vhost);
31
32 try {
33 $conn = new AMQPConnection($connArgs);
34 if (!$conn->connect()) {
35 throw new Exception('Cannot connect to the broker with parameters: ' . json_encode($connArgs));
36 }
37 $this->channel = new AMQPChannel($conn);
38 $this->declareExchange();
39 $this->bindQueues();
40 } catch (Exception $e) {
41 throw new Exception($e->getMessage());
42 }
43 }
44
45 protected function declareExchange(){
46 $this->exchange = new AMQPExchange($this->channel);
47 $this->exchange->setName($this->getExchangeName());
48 $this->exchange->setType($this->getExchangeType());
49 $this->exchange->setFlags(AMQP_DURABLE);
50 if(!$this->exchange->declareExchange())
51 throw new Exception('Exchange declare failed.');
52 }
53
54 protected function bindQueues(){
55 foreach($this->getQueueNamesAndRoutingKeys() as $queueName => $routingKey){
56 $queue = new AMQPQueue($this->channel);
57 $queue->setName($queueName);
58 $queue->setFlags(AMQP_DURABLE);
59 $queue->declareQueue();
60 if(!$queue->bind($this->getExchangeName(), $routingKey))
61 throw new Exception('Queue binding failed with parameters: ',
62 json_encode(array('name' => $queueName, 'routingKey' => $routingKey)));
63 }
64 }
65
66 /**
67 * @param mixed $content
68 * @param string $routingKey
69 * @return bool
70 * @throws Exception
71 */
72 protected function send($content, $routingKey = null){
73 if(!in_array($routingKey, $this->getQueueNamesAndRoutingKeys()))
74 throw new Exception('RoutingKey: ' . $routingKey
75 . ' is not found in the routing key list from the function getQueueNamesAndRoutingKeys');
76
77 $jobType = $this->getRabbitMQJobType();
78 if(!$this->validateJobType($jobType))
79 throw new Exception('Invalid Job Type.');
80
81 $message = array(
82 'MType' => $jobType,
83 'Content' => $content,
84 );
85 return $this->exchange->publish(json_encode($message), $routingKey);
86 }
87
88 protected function get($rk) {
89 if (!in_array($rk, $this->getQueueNamesAndRoutingKeys())) {
90 throw new Exception("RoutingKey: $rk is not found");
91 }
92 }
93
94 /**
95 * @param string $jobType
96 * @return bool
97 */
98 protected function validateJobType($jobType){
99 return in_array($jobType, array(
100 self::JOB_TYPE_MAIL,
101 self::JOB_TYPE_TEST,
102 self::JOB_TYPE_STRUCTURE,
103 ));
104 }
105
106 function __destruct(){
107 $this->channel->getConnection()->disconnect();
108 }
109
110
111 /**
112 * @return string
113 */
114 abstract protected function getRabbitMQJobType();
115
116 /**
117 * @return string
118 */
119 abstract protected function getExchangeName();
120
121 /**
122 * @return string
123 */
124 abstract protected function getExchangeType();
125
126
127 /**
128 * @return array queue_name => routing_key
129 */
130 abstract protected function getQueueNamesAndRoutingKeys();
131 }

写一个service继承基类

  1 <?php
2
3 namespace Mission\Service;
4 use BI\Service\RabbitMQJob\Base;
5 use Monolog\Handler\StreamHandler;
6 use Monolog\Logger;
7
8 class PublishToMQService extends Base
9 {
10 private $message;
11 private $logger;
12 private $error;
13 protected $queue = 'mission_queue';
14 protected $routingKey = 'api_update_mission';
15
16 /**
17 * @return bool
18 */
19 public function publish()
20 {
21 if ( false === $this->_validation() )
22 return false;
23
24 $this->getLogger()->addDebug(__METHOD__, array('MQMessage' => $this->message));
25 $this->sendToQueue($this->message, $this->queue);
26
27 return true;
28 }
29
30
31 /**
32 * @param array $message
33 * @return $this
34 */
35 public function setMessage( $message = array() )
36 {
37 $this->message = $message;
38
39 return $this;
40 }
41
42 /**
43 * @param $queue
44 * @return $this
45 */
46 public function setQueue( $queue )
47 {
48 $this->queue = $queue;
49
50 return $this;
51 }
52
53 /**
54 * @param $routingKey
55 * @return $this
56 */
57 public function setRoutingKey( $routingKey )
58 {
59 $this->routingKey = $routingKey;
60
61 return $this;
62 }
63
64 /**
65 * @return Logger
66 */
67 private function getLogger()
68 {
69 if (!($this->logger instanceof Logger)) {
70 $this->logger = new Logger('Detection');
71 $file = __DIR__ . DIRECTORY_SEPARATOR . '../../../logs/queue.log';
72 $this->logger->pushHandler(new StreamHandler( $file, Logger::DEBUG ));
73 }
74 return $this->logger;
75 }
76
77 /**
78 * @return bool
79 */
80 private function _validation()
81 {
82 if ( empty( $this->message ) ) {
83 $this->error = 'Message cannot be empty.';
84 return false;
85 }
86
87 return true;
88 }
89
90 /**
91 * @return string
92 */
93 protected function getExchangeName()
94 {
95 return 'API';
96 }
97
98 /**
99 * @return string
100 */
101 protected function getRabbitMQJobType()
102 {
103 return Base::JOB_TYPE_TEST;
104 }
105
106 /**
107 * @return string
108 */
109 protected function getExchangeType()
110 {
111 return parent::EXCHANGE_TYPE_DIRECCT;
112 }
113
114 /**
115 * @return array queue_name => routing_key
116 */
117 protected function getQueueNamesAndRoutingKeys()
118 {
119 return array(
120 $this->queue => $this->routingKey
121 );
122 }
123
124 private function sendToQueue($content, $queueName)
125 {
126 $key = $this->getQueueNamesAndRoutingKeys();
127 return parent::send($content, $key[$queueName]);
128 }
129
130 /**
131 * @return mixed
132 */
133 public function getError()
134 {
135 return $this->error;
136 }
137
138 }

在代码层调用service

 1 class AlarmController
2 {
3 const QUEUE = 'internal_message';
4 const ROUTING_KEY = 'api_internal_message';
5 public function checkTipAlarm()
6 {
7 //在线,写队列通知新站内信
8 /**@var PublishToMQService $publishHandler*/
9 $publishHandler = $this->get( 'mission.publish.RabbitMQ' );
10
11 $message = array(
12 'act' => self::ACT_SYSTEM_NEW_INMAIL,
13 'psid' => (is_numeric($this->request['to']) ? $this->request['to'] : ''),
14 'uuid' => (!is_numeric($this->request['to']) ? $this->request['to'] : ''),
15 'data' => array(
16 'owner' => $this->uuid
17 )
18 );
19
20 $publishHandler->setMessage( $message )->setRoutingKey( self::ROUTING_KEY )->setQueue( self::QUEUE );
21
22 if ( false === $publishHandler->publish() ) {
23 $this->error = array(
24 'errorMsg' => $publishHandler->getError(),
25 'errorCode' => 1
26 );
27 return false;
28 }
29 }
30 }

微信扫一扫

支付宝扫一扫

本文网址:https://www.zhankr.net/141997.html

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:875 阅读:5,090
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:806 阅读:3,508
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:565 阅读:4,320
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:730 阅读:4,314
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:4,920
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:3,105
发表评论
暂无评论

还没有评论呢,快来抢沙发~

助力内容变现

将您的收入提升到一个新的水平

点击联系客服

在线时间:8:00-16:00

客服电话

400-888-8888

客服邮箱

ceotheme@ceo.com

扫描二维码

关注微信公众号

扫描二维码

手机访问本站