* @since 2.0 */ class MqttController extends Controller { /** * @var string 使用此队列入库到数据库 */ public $redis_name_mqtt_sub_list = "console:cxaibc:mqtt:sub:list"; /** * @var string 球车数据键 */ public $redis_name_mqtt_qc_data = "api:cxaibc:mqtt:data:"; public $mqtt; /** * This command echoes what you have entered as the message. * @param string $message the message to be echoed. * @return int Exit code */ public function actionIndex() { global $argv; if(empty($argv[2])){ var_dump("启动失败,参数错误"); exit(); } if($argv[2] == 'sub_mqtt'){ $this->subMqttData(); return ExitCode::OK; } /*if($argv[2] == 'pub_mqtt'){ $this->pubMqttData(); return ExitCode::OK; }*/ return ExitCode::OK; } /** * @ Author : Lw * @ CreateTime : 2022-10-31 * @ Info : 连接mqtt */ public function connectMqtt(){ try{ if(!empty($this->mqtt)){ $this->mqtt->publish('test/ping',json_encode(['msg'=>'test','time'=>time()]),0,true); return true; } var_dump("连接mqtt"); $server = 'k676f641.cn-shenzhen.emqx.cloud'; $port = 11732; $clientId = md5(microtime(true)); $username = '123456'; $password = '123456'; $this->mqtt = new MqttClient($server, $port, $clientId); $this->mqtt->connect($username,$password); var_dump("连接mqtt成功"); }catch (\Exception $e){ $this->mqtt = null; var_dump($e->getMessage()); var_dump($e->getFile()); var_dump($e->getLine()); }catch (\Error $e){ $this->mqtt = null; var_dump($e->getMessage()); var_dump($e->getFile()); var_dump($e->getLine()); } } /** * @ Author : Lw * @ CreateTime : 2022-10-31 * @ Info : 订阅mqtt消息 */ public function subMqttData(){ var_dump("订阅mqtt消息"); while (True){ try{ var_dump("订阅mqtt消息"); $this->connectMqtt(); $this->mqtt->subscribe("+/Detail", function ($topic, $message) { var_dump($topic,$message,$this->getBytes($message)); /*$explode = explode('/',$topic); if(!in_array($explode[1],['Detail','Warn'])){ return false; } var_dump($explode,$message); \Yii::$app->redis->lpush($this->redis_name_mqtt_list,json_encode([ 'id' => $explode[0], 'data' => $message, ])); \Yii::$app->redis->set($this->redis_name_mqtt_qc_data.$explode[0],json_encode([ 'data' => $message, 'time' => time(), ]));*/ },0); $this->mqtt->loop(true); }catch (\Exception $e){ $this->mqtt = null; var_dump($e->getMessage()); var_dump($e->getFile()); var_dump($e->getLine()); }catch (\Error $e){ $this->mqtt = null; var_dump($e->getMessage()); var_dump($e->getFile()); var_dump($e->getLine()); } sleep(1); } } }