123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- # TODO 資料表是否會重複開啟? -> def?
- from logging import info
- from datetime import datetime
- from flask import Flask, render_template, request, jsonify
- import pymysql
- # from flask_sqlalchemy import SQLAlchemy
- # from sqlalchemy.orm import query, session
- from flask_migrate import Migrate
- import time
- from app import create_app
- #app, db, mqtt = create_app()
- app = Flask(__name__)
- # # app.config['SQLALCHEMY_DATABASE_URI'] = "mysql+pymysql://root:g53743001@localhost:3306/coffeetest"
- # app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
- # db = SQLAlchemy(app)
- # # db.init_app(app)
- # Migrate(app, db)
- # 首頁測試
- @app.route('/')
- def index():
- return render_template('index.html')
- # 使用者名稱測試
- @app.route('/user/<name>')
- def user(name):
- return render_template('hello.html', name=name)
- # 新增零件清單項目
- @app.route('/create_component_table_item', methods=['GET', 'POST'])
- def create_component_table_item():
- # 開啟本機 erp 資料庫
- mydb = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='g53743001', database='erp', charset='utf8')
- # mydb = pymysql.connect(host='52.69.200.169', port=3306, user='coffee', password='skyeye', database='Coffee', charset='utf8')
- mycursor = mydb.cursor()
- # 獲取會計科目欄位
- AccountingSubjects = "SELECT * FROM 會計科目表"
- mycursor.execute(AccountingSubjects)
- AccountingSubjects_data = mycursor.fetchall()
- # print("AccountingSubjects_data", AccountingSubjects_data)
- # # (('01', '成品'), ('02', '系統成品'), ('03', '模組半成品'), ('04', '零件'), ('05', '工具'))
- # 獲取類別欄位
- Category = "SELECT * FROM 類別表"
- mycursor.execute(Category)
- Category_data = mycursor.fetchall()
- # 獲取供應商欄位
- Supplier = "SELECT * FROM 供應商"
- mycursor.execute(Supplier)
- Supplier_data = mycursor.fetchall()
- # 獲取零件名稱
- Component = "SELECT 名稱 FROM 零件表"
- mycursor.execute(Component)
- Component_data = mycursor.fetchall()
- return render_template('create_component_table_item.html', title='新增零件', **locals())
- # ERP 測試 四層下拉式選單關聯
- @app.route('/drop_down_list', methods=['GET', 'POST'])
- def drop_down_list():
- conn = pymysql.connect(
- host='52.69.200.169',
- port=3306,
- user='coffee',
- password='skyeye',
- database='Coffee',
- charset='utf8'
- )
- cur = conn.cursor()
- #獲取欄位資料
- sql = "select * from product_info"
- cur.execute(sql)
- content = cur.fetchall()
- #獲取欄位名稱
- sql = "SHOW FIELDS FROM product_info"
- cur.execute(sql)
- labels = cur.fetchall()
- # print("labels: ", labels) # labels: (('產品', 'varchar(4)', 'YES', '', None, ''), ('系統', 'varchar(5)', 'YES', '', None, ''),
- labels = [g[0] for g in labels]
- # print("labels: ", labels) # labels: ['產品', '系統', '系統圖號', '狀態', '進貨狀態', '序號', '組序號',
- return render_template('drop_down_list.html', labels=labels, content=content)
- # ERP 測試
- @app.route('/search', methods=['GET', 'POST'])
- def search():
- # 開啟本機 erp 資料庫
- mydb = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='g53743001', database='erp', charset='utf8')
- # mydb = pymysql.connect(host='52.69.200.169', port=3306, user='coffee', password='skyeye', database='Coffee', charset='utf8')
- mycursor = mydb.cursor()
- # 獲取欄位資料
- # sql = '''SELECT * FROM table_component_name'''
- sql = '''SELECT 內部料號, 會計科目表.會計科目_名稱, 類別表.類別_名稱, 流水號, 供應商.公司, 名稱
- FROM (
- (零件表
- INNER JOIN 供應商
- ON 供應商.供應商 = 零件表.供應商
- )
- INNER JOIN 類別表
- ON 類別表.類別_編號 = 零件表.類別
- )
- INNER JOIN 會計科目表
- ON 會計科目表.會計科目_編號 = 零件表.會計科目
- '''
- mycursor.execute(sql)
- content = mycursor.fetchall()
- #獲取欄位名稱
- sql = "SHOW FIELDS FROM 零件表"
- mycursor.execute(sql)
- labels = mycursor.fetchall()
- # print("labels: ", labels)
- # # labels: (('產品', 'varchar(4)', 'YES', '', None, ''), ('系統', 'varchar(5)', 'YES', '', None, ''),
- labels = [g[0] for g in labels]
- # print("labels: ", labels)
- # # labels: ['產品', '系統', '系統圖號', '狀態', '進貨狀態', '序號', '組序號',
- # 獲取會計科目欄位
- AccountingSubjects = "SELECT * FROM 會計科目表"
- mycursor.execute(AccountingSubjects)
- AccountingSubjects_data = mycursor.fetchall()
- # print("AccountingSubjects_data", AccountingSubjects_data)
- # # (('01', '成品'), ('02', '系統成品'), ('03', '模組半成品'), ('04', '零件'), ('05', '工具'))
- # 獲取類別欄位
- Category = "SELECT * FROM 類別表"
- mycursor.execute(Category)
- Category_data = mycursor.fetchall()
- # 獲取供應商欄位
- Supplier = "SELECT * FROM 供應商"
- mycursor.execute(Supplier)
- Supplier_data = mycursor.fetchall()
- # 獲取零件名稱
- Component = "SELECT 名稱 FROM 零件表"
- mycursor.execute(Component)
- Component_data = mycursor.fetchall()
-
- return render_template('search.html', title = 'ERP Search', **locals())
- # 找到 serial number 最大值後 +1
- @app.route('/sn_get/<AccountSubject>/<Category>', methods=['GET', 'POST'])
- def zn_get(AccountSubject, Category):
- # 開啟本機 erp 資料庫
- mydb = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='g53743001', database='erp', charset='utf8')
- mycursor = mydb.cursor()
- # 取得流水號最大值
- sql = 'SELECT 流水號 FROM 零件表 '
- sql += 'WHERE 會計科目 = "' + AccountSubject + '" AND 類別 = "' + Category + '" ORDER BY 流水號 DESC LIMIT 1'
- print("sql: ", sql)
- mycursor.execute(sql)
- content = mycursor.fetchall()
- try:
- print("content: ", content[0][0], type(content[0][0]))
- new_sn = int(content[0][0])+1
- new_sn = '{0:04d}'.format(new_sn)
- # new_sn = content[0][0].zfill(4)
- except IndexError:
- new_sn = '0001'
-
- return jsonify({"new_sn":new_sn})
- # 從資料表找出符合篩選條件之資料
- @app.route('/sql_get', methods=['GET'])
- def sql_get():
- mydb = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='g53743001', database='erp', charset='utf8')
- mycursor = mydb.cursor()
- # 取得網頁傳回來的 SQL 指令
- # sql = "SELECT * FROM 零件表 WHERE 會計科目 = 04"
- info = request.args.to_dict('sql')
- sql = info['sql']
- print("sql: ", sql)
- mycursor.execute(sql)
- sql_data = mycursor.fetchall()
- # 獲取欄位名稱
- # 拆解取得資料表資料
- # print("sql_data: ", sql_data)
- # # sql_data: (('04030001T01', '04', '03', '0001', 'T01', '電阻-18Ω'), ... )
- # Rita 這裡在測試日期格式修改, 優化後調整
- # if 'INNER JOIN 庫存表' in sql:
- # labels = []
- # for g in sql_data:
- # # timeArray = time.strptime(a, "%Y-%m-%d %H:%M:%S")
- # print("g[0]: ", g[0], type(g[0]), g[0].strftime("%Y-%m-%d %H:%M:%S"))
-
- # timeString = g[0].strftime("%Y-%m-%d %H:%M:%S")
- # print("timeString: ", timeString, type(timeString))
- # labels.append(timeString)
- # # labels.append(time.strftime("%Y-%m-%d %H:%M:%S"))
- # labels.append(g[1])
- # labels.append(g[2])
- # else:
- # labels = [g for g in sql_data]
- labels = [g for g in sql_data]
- # # labels g[1]~g[5]: [('04030001T01', '04', '03', '0001', 'T01', '電阻-18Ω'), ...]
- print("labels: ", labels)
- # 取得欄位資料
- if 'INNER JOIN 規格表' in sql:
- sql_field = "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '規格表'"
- elif 'INNER JOIN 庫存表' in sql:
- sql_field = "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '庫存表' LIMIT 3"
- elif 'INNER JOIN 銷貨表' in sql:
- sql_field = "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '銷貨表'"
- elif 'INNER JOIN 進貨表' in sql:
- sql_field = "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '進貨表'"
- else:
- sql_field = "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '零件表'"
- mycursor.execute(sql_field)
- sql_field = mycursor.fetchall()
- mydb.commit()
-
- return jsonify({"sql_data":sql_data,
- "labels":labels,
- "sql_field":sql_field})
- #自動關閉所有未使用、掛著的連接
- @app.teardown_appcontext
- def shutdown_session(exception=None):
- # db.session.remove()
- pass
- if __name__ == '__main__':
- # Rita 測試 Benson 網頁 http://192.168.50.65:5006/login
- app.run(debug=False, threaded=True, port=5010)
- #app.run(debug=False, threaded=True, port=5006, host='0.0.0.0')
- #使用WSGI開關,避免出現WARNING: This is a development server. d not use it in a production deployment.Use a production WSGI server instead.
- # server = pywsgi.WSGIServer(('0.0.0.0', 5006), app)
- # server.serve_forever()
|