123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436 |
- # 主業務邏輯中的視圖和路由的定義
- import os
- import datetime
- from flask import render_template, request, session, Response, url_for, make_response
- # 導入藍圖程序,用於構建路由
- from werkzeug.utils import redirect, secure_filename
- from . import main
- from manage import mqtt
- # 導入db,用於操作數據庫
- from manage import db, app
- # 導入實體類,用於操作數據庫
- from ..models import *
- import json
- from datetime import datetime as dt
- from sqlalchemy import text
- import socket
- import pickle
- import cv2
- import numpy as np
- import math
- import threading
- import time
- from flask_mqtt import Mqtt
- import requests
- import re
- import copy
- from selenium import webdriver
- from selenium.webdriver.support.ui import Select
- from bs4 import BeautifulSoup
- from time import sleep as sl
- from app import ALLOWED_EXTENSIONS
- from flask import jsonify
- from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity
- import paramiko
- import git
- import shutil
- from manage import socketio
- # 主頁的訪問路徑
- @main.route('/')
- # @jwt_required
- def main_index():
- # 獲取登入信息
- if 'token' in session and 'uname' in session:
- username = session['uname']
- return redirect('/index')
- else:
- return render_template('sign_in.html')
- # 登入頁面的訪問路徑
- @main.route('/login', methods=['GET', 'POST'])
- def login_views():
- if request.method == 'GET':
- if 'id' in session and 'uname' in session and 'status' in session:
- return redirect('/')
- else:
- return render_template('sign_in.html')
- else:
- # 接收前端傳過來的資料
- username = request.form['username']
- password = request.form['password']
- # 使用接收的用戶和密碼到資料庫中查詢
- user = User.query.filter_by(username=username, password=password).first()
- #檢查原密碼與哈希過的密碼是否吻合
- # if user and user.check_hash_password(password):
- # 如果用戶存在,將信息保存置session並重定向回首頁,否則回登入頁
- if user:
- # resp = redirect('/')
- # resp = make_response('')
- # 判斷是否有記住密碼
- if 'rem' in request.form:
- # sn = str(user.sn)
- # 如果設置了 session.permanent 為 True,那麽過期時間是31天
- session.permanent = True
- #設置cookie
- # max_age = 60 * 60 * 24 * 365
- # resp.set_cookie("username", username, max_age=max_age)
- # resp.set_cookie("sn", sn, max_age=max_age)
- #可設置tokens時效
- expires = datetime.timedelta(minutes=30)
- access_token = create_access_token(identity=user.username, expires_delta=expires)
- # access_token = create_access_token(identity=user.username)
- print(access_token)
- session['token'] = access_token
- session['uname'] = user.username
- # session['id'] = user.sn
- # session['status'] = user.status
- return jsonify(access_token=access_token)
- else:
- errMsg = "Wrong login or password"
- # return render_template('sign_in.html', errMsg=errMsg)
- return jsonify(errMsg=errMsg)
- # 註冊頁面的訪問路徑
- @main.route('/register', methods=['POST', "GET"])
- def register_views():
- if request.method == 'GET':
- return render_template('registration.html')
- else:
- # 獲取文本框的值並賦值給user實體對象
- #哈希加密版本
- # username = request.form['username']
- # password = request.form['password']
- # user = User(username=username, password=password)
- user = User()
- user.firstname = request.form['firstname']
- user.lastname = request.form['lastname']
- user.mail = request.form['email']
- user.phone = request.form['phone']
- user.username = request.form['username']
- user.password = request.form['password']
- user.status = 1
- user.isActive = True
- # 將數據保存進資料庫 - 註冊
- db.session.add(user)
- # 手動提交,目的是為了獲取提交後的user的id
- db.session.commit()
- # 當user成功插入進資料庫之後,程序會自動將所有信息取出來在賦值給user
- # 完成登入的操作
- user = User.query.filter_by(username=user.username).first()
- # 可設置tokens時效
- expires = datetime.timedelta(minutes=30)
- access_token = create_access_token(identity=user.username, expires_delta=expires)
- # access_token = create_access_token(identity=user.username)
- session['token'] = access_token
- # session['id'] = user.sn
- session['uname'] = user.username
- # session['status'] = user.status
- return jsonify(access_token=access_token)
- # return redirect('/')
- #登入重設密碼的頁面
- @main.route('/reset_password', methods=['POST', 'GET'])
- def reset_password_views():
- if request.method == 'GET':
- if 'mail' in session:
- del session['mail']
- return render_template('reset_pwd1.html')
- else:
- #如果有id在session裡,代表從reset_pwd2過來的
- if "mail" in session:
- new_pwd = request.form['new_pwd']
- confirm_pwd = request.form['confirm_pwd']
- #判斷密碼是否一致
- if new_pwd == confirm_pwd:
- mail = session['mail']
- user = User.query.filter_by(mail=mail).first()
- user.password = new_pwd
- db.session.add(user)
- del session['mail']
- #修改完後回登入頁
- return redirect('/login')
- else:
- errMsg = "Passwords does not match"
- return render_template('reset_pwd2.html', errMsg=errMsg)
- email = request.form['email']
- user = User.query.filter_by(mail=email).first()
- if user:
- session['mail'] = user.mail
- return render_template('reset_pwd2.html')
- else:
- errMsg = "Wrong email.Please try again"
- return render_template('reset_pwd1.html', errMsg=errMsg)
- #驗證email訪問路徑
- @main.route('/check_email')
- def check_email_views():
- email = request.args['email']
- user = User.query.filter_by(mail=email).first()
- if user:
- result = {"errMsg":" "}
- else:
- result = {"pass":" "}
- return json.dumps(result)
- #驗證username訪問路徑
- @main.route('/check_username')
- def check_username_views():
- username = request.args['username']
- user = User.query.filter_by(username=username).first()
- if user:
- result = {"errMsg":" "}
- else:
- result = {"pass":" "}
- return json.dumps(result)
- #用戶主介面的訪問路徑
- @main.route('/index', methods=['POST', 'GET'])
- def index_views():
- token = session['token']
- username = session['uname']
- if request.method == 'GET':
- return render_template('index.html', params=locals())
- else:
- pass
- #進貨檢測的訪問路徑
- @main.route('/incoming_check', methods=['POST', 'GET'])
- def incoming_check_views():
- # print("test")
- username = session['uname']
- if request.method == 'GET':
- # print("test2")
- # return jsonify(status=200)
- return render_template('incoming_check.html', params=locals())
- # 方法內可透過get_jwt_identity之前放在token的identity內的內容:使用者名稱
- # identity = get_jwt_identity()
- # return jsonify(identity=identity)
- else:
- pass
- #感測器燒錄的訪問路徑
- @main.route('/burn_program', methods=['POST', 'GET'])
- def burn_program_views():
- username = session['uname']
- if request.method == 'GET':
- sensors = SensorList.query.all()
- sensor_list = []
- for sensor in sensors:
- sensor_list.append(sensor.toDict())
- return render_template('burn_program.html', params=locals())
- else:
- import json
- payload = {}
- if request.form['airTem']:
- payload = {"command":"system_update", "url":"http://60.250.156.230:3000/Benson/TestOTA.git"}
- json = json.dumps(payload)
- mqtt.publish(json)
- res = {"status": 1}
- return json.dumps(res)
- #修改權限的訪問路徑
- @main.route('/modify_permissions', methods=['POST', 'GET'])
- def modify_permissions_views():
- username = session['uname']
- if request.method == 'GET':
- return render_template('modify_permissions.html', params=locals())
- else:
- pass
- #新增感測器的訪問路徑
- @main.route('/add_sensor', methods=['POST', 'GET'])
- def add_sensor_views():
- username = session['uname']
- if request.method == 'GET':
- return render_template('add_sensor.html', params=locals())
- else:
- try:
- ch_name = request.form['ch_name']
- en_name = request.form['en_name']
- version = request.form['version']
- sensor = SensorList()
- sensor.ch_name = ch_name
- sensor.en_name = en_name
- sensor.version = version
- sensor.datetime = dt.now()
- db.session.add(sensor)
- db.session.commit()
- return "新增成功!"
- except Exception as e:
- return 'ERROR:' + e
- # @socketio.on('connect_event')
- # def connected_msg(msg):
- # print("connect success")
- # print(msg)
- # socketio.emit('stdout', 'success')
- # print(socketio)
- #
- # D = {"ssh_content":0}
- #
- # #建置伺服器的訪問路徑
- # @main.route('/add_server', methods=['POST', 'GET'])
- # def add_server_views():
- # username = session['uname']
- # L = []
- # if request.method == 'GET':
- # repo = git.Repo.init(path='.')
- # new_repo = git.Repo.clone_from(url='http://60.250.156.230:3000/benson/TestSSH.git', to_path='../new')
- # with open('/home/benson/Project/new/All_installv3.2.sh') as f:
- # D['ssh_content'] = f.read().split('\n')[0:-1]
- # ssh_content = D['ssh_content']
- # print(D['ssh_content'])
- #
- # #刪除非空資料夾
- # try:
- # shutil.rmtree('/home/benson/Project/new')
- # except OSError as e:
- # print(f"Error:{e.strerror}")
- #
- # for i in range(0, len(D['ssh_content'])):
- # if "#" in D['ssh_content'][i] or D['ssh_content'][i] == '' or D['ssh_content'][i] == ' ':
- # continue
- # if "sudo" in D['ssh_content'][i] or "echo" in D['ssh_content'][i]:
- # L.append(D['ssh_content'][i])
- # return render_template('add_server.html', params=locals())
- # else:
- # print(D['ssh_content'])
- # L = []
- # d = request.form.to_dict()
- # ip = d['ip']
- # print(ip)
- # try:
- # # 建立一個sshclient物件
- # ssh = paramiko.SSHClient()
- # # 允許將信任的主機自動加入到host_allow 列表,此方法必須放在connect方法的前面
- # ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- # # 指定本地的RSA私鑰檔案,如果建立金鑰對時設定的有密碼,password為設定的密碼,如無不用指定password引數
- # # pkey = paramiko.RSAKey.from_private_key_file('/home/super/.ssh/id_rsa', password='12345')
- # # pkey = paramiko.RSAKey.from_private_key_file('/home/ptop/topicjie/scripts/keys/id_rsa')
- # # 建立連線
- # # ssh.connect(hostname=ip,
- # # port=22,
- # # username='gs1801',
- # # pkey=pkey)
- # with open('/home/benson/test11.sh') as f:
- # content = f.read().split('\n')[0:-1]
- # print(content)
- # # L = []
- # # for i in range(0, len(D['ssh_content'])):
- # # if "#" in D['ssh_content'][i] or D['ssh_content'][i] == '' or D['ssh_content'][i] == ' ':
- # # continue
- # # L.append(D['ssh_content'][i])
- # # for i in L:
- # # print(i)
- #
- # ssh.connect(hostname=ip,
- # port=22,
- # username='gs1801',
- # password='g53743001')
- # # os.system("scp /home/benson/test10.sh gs1801@" + ip + ":/home/gs1801/")
- # # 執行命令
- # # stdin, stdout, stderr = ssh.exec_command("ls -l /home/gs1801;touch test7.py")
- # # stdin, stdout, stderr = ssh.exec_command("touch test10.sh;echo '" + content + "' >> test10.sh;bash test10.sh")
- # # print('test1')
- # # print(stdout.read().decode())
- # # print('test2')
- # # print(stderr.read().decode())
- #
- # for command in content:
- # stdin, stdout, stderr = ssh.exec_command(command)
- # #
- # # # print(stdout.read().decode())
- # # # print('----------------------------------------')
- # # # print(stderr.read().decode())
- # #
- # if stderr.read().decode():
- # L.append(0)
- # print('error')
- # socketio.emit('stderr', 'error')
- # else:
- # L.append(1)
- # print('success')
- # print(socketio)
- # socketio.emit('stdout', 'success')
- #
- # print(L)
- # #
- # # item_list = ItemList()
- # # item_list.ip = ip
- # # item_list.pymysql = L[0]
- # # item_list.eventlet = L[1]
- # # datetime = dt.now()
- # # item_list.datetime = datetime
- # # db.session.add(item_list)
- # # db.session.commit()
- #
- # # 關閉連線
- # ssh.close()
- # return "建置結束!"
- # except Exception as e:
- # print(e)
- # return e
- #拿取伺服器建置清單狀態的訪問路徑
- @main.route('/item_list/<ip>', methods=['POST', 'GET'])
- def item_list_views(ip):
- username = session['uname']
- if request.method == 'GET':
- item_list = ItemList.query.filter_by(ip=ip).order_by(text('datetime desc')).first()
- pymysql = item_list.pymysql
- eventlet = item_list.eventlet
- return jsonify(pymysql=pymysql, eventlet=eventlet)
- else:
- pass
- #過濾前端傳過來文件副檔名函數
- def allowed_file(filename):
- return '.' in filename and \
- filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS
- #認證token的訪問路徑
- @main.route('/auth', methods=['POST', 'GET'])
- @jwt_required
- def auth_views():
- if request.method == 'GET':
- identity = get_jwt_identity()
- print(identity)
- return jsonify(identity=identity)
- # 退出的訪問路徑
- @main.route('/logout')
- def logout_views():
- if 'token' in session and 'uname' in session:
- # del session['id']
- del session['uname']
- del session['token']
- # del session['status']
- return redirect('/')
|