import uvicorn import warnings import os from fastapi import FastAPI, UploadFile, File from openpyxl import load_workbook from openpyxl.utils.cell import coordinate_from_string from openpyxl.comments import Comment from openpyxl.styles import PatternFill from fastapi.middleware.cors import CORSMiddleware from datetime import datetime from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles warnings.filterwarnings("ignore") app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) shared_dir = 'cache' app.mount(f"/{shared_dir}", StaticFiles(directory="download_cache"), name={shared_dir}) @app.post("/uploadfile") async def create_upload_file(file: UploadFile = File(...)): print(f"开始处理{file.filename}") contents = await file.read() savename = "download_cache/" + file.filename # savename = "uploadfile/" + file.filename with open(savename, "wb") as f: f.write(contents) # 读取excel表 workbook = load_workbook(savename) # 获取指定的sheet sheet_names = workbook.sheetnames # if "脱贫户信息查询" not in sheet_names: # print("读取不到指定的sheet页") # return {"code": 500, "msg": "读取不到指定的sheet页--脱贫户信息查询"} sheet = workbook[sheet_names[0]] title_row_num = 0 # 读取前5行,正常应该有字段名了 row_range = sheet[1:5] for i, r in enumerate(row_range): for j, c in enumerate(r): # print(f"第{i + 1 }行,第{j}列,值:{c.value}") if "户主编号" == c.value or "户编号" == c.value: title_row_num = c.row if title_row_num == 0: print(f"{file.filename}文件 内容不合格") return {"code": 202, "msg": "不是可以解析的格式"} # 获取字段名对应的列 title_dict = {} title_rows = sheet[title_row_num] # 遍历字段名所在行的所有单元格 for title_cell in title_rows: x, y = coordinate_from_string(title_cell.coordinate) title_dict[title_cell.value] = x # print(title_dict) # 开始读取表格内容 read_data(sheet, title_row_num + 1, sheet.max_row, title_dict) # 保存文档 workbook.save(savename) # return FileResponse(savename, media_type="application/octet-stream", filename="deal.xlsx") # return FileResponse(savename) # return FileResponse(savename, media_type='application/xlsx', filename="deal.xlsx") # return savename print(f"处理完了{file.filename}文件") return {"code": 200, "msg": "分析完成,请点击下载查看分析结果", "filePath": f"/{shared_dir}/" + file.filename} def calculate_age_from_id_number(id_number): """根据传入的身份证号码,提取出出生年月日,结合代码运行时的当天时间,返回当前年龄(周岁)""" age = 0 if len(id_number) != 18: return age birthday = id_number[6:14] birthday = datetime.strptime(birthday, "%Y%m%d") today = datetime.now() age = today.year - birthday.year if today.month < birthday.month or (today.month == birthday.month and today.day < birthday.day): age -= 1 return age def verify_age_region_for_education(education, age): """根据传入的学历和年龄判断该年龄是否符合该学历""" # 义务教育年龄段 # if education == "" and (age >= 6 and age <= 15): # return False education_for_age = { "学龄前儿童": (0, 6), "学前教育": (3, 6), "小学": (6, 12), "七年级": (12, 13), "八年级": (13, 14), "九年级": (14, 15), "初中": (12, 15), "高中": (15, 18), "中专": (15, 18), "中职": (15, 18), "高职": (18, 22), "高专": (18, 22), "大专": (18, 20), "本科": (18, 22), "硕士": (22, 25), } for education_key in education_for_age.keys(): if education_key in education: age_range = education_for_age[education_key] if age < age_range[0] or age > age_range[1]: return False break return True def read_data(ws, start_row, end_row, title_dict): # 监测对象致(返)贫风险非最新设计的风险类型 for i in range(start_row, end_row): check_poverty_causes(ws, i, title_dict) check_identitycard_length(ws, i, title_dict) check_deformity(ws, i, title_dict) check_education_level(ws, i, title_dict) check_risk_type(ws, i, title_dict) check_assistance(ws, i, title_dict) # 筛查方法 def check_poverty_causes(ws, row_num, title_dict): """筛查主要致贫原因是否合规""" main_reason = "主要致贫原因" if main_reason not in title_dict: return poverty_causes = ws[f"{title_dict[main_reason]}{row_num}"].value # 致贫原因列表 imageTypeList = ["因病", "因学", "因残", "因自然灾害", "因意外事故", "因产业项目失败", "因务工就业不稳", "缺劳动力", "因房", "因水", "其他(填写备注)"] if poverty_causes not in imageTypeList: target = ws[f"{title_dict[main_reason]}{row_num}"] comment_and_fill_yellow_for(target, "21.监测对象致(返)贫风险非最新设计的风险类型") def check_identitycard_length(ws, row_num, title_dict): """筛查身份证号码是否合规""" info_number = "户主证件号码" if info_number not in title_dict: return identitycard = ws[f"{title_dict[info_number]}{row_num}"].value if len(identitycard) not in [15, 18, 20, 22]: target = ws[f"{title_dict[info_number]}{row_num}"] comment_and_fill_yellow_for(target, "31.监测对象家庭成员证件号码位数异常(证件号码非15、18、20、22位)") def check_deformity(ws, row_num, title_dict): """筛查是否不符合残疾劳动标准""" condition = "健康状况" skill = "劳动技能" if condition not in title_dict or skill not in title_dict: return condition_value = ws[f"{title_dict[condition]}{row_num}"].value skill_value = ws[f"{title_dict[skill]}{row_num}"].value if "残疾" in condition_value and skill_value == "弱劳动力或半劳动力": target = ws[f"{title_dict[skill]}{row_num}"] comment_and_fill_yellow_for(target, "35.一、二级(重度)肢体残疾脱贫人口有普通劳动能力") def check_education_level(ws, row_num, title_dict): """筛查在校生状况填写是否不符合年龄""" info_number = "证件号码" if info_number not in title_dict: return identitycard = ws[f"{title_dict[info_number]}{row_num}"].value if len(identitycard) != 18: return education_key = "文化程度" is_student_key = "在校生状况" if education_key not in title_dict or is_student_key not in title_dict: return education = ws[f"{title_dict[education_key]}{row_num}"].value is_student = ws[f"{title_dict[is_student_key]}{row_num}"].value if len(education) == 0 and len(is_student) == 0: target = ws[f"{title_dict[education_key]}{row_num}"] comment_and_fill_yellow_for(target, "文化程度,在校生状况均未填写") target_is_student = ws[f"{title_dict[education_key]}{row_num}"] comment_and_fill_yellow_for(target_is_student, "文化程度,在校生状况均未填写") return if len(is_student) == 0: return age = calculate_age_from_id_number(identitycard) age_is_reliable = verify_age_region_for_education(is_student, age) if not age_is_reliable: target = ws[f"{title_dict[is_student_key]}{row_num}"] comment_and_fill_yellow_for(target, "在校生状况与年龄不符") def check_risk_type(ws, row_num, title_dict): """筛查风险类型,家庭信息是否合规""" def get_item_values_for(items): result = [] for item in items: if item not in title_dict: continue value = ws[f"{title_dict[item]}{row_num}"].value if value is not None and len(value) > 0: result.append((item, value)) return result def get_key_for(i): key = f"致贫风险{i}" return key risks = [] for i in range(1, 6): key = get_key_for(i) if key not in title_dict: return risk = ws[f"{title_dict[key]}{row_num}"].value if risk is not None and len(risk) > 0: risks.append((risk, i)) # 定义:健康帮扶,"综合保障,社会帮扶,义务教育保障, 教育帮扶, 住房安全保障, 搬迁, 饮水安全保障, 产业帮扶, 就业帮扶, 金融帮扶, 公益岗位帮扶等常量 HEALTH_SUPPORT = "健康帮扶" COMPREHENSIVE_GUARANTEE = "综合保障" SOCIAL_SUPPORT = "社会帮扶" OBLIGATORY_EDUCATION_GUARANTEE = "义务教育保障" EDUCATION_SUPPORT = "教育帮扶" HOUSING_SECURITY_GUARANTEE = "住房安全保障" RELOCATION = "搬迁" DRINKING_WATER_SECURITY_GUARANTEE = "饮水安全保障" INDUSTRY_SUPPORT = "产业帮扶" EMPLOYMENT_SUPPORT = "就业帮扶" FINANCIAL_SUPPORT = "金融帮扶" PUBLIC_WELFARE_SUPPORT = "公益岗位帮扶" must_selected_option = [] forbinddens_option = [] for risk, i in risks: if risk == "因病": must_selected_option = [HEALTH_SUPPORT, COMPREHENSIVE_GUARANTEE, SOCIAL_SUPPORT] forbinddens_option = [ HOUSING_SECURITY_GUARANTEE, DRINKING_WATER_SECURITY_GUARANTEE, OBLIGATORY_EDUCATION_GUARANTEE, EDUCATION_SUPPORT, ] elif risk == "因学": must_selected_option = [OBLIGATORY_EDUCATION_GUARANTEE, EDUCATION_SUPPORT, SOCIAL_SUPPORT] forbinddens_option = [ HOUSING_SECURITY_GUARANTEE, DRINKING_WATER_SECURITY_GUARANTEE, EDUCATION_SUPPORT, ] elif risk == "因残": must_selected_option = [COMPREHENSIVE_GUARANTEE, SOCIAL_SUPPORT] forbinddens_option = [ HOUSING_SECURITY_GUARANTEE, DRINKING_WATER_SECURITY_GUARANTEE, OBLIGATORY_EDUCATION_GUARANTEE, EDUCATION_SUPPORT, ] elif risk == "因产业项目失败" or risk == "因务工就业不稳" or risk == "缺劳动力": must_selected_option = [] forbinddens_option = [ HOUSING_SECURITY_GUARANTEE, DRINKING_WATER_SECURITY_GUARANTEE, OBLIGATORY_EDUCATION_GUARANTEE, EDUCATION_SUPPORT, HEALTH_SUPPORT, ] elif risk == "住房出现安全问题": must_selected_option = [HOUSING_SECURITY_GUARANTEE, RELOCATION] forbinddens_option = [ DRINKING_WATER_SECURITY_GUARANTEE, HEALTH_SUPPORT, OBLIGATORY_EDUCATION_GUARANTEE, EDUCATION_SUPPORT, COMPREHENSIVE_GUARANTEE, ] elif risk == "家庭成员中有义务教育阶段适龄儿童少年失学辍学": must_selected_option = [OBLIGATORY_EDUCATION_GUARANTEE] forbinddens_option = [ INDUSTRY_SUPPORT, EMPLOYMENT_SUPPORT, FINANCIAL_SUPPORT, PUBLIC_WELFARE_SUPPORT, HOUSING_SECURITY_GUARANTEE, DRINKING_WATER_SECURITY_GUARANTEE, HEALTH_SUPPORT, EDUCATION_SUPPORT, COMPREHENSIVE_GUARANTEE, SOCIAL_SUPPORT, ] elif risk == "饮水安全问题": must_selected_option = [DRINKING_WATER_SECURITY_GUARANTEE] forbinddens_option = [ INDUSTRY_SUPPORT, EMPLOYMENT_SUPPORT, FINANCIAL_SUPPORT, PUBLIC_WELFARE_SUPPORT, HOUSING_SECURITY_GUARANTEE, OBLIGATORY_EDUCATION_GUARANTEE, HEALTH_SUPPORT, EDUCATION_SUPPORT, COMPREHENSIVE_GUARANTEE, SOCIAL_SUPPORT, ] elif risk == "家庭成员中有未参加城乡居民(职工)基本医疗保险": must_selected_option = [HEALTH_SUPPORT, SOCIAL_SUPPORT] forbinddens_option = [ INDUSTRY_SUPPORT, EMPLOYMENT_SUPPORT, FINANCIAL_SUPPORT, PUBLIC_WELFARE_SUPPORT, HOUSING_SECURITY_GUARANTEE, OBLIGATORY_EDUCATION_GUARANTEE, EDUCATION_SUPPORT, COMPREHENSIVE_GUARANTEE, DRINKING_WATER_SECURITY_GUARANTEE, ] must_selected = get_item_values_for(must_selected_option) forbiddens = get_item_values_for(forbinddens_option) # 如果option为空,代表无要求 must_selected_ok = len(must_selected) > 0 if len(must_selected_option) > 0 else True forbiddens_ok = len(forbiddens) == 0 if len(forbinddens_option) > 0 else True # print(f"risk 为{risk}的必选项当前值:{must_selected},禁选项当前值:{forbiddens}") if not must_selected_ok or not forbiddens_ok: target = ws[f"{title_dict[get_key_for(i)]}{row_num}"] if not must_selected_ok: comment_and_fill_yellow_for(target, f"{get_key_for(i)} 未选择必选项,必选项为{must_selected_option}") else: forbiddens_keys = [x for x, y in forbiddens] forbiddens_values = [y for x, y in forbiddens] comment_and_fill_yellow_for( target, f"{get_key_for(i)} 填写了禁选项{forbiddens_keys},填写的内容为{forbiddens_values}", ) def check_assistance(ws, row_num, title_dict): """检查实施开发式帮扶填写状态是否有问题,如果填写了开发式帮扶,以下四项:产业帮扶、就业帮扶、金融帮扶、公益岗位帮扶有一项填写的是其他或者技能培训就不合规""" assistance = "实施开发式帮扶措施情况" assistance_value = ws[f"{title_dict[assistance]}{row_num}"].value if assistance_value and len(assistance_value) > 0: for type in ["产业帮扶", "就业帮扶", "金融帮扶", "公益岗位帮扶"]: target = ws[f"{title_dict[type]}{row_num}"].value for key in ["其他", "技能培训"]: if key in target: comment_and_fill_yellow_for(target, f"实施开发式帮扶填写状态下,{type} 不允许选择 {key}") return # 填写了的话,剩下四项有一个是其他或者技能培训就不行 # info_number = "户主证件号码" # identitycard = ws[f"{title_dict[info_number]}{row_num}"].value # if len(identitycard) not in [15, 18, 20, 22]: # target = ws[f"{title_dict[info_number]}{row_num}"] # comment_and_fill_yellow_for(target, "31.监测对象家庭成员证件号码位数异常(证件号码非15、18、20、22位)") # def get_item_values_for(ws, row_num, title_dict, items): # result = [] # for item in items: # if item not in title_dict: # continue # value = ws[f"{title_dict[item]}{row_num}"].value # if value is not None: # result.append(value) # return result def comment_and_fill_yellow_for(target, comment): target.comment = Comment(text=comment, author="system") yellow_fill = PatternFill(patternType="solid", fgColor="FFFF00") target.fill = yellow_fill if __name__ == "__main__": # result = calculate_age_from_id_number("532801200607144126") # print(result) uvicorn.run("data_verification:app", host="0.0.0.0", port=8500, reload=True)