123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- import uvicorn
- import warnings
- import os
- from fastapi import FastAPI, UploadFile, File
- from openpyxl import load_workbook
- from openpyxl.utils.cell import coordinate_from_string
- from openpyxl.comments import Comment
- from openpyxl.styles import PatternFill
- from fastapi.middleware.cors import CORSMiddleware
- from datetime import datetime
- from fastapi.responses import FileResponse
- from fastapi.staticfiles import StaticFiles
- warnings.filterwarnings("ignore")
- app = FastAPI()
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- shared_dir = 'cache'
- app.mount(f"/{shared_dir}", StaticFiles(directory="download_cache"), name={shared_dir})
- @app.post("/uploadfile")
- async def create_upload_file(file: UploadFile = File(...)):
- print(f"开始处理{file.filename}")
- contents = await file.read()
- savename = "download_cache/" + file.filename
- # savename = "uploadfile/" + file.filename
- with open(savename, "wb") as f:
- f.write(contents)
- # 读取excel表
- workbook = load_workbook(savename)
- # 获取指定的sheet
- sheet_names = workbook.sheetnames
- # if "脱贫户信息查询" not in sheet_names:
- # print("读取不到指定的sheet页")
- # return {"code": 500, "msg": "读取不到指定的sheet页--脱贫户信息查询"}
- sheet = workbook[sheet_names[0]]
- title_row_num = 0
- # 读取前5行,正常应该有字段名了
- row_range = sheet[1:5]
- for i, r in enumerate(row_range):
- for j, c in enumerate(r):
- # print(f"第{i + 1 }行,第{j}列,值:{c.value}")
- if "户主编号" == c.value or "户编号" == c.value:
- title_row_num = c.row
- if title_row_num == 0:
- print(f"{file.filename}文件 内容不合格")
- return {"code": 202, "msg": "不是可以解析的格式"}
- # 获取字段名对应的列
- title_dict = {}
- title_rows = sheet[title_row_num]
- # 遍历字段名所在行的所有单元格
- for title_cell in title_rows:
- x, y = coordinate_from_string(title_cell.coordinate)
- title_dict[title_cell.value] = x
- # print(title_dict)
- # 开始读取表格内容
- read_data(sheet, title_row_num + 1, sheet.max_row, title_dict)
- # 保存文档
- workbook.save(savename)
- # return FileResponse(savename, media_type="application/octet-stream", filename="deal.xlsx")
- # return FileResponse(savename)
- # return FileResponse(savename, media_type='application/xlsx', filename="deal.xlsx")
- # return savename
- print(f"处理完了{file.filename}文件")
- return {"code": 200, "msg": "分析完成,请点击下载查看分析结果", "filePath": f"/{shared_dir}/" + file.filename}
- def calculate_age_from_id_number(id_number):
- """根据传入的身份证号码,提取出出生年月日,结合代码运行时的当天时间,返回当前年龄(周岁)"""
- age = 0
- if len(id_number) != 18:
- return age
- birthday = id_number[6:14]
- birthday = datetime.strptime(birthday, "%Y%m%d")
- today = datetime.now()
- age = today.year - birthday.year
- if today.month < birthday.month or (today.month == birthday.month and today.day < birthday.day):
- age -= 1
- return age
- def verify_age_region_for_education(education, age):
- """根据传入的学历和年龄判断该年龄是否符合该学历"""
- # 义务教育年龄段
- # if education == "" and (age >= 6 and age <= 15):
- # return False
- education_for_age = {
- "学龄前儿童": (0, 6),
- "学前教育": (3, 6),
- "小学": (6, 12),
- "七年级": (12, 13),
- "八年级": (13, 14),
- "九年级": (14, 15),
- "初中": (12, 15),
- "高中": (15, 18),
- "中专": (15, 18),
- "中职": (15, 18),
- "高职": (18, 22),
- "高专": (18, 22),
- "大专": (18, 20),
- "本科": (18, 22),
- "硕士": (22, 25),
- }
- for education_key in education_for_age.keys():
- if education_key in education:
- age_range = education_for_age[education_key]
- if age < age_range[0] or age > age_range[1]:
- return False
- break
- return True
- def read_data(ws, start_row, end_row, title_dict):
- # 监测对象致(返)贫风险非最新设计的风险类型
- for i in range(start_row, end_row):
- check_poverty_causes(ws, i, title_dict)
- check_identitycard_length(ws, i, title_dict)
- check_deformity(ws, i, title_dict)
- check_education_level(ws, i, title_dict)
- check_risk_type(ws, i, title_dict)
- check_assistance(ws, i, title_dict)
- # 筛查方法
- def check_poverty_causes(ws, row_num, title_dict):
- """筛查主要致贫原因是否合规"""
- main_reason = "主要致贫原因"
- if main_reason not in title_dict:
- return
- poverty_causes = ws[f"{title_dict[main_reason]}{row_num}"].value
- # 致贫原因列表
- imageTypeList = ["因病", "因学", "因残", "因自然灾害", "因意外事故", "因产业项目失败", "因务工就业不稳", "缺劳动力", "因房", "因水", "其他(填写备注)"]
- if poverty_causes not in imageTypeList:
- target = ws[f"{title_dict[main_reason]}{row_num}"]
- comment_and_fill_yellow_for(target, "21.监测对象致(返)贫风险非最新设计的风险类型")
- def check_identitycard_length(ws, row_num, title_dict):
- """筛查身份证号码是否合规"""
- info_number = "户主证件号码"
- if info_number not in title_dict:
- return
- identitycard = ws[f"{title_dict[info_number]}{row_num}"].value
- if len(identitycard) not in [15, 18, 20, 22]:
- target = ws[f"{title_dict[info_number]}{row_num}"]
- comment_and_fill_yellow_for(target, "31.监测对象家庭成员证件号码位数异常(证件号码非15、18、20、22位)")
- def check_deformity(ws, row_num, title_dict):
- """筛查是否不符合残疾劳动标准"""
- condition = "健康状况"
- skill = "劳动技能"
- if condition not in title_dict or skill not in title_dict:
- return
- condition_value = ws[f"{title_dict[condition]}{row_num}"].value
- skill_value = ws[f"{title_dict[skill]}{row_num}"].value
- if "残疾" in condition_value and skill_value == "弱劳动力或半劳动力":
- target = ws[f"{title_dict[skill]}{row_num}"]
- comment_and_fill_yellow_for(target, "35.一、二级(重度)肢体残疾脱贫人口有普通劳动能力")
- def check_education_level(ws, row_num, title_dict):
- """筛查在校生状况填写是否不符合年龄"""
- info_number = "证件号码"
- if info_number not in title_dict:
- return
- identitycard = ws[f"{title_dict[info_number]}{row_num}"].value
- if len(identitycard) != 18:
- return
- education_key = "文化程度"
- is_student_key = "在校生状况"
- if education_key not in title_dict or is_student_key not in title_dict:
- return
- education = ws[f"{title_dict[education_key]}{row_num}"].value
- is_student = ws[f"{title_dict[is_student_key]}{row_num}"].value
- if len(education) == 0 and len(is_student) == 0:
- target = ws[f"{title_dict[education_key]}{row_num}"]
- comment_and_fill_yellow_for(target, "文化程度,在校生状况均未填写")
- target_is_student = ws[f"{title_dict[education_key]}{row_num}"]
- comment_and_fill_yellow_for(target_is_student, "文化程度,在校生状况均未填写")
- return
- if len(is_student) == 0:
- return
- age = calculate_age_from_id_number(identitycard)
- age_is_reliable = verify_age_region_for_education(is_student, age)
- if not age_is_reliable:
- target = ws[f"{title_dict[is_student_key]}{row_num}"]
- comment_and_fill_yellow_for(target, "在校生状况与年龄不符")
- def check_risk_type(ws, row_num, title_dict):
- """筛查风险类型,家庭信息是否合规"""
- def get_item_values_for(items):
- result = []
- for item in items:
- if item not in title_dict:
- continue
- value = ws[f"{title_dict[item]}{row_num}"].value
- if value is not None and len(value) > 0:
- result.append((item, value))
- return result
- def get_key_for(i):
- key = f"致贫风险{i}"
- return key
- risks = []
- for i in range(1, 6):
- key = get_key_for(i)
- if key not in title_dict:
- return
- risk = ws[f"{title_dict[key]}{row_num}"].value
- if risk is not None and len(risk) > 0:
- risks.append((risk, i))
- # 定义:健康帮扶,"综合保障,社会帮扶,义务教育保障, 教育帮扶, 住房安全保障, 搬迁, 饮水安全保障, 产业帮扶, 就业帮扶, 金融帮扶, 公益岗位帮扶等常量
- HEALTH_SUPPORT = "健康帮扶"
- COMPREHENSIVE_GUARANTEE = "综合保障"
- SOCIAL_SUPPORT = "社会帮扶"
- OBLIGATORY_EDUCATION_GUARANTEE = "义务教育保障"
- EDUCATION_SUPPORT = "教育帮扶"
- HOUSING_SECURITY_GUARANTEE = "住房安全保障"
- RELOCATION = "搬迁"
- DRINKING_WATER_SECURITY_GUARANTEE = "饮水安全保障"
- INDUSTRY_SUPPORT = "产业帮扶"
- EMPLOYMENT_SUPPORT = "就业帮扶"
- FINANCIAL_SUPPORT = "金融帮扶"
- PUBLIC_WELFARE_SUPPORT = "公益岗位帮扶"
- must_selected_option = []
- forbinddens_option = []
- for risk, i in risks:
- if risk == "因病":
- must_selected_option = [HEALTH_SUPPORT, COMPREHENSIVE_GUARANTEE, SOCIAL_SUPPORT]
- forbinddens_option = [
- HOUSING_SECURITY_GUARANTEE,
- DRINKING_WATER_SECURITY_GUARANTEE,
- OBLIGATORY_EDUCATION_GUARANTEE,
- EDUCATION_SUPPORT,
- ]
- elif risk == "因学":
- must_selected_option = [OBLIGATORY_EDUCATION_GUARANTEE, EDUCATION_SUPPORT, SOCIAL_SUPPORT]
- forbinddens_option = [
- HOUSING_SECURITY_GUARANTEE,
- DRINKING_WATER_SECURITY_GUARANTEE,
- EDUCATION_SUPPORT,
- ]
- elif risk == "因残":
- must_selected_option = [COMPREHENSIVE_GUARANTEE, SOCIAL_SUPPORT]
- forbinddens_option = [
- HOUSING_SECURITY_GUARANTEE,
- DRINKING_WATER_SECURITY_GUARANTEE,
- OBLIGATORY_EDUCATION_GUARANTEE,
- EDUCATION_SUPPORT,
- ]
- elif risk == "因产业项目失败" or risk == "因务工就业不稳" or risk == "缺劳动力":
- must_selected_option = []
- forbinddens_option = [
- HOUSING_SECURITY_GUARANTEE,
- DRINKING_WATER_SECURITY_GUARANTEE,
- OBLIGATORY_EDUCATION_GUARANTEE,
- EDUCATION_SUPPORT,
- HEALTH_SUPPORT,
- ]
- elif risk == "住房出现安全问题":
- must_selected_option = [HOUSING_SECURITY_GUARANTEE, RELOCATION]
- forbinddens_option = [
- DRINKING_WATER_SECURITY_GUARANTEE,
- HEALTH_SUPPORT,
- OBLIGATORY_EDUCATION_GUARANTEE,
- EDUCATION_SUPPORT,
- COMPREHENSIVE_GUARANTEE,
- ]
- elif risk == "家庭成员中有义务教育阶段适龄儿童少年失学辍学":
- must_selected_option = [OBLIGATORY_EDUCATION_GUARANTEE]
- forbinddens_option = [
- INDUSTRY_SUPPORT,
- EMPLOYMENT_SUPPORT,
- FINANCIAL_SUPPORT,
- PUBLIC_WELFARE_SUPPORT,
- HOUSING_SECURITY_GUARANTEE,
- DRINKING_WATER_SECURITY_GUARANTEE,
- HEALTH_SUPPORT,
- EDUCATION_SUPPORT,
- COMPREHENSIVE_GUARANTEE,
- SOCIAL_SUPPORT,
- ]
- elif risk == "饮水安全问题":
- must_selected_option = [DRINKING_WATER_SECURITY_GUARANTEE]
- forbinddens_option = [
- INDUSTRY_SUPPORT,
- EMPLOYMENT_SUPPORT,
- FINANCIAL_SUPPORT,
- PUBLIC_WELFARE_SUPPORT,
- HOUSING_SECURITY_GUARANTEE,
- OBLIGATORY_EDUCATION_GUARANTEE,
- HEALTH_SUPPORT,
- EDUCATION_SUPPORT,
- COMPREHENSIVE_GUARANTEE,
- SOCIAL_SUPPORT,
- ]
- elif risk == "家庭成员中有未参加城乡居民(职工)基本医疗保险":
- must_selected_option = [HEALTH_SUPPORT, SOCIAL_SUPPORT]
- forbinddens_option = [
- INDUSTRY_SUPPORT,
- EMPLOYMENT_SUPPORT,
- FINANCIAL_SUPPORT,
- PUBLIC_WELFARE_SUPPORT,
- HOUSING_SECURITY_GUARANTEE,
- OBLIGATORY_EDUCATION_GUARANTEE,
- EDUCATION_SUPPORT,
- COMPREHENSIVE_GUARANTEE,
- DRINKING_WATER_SECURITY_GUARANTEE,
- ]
- must_selected = get_item_values_for(must_selected_option)
- forbiddens = get_item_values_for(forbinddens_option)
- # 如果option为空,代表无要求
- must_selected_ok = len(must_selected) > 0 if len(must_selected_option) > 0 else True
- forbiddens_ok = len(forbiddens) == 0 if len(forbinddens_option) > 0 else True
- # print(f"risk 为{risk}的必选项当前值:{must_selected},禁选项当前值:{forbiddens}")
- if not must_selected_ok or not forbiddens_ok:
- target = ws[f"{title_dict[get_key_for(i)]}{row_num}"]
- if not must_selected_ok:
- comment_and_fill_yellow_for(target, f"{get_key_for(i)} 未选择必选项,必选项为{must_selected_option}")
- else:
- forbiddens_keys = [x for x, y in forbiddens]
- forbiddens_values = [y for x, y in forbiddens]
- comment_and_fill_yellow_for(
- target,
- f"{get_key_for(i)} 填写了禁选项{forbiddens_keys},填写的内容为{forbiddens_values}",
- )
- def check_assistance(ws, row_num, title_dict):
- """检查实施开发式帮扶填写状态是否有问题,如果填写了开发式帮扶,以下四项:产业帮扶、就业帮扶、金融帮扶、公益岗位帮扶有一项填写的是其他或者技能培训就不合规"""
- assistance = "实施开发式帮扶措施情况"
- assistance_value = ws[f"{title_dict[assistance]}{row_num}"].value
- if assistance_value and len(assistance_value) > 0:
- for type in ["产业帮扶", "就业帮扶", "金融帮扶", "公益岗位帮扶"]:
- target = ws[f"{title_dict[type]}{row_num}"].value
- for key in ["其他", "技能培训"]:
- if key in target:
- comment_and_fill_yellow_for(target, f"实施开发式帮扶填写状态下,{type} 不允许选择 {key}")
- return
- # 填写了的话,剩下四项有一个是其他或者技能培训就不行
- # info_number = "户主证件号码"
- # identitycard = ws[f"{title_dict[info_number]}{row_num}"].value
- # if len(identitycard) not in [15, 18, 20, 22]:
- # target = ws[f"{title_dict[info_number]}{row_num}"]
- # comment_and_fill_yellow_for(target, "31.监测对象家庭成员证件号码位数异常(证件号码非15、18、20、22位)")
- # def get_item_values_for(ws, row_num, title_dict, items):
- # result = []
- # for item in items:
- # if item not in title_dict:
- # continue
- # value = ws[f"{title_dict[item]}{row_num}"].value
- # if value is not None:
- # result.append(value)
- # return result
- def comment_and_fill_yellow_for(target, comment):
- target.comment = Comment(text=comment, author="system")
- yellow_fill = PatternFill(patternType="solid", fgColor="FFFF00")
- target.fill = yellow_fill
- if __name__ == "__main__":
- # result = calculate_age_from_id_number("532801200607144126")
- # print(result)
- uvicorn.run("data_verification:app", host="0.0.0.0", port=8500, reload=True)
|