123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584 |
- import uvicorn
- import warnings
- import os
- from fastapi import FastAPI, UploadFile, File, BackgroundTasks, routing
- from openpyxl import load_workbook
- from openpyxl.utils.cell import coordinate_from_string
- from openpyxl.comments import Comment
- from openpyxl.styles import PatternFill
- from fastapi.middleware.cors import CORSMiddleware
- from datetime import datetime
- from fastapi.responses import FileResponse
- from fastapi.staticfiles import StaticFiles
- import asyncio
- from concurrent.futures.process import ProcessPoolExecutor
- from fastapi.responses import StreamingResponse
- import shutil
- import uuid
- import time
- warnings.filterwarnings("ignore")
- app = FastAPI()
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- shared_dir = "download_cache"
- app.mount(f"/{shared_dir}", StaticFiles(directory=shared_dir), name={shared_dir})
- diff_dir = "diff_cache"
- app.mount(f"/{diff_dir}", StaticFiles(directory=diff_dir), name={diff_dir})
- cur_cache_path = "cur_cache/"
- def get_title_row(sheet):
- title_row_num = -1
- row_range = sheet[1:5]
- for i, r in enumerate(row_range):
- for j, c in enumerate(r):
- print(f"第{i + 1 }行,第{j}列,值:{c.value}")
- if "证件号码" == c.value or "收入(元)" == c.value or "务工月收入" == c.value:
- title_row_num = c.row
- return title_row_num
- def get_all_numbers(sheet, start_row, cow):
- keys = {}
- for i in range(start_row, sheet.max_row * 2):
- id_number = sheet[f"{cow}{i}"].value
- if id_number is None:
- break
- keys[id_number] = i
- return keys
- def deal_diff_data(file: UploadFile = File(...), target_name: str = None):
- print("开始处理")
- def generate_diff_data(start_row, max_row, sheet, title_dict, keys, need_copy_data):
- for i in range(start_row, max_row):
- id_number = sheet[f"{title_dict['证件号码']}{i}"].value
- if id_number is None:
- # print(f"该行身份证为空{i}")
- continue
- if id_number not in keys:
- # print(f"该身份证不在省办列表中{id_number}")
- need_copy_data.append(i)
- new = None
- # 删除所需要的执行时间太久了,暂时废弃删除的分支
- # if sheet.max_row > len(need_copy_data) * 2 and 3 < 2:
- # """如果diff很少,那么创建一个新表,一条一条添加"""
- new = workbook.create_sheet("仅" + sheet.title + "有的数据")
- for i, row in enumerate(need_copy_data):
- print(f"开始写入{i}行")
- for j, c in enumerate(sheet[row]):
- new.cell(i + 1, j + 1, c.value)
- # else:
- # """如果diff数据比较多,直接copy旧表,删除不需要的数据"""
- # start = time.time()
- # new = workbook.copy_worksheet(sheet)
- # print(f" copy执行时间{time.time() - start}")
- # for i in range(max_row, start_row, -1):
- # id_number = sheet[f"{title_dict['证件号码']}{i}"].value
- # if id_number is None:
- # print(f"该行身份证为空{i}")
- # continue
- # if id_number in keys:
- # print(f"该身份证不在省办列表中{id_number}")
- # new.delete_rows(i)
- return need_copy_data
- dir_path = cur_cache_path
- savename = dir_path + file.filename
- contents = file.file.read()
- with open(savename, "wb") as f:
- f.write(contents)
- # 读取excel表
- workbook = load_workbook(savename)
- # 获取指定的sheet
- sheet_names = workbook.sheetnames
- first = None
- second = None
- for index, name in enumerate(sheet_names):
- print(f"表名为:{name}")
- if name == "省办务工":
- first = workbook[name]
- elif name == "国办务工":
- second = workbook[name]
- if first is None or second is None:
- return {
- "code": 202,
- "msg": "没有找到待处理的 省办务工 和 国办务工 两张表格",
- }
- first_title_row_num = get_title_row(first)
- if first_title_row_num == -1:
- return {"code": 202, "msg": "省办务工没有找到数据"}
- first_title_dict = {}
- first_title_rows = first[first_title_row_num]
- for title_cell in first_title_rows:
- x, y = coordinate_from_string(title_cell.coordinate)
- first_title_dict[title_cell.value] = x
- first_keys = get_all_numbers(first, first_title_row_num + 1, first_title_dict["证件号码"])
- second_title_row_num = get_title_row(second)
- if second_title_row_num == -1:
- return {"code": 202, "msg": "国办务工没有找到数据"}
- second_title_dict = {}
- second_title_rows = second[second_title_row_num]
- for title_cell in second_title_rows:
- x, y = coordinate_from_string(title_cell.coordinate)
- second_title_dict[title_cell.value] = x
- second_keys = get_all_numbers(second, second_title_row_num + 1, second_title_dict["证件号码"])
- generate_diff_data(
- first_title_row_num + 1, first.max_row, first, first_title_dict, second_keys, [first_title_row_num]
- )
- generate_diff_data(
- second_title_row_num + 1,
- second.max_row,
- second,
- second_title_dict,
- first_keys,
- [second_title_row_num],
- )
- workbook.save(savename)
- move_file(savename, target_name)
- print(f"处理完成,目标文件夹{diff_dir}, {target_name}")
- def clean_with_path(dir_path):
- for file in os.listdir(dir_path):
- # 遍历output_path文件夹下文件,删除后缀为woff的字体文件
- if file.endswith(".xlsx"):
- os.remove(f"{dir_path}/{file}")
- def move_dir(old_path, new_path):
- filelist = os.listdir(old_path) # 列出该目录下的所有文件,listdir返回的文件列表是不包含路径的。
- print(f"old path is {old_path}, new path is {new_path}")
- for file in filelist:
- src = os.path.join(old_path, file)
- dst = os.path.join(new_path, file)
- print("src:", src)
- print("dst:", dst)
- shutil.move(src, dst)
- def move_file(old_path, new_path):
- shutil.move(old_path, new_path)
- @app.get("/python_api/test")
- def test():
- # move_file(cur_cache_path + "123.xlsx", diff_dir + "/" + uuid.uuid4().hex + ".xlsx")
- print("准备睡眠")
- time.sleep(5)
- print("执行完成")
- return {"code": 200, "message": "成功"}
- @app.get("/python_api/is_exist")
- def is_exist(file_name: str):
- print(f"查询file{file_name}是否存在")
- for dir in [f"{diff_dir}/", f"{shared_dir}/"]:
- file_path = os.path.join(dir, file_name)
- if os.path.exists(file_path):
- return {"code": 200, "exists": True, "filePath": f"{dir}" + file_name}
- else:
- return {"code": 200, "exists": False}
- @app.post("/python_api/upload_diff_file")
- def diff_file(file: UploadFile, background_tasks: BackgroundTasks):
- # clean_with_path("cur_cache")
- # clean_with_path("diff_cache")
- file_name = diff_dir + "/" + uuid.uuid4().hex + ".xlsx"
- background_tasks.add_task(deal_diff_data, file, file_name)
- print(f"开始处理{file_name}")
- return {"code": 200, "msg": "开始处理", "filePath": file_name}
- @app.post("/python_api/uploadfile")
- def create_upload_file(file: UploadFile = File(...)):
- print(f"开始处理{file.filename}")
- # clean_with_path(f"{shared_dir}/")
- contents = file.file.read()
- savename = f"{shared_dir}/" + file.filename
- if file.filename.endswith("xlsx"):
- savename = f"{shared_dir}/" + uuid.uuid4().hex + ".xlsx"
- with open(savename, "wb") as f:
- f.write(contents)
- # 读取excel表
- workbook = load_workbook(savename)
- # 获取指定的sheet
- sheet_names = workbook.sheetnames
- # if "脱贫户信息查询" not in sheet_names:
- # print("读取不到指定的sheet页")
- # return {"code": 500, "msg": "读取不到指定的sheet页--脱贫户信息查询"}
- sheet = workbook[sheet_names[0]]
- title_row_num = 0
- # 读取前5行,正常应该有字段名了
- row_range = sheet[1:5]
- for i, r in enumerate(row_range):
- for j, c in enumerate(r):
- # print(f"第{i + 1 }行,第{j}列,值:{c.value}")
- if "户主编号" == c.value or "户编号" == c.value:
- title_row_num = c.row
- if title_row_num == 0:
- print(f"{file.filename}文件 内容不合格")
- return {"code": 202, "msg": "不是可以解析的格式"}
- # 获取字段名对应的列
- title_dict = {}
- title_rows = sheet[title_row_num]
- # 遍历字段名所在行的所有单元格
- for title_cell in title_rows:
- x, y = coordinate_from_string(title_cell.coordinate)
- title_dict[title_cell.value] = x
- # 开始读取表格内容
- read_data(sheet, title_row_num + 1, sheet.max_row, title_dict)
- # 保存文档
- workbook.save(savename)
- print(f"处理完了{file.filename}文件")
- return {"code": 200, "msg": "分析完成,请点击下载查看分析结果", "filePath": savename}
- def calculate_age_from_id_number(id_number):
- """根据传入的身份证号码,提取出出生年月日,结合代码运行时的当天时间,返回当前年龄(周岁)"""
- age = 0
- if len(id_number) != 18:
- return age
- birthday = id_number[6:14]
- birthday = datetime.strptime(birthday, "%Y%m%d")
- today = datetime.now()
- age = today.year - birthday.year
- if today.month < birthday.month or (today.month == birthday.month and today.day < birthday.day):
- age -= 1
- return age
- def verify_age_region_for_education(education, age):
- """根据传入的学历和年龄判断该年龄是否符合该学历"""
- # 义务教育年龄段
- # if education == "" and (age >= 6 and age <= 15):
- # return False
- education_for_age = {
- "学龄前儿童": (0, 6),
- "学前教育": (3, 6),
- "小学": (6, 12),
- "七年级": (12, 13),
- "八年级": (13, 14),
- "九年级": (14, 15),
- "初中": (12, 15),
- "高中": (15, 18),
- "中专": (15, 18),
- "中职": (15, 18),
- "高职": (18, 22),
- "高专": (18, 22),
- "大专": (18, 20),
- "本科": (18, 22),
- "硕士": (22, 25),
- }
- for education_key in education_for_age.keys():
- if education_key in education:
- age_range = education_for_age[education_key]
- if age < age_range[0] or age > age_range[1]:
- return False
- break
- return True
- def read_data(ws, start_row, end_row, title_dict):
- # 监测对象致(返)贫风险非最新设计的风险类型
- for i in range(start_row, end_row):
- check_poverty_causes(ws, i, title_dict)
- check_identitycard_length(ws, i, title_dict)
- check_deformity(ws, i, title_dict)
- check_education_level(ws, i, title_dict)
- check_risk_type(ws, i, title_dict)
- check_assistance(ws, i, title_dict)
- # 筛查方法
- def check_poverty_causes(ws, row_num, title_dict):
- """筛查主要致贫原因是否合规"""
- main_reason = "主要致贫原因"
- if main_reason not in title_dict:
- return
- poverty_causes = ws[f"{title_dict[main_reason]}{row_num}"].value
- # 致贫原因列表
- imageTypeList = ["因病", "因学", "因残", "因自然灾害", "因意外事故", "因产业项目失败", "因务工就业不稳", "缺劳动力", "因房", "因水", "其他(填写备注)"]
- if poverty_causes not in imageTypeList:
- target = ws[f"{title_dict[main_reason]}{row_num}"]
- comment_and_fill_yellow_for(target, "21.监测对象致(返)贫风险非最新设计的风险类型")
- def check_identitycard_length(ws, row_num, title_dict):
- """筛查身份证号码是否合规"""
- info_number = "户主证件号码"
- if info_number not in title_dict:
- return
- identitycard = ws[f"{title_dict[info_number]}{row_num}"].value
- if len(identitycard) not in [15, 18, 20, 22]:
- target = ws[f"{title_dict[info_number]}{row_num}"]
- comment_and_fill_yellow_for(target, "31.监测对象家庭成员证件号码位数异常(证件号码非15、18、20、22位)")
- def check_deformity(ws, row_num, title_dict):
- """筛查是否不符合残疾劳动标准"""
- condition = "健康状况"
- skill = "劳动技能"
- if condition not in title_dict or skill not in title_dict:
- return
- condition_value = ws[f"{title_dict[condition]}{row_num}"].value
- skill_value = ws[f"{title_dict[skill]}{row_num}"].value
- if "残疾" in condition_value and skill_value == "弱劳动力或半劳动力":
- target = ws[f"{title_dict[skill]}{row_num}"]
- comment_and_fill_yellow_for(target, "35.一、二级(重度)肢体残疾脱贫人口有普通劳动能力")
- def check_education_level(ws, row_num, title_dict):
- """筛查在校生状况填写是否不符合年龄"""
- info_number = "证件号码"
- if info_number not in title_dict:
- return
- identitycard = ws[f"{title_dict[info_number]}{row_num}"].value
- if len(identitycard) != 18:
- return
- education_key = "文化程度"
- is_student_key = "在校生状况"
- if education_key not in title_dict or is_student_key not in title_dict:
- return
- education = ws[f"{title_dict[education_key]}{row_num}"].value
- is_student = ws[f"{title_dict[is_student_key]}{row_num}"].value
- if len(education) == 0 and len(is_student) == 0:
- target = ws[f"{title_dict[education_key]}{row_num}"]
- comment_and_fill_yellow_for(target, "文化程度,在校生状况均未填写")
- target_is_student = ws[f"{title_dict[education_key]}{row_num}"]
- comment_and_fill_yellow_for(target_is_student, "文化程度,在校生状况均未填写")
- return
- if len(is_student) == 0:
- return
- age = calculate_age_from_id_number(identitycard)
- age_is_reliable = verify_age_region_for_education(is_student, age)
- if not age_is_reliable:
- target = ws[f"{title_dict[is_student_key]}{row_num}"]
- comment_and_fill_yellow_for(target, "在校生状况与年龄不符")
- def check_risk_type(ws, row_num, title_dict):
- """筛查风险类型,家庭信息是否合规"""
- def get_item_values_for(items):
- result = []
- for item in items:
- if item not in title_dict:
- continue
- value = ws[f"{title_dict[item]}{row_num}"].value
- if value is not None and len(value) > 0:
- result.append((item, value))
- return result
- def get_key_for(i):
- key = f"致贫风险{i}"
- return key
- risks = []
- # 最新修改,只需要筛查致贫风险1了
- # for i in range(1, 6):
- key = get_key_for(1)
- if key not in title_dict:
- return
- risk = ws[f"{title_dict[key]}{row_num}"].value
- if risk is not None and len(risk) > 0:
- risks.append((risk, 1))
- # 定义:健康帮扶,"综合保障,社会帮扶,义务教育保障, 教育帮扶, 住房安全保障, 搬迁, 饮水安全保障, 产业帮扶, 就业帮扶, 金融帮扶, 公益岗位帮扶等常量
- HEALTH_SUPPORT = "健康帮扶"
- COMPREHENSIVE_GUARANTEE = "综合保障"
- SOCIAL_SUPPORT = "社会帮扶"
- OBLIGATORY_EDUCATION_GUARANTEE = "义务教育保障"
- EDUCATION_SUPPORT = "教育帮扶"
- HOUSING_SECURITY_GUARANTEE = "住房安全保障"
- RELOCATION = "搬迁"
- DRINKING_WATER_SECURITY_GUARANTEE = "饮水安全保障"
- INDUSTRY_SUPPORT = "产业帮扶"
- EMPLOYMENT_SUPPORT = "就业帮扶"
- FINANCIAL_SUPPORT = "金融帮扶"
- PUBLIC_WELFARE_SUPPORT = "公益岗位帮扶"
- must_selected_option = []
- forbinddens_option = []
- for risk, i in risks:
- if risk == "因病":
- must_selected_option = [HEALTH_SUPPORT, COMPREHENSIVE_GUARANTEE, SOCIAL_SUPPORT]
- forbinddens_option = [
- HOUSING_SECURITY_GUARANTEE,
- DRINKING_WATER_SECURITY_GUARANTEE,
- OBLIGATORY_EDUCATION_GUARANTEE,
- EDUCATION_SUPPORT,
- ]
- elif risk == "因学":
- must_selected_option = [OBLIGATORY_EDUCATION_GUARANTEE, EDUCATION_SUPPORT, SOCIAL_SUPPORT]
- forbinddens_option = [
- HOUSING_SECURITY_GUARANTEE,
- DRINKING_WATER_SECURITY_GUARANTEE,
- HEALTH_SUPPORT,
- ]
- elif risk == "因残":
- must_selected_option = [COMPREHENSIVE_GUARANTEE, SOCIAL_SUPPORT]
- forbinddens_option = [
- HOUSING_SECURITY_GUARANTEE,
- DRINKING_WATER_SECURITY_GUARANTEE,
- OBLIGATORY_EDUCATION_GUARANTEE,
- EDUCATION_SUPPORT,
- ]
- elif risk == "因产业项目失败" or risk == "因务工就业不稳" or risk == "缺劳动力":
- must_selected_option = []
- forbinddens_option = [
- HOUSING_SECURITY_GUARANTEE,
- DRINKING_WATER_SECURITY_GUARANTEE,
- OBLIGATORY_EDUCATION_GUARANTEE,
- EDUCATION_SUPPORT,
- HEALTH_SUPPORT,
- ]
- elif risk == "住房出现安全问题":
- must_selected_option = [HOUSING_SECURITY_GUARANTEE, RELOCATION]
- forbinddens_option = [
- DRINKING_WATER_SECURITY_GUARANTEE,
- HEALTH_SUPPORT,
- OBLIGATORY_EDUCATION_GUARANTEE,
- EDUCATION_SUPPORT,
- COMPREHENSIVE_GUARANTEE,
- ]
- elif risk == "家庭成员中有义务教育阶段适龄儿童少年失学辍学":
- must_selected_option = [OBLIGATORY_EDUCATION_GUARANTEE]
- forbinddens_option = [
- INDUSTRY_SUPPORT,
- EMPLOYMENT_SUPPORT,
- FINANCIAL_SUPPORT,
- PUBLIC_WELFARE_SUPPORT,
- HOUSING_SECURITY_GUARANTEE,
- DRINKING_WATER_SECURITY_GUARANTEE,
- HEALTH_SUPPORT,
- EDUCATION_SUPPORT,
- COMPREHENSIVE_GUARANTEE,
- SOCIAL_SUPPORT,
- ]
- elif risk == "饮水安全问题":
- must_selected_option = [DRINKING_WATER_SECURITY_GUARANTEE]
- forbinddens_option = [
- INDUSTRY_SUPPORT,
- EMPLOYMENT_SUPPORT,
- FINANCIAL_SUPPORT,
- PUBLIC_WELFARE_SUPPORT,
- HOUSING_SECURITY_GUARANTEE,
- OBLIGATORY_EDUCATION_GUARANTEE,
- HEALTH_SUPPORT,
- EDUCATION_SUPPORT,
- COMPREHENSIVE_GUARANTEE,
- SOCIAL_SUPPORT,
- ]
- elif risk == "家庭成员中有未参加城乡居民(职工)基本医疗保险":
- must_selected_option = [HEALTH_SUPPORT, SOCIAL_SUPPORT]
- forbinddens_option = [
- INDUSTRY_SUPPORT,
- EMPLOYMENT_SUPPORT,
- FINANCIAL_SUPPORT,
- PUBLIC_WELFARE_SUPPORT,
- HOUSING_SECURITY_GUARANTEE,
- OBLIGATORY_EDUCATION_GUARANTEE,
- EDUCATION_SUPPORT,
- COMPREHENSIVE_GUARANTEE,
- DRINKING_WATER_SECURITY_GUARANTEE,
- ]
- must_selected = get_item_values_for(must_selected_option)
- forbiddens = get_item_values_for(forbinddens_option)
- # 如果option为空,代表无要求
- must_selected_ok = len(must_selected) > 0 if len(must_selected_option) > 0 else True
- forbiddens_ok = len(forbiddens) == 0 if len(forbinddens_option) > 0 else True
- # print(f"risk 为{risk}的必选项当前值:{must_selected},禁选项当前值:{forbiddens}")
- if not must_selected_ok or not forbiddens_ok:
- target = ws[f"{title_dict[get_key_for(i)]}{row_num}"]
- if not must_selected_ok:
- comment_and_fill_yellow_for(target, f"{get_key_for(i)} 未选择必选项,必选项为{must_selected_option}")
- else:
- forbiddens_keys = [x for x, y in forbiddens]
- forbiddens_values = [y for x, y in forbiddens]
- comment_and_fill_yellow_for(
- target,
- f"{get_key_for(i)} 填写了禁选项{forbiddens_keys},填写的内容为{forbiddens_values}",
- )
- def check_assistance(ws, row_num, title_dict):
- """检查实施开发式帮扶填写状态是否有问题,如果填写了开发式帮扶,以下四项:产业帮扶、就业帮扶、金融帮扶、公益岗位帮扶有一项填写的是其他或者技能培训就不合规"""
- assistance = "实施开发式帮扶措施情况"
- assistance_value = ws[f"{title_dict[assistance]}{row_num}"].value
- if assistance_value and len(assistance_value) > 0:
- for type in ["产业帮扶", "就业帮扶", "金融帮扶", "公益岗位帮扶"]:
- target = ws[f"{title_dict[type]}{row_num}"].value
- for key in ["其他", "技能培训"]:
- if key in target:
- comment_and_fill_yellow_for(
- ws[f"{title_dict[type]}{row_num}"], f"实施开发式帮扶填写状态下,{type} 不允许选择 {key}"
- )
- return
- def comment_and_fill_yellow_for(target, comment):
- target.comment = Comment(text=comment, author="system")
- yellow_fill = PatternFill(patternType="solid", fgColor="FFFF00")
- target.fill = yellow_fill
- if __name__ == "__main__":
- uvicorn.run("data_verification:app", host="0.0.0.0", port=8500, reload=True)
|