|
- import os
- import datetime
- from flask import render_template, request, session, Response, url_for, make_response
- from werkzeug.utils import redirect, secure_filename
- from . import main
- from manage import mqtt
- from manage import db, app
- from ..models import *
- import json
- from datetime import datetime as dt
- from sqlalchemy import text
- import socket
- import pickle
- import cv2
- import numpy as np
- import math
- import threading
- import time
- from flask_mqtt import Mqtt
- import requests
- import re
- import copy
- from selenium import webdriver
- from selenium.webdriver.support.ui import Select
- from bs4 import BeautifulSoup
- from time import sleep as sl
- from app import ALLOWED_EXTENSIONS
- from flask import jsonify
- from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity
- import paramiko
- import git
- import shutil
- @main.route('/')
- def main_index():
-
- if 'token' in session and 'uname' in session:
- username = session['uname']
- return redirect('/index')
- else:
- return render_template('sign_in.html')
- @main.route('/login', methods=['GET', 'POST'])
- def login_views():
- if request.method == 'GET':
- if 'id' in session and 'uname' in session and 'status' in session:
- return redirect('/')
- else:
- return render_template('sign_in.html')
- else:
-
- username = request.form['username']
- password = request.form['password']
-
- user = User.query.filter_by(username=username, password=password).first()
-
-
-
- if user:
-
-
-
- if 'rem' in request.form:
-
-
- session.permanent = True
-
-
-
-
-
- expires = datetime.timedelta(minutes=30)
- access_token = create_access_token(identity=user.username, expires_delta=expires)
-
- print(access_token)
- session['token'] = access_token
- session['uname'] = user.username
-
-
- return jsonify(access_token=access_token)
- else:
- errMsg = "Wrong login or password"
-
- return jsonify(errMsg=errMsg)
- @main.route('/register', methods=['POST', "GET"])
- def register_views():
- if request.method == 'GET':
- return render_template('registration.html')
- else:
-
-
-
-
-
- user = User()
- user.firstname = request.form['firstname']
- user.lastname = request.form['lastname']
- user.mail = request.form['email']
- user.phone = request.form['phone']
- user.username = request.form['username']
- user.password = request.form['password']
- user.status = 1
- user.isActive = True
-
- db.session.add(user)
-
- db.session.commit()
-
-
- user = User.query.filter_by(username=user.username).first()
-
- expires = datetime.timedelta(minutes=30)
- access_token = create_access_token(identity=user.username, expires_delta=expires)
-
- session['token'] = access_token
-
- session['uname'] = user.username
-
- return jsonify(access_token=access_token)
-
- @main.route('/reset_password', methods=['POST', 'GET'])
- def reset_password_views():
- if request.method == 'GET':
- if 'mail' in session:
- del session['mail']
- return render_template('reset_pwd1.html')
- else:
-
- if "mail" in session:
- new_pwd = request.form['new_pwd']
- confirm_pwd = request.form['confirm_pwd']
-
- if new_pwd == confirm_pwd:
- mail = session['mail']
- user = User.query.filter_by(mail=mail).first()
- user.password = new_pwd
- db.session.add(user)
- del session['mail']
-
- return redirect('/login')
- else:
- errMsg = "Passwords does not match"
- return render_template('reset_pwd2.html', errMsg=errMsg)
- email = request.form['email']
- user = User.query.filter_by(mail=email).first()
- if user:
- session['mail'] = user.mail
- return render_template('reset_pwd2.html')
- else:
- errMsg = "Wrong email.Please try again"
- return render_template('reset_pwd1.html', errMsg=errMsg)
- @main.route('/check_email')
- def check_email_views():
- email = request.args['email']
- user = User.query.filter_by(mail=email).first()
- if user:
- result = {"errMsg":" "}
- else:
- result = {"pass":" "}
- return json.dumps(result)
- @main.route('/check_username')
- def check_username_views():
- username = request.args['username']
- user = User.query.filter_by(username=username).first()
- if user:
- result = {"errMsg":" "}
- else:
- result = {"pass":" "}
- return json.dumps(result)
- @main.route('/index', methods=['POST', 'GET'])
- def index_views():
- token = session['token']
- username = session['uname']
- if request.method == 'GET':
- return render_template('index.html', params=locals())
- else:
- pass
- @main.route('/incoming_check', methods=['POST', 'GET'])
- def incoming_check_views():
-
- username = session['uname']
- if request.method == 'GET':
-
-
- return render_template('incoming_check.html', params=locals())
-
-
-
- else:
- pass
- @main.route('/burn_program', methods=['POST', 'GET'])
- def burn_program_views():
- username = session['uname']
- if request.method == 'GET':
- sensors = SensorList.query.all()
- sensor_list = []
- for sensor in sensors:
- sensor_list.append(sensor.toDict())
- return render_template('burn_program.html', params=locals())
- else:
- import json
- payload = {}
-
- if request.form['airTem']:
- payload = {"command":"system_update", "url":"http://60.250.156.230:3000/Benson/TestOTA.git"}
- json = json.dumps(payload)
- mqtt.publish(json)
- res = {"status": 1}
- return json.dumps(res)
- @main.route('/modify_permissions', methods=['POST', 'GET'])
- def modify_permissions_views():
- username = session['uname']
- if request.method == 'GET':
- return render_template('modify_permissions.html', params=locals())
- else:
- pass
- @main.route('/add_sensor', methods=['POST', 'GET'])
- def add_sensor_views():
- username = session['uname']
- if request.method == 'GET':
- return render_template('add_sensor.html', params=locals())
- else:
- try:
- ch_name = request.form['ch_name']
- en_name = request.form['en_name']
- version = request.form['version']
- sensor = SensorList()
- sensor.ch_name = ch_name
- sensor.en_name = en_name
- sensor.version = version
- sensor.datetime = dt.now()
- db.session.add(sensor)
- db.session.commit()
- return "新增成功!"
- except Exception as e:
- return 'ERROR:' + e
- @main.route('/item_list/<ip>', methods=['POST', 'GET'])
- def item_list_views(ip):
- username = session['uname']
- if request.method == 'GET':
- item_list = ItemList.query.filter_by(ip=ip).order_by(text('datetime desc')).first()
- pymysql = item_list.pymysql
- eventlet = item_list.eventlet
- return jsonify(pymysql=pymysql, eventlet=eventlet)
- else:
- pass
- def allowed_file(filename):
- return '.' in filename and \
- filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS
- @main.route('/auth', methods=['POST', 'GET'])
- @jwt_required()
- def auth_views():
- if request.method == 'GET':
- identity = get_jwt_identity()
- print(identity)
- return jsonify(identity=identity)
- @main.route('/logout')
- def logout_views():
- if 'token' in session and 'uname' in session:
-
- del session['uname']
- del session['token']
-
- return redirect('/')
|