from app import config, log, register, project_cf, speaker_cf, linphone_cf, player import paho.mqtt.client as mqtt import json from app.app_config import * import sys from time import sleep from threading import Lock import os import vlc import threading """ desc: 项目主要功能类 """ class Device(object): def __init__(self, config = config, player = player): self.model = config.model self.hostname = config.hostname self.ipaddr = config.ipaddr self.hard_volume = config.hard_volume self.hard_volume_control = config.hard_volume_control self.exten = config.exten self.exten_password = config.exten_password self.server_ipaddr = config.server_ipaddr self.mac = config.mac self.connect_flags = 1 self.player_current_volume = config.player_current_volume self.init_player_state = config.init_player_state self.current_uri = config.current_uri self.pull_rtsp = 0 self.warning_lock = Lock() self.error_lock = Lock() self.source_error_lock = Lock() #添加服务器播放还是本地播放flag 0:服务器播放 1:本地播放 self.play_action_flag = 0 self.tmp_init_player_state = 0 if not self._check_config(): sys.exit(1) self.player = player self.player.event_manager = self.player.media.event_manager() """ desc: 播放器事件回调注册 Parameters: param1 - 播放器事件名 param2 - 回调函数名 Returns: """ self.player.event_manager.event_attach(vlc.EventType.MediaPlayerEncounteredError, self.MediaPlayerEncounteredError_cb) self.player.event_manager.event_attach(vlc.EventType.VlmMediaInstanceStatusError, self.VlmMediaInstanceStatusError_cb) self.player.event_manager.event_attach(vlc.EventType.MediaPlayerPlaying, self.MediaPlayerPlaying_cb) self.player.event_manager.event_attach(vlc.EventType.MediaPlayerPaused, self.MediaPlayerPaused_cb) self.player.event_manager.event_attach(vlc.EventType.MediaPlayerStopped, self.MediaPlayerStopped_cb) self.vlc_error_count = 0 self.vlc_source_error_count = 0 self.vlc_warning_count = 0 self.mqttClient = mqtt.Client(client_id = self.mac, clean_session = True) self.mqttClient.username_pw_set(username=self.exten, password=self.exten_password) self.mqttClient.on_connect = self.on_connect # 注册监听器 def add_callback(self, event_type, callback): pass # 移除监听器 def remove_callback(self, event_type, callback): pass def MediaPlayerEncounteredError_cb(self, event): # self.set_vlc_source_error_count(self.get_vlc_source_error_count() + 1) # if self.get_vlc_source_error_count() > VLC_SOURCE_ERROR_COUNT: # self.set_vlc_source_error_count(0) # return # else: # log.logger.error("MediaPlayerEncounteredError_cb") # sleep(2) # log.logger.info("Replay uri:%s" % (self.current_uri)) # t = threading.Thread(target = self.player.play, args = (self.current_uri,)) # t.setDaemon(True) # t.start() # log.logger.info("Replay finished!") log.logger.error("Player play error") def VlmMediaInstanceStatusError_cb(self, event): log.logger.error("VlmMediaInstanceStatusError_cb") """ desc: 开始播放事件回调,控制声音渐入 """ def MediaPlayerPlaying_cb(self, event): log.logger.error("MediaPlayerPlaying_cb") t = threading.Thread(target = self.FadeIn, args = (0, self.player_current_volume,)) t.setDaemon(True) t.start() log.logger.info("Set volume FadeIn finished!") def MediaPlayerPaused_cb(self, event): log.logger.error("MediaPlayerPaused_cb") def MediaPlayerStopped_cb(self, event): log.logger.error("MediaPlayerStopped_cb") """ desc: 播放器声音渐入实现函数,从low_volume音量过渡到high_volume Parameters: param1 - 低音量 param2 - 高音量 """ def FadeIn(self, low_volume, high_volume): self.player.set_volume(low_volume) per_volume = (high_volume - low_volume)//10 tmp_volume = low_volume + per_volume i = 0 while i < 10: sleep(0.1) if i == 9: self.player.set_volume(high_volume) else: self.player.set_volume(tmp_volume) tmp_volume = tmp_volume + per_volume i = i + 1 return True """ desc: 播放器声音渐出实现函数,从high_volume音量过渡到low_volume Parameters: param1 - 高音量 param2 - 低音量 """ def FadeOut(self, low_volume, high_volume): per_volume = (high_volume - low_volume)//10 tmp_volume = high_volume - per_volume i = 0 while i < 9: sleep(0.1) if i == 8: self.player.set_volume(low_volume) else: self.player.set_volume(tmp_volume) tmp_volume = tmp_volume - per_volume i = i + 1 return True """ desc: 系统信号处理函数,主要处理键盘输入,如ctrl+c等 Parameters: param1 - 信号数字 param2 - 处理方法 """ def singal_handler(self, singal_num, handler): self.mqttClient.disconnect() self.connect_flags = 0 self.stop_pull_rtsp() log.logger.info("Mqtt disconnect now!") sys.exit(0) """ desc: 设置播放器警告日志数量 Parameters: param1 - 数量 """ def set_vlc_warning_count(self, value): self.warning_lock.acquire() self.vlc_warning_count = value self.warning_lock.release() def get_vlc_warning_count(self): return self.vlc_warning_count def set_vlc_error_count(self, value): self.error_lock.acquire() self.vlc_error_count = value self.error_lock.release() def get_vlc_error_count(self): return self.vlc_error_count def set_vlc_source_error_count(self, value): self.source_error_lock.acquire() self.vlc_source_error_count = value self.source_error_lock.release() def get_vlc_source_error_count(self): return self.vlc_source_error_count """ desc: 连接mqtt服务器 """ def run(self): while self.connect_flags: try: log.logger.info("Mqtt to %s:%d, connectting..." % (self.server_ipaddr, SERVER_MQTT_PORT)) self.mqttClient.connect(self.server_ipaddr, SERVER_MQTT_PORT, MQTT_CONNECT_TIMEOUT) self.connect_flags = 0 except: log.logger.error("Mqtt connect to %s:%d error" % (self.server_ipaddr, SERVER_MQTT_PORT)) log.logger.error("Mqtt reconnect...") sleep(RETRY_SEC) self.mqttClient.loop_start() self.mqttClient.on_message = register.recieve_run self.mqttClient.on_subcribe = self.on_subcribe self.mqttClient.on_disconnect = self.on_disconnect self.mqttClient.on_publish = self.on_publish self.mqttClient.on_socket_close = self.on_socket_close self.mqttClient.subscribe(TOPIC_COMMAND, MQTT_QOS) return True """ desc: mqtt连接事件回调函数 """ def on_connect(self, client, userdata, flags, rc): log.logger.info(mqtt.connack_string(rc)) self._player_reconnect() init_data = self._get_init_data() self.mqttClient.publish(TOPIC_EVENT, json.dumps(init_data).encode("utf-8"), MQTT_QOS) log.logger.info("Publish data: %s" % (init_data)) return True """ desc: 播放器重新连接 """ def _player_reconnect(self): if self.current_uri != "" and self.init_player_state == 1: os.system("/etc/scripts/pa_mute.sh 0 > /dev/null 2>&1 &") self.player.play(self.current_uri) log.logger.info("Player reconnect finished!") sleep(1) else: log.logger.info("Not need reconnect!") return True """ desc: 组装mqtt连接到的服务器时需上报的初始化数据 """ def _get_init_data(self): init_data = {} data = {} data.setdefault(SOLF_VOLUME, self.player_current_volume) data.setdefault(HARD_VOLUME, self.hard_volume) data.setdefault("hard-volume-control", self.hard_volume_control) data.setdefault(EXTEN, self.exten) data.setdefault(DEVICE_MODEL, self.model) status = self.player.get_state() if status == 1: data.setdefault(STATUS, PLAYER_PLAYING) data.setdefault("url", self.current_uri) elif status == 0: data.setdefault(STATUS, PLAYER_PAUSE) elif status == -1: data.setdefault(STATUS, PLAYER_IDLE) else: data.setdefault(STATUS, PLAYER_ERROR) init_data.setdefault(ACTION_NAME, "init") init_data.setdefault(DATA, data) return init_data """ desc: 设置播放器音量 Parameters: param1 - 音量 param2 - 类型 """ def set_soft_volume(self, volume, type = "default"): old_volume = self.player_current_volume self.player_current_volume = volume project_cf.read(PROJECT_CONFIG) project_cf.set("general", "player_init_volume", str(volume)) with open(PROJECT_CONFIG, "w+") as f: project_cf.write(f, space_around_delimiters=False) os.system("sync") if self.player.get_state() == 1: if old_volume > volume: self.FadeOut(volume, old_volume) elif old_volume < volume: self.FadeIn(old_volume, volume) else: pass else: self.player.set_volume(volume) return True """ desc: 设置系统相关数据 Parameters: param1 - 相关数据 """ def set_service_settings(self, data): if "soft-volume" in data: self.player_current_volume = int(data.get("soft-volume")) if "hard-volume" in data: self.set_hard_volume(int(data.get("hard-volume"))) if "hard-volume-control" in data: self.set_hard_volume_control(data.get("hard-volume-control")) return True """ desc: 开始拉取本地摄像头的rtsp流并推送到服务器 Parameters: param1 - 本地摄像头rtsp地址 param2 - 服务器的rtsp地址 """ def start_pull_rtsp(self, src, dst): if not src: log.Logger.error("src not found") return False if not dst: log.Logger.error("dst not found") return False tmp_pull_rtsp = self.pull_rtsp self.stop_pull_rtsp() self.pull_rtsp = tmp_pull_rtsp sleep(1) os.system("%s %s %s > /dev/null 2>&1 &" % (RTSP_PUSHER, src, dst)) log.logger.info("%s %s %s > /dev/null 2>&1 &" % (RTSP_PUSHER, src, dst)) self.pull_rtsp = self.pull_rtsp + 1 log.logger.info("self.pull_rtsp:%d" % (self.pull_rtsp)) return True """ desc: 停止拉取本地摄像头的rtsp流 """ def stop_pull_rtsp(self): os.system("/usr/bin/killall rtsp_pull_push > /dev/null 2>&1 &") self.pull_rtsp = 0 return True """ desc: 设置系统的音量 Parameters: param1 - 系统音量值 """ def set_hard_volume(self, volume): if volume and volume >= 0 and volume <= 10: real_hard_volume = volume * 10 + 9 try: os.system("/usr/bin/amixer -q sset 'Master',0 %d > /dev/null 2>&1 &" % (real_hard_volume)) speaker_cf.read(SPEAKER_CONFIG_FILE) speaker_cf.set("system", "volume_out", str(volume)) with open(SPEAKER_CONFIG_FILE, "w+") as f: speaker_cf.write(f, space_around_delimiters=False) os.system("sync") self.hard_volume = volume except ERROR: log.logger.error("Set hard volume error:\n%s" % (ERROR)) return False else: log.logger.error("Parameter volume not exist") return False return True """ desc: 设置强控,不允许通过硬件按键修改系统音量 Parameters: param1 - 强控开关 """ def set_hard_volume_control(self, value): if value: try: speaker_cf.read(SPEAKER_CONFIG_FILE) speaker_cf.set("volume", "hard_volume_control", value) with open(SPEAKER_CONFIG_FILE, "w+") as f: speaker_cf.write(f, space_around_delimiters=False) os.system("sync") self.hard_volume_control = value except ERROR: log.logger.error("Set hard volume control value:\n%s" % (ERROR)) return False else: log.logger.error("Set hard volume control value not exist") return False return True """ desc: 设置系统分机信息 Parameters: param1 - 服务器地址 param2 - 用户名(分机号) param3 - 分机密码 """ def set_exten(self, host, exten, password): if host and exten and password: try: linphone_cf.read(LINPHONE_CONFIG_FILE) linphone_cf.set("auth_info_0", "username", exten) linphone_cf.set("auth_info_0", "passwd", password) reg_proxy = "" % (host) reg_identity = "sip:%s@%s" % (exten, host) linphone_cf.set("proxy_0", "reg_proxy", reg_proxy) linphone_cf.set("proxy_0", "reg_identity", reg_identity) with open(LINPHONE_CONFIG_FILE, "w+") as f: linphone_cf.write(f, space_around_delimiters=False) os.system("sync") except ERROR: log.logger.error("Set exten error:\n%s" % (ERROR)) return False else: log.logger.error("Set exten value not full") return False return True """ desc: vlc播放指定数据 Parameters: param1 - 播放数据 """ def play(self, data): if "url" in data: url = data.get("url") else: log.logger.warning("Url not found!") return False if "soft-volume" in data: self.set_soft_volume(int(data.get("soft-volume"))) if "hard-volume" in data: self.set_hard_volume(int(data.get("hard-volume"))) if "hard-volume-control" in data: self.set_hard_volume_control(data.get("hard-volume-control")) os.system("/etc/scripts/pa_mute.sh 0 > /dev/null 2>&1 &") try: self.init_player_state = 1 self.current_uri = url self.play_action_flag = 0 try: project_cf.read(PROJECT_CONFIG) project_cf.set("general", "init_player_state", str(self.init_player_state)) project_cf.set("general", "current_uri", str(self.current_uri)) with open(PROJECT_CONFIG, "w+") as f: project_cf.write(f, space_around_delimiters=False) os.system("sync") except ERROR: log.logger.error("Save init player state or current uri error:\n%s" % (ERROR)) return False self.player.stop() self.player.play(url) except: log.logger.error("Player play uri:%s is error" % (url)) return False return True """ desc: 播放器暂停 Parameters: param1 - 暂停模式 """ def pause(self, value): self.init_player_state = 0 if value == "stop": try: self.player.stop() except: log.logger.error("Player pause(stop) is error") else: try: self.player.pause() except: log.logger.error("Player pause is error") try: project_cf.read(PROJECT_CONFIG) project_cf.set("general", "init_player_state", str(self.init_player_state)) with open(PROJECT_CONFIG, "w+") as f: project_cf.write(f, space_around_delimiters=False) os.system("sync") except ERROR: log.logger.error("Save init player state or current uri error:\n%s" % (ERROR)) return True """ desc: 播放器恢复播放 Parameters: param1 - 恢复数据 """ def resume(self, value): self.init_player_state = 1 try: project_cf.read(PROJECT_CONFIG) project_cf.set("general", "init_player_state", str(self.init_player_state)) with open(PROJECT_CONFIG, "w+") as f: project_cf.write(f, space_around_delimiters=False) os.system("sync") except ERROR: log.logger.error("Save init player state or current uri error:\n%s" % (ERROR)) if self.player.get_state() == 1: return True if value == "play": data = {} data.setdefault("url", self.current_uri) try: self.play(data) except: log.logger.error("Player resume(play) is error") else: try: self.player.resume() except: log.logger.error("Player resume is error") return True """ desc: 播放器停止播放 """ def stop(self): self.init_player_state = -1 self.current_uri = "" self.play_action_flag = 0 try: self.FadeOut(0, self.player_current_volume) self.player.stop() try: project_cf.read(PROJECT_CONFIG) project_cf.set("general", "init_player_state", str(self.init_player_state)) project_cf.set("general", "current_uri", "") with open(PROJECT_CONFIG, "w+") as f: project_cf.write(f, space_around_delimiters=False) os.system("sync") except ERROR: log.logger.error("Save init player state or current uri error:\n%s" % (ERROR)) return False except: log.logger.error("Player stop is error") return False os.system("/etc/scripts/pa_mute.sh 1 > /dev/null 2>&1 &") return True def on_subcribe(self, client, userdata, mid, granted_qos): pass def on_publish(self, client, userdata, mid): pass def on_disconnect(self, client, userdata, rc): log.logger.info("Mqtt disconnect!") """ desc: 检测系统的必要配置信息 """ def _check_config(self): if not self.model: log.logger.error("Model not found!") return False elif not self.mac: log.logger.error("Mac not found!") return False elif not self.server_ipaddr: log.logger.error("Server Ipaddr not found!") return False elif not self.exten: log.logger.error("Extension not found!") return False elif self.hard_volume < 0: log.logger.error("Hard Volume error!") return False else: return True """ desc: 监测系统状态变化,如音量,播放状态等 """ def _check_variable(self): old_soft_volume = self.player_current_volume old_hard_volume = self.hard_volume old_exten = self.exten old_player_state = self.player.get_state() old_hard_volume_control = self.hard_volume_control old_pull_rtsp = self.pull_rtsp while True: sleep(CHECK_INTERVAL) data = {} event = {} event.setdefault(ACTION_NAME, TOPIC_UPDATE) event.setdefault(DATA, data) new_soft_volume = self.player_current_volume if old_soft_volume != new_soft_volume: old_soft_volume = new_soft_volume data.setdefault(SOLF_VOLUME, new_soft_volume) project_cf.read(PROJECT_CONFIG) new_hard_volume = int(speaker_cf.get("system", "volume_out")) if old_hard_volume != new_hard_volume: old_hard_volume = new_hard_volume data.setdefault(HARD_VOLUME, new_hard_volume) linphone_cf.read(LINPHONE_CONFIG_FILE) new_exten = linphone_cf.get("auth_info_0", "username") if old_exten != new_exten: old_exten = new_exten data.setdefault(EXTEN, new_exten) if self.init_player_state == 1 and self.player.get_state() != 1: if self.current_uri != "": self._player_reconnect() new_player_state = self.player.get_state() if old_player_state != new_player_state and self.play_action_flag == 0: old_player_state = new_player_state if new_player_state == -1: player_status = PLAYER_IDLE elif new_player_state == 0: player_status = PLAYER_PAUSE elif new_player_state == 1: player_status = PLAYER_PLAYING else: player_status = PLAYER_ERROR data.setdefault(STATUS, player_status) speaker_cf.read(SPEAKER_CONFIG_FILE) new_hard_volume_control = speaker_cf.get("volume", "hard_volume_control") if old_hard_volume_control != new_hard_volume_control: old_hard_volume_control = new_hard_volume_control data.setdefault(HARD_VOLUME_CONTROL, new_hard_volume_control) new_pull_rtsp = self.pull_rtsp if old_pull_rtsp != new_pull_rtsp: old_pull_rtsp = new_pull_rtsp if new_pull_rtsp != 0: data.setdefault("pull-rtsp", 1) else: data.setdefault("pull-rtsp", 0) if event.get(DATA) != {}: try: self.mqttClient.publish(TOPIC_EVENT, json.dumps(event).encode('utf-8'), MQTT_QOS) log.logger.info("Publish event:%s" % (event)) except ERROR: log.logger.error("Publish event:\n%s" % (ERROR)) """ desc: 监测fifo """ def _fifo(self): if not os.path.exists(FIFO_PATH): os.mkfifo(FIFO_PATH) rf = os.open(FIFO_PATH, os.O_RDWR) while True: data = os.read(rf, 64) data = data.decode("utf-8") log.logger.info("received msg from fifo:%s" % (data)) if data == "0": if self.player.get_state() == 1: self.play_action_flag = 1 self.tmp_init_player_state = self.init_player_state self.init_player_state = 0 self.player.stop() elif data == "1": self.play_action_flag = 0 self.init_player_state = self.tmp_init_player_state else: log.logger.error("No support this action!") os.close(rf) """ desc: 启动监测线程 """ def check_start(self): try: t = threading.Thread(target = self._check_variable, args = ()) t.setDaemon(True) t.start() return True except ERROR: log.logger.error(ERROR) """ desc: 启动fifo监测 """ def check_fifo_start(self): try: t = threading.Thread(target = self._fifo, args = ()) t.setDaemon(True) t.start() return True except ERROR: log.logger.error(ERROR) """ desc: socket断线回调 """ def on_socket_close(self, client, userdata, sock): log.logger.error("Socket error(disconnect) occur!")