|
- from app import config, log, register, project_cf, speaker_cf, linphone_cf, player
- import paho.mqtt.client as mqtt
- import json
- from app.app_config import *
- import sys
- from time import sleep
- from threading import Lock
- import os
- import vlc
- import threading
- """
- desc:
- 项目主要功能类
- """
- class Device(object):
- def __init__(self, config = config, player = player):
- self.model = config.model
- self.hostname = config.hostname
- self.ipaddr = config.ipaddr
- self.hard_volume = config.hard_volume
- self.hard_volume_control = config.hard_volume_control
- self.exten = config.exten
- self.exten_password = config.exten_password
- self.server_ipaddr = config.server_ipaddr
- self.mac = config.mac
- self.connect_flags = 1
- self.player_current_volume = config.player_current_volume
- self.init_player_state = config.init_player_state
- self.current_uri = config.current_uri
- self.pull_rtsp = 0
- self.warning_lock = Lock()
- self.error_lock = Lock()
- self.source_error_lock = Lock()
- #添加服务器播放还是本地播放flag 0:服务器播放 1:本地播放
- self.play_action_flag = 0
- self.tmp_init_player_state = 0
- if not self._check_config():
- sys.exit(1)
-
- self.player = player
- self.player.event_manager = self.player.media.event_manager()
- """
- desc:
- 播放器事件回调注册
- Parameters:
- param1 - 播放器事件名
- param2 - 回调函数名
-
- Returns:
- """
- self.player.event_manager.event_attach(vlc.EventType.MediaPlayerEncounteredError, self.MediaPlayerEncounteredError_cb)
- self.player.event_manager.event_attach(vlc.EventType.VlmMediaInstanceStatusError, self.VlmMediaInstanceStatusError_cb)
- self.player.event_manager.event_attach(vlc.EventType.MediaPlayerPlaying, self.MediaPlayerPlaying_cb)
- self.player.event_manager.event_attach(vlc.EventType.MediaPlayerPaused, self.MediaPlayerPaused_cb)
- self.player.event_manager.event_attach(vlc.EventType.MediaPlayerStopped, self.MediaPlayerStopped_cb)
- self.vlc_error_count = 0
- self.vlc_source_error_count = 0
- self.vlc_warning_count = 0
- self.mqttClient = mqtt.Client(client_id = self.mac, clean_session = True)
- self.mqttClient.username_pw_set(username=self.exten, password=self.exten_password)
- self.mqttClient.on_connect = self.on_connect
-
- # 注册监听器
- def add_callback(self, event_type, callback):
- pass
- # 移除监听器
- def remove_callback(self, event_type, callback):
- pass
- def MediaPlayerEncounteredError_cb(self, event):
- # self.set_vlc_source_error_count(self.get_vlc_source_error_count() + 1)
- # if self.get_vlc_source_error_count() > VLC_SOURCE_ERROR_COUNT:
- # self.set_vlc_source_error_count(0)
- # return
- # else:
- # log.logger.error("MediaPlayerEncounteredError_cb")
- # sleep(2)
- # log.logger.info("Replay uri:%s" % (self.current_uri))
- # t = threading.Thread(target = self.player.play, args = (self.current_uri,))
- # t.setDaemon(True)
- # t.start()
- # log.logger.info("Replay finished!")
- log.logger.error("Player play error")
- def VlmMediaInstanceStatusError_cb(self, event):
- log.logger.error("VlmMediaInstanceStatusError_cb")
- """
- desc:
- 开始播放事件回调,控制声音渐入
- """
- def MediaPlayerPlaying_cb(self, event):
- log.logger.error("MediaPlayerPlaying_cb")
- t = threading.Thread(target = self.FadeIn, args = (0, self.player_current_volume,))
- t.setDaemon(True)
- t.start()
- log.logger.info("Set volume FadeIn finished!")
- def MediaPlayerPaused_cb(self, event):
- log.logger.error("MediaPlayerPaused_cb")
- def MediaPlayerStopped_cb(self, event):
- log.logger.error("MediaPlayerStopped_cb")
- """
- desc:
- 播放器声音渐入实现函数,从low_volume音量过渡到high_volume
-
- Parameters:
- param1 - 低音量
- param2 - 高音量
- """
- def FadeIn(self, low_volume, high_volume):
- self.player.set_volume(low_volume)
- per_volume = (high_volume - low_volume)//10
- tmp_volume = low_volume + per_volume
- i = 0
- while i < 10:
- sleep(0.1)
- if i == 9:
- self.player.set_volume(high_volume)
- else:
- self.player.set_volume(tmp_volume)
- tmp_volume = tmp_volume + per_volume
- i = i + 1
- return True
- """
- desc:
- 播放器声音渐出实现函数,从high_volume音量过渡到low_volume
-
- Parameters:
- param1 - 高音量
- param2 - 低音量
- """
- def FadeOut(self, low_volume, high_volume):
- per_volume = (high_volume - low_volume)//10
- tmp_volume = high_volume - per_volume
- i = 0
- while i < 9:
- sleep(0.1)
- if i == 8:
- self.player.set_volume(low_volume)
- else:
- self.player.set_volume(tmp_volume)
- tmp_volume = tmp_volume - per_volume
- i = i + 1
- return True
- """
- desc:
- 系统信号处理函数,主要处理键盘输入,如ctrl+c等
-
- Parameters:
- param1 - 信号数字
- param2 - 处理方法
- """
- def singal_handler(self, singal_num, handler):
- self.mqttClient.disconnect()
- self.connect_flags = 0
- self.stop_pull_rtsp()
- log.logger.info("Mqtt disconnect now!")
- sys.exit(0)
-
- """
- desc:
- 设置播放器警告日志数量
-
- Parameters:
- param1 - 数量
- """
- def set_vlc_warning_count(self, value):
- self.warning_lock.acquire()
- self.vlc_warning_count = value
- self.warning_lock.release()
-
- def get_vlc_warning_count(self):
- return self.vlc_warning_count
- def set_vlc_error_count(self, value):
- self.error_lock.acquire()
- self.vlc_error_count = value
- self.error_lock.release()
-
- def get_vlc_error_count(self):
- return self.vlc_error_count
-
- def set_vlc_source_error_count(self, value):
- self.source_error_lock.acquire()
- self.vlc_source_error_count = value
- self.source_error_lock.release()
-
- def get_vlc_source_error_count(self):
- return self.vlc_source_error_count
- """
- desc:
- 连接mqtt服务器
- """
- def run(self):
- while self.connect_flags:
- try:
- log.logger.info("Mqtt to %s:%d, connectting..." % (self.server_ipaddr, SERVER_MQTT_PORT))
- self.mqttClient.connect(self.server_ipaddr, SERVER_MQTT_PORT, MQTT_CONNECT_TIMEOUT)
- self.connect_flags = 0
- except:
- log.logger.error("Mqtt connect to %s:%d error" % (self.server_ipaddr, SERVER_MQTT_PORT))
- log.logger.error("Mqtt reconnect...")
- sleep(RETRY_SEC)
- self.mqttClient.loop_start()
- self.mqttClient.on_message = register.recieve_run
- self.mqttClient.on_subcribe = self.on_subcribe
- self.mqttClient.on_disconnect = self.on_disconnect
- self.mqttClient.on_publish = self.on_publish
- self.mqttClient.on_socket_close = self.on_socket_close
- self.mqttClient.subscribe(TOPIC_COMMAND, MQTT_QOS)
- return True
-
- """
- desc:
- mqtt连接事件回调函数
- """
- def on_connect(self, client, userdata, flags, rc):
- log.logger.info(mqtt.connack_string(rc))
- self._player_reconnect()
- init_data = self._get_init_data()
- self.mqttClient.publish(TOPIC_EVENT, json.dumps(init_data).encode("utf-8"), MQTT_QOS)
- log.logger.info("Publish data: %s" % (init_data))
- return True
- """
- desc:
- 播放器重新连接
- """
- def _player_reconnect(self):
- if self.current_uri != "" and self.init_player_state == 1:
- os.system("/etc/scripts/pa_mute.sh 0 > /dev/null 2>&1 &")
- self.player.play(self.current_uri)
- log.logger.info("Player reconnect finished!")
- sleep(1)
- else:
- log.logger.info("Not need reconnect!")
- return True
-
- """
- desc:
- 组装mqtt连接到的服务器时需上报的初始化数据
- """
- def _get_init_data(self):
- init_data = {}
- data = {}
- data.setdefault(SOLF_VOLUME, self.player_current_volume)
- data.setdefault(HARD_VOLUME, self.hard_volume)
- data.setdefault("hard-volume-control", self.hard_volume_control)
- data.setdefault(EXTEN, self.exten)
- data.setdefault(DEVICE_MODEL, self.model)
- status = self.player.get_state()
- if status == 1:
- data.setdefault(STATUS, PLAYER_PLAYING)
- data.setdefault("url", self.current_uri)
- elif status == 0:
- data.setdefault(STATUS, PLAYER_PAUSE)
- elif status == -1:
- data.setdefault(STATUS, PLAYER_IDLE)
- else:
- data.setdefault(STATUS, PLAYER_ERROR)
- init_data.setdefault(ACTION_NAME, "init")
- init_data.setdefault(DATA, data)
- return init_data
-
- """
- desc:
- 设置播放器音量
-
- Parameters:
- param1 - 音量
- param2 - 类型
- """
- def set_soft_volume(self, volume, type = "default"):
- old_volume = self.player_current_volume
- self.player_current_volume = volume
- project_cf.read(PROJECT_CONFIG)
- project_cf.set("general", "player_init_volume", str(volume))
- with open(PROJECT_CONFIG, "w+") as f:
- project_cf.write(f, space_around_delimiters=False)
- os.system("sync")
- if self.player.get_state() == 1:
- if old_volume > volume:
- self.FadeOut(volume, old_volume)
- elif old_volume < volume:
- self.FadeIn(old_volume, volume)
- else:
- pass
- else:
- self.player.set_volume(volume)
- return True
- """
- desc:
- 设置系统相关数据
-
- Parameters:
- param1 - 相关数据
- """
- def set_service_settings(self, data):
- if "soft-volume" in data:
- self.player_current_volume = int(data.get("soft-volume"))
- if "hard-volume" in data:
- self.set_hard_volume(int(data.get("hard-volume")))
- if "hard-volume-control" in data:
- self.set_hard_volume_control(data.get("hard-volume-control"))
- return True
- """
- desc:
- 开始拉取本地摄像头的rtsp流并推送到服务器
-
- Parameters:
- param1 - 本地摄像头rtsp地址
- param2 - 服务器的rtsp地址
- """
- def start_pull_rtsp(self, src, dst):
- if not src:
- log.Logger.error("src not found")
- return False
- if not dst:
- log.Logger.error("dst not found")
- return False
- tmp_pull_rtsp = self.pull_rtsp
- self.stop_pull_rtsp()
- self.pull_rtsp = tmp_pull_rtsp
- sleep(1)
- os.system("%s %s %s > /dev/null 2>&1 &" % (RTSP_PUSHER, src, dst))
- log.logger.info("%s %s %s > /dev/null 2>&1 &" % (RTSP_PUSHER, src, dst))
- self.pull_rtsp = self.pull_rtsp + 1
- log.logger.info("self.pull_rtsp:%d" % (self.pull_rtsp))
- return True
- """
- desc:
- 停止拉取本地摄像头的rtsp流
- """
- def stop_pull_rtsp(self):
- os.system("/usr/bin/killall rtsp_pull_push > /dev/null 2>&1 &")
- self.pull_rtsp = 0
- return True
- """
- desc:
- 设置系统的音量
-
- Parameters:
- param1 - 系统音量值
- """
- def set_hard_volume(self, volume):
- if volume and volume >= 0 and volume <= 10:
- real_hard_volume = volume * 10 + 9
- try:
- os.system("/usr/bin/amixer -q sset 'Master',0 %d > /dev/null 2>&1 &" % (real_hard_volume))
- speaker_cf.read(SPEAKER_CONFIG_FILE)
- speaker_cf.set("system", "volume_out", str(volume))
- with open(SPEAKER_CONFIG_FILE, "w+") as f:
- speaker_cf.write(f, space_around_delimiters=False)
- os.system("sync")
- self.hard_volume = volume
- except ERROR:
- log.logger.error("Set hard volume error:\n%s" % (ERROR))
- return False
- else:
- log.logger.error("Parameter volume not exist")
- return False
- return True
-
- """
- desc:
- 设置强控,不允许通过硬件按键修改系统音量
-
- Parameters:
- param1 - 强控开关
- """
- def set_hard_volume_control(self, value):
- if value:
- try:
- speaker_cf.read(SPEAKER_CONFIG_FILE)
- speaker_cf.set("volume", "hard_volume_control", value)
- with open(SPEAKER_CONFIG_FILE, "w+") as f:
- speaker_cf.write(f, space_around_delimiters=False)
- os.system("sync")
- self.hard_volume_control = value
- except ERROR:
- log.logger.error("Set hard volume control value:\n%s" % (ERROR))
- return False
- else:
- log.logger.error("Set hard volume control value not exist")
- return False
- return True
- """
- desc:
- 设置系统分机信息
-
- Parameters:
- param1 - 服务器地址
- param2 - 用户名(分机号)
- param3 - 分机密码
- """
- def set_exten(self, host, exten, password):
- if host and exten and password:
- try:
- linphone_cf.read(LINPHONE_CONFIG_FILE)
- linphone_cf.set("auth_info_0", "username", exten)
- linphone_cf.set("auth_info_0", "passwd", password)
- reg_proxy = "<sip:%s>" % (host)
- reg_identity = "sip:%s@%s" % (exten, host)
- linphone_cf.set("proxy_0", "reg_proxy", reg_proxy)
- linphone_cf.set("proxy_0", "reg_identity", reg_identity)
- with open(LINPHONE_CONFIG_FILE, "w+") as f:
- linphone_cf.write(f, space_around_delimiters=False)
- os.system("sync")
- except ERROR:
- log.logger.error("Set exten error:\n%s" % (ERROR))
- return False
- else:
- log.logger.error("Set exten value not full")
- return False
-
- return True
-
- """
- desc:
- vlc播放指定数据
-
- Parameters:
- param1 - 播放数据
- """
- def play(self, data):
- if "url" in data:
- url = data.get("url")
- else:
- log.logger.warning("Url not found!")
- return False
- if "soft-volume" in data:
- self.set_soft_volume(int(data.get("soft-volume")))
-
- if "hard-volume" in data:
- self.set_hard_volume(int(data.get("hard-volume")))
- if "hard-volume-control" in data:
- self.set_hard_volume_control(data.get("hard-volume-control"))
- os.system("/etc/scripts/pa_mute.sh 0 > /dev/null 2>&1 &")
- try:
- self.init_player_state = 1
- self.current_uri = url
- self.play_action_flag = 0
- try:
- project_cf.read(PROJECT_CONFIG)
- project_cf.set("general", "init_player_state", str(self.init_player_state))
- project_cf.set("general", "current_uri", str(self.current_uri))
- with open(PROJECT_CONFIG, "w+") as f:
- project_cf.write(f, space_around_delimiters=False)
- os.system("sync")
- except ERROR:
- log.logger.error("Save init player state or current uri error:\n%s" % (ERROR))
- return False
- self.player.stop()
- self.player.play(url)
- except:
- log.logger.error("Player play uri:%s is error" % (url))
- return False
- return True
-
- """
- desc:
- 播放器暂停
-
- Parameters:
- param1 - 暂停模式
- """
- def pause(self, value):
- self.init_player_state = 0
- if value == "stop":
- try:
- self.player.stop()
- except:
- log.logger.error("Player pause(stop) is error")
- else:
- try:
- self.player.pause()
- except:
- log.logger.error("Player pause is error")
-
- try:
- project_cf.read(PROJECT_CONFIG)
- project_cf.set("general", "init_player_state", str(self.init_player_state))
- with open(PROJECT_CONFIG, "w+") as f:
- project_cf.write(f, space_around_delimiters=False)
- os.system("sync")
- except ERROR:
- log.logger.error("Save init player state or current uri error:\n%s" % (ERROR))
-
- return True
-
- """
- desc:
- 播放器恢复播放
-
- Parameters:
- param1 - 恢复数据
- """
- def resume(self, value):
- self.init_player_state = 1
- try:
- project_cf.read(PROJECT_CONFIG)
- project_cf.set("general", "init_player_state", str(self.init_player_state))
- with open(PROJECT_CONFIG, "w+") as f:
- project_cf.write(f, space_around_delimiters=False)
- os.system("sync")
- except ERROR:
- log.logger.error("Save init player state or current uri error:\n%s" % (ERROR))
-
- if self.player.get_state() == 1:
- return True
- if value == "play":
- data = {}
- data.setdefault("url", self.current_uri)
- try:
- self.play(data)
- except:
- log.logger.error("Player resume(play) is error")
- else:
- try:
- self.player.resume()
- except:
- log.logger.error("Player resume is error")
-
- return True
-
- """
- desc:
- 播放器停止播放
- """
- def stop(self):
- self.init_player_state = -1
- self.current_uri = ""
- self.play_action_flag = 0
- try:
- self.FadeOut(0, self.player_current_volume)
- self.player.stop()
- try:
- project_cf.read(PROJECT_CONFIG)
- project_cf.set("general", "init_player_state", str(self.init_player_state))
- project_cf.set("general", "current_uri", "")
- with open(PROJECT_CONFIG, "w+") as f:
- project_cf.write(f, space_around_delimiters=False)
- os.system("sync")
- except ERROR:
- log.logger.error("Save init player state or current uri error:\n%s" % (ERROR))
- return False
- except:
- log.logger.error("Player stop is error")
- return False
- os.system("/etc/scripts/pa_mute.sh 1 > /dev/null 2>&1 &")
- return True
-
- def on_subcribe(self, client, userdata, mid, granted_qos):
- pass
- def on_publish(self, client, userdata, mid):
- pass
- def on_disconnect(self, client, userdata, rc):
- log.logger.info("Mqtt disconnect!")
- """
- desc:
- 检测系统的必要配置信息
- """
- def _check_config(self):
- if not self.model:
- log.logger.error("Model not found!")
- return False
- elif not self.mac:
- log.logger.error("Mac not found!")
- return False
- elif not self.server_ipaddr:
- log.logger.error("Server Ipaddr not found!")
- return False
- elif not self.exten:
- log.logger.error("Extension not found!")
- return False
- elif self.hard_volume < 0:
- log.logger.error("Hard Volume error!")
- return False
- else:
- return True
-
- """
- desc:
- 监测系统状态变化,如音量,播放状态等
- """
- def _check_variable(self):
- old_soft_volume = self.player_current_volume
- old_hard_volume = self.hard_volume
- old_exten = self.exten
- old_player_state = self.player.get_state()
- old_hard_volume_control = self.hard_volume_control
- old_pull_rtsp = self.pull_rtsp
- while True:
- sleep(CHECK_INTERVAL)
- data = {}
- event = {}
- event.setdefault(ACTION_NAME, TOPIC_UPDATE)
- event.setdefault(DATA, data)
- new_soft_volume = self.player_current_volume
- if old_soft_volume != new_soft_volume:
- old_soft_volume = new_soft_volume
- data.setdefault(SOLF_VOLUME, new_soft_volume)
-
- project_cf.read(PROJECT_CONFIG)
- new_hard_volume = int(speaker_cf.get("system", "volume_out"))
- if old_hard_volume != new_hard_volume:
- old_hard_volume = new_hard_volume
- data.setdefault(HARD_VOLUME, new_hard_volume)
-
- linphone_cf.read(LINPHONE_CONFIG_FILE)
- new_exten = linphone_cf.get("auth_info_0", "username")
- if old_exten != new_exten:
- old_exten = new_exten
- data.setdefault(EXTEN, new_exten)
- if self.init_player_state == 1 and self.player.get_state() != 1:
- if self.current_uri != "":
- self._player_reconnect()
- new_player_state = self.player.get_state()
- if old_player_state != new_player_state and self.play_action_flag == 0:
- old_player_state = new_player_state
- if new_player_state == -1:
- player_status = PLAYER_IDLE
- elif new_player_state == 0:
- player_status = PLAYER_PAUSE
- elif new_player_state == 1:
- player_status = PLAYER_PLAYING
- else:
- player_status = PLAYER_ERROR
- data.setdefault(STATUS, player_status)
-
- speaker_cf.read(SPEAKER_CONFIG_FILE)
- new_hard_volume_control = speaker_cf.get("volume", "hard_volume_control")
- if old_hard_volume_control != new_hard_volume_control:
- old_hard_volume_control = new_hard_volume_control
- data.setdefault(HARD_VOLUME_CONTROL, new_hard_volume_control)
-
- new_pull_rtsp = self.pull_rtsp
- if old_pull_rtsp != new_pull_rtsp:
- old_pull_rtsp = new_pull_rtsp
- if new_pull_rtsp != 0:
- data.setdefault("pull-rtsp", 1)
- else:
- data.setdefault("pull-rtsp", 0)
-
- if event.get(DATA) != {}:
- try:
- self.mqttClient.publish(TOPIC_EVENT, json.dumps(event).encode('utf-8'), MQTT_QOS)
- log.logger.info("Publish event:%s" % (event))
- except ERROR:
- log.logger.error("Publish event:\n%s" % (ERROR))
- """
- desc:
- 监测fifo
- """
- def _fifo(self):
- if not os.path.exists(FIFO_PATH):
- os.mkfifo(FIFO_PATH)
-
- rf = os.open(FIFO_PATH, os.O_RDWR)
-
- while True:
- data = os.read(rf, 64)
- data = data.decode("utf-8")
- log.logger.info("received msg from fifo:%s" % (data))
- if data == "0":
- if self.player.get_state() == 1:
- self.play_action_flag = 1
- self.tmp_init_player_state = self.init_player_state
- self.init_player_state = 0
- self.player.stop()
- elif data == "1":
- self.play_action_flag = 0
- self.init_player_state = self.tmp_init_player_state
- else:
- log.logger.error("No support this action!")
- os.close(rf)
-
- """
- desc:
- 启动监测线程
- """
- def check_start(self):
- try:
- t = threading.Thread(target = self._check_variable, args = ())
- t.setDaemon(True)
- t.start()
- return True
- except ERROR:
- log.logger.error(ERROR)
- """
- desc:
- 启动fifo监测
- """
- def check_fifo_start(self):
- try:
- t = threading.Thread(target = self._fifo, args = ())
- t.setDaemon(True)
- t.start()
- return True
- except ERROR:
- log.logger.error(ERROR)
- """
- desc:
- socket断线回调
- """
- def on_socket_close(self, client, userdata, sock):
- log.logger.error("Socket error(disconnect) occur!")
|