# 主業務邏輯中的視圖和路由的定義 import os import datetime from flask import render_template, request, session, Response, url_for, make_response # 導入藍圖程序,用於構建路由 from werkzeug.utils import redirect, secure_filename from . import main from manage import mqtt # 導入db,用於操作數據庫 from manage import db, app # 導入實體類,用於操作數據庫 from ..models import * import json from datetime import datetime as dt from sqlalchemy import text import socket import pickle import cv2 import numpy as np import math import threading import time from flask_mqtt import Mqtt import requests import re import copy from selenium import webdriver from selenium.webdriver.support.ui import Select from bs4 import BeautifulSoup from time import sleep as sl from app import ALLOWED_EXTENSIONS from flask import jsonify from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity import paramiko import git import shutil from manage import socketio # 主頁的訪問路徑 @main.route('/') # @jwt_required def main_index(): # 獲取登入信息 if 'token' in session and 'uname' in session: username = session['uname'] return redirect('/index') else: return render_template('sign_in.html') # 登入頁面的訪問路徑 @main.route('/login', methods=['GET', 'POST']) def login_views(): if request.method == 'GET': if 'id' in session and 'uname' in session and 'status' in session: return redirect('/') else: return render_template('sign_in.html') else: # 接收前端傳過來的資料 username = request.form['username'] password = request.form['password'] # 使用接收的用戶和密碼到資料庫中查詢 user = User.query.filter_by(username=username, password=password).first() #檢查原密碼與哈希過的密碼是否吻合 # if user and user.check_hash_password(password): # 如果用戶存在,將信息保存置session並重定向回首頁,否則回登入頁 if user: # resp = redirect('/') # resp = make_response('') # 判斷是否有記住密碼 if 'rem' in request.form: # sn = str(user.sn) # 如果設置了 session.permanent 為 True,那麽過期時間是31天 session.permanent = True #設置cookie # max_age = 60 * 60 * 24 * 365 # resp.set_cookie("username", username, max_age=max_age) # resp.set_cookie("sn", sn, max_age=max_age) #可設置tokens時效 expires = datetime.timedelta(minutes=30) access_token = create_access_token(identity=user.username, expires_delta=expires) # access_token = create_access_token(identity=user.username) print(access_token) session['token'] = access_token session['uname'] = user.username # session['id'] = user.sn # session['status'] = user.status return jsonify(access_token=access_token) else: errMsg = "Wrong login or password" # return render_template('sign_in.html', errMsg=errMsg) return jsonify(errMsg=errMsg) # 註冊頁面的訪問路徑 @main.route('/register', methods=['POST', "GET"]) def register_views(): if request.method == 'GET': return render_template('registration.html') else: # 獲取文本框的值並賦值給user實體對象 #哈希加密版本 # username = request.form['username'] # password = request.form['password'] # user = User(username=username, password=password) user = User() user.firstname = request.form['firstname'] user.lastname = request.form['lastname'] user.mail = request.form['email'] user.phone = request.form['phone'] user.username = request.form['username'] user.password = request.form['password'] user.status = 1 user.isActive = True # 將數據保存進資料庫 - 註冊 db.session.add(user) # 手動提交,目的是為了獲取提交後的user的id db.session.commit() # 當user成功插入進資料庫之後,程序會自動將所有信息取出來在賦值給user # 完成登入的操作 user = User.query.filter_by(username=user.username).first() # 可設置tokens時效 expires = datetime.timedelta(minutes=30) access_token = create_access_token(identity=user.username, expires_delta=expires) # access_token = create_access_token(identity=user.username) session['token'] = access_token # session['id'] = user.sn session['uname'] = user.username # session['status'] = user.status return jsonify(access_token=access_token) # return redirect('/') #登入重設密碼的頁面 @main.route('/reset_password', methods=['POST', 'GET']) def reset_password_views(): if request.method == 'GET': if 'mail' in session: del session['mail'] return render_template('reset_pwd1.html') else: #如果有id在session裡,代表從reset_pwd2過來的 if "mail" in session: new_pwd = request.form['new_pwd'] confirm_pwd = request.form['confirm_pwd'] #判斷密碼是否一致 if new_pwd == confirm_pwd: mail = session['mail'] user = User.query.filter_by(mail=mail).first() user.password = new_pwd db.session.add(user) del session['mail'] #修改完後回登入頁 return redirect('/login') else: errMsg = "Passwords does not match" return render_template('reset_pwd2.html', errMsg=errMsg) email = request.form['email'] user = User.query.filter_by(mail=email).first() if user: session['mail'] = user.mail return render_template('reset_pwd2.html') else: errMsg = "Wrong email.Please try again" return render_template('reset_pwd1.html', errMsg=errMsg) #驗證email訪問路徑 @main.route('/check_email') def check_email_views(): email = request.args['email'] user = User.query.filter_by(mail=email).first() if user: result = {"errMsg":" "} else: result = {"pass":" "} return json.dumps(result) #驗證username訪問路徑 @main.route('/check_username') def check_username_views(): username = request.args['username'] user = User.query.filter_by(username=username).first() if user: result = {"errMsg":" "} else: result = {"pass":" "} return json.dumps(result) #用戶主介面的訪問路徑 @main.route('/index', methods=['POST', 'GET']) def index_views(): token = session['token'] username = session['uname'] if request.method == 'GET': return render_template('index.html', params=locals()) else: pass #進貨檢測的訪問路徑 @main.route('/incoming_check', methods=['POST', 'GET']) def incoming_check_views(): # print("test") username = session['uname'] if request.method == 'GET': # print("test2") # return jsonify(status=200) return render_template('incoming_check.html', params=locals()) # 方法內可透過get_jwt_identity之前放在token的identity內的內容:使用者名稱 # identity = get_jwt_identity() # return jsonify(identity=identity) else: pass #感測器燒錄的訪問路徑 @main.route('/burn_program', methods=['POST', 'GET']) def burn_program_views(): username = session['uname'] if request.method == 'GET': sensors = SensorList.query.all() sensor_list = [] for sensor in sensors: sensor_list.append(sensor.toDict()) return render_template('burn_program.html', params=locals()) else: import json payload = {} if request.form['airTem']: payload = {"command":"system_update", "url":"http://60.250.156.230:3000/Benson/TestOTA.git"} json = json.dumps(payload) mqtt.publish(json) res = {"status": 1} return json.dumps(res) #修改權限的訪問路徑 @main.route('/modify_permissions', methods=['POST', 'GET']) def modify_permissions_views(): username = session['uname'] if request.method == 'GET': return render_template('modify_permissions.html', params=locals()) else: pass #新增感測器的訪問路徑 @main.route('/add_sensor', methods=['POST', 'GET']) def add_sensor_views(): username = session['uname'] if request.method == 'GET': return render_template('add_sensor.html', params=locals()) else: try: ch_name = request.form['ch_name'] en_name = request.form['en_name'] version = request.form['version'] sensor = SensorList() sensor.ch_name = ch_name sensor.en_name = en_name sensor.version = version sensor.datetime = dt.now() db.session.add(sensor) db.session.commit() return "新增成功!" except Exception as e: return 'ERROR:' + e # @socketio.on('connect_event') # def connected_msg(msg): # print("connect success") # print(msg) # socketio.emit('stdout', 'success') # print(socketio) # # D = {"ssh_content":0} # # #建置伺服器的訪問路徑 # @main.route('/add_server', methods=['POST', 'GET']) # def add_server_views(): # username = session['uname'] # L = [] # if request.method == 'GET': # repo = git.Repo.init(path='.') # new_repo = git.Repo.clone_from(url='http://60.250.156.230:3000/benson/TestSSH.git', to_path='../new') # with open('/home/benson/Project/new/All_installv3.2.sh') as f: # D['ssh_content'] = f.read().split('\n')[0:-1] # ssh_content = D['ssh_content'] # print(D['ssh_content']) # # #刪除非空資料夾 # try: # shutil.rmtree('/home/benson/Project/new') # except OSError as e: # print(f"Error:{e.strerror}") # # for i in range(0, len(D['ssh_content'])): # if "#" in D['ssh_content'][i] or D['ssh_content'][i] == '' or D['ssh_content'][i] == ' ': # continue # if "sudo" in D['ssh_content'][i] or "echo" in D['ssh_content'][i]: # L.append(D['ssh_content'][i]) # return render_template('add_server.html', params=locals()) # else: # print(D['ssh_content']) # L = [] # d = request.form.to_dict() # ip = d['ip'] # print(ip) # try: # # 建立一個sshclient物件 # ssh = paramiko.SSHClient() # # 允許將信任的主機自動加入到host_allow 列表,此方法必須放在connect方法的前面 # ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # # 指定本地的RSA私鑰檔案,如果建立金鑰對時設定的有密碼,password為設定的密碼,如無不用指定password引數 # # pkey = paramiko.RSAKey.from_private_key_file('/home/super/.ssh/id_rsa', password='12345') # # pkey = paramiko.RSAKey.from_private_key_file('/home/ptop/topicjie/scripts/keys/id_rsa') # # 建立連線 # # ssh.connect(hostname=ip, # # port=22, # # username='gs1801', # # pkey=pkey) # with open('/home/benson/test11.sh') as f: # content = f.read().split('\n')[0:-1] # print(content) # # L = [] # # for i in range(0, len(D['ssh_content'])): # # if "#" in D['ssh_content'][i] or D['ssh_content'][i] == '' or D['ssh_content'][i] == ' ': # # continue # # L.append(D['ssh_content'][i]) # # for i in L: # # print(i) # # ssh.connect(hostname=ip, # port=22, # username='gs1801', # password='g53743001') # # os.system("scp /home/benson/test10.sh gs1801@" + ip + ":/home/gs1801/") # # 執行命令 # # stdin, stdout, stderr = ssh.exec_command("ls -l /home/gs1801;touch test7.py") # # stdin, stdout, stderr = ssh.exec_command("touch test10.sh;echo '" + content + "' >> test10.sh;bash test10.sh") # # print('test1') # # print(stdout.read().decode()) # # print('test2') # # print(stderr.read().decode()) # # for command in content: # stdin, stdout, stderr = ssh.exec_command(command) # # # # # print(stdout.read().decode()) # # # print('----------------------------------------') # # # print(stderr.read().decode()) # # # if stderr.read().decode(): # L.append(0) # print('error') # socketio.emit('stderr', 'error') # else: # L.append(1) # print('success') # print(socketio) # socketio.emit('stdout', 'success') # # print(L) # # # # item_list = ItemList() # # item_list.ip = ip # # item_list.pymysql = L[0] # # item_list.eventlet = L[1] # # datetime = dt.now() # # item_list.datetime = datetime # # db.session.add(item_list) # # db.session.commit() # # # 關閉連線 # ssh.close() # return "建置結束!" # except Exception as e: # print(e) # return e #拿取伺服器建置清單狀態的訪問路徑 @main.route('/item_list/', methods=['POST', 'GET']) def item_list_views(ip): username = session['uname'] if request.method == 'GET': item_list = ItemList.query.filter_by(ip=ip).order_by(text('datetime desc')).first() pymysql = item_list.pymysql eventlet = item_list.eventlet return jsonify(pymysql=pymysql, eventlet=eventlet) else: pass #過濾前端傳過來文件副檔名函數 def allowed_file(filename): return '.' in filename and \ filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS #認證token的訪問路徑 @main.route('/auth', methods=['POST', 'GET']) @jwt_required def auth_views(): if request.method == 'GET': identity = get_jwt_identity() print(identity) return jsonify(identity=identity) # 退出的訪問路徑 @main.route('/logout') def logout_views(): if 'token' in session and 'uname' in session: # del session['id'] del session['uname'] del session['token'] # del session['status'] return redirect('/')