data_verification.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. import uvicorn
  2. import warnings
  3. import os
  4. from fastapi import FastAPI, UploadFile, File
  5. from openpyxl import load_workbook
  6. from openpyxl.utils.cell import coordinate_from_string
  7. from openpyxl.comments import Comment
  8. from openpyxl.styles import PatternFill
  9. from fastapi.middleware.cors import CORSMiddleware
  10. from datetime import datetime
  11. from fastapi.responses import FileResponse
  12. from fastapi.staticfiles import StaticFiles
  13. warnings.filterwarnings("ignore")
  14. app = FastAPI()
  15. app.add_middleware(
  16. CORSMiddleware,
  17. allow_origins=["*"],
  18. allow_credentials=True,
  19. allow_methods=["*"],
  20. allow_headers=["*"],
  21. )
  22. shared_dir = 'cache'
  23. app.mount(f"/{shared_dir}", StaticFiles(directory="download_cache"), name={shared_dir})
  24. @app.post("/uploadfile")
  25. async def create_upload_file(file: UploadFile = File(...)):
  26. print(f"开始处理{file.filename}")
  27. contents = await file.read()
  28. savename = "download_cache/" + file.filename
  29. # savename = "uploadfile/" + file.filename
  30. with open(savename, "wb") as f:
  31. f.write(contents)
  32. # 读取excel表
  33. workbook = load_workbook(savename)
  34. # 获取指定的sheet
  35. sheet_names = workbook.sheetnames
  36. # if "脱贫户信息查询" not in sheet_names:
  37. # print("读取不到指定的sheet页")
  38. # return {"code": 500, "msg": "读取不到指定的sheet页--脱贫户信息查询"}
  39. sheet = workbook[sheet_names[0]]
  40. title_row_num = 0
  41. # 读取前5行,正常应该有字段名了
  42. row_range = sheet[1:5]
  43. for i, r in enumerate(row_range):
  44. for j, c in enumerate(r):
  45. # print(f"第{i + 1 }行,第{j}列,值:{c.value}")
  46. if "户主编号" == c.value or "户编号" == c.value:
  47. title_row_num = c.row
  48. if title_row_num == 0:
  49. print(f"{file.filename}文件 内容不合格")
  50. return {"code": 202, "msg": "不是可以解析的格式"}
  51. # 获取字段名对应的列
  52. title_dict = {}
  53. title_rows = sheet[title_row_num]
  54. # 遍历字段名所在行的所有单元格
  55. for title_cell in title_rows:
  56. x, y = coordinate_from_string(title_cell.coordinate)
  57. title_dict[title_cell.value] = x
  58. # print(title_dict)
  59. # 开始读取表格内容
  60. read_data(sheet, title_row_num + 1, sheet.max_row, title_dict)
  61. # 保存文档
  62. workbook.save(savename)
  63. # return FileResponse(savename, media_type="application/octet-stream", filename="deal.xlsx")
  64. # return FileResponse(savename)
  65. # return FileResponse(savename, media_type='application/xlsx', filename="deal.xlsx")
  66. # return savename
  67. print(f"处理完了{file.filename}文件")
  68. return {"code": 200, "msg": "分析完成,请点击下载查看分析结果", "filePath": f"/{shared_dir}/" + file.filename}
  69. def calculate_age_from_id_number(id_number):
  70. """根据传入的身份证号码,提取出出生年月日,结合代码运行时的当天时间,返回当前年龄(周岁)"""
  71. age = 0
  72. if len(id_number) != 18:
  73. return age
  74. birthday = id_number[6:14]
  75. birthday = datetime.strptime(birthday, "%Y%m%d")
  76. today = datetime.now()
  77. age = today.year - birthday.year
  78. if today.month < birthday.month or (today.month == birthday.month and today.day < birthday.day):
  79. age -= 1
  80. return age
  81. def verify_age_region_for_education(education, age):
  82. """根据传入的学历和年龄判断该年龄是否符合该学历"""
  83. # 义务教育年龄段
  84. # if education == "" and (age >= 6 and age <= 15):
  85. # return False
  86. education_for_age = {
  87. "学龄前儿童": (0, 6),
  88. "学前教育": (3, 6),
  89. "小学": (6, 12),
  90. "七年级": (12, 13),
  91. "八年级": (13, 14),
  92. "九年级": (14, 15),
  93. "初中": (12, 15),
  94. "高中": (15, 18),
  95. "中专": (15, 18),
  96. "中职": (15, 18),
  97. "高职": (18, 22),
  98. "高专": (18, 22),
  99. "大专": (18, 20),
  100. "本科": (18, 22),
  101. "硕士": (22, 25),
  102. }
  103. for education_key in education_for_age.keys():
  104. if education_key in education:
  105. age_range = education_for_age[education_key]
  106. if age < age_range[0] or age > age_range[1]:
  107. return False
  108. break
  109. return True
  110. def read_data(ws, start_row, end_row, title_dict):
  111. # 监测对象致(返)贫风险非最新设计的风险类型
  112. for i in range(start_row, end_row):
  113. check_poverty_causes(ws, i, title_dict)
  114. check_identitycard_length(ws, i, title_dict)
  115. check_deformity(ws, i, title_dict)
  116. check_education_level(ws, i, title_dict)
  117. check_risk_type(ws, i, title_dict)
  118. check_assistance(ws, i, title_dict)
  119. # 筛查方法
  120. def check_poverty_causes(ws, row_num, title_dict):
  121. """筛查主要致贫原因是否合规"""
  122. main_reason = "主要致贫原因"
  123. if main_reason not in title_dict:
  124. return
  125. poverty_causes = ws[f"{title_dict[main_reason]}{row_num}"].value
  126. # 致贫原因列表
  127. imageTypeList = ["因病", "因学", "因残", "因自然灾害", "因意外事故", "因产业项目失败", "因务工就业不稳", "缺劳动力", "因房", "因水", "其他(填写备注)"]
  128. if poverty_causes not in imageTypeList:
  129. target = ws[f"{title_dict[main_reason]}{row_num}"]
  130. comment_and_fill_yellow_for(target, "21.监测对象致(返)贫风险非最新设计的风险类型")
  131. def check_identitycard_length(ws, row_num, title_dict):
  132. """筛查身份证号码是否合规"""
  133. info_number = "户主证件号码"
  134. if info_number not in title_dict:
  135. return
  136. identitycard = ws[f"{title_dict[info_number]}{row_num}"].value
  137. if len(identitycard) not in [15, 18, 20, 22]:
  138. target = ws[f"{title_dict[info_number]}{row_num}"]
  139. comment_and_fill_yellow_for(target, "31.监测对象家庭成员证件号码位数异常(证件号码非15、18、20、22位)")
  140. def check_deformity(ws, row_num, title_dict):
  141. """筛查是否不符合残疾劳动标准"""
  142. condition = "健康状况"
  143. skill = "劳动技能"
  144. if condition not in title_dict or skill not in title_dict:
  145. return
  146. condition_value = ws[f"{title_dict[condition]}{row_num}"].value
  147. skill_value = ws[f"{title_dict[skill]}{row_num}"].value
  148. if "残疾" in condition_value and skill_value == "弱劳动力或半劳动力":
  149. target = ws[f"{title_dict[skill]}{row_num}"]
  150. comment_and_fill_yellow_for(target, "35.一、二级(重度)肢体残疾脱贫人口有普通劳动能力")
  151. def check_education_level(ws, row_num, title_dict):
  152. """筛查在校生状况填写是否不符合年龄"""
  153. info_number = "证件号码"
  154. if info_number not in title_dict:
  155. return
  156. identitycard = ws[f"{title_dict[info_number]}{row_num}"].value
  157. if len(identitycard) != 18:
  158. return
  159. education_key = "文化程度"
  160. is_student_key = "在校生状况"
  161. if education_key not in title_dict or is_student_key not in title_dict:
  162. return
  163. education = ws[f"{title_dict[education_key]}{row_num}"].value
  164. is_student = ws[f"{title_dict[is_student_key]}{row_num}"].value
  165. if len(education) == 0 and len(is_student) == 0:
  166. target = ws[f"{title_dict[education_key]}{row_num}"]
  167. comment_and_fill_yellow_for(target, "文化程度,在校生状况均未填写")
  168. target_is_student = ws[f"{title_dict[education_key]}{row_num}"]
  169. comment_and_fill_yellow_for(target_is_student, "文化程度,在校生状况均未填写")
  170. return
  171. if len(is_student) == 0:
  172. return
  173. age = calculate_age_from_id_number(identitycard)
  174. age_is_reliable = verify_age_region_for_education(is_student, age)
  175. if not age_is_reliable:
  176. target = ws[f"{title_dict[is_student_key]}{row_num}"]
  177. comment_and_fill_yellow_for(target, "在校生状况与年龄不符")
  178. def check_risk_type(ws, row_num, title_dict):
  179. """筛查风险类型,家庭信息是否合规"""
  180. def get_item_values_for(items):
  181. result = []
  182. for item in items:
  183. if item not in title_dict:
  184. continue
  185. value = ws[f"{title_dict[item]}{row_num}"].value
  186. if value is not None and len(value) > 0:
  187. result.append((item, value))
  188. return result
  189. def get_key_for(i):
  190. key = f"致贫风险{i}"
  191. return key
  192. risks = []
  193. for i in range(1, 6):
  194. key = get_key_for(i)
  195. if key not in title_dict:
  196. return
  197. risk = ws[f"{title_dict[key]}{row_num}"].value
  198. if risk is not None and len(risk) > 0:
  199. risks.append((risk, i))
  200. # 定义:健康帮扶,"综合保障,社会帮扶,义务教育保障, 教育帮扶, 住房安全保障, 搬迁, 饮水安全保障, 产业帮扶, 就业帮扶, 金融帮扶, 公益岗位帮扶等常量
  201. HEALTH_SUPPORT = "健康帮扶"
  202. COMPREHENSIVE_GUARANTEE = "综合保障"
  203. SOCIAL_SUPPORT = "社会帮扶"
  204. OBLIGATORY_EDUCATION_GUARANTEE = "义务教育保障"
  205. EDUCATION_SUPPORT = "教育帮扶"
  206. HOUSING_SECURITY_GUARANTEE = "住房安全保障"
  207. RELOCATION = "搬迁"
  208. DRINKING_WATER_SECURITY_GUARANTEE = "饮水安全保障"
  209. INDUSTRY_SUPPORT = "产业帮扶"
  210. EMPLOYMENT_SUPPORT = "就业帮扶"
  211. FINANCIAL_SUPPORT = "金融帮扶"
  212. PUBLIC_WELFARE_SUPPORT = "公益岗位帮扶"
  213. must_selected_option = []
  214. forbinddens_option = []
  215. for risk, i in risks:
  216. if risk == "因病":
  217. must_selected_option = [HEALTH_SUPPORT, COMPREHENSIVE_GUARANTEE, SOCIAL_SUPPORT]
  218. forbinddens_option = [
  219. HOUSING_SECURITY_GUARANTEE,
  220. DRINKING_WATER_SECURITY_GUARANTEE,
  221. OBLIGATORY_EDUCATION_GUARANTEE,
  222. EDUCATION_SUPPORT,
  223. ]
  224. elif risk == "因学":
  225. must_selected_option = [OBLIGATORY_EDUCATION_GUARANTEE, EDUCATION_SUPPORT, SOCIAL_SUPPORT]
  226. forbinddens_option = [
  227. HOUSING_SECURITY_GUARANTEE,
  228. DRINKING_WATER_SECURITY_GUARANTEE,
  229. EDUCATION_SUPPORT,
  230. ]
  231. elif risk == "因残":
  232. must_selected_option = [COMPREHENSIVE_GUARANTEE, SOCIAL_SUPPORT]
  233. forbinddens_option = [
  234. HOUSING_SECURITY_GUARANTEE,
  235. DRINKING_WATER_SECURITY_GUARANTEE,
  236. OBLIGATORY_EDUCATION_GUARANTEE,
  237. EDUCATION_SUPPORT,
  238. ]
  239. elif risk == "因产业项目失败" or risk == "因务工就业不稳" or risk == "缺劳动力":
  240. must_selected_option = []
  241. forbinddens_option = [
  242. HOUSING_SECURITY_GUARANTEE,
  243. DRINKING_WATER_SECURITY_GUARANTEE,
  244. OBLIGATORY_EDUCATION_GUARANTEE,
  245. EDUCATION_SUPPORT,
  246. HEALTH_SUPPORT,
  247. ]
  248. elif risk == "住房出现安全问题":
  249. must_selected_option = [HOUSING_SECURITY_GUARANTEE, RELOCATION]
  250. forbinddens_option = [
  251. DRINKING_WATER_SECURITY_GUARANTEE,
  252. HEALTH_SUPPORT,
  253. OBLIGATORY_EDUCATION_GUARANTEE,
  254. EDUCATION_SUPPORT,
  255. COMPREHENSIVE_GUARANTEE,
  256. ]
  257. elif risk == "家庭成员中有义务教育阶段适龄儿童少年失学辍学":
  258. must_selected_option = [OBLIGATORY_EDUCATION_GUARANTEE]
  259. forbinddens_option = [
  260. INDUSTRY_SUPPORT,
  261. EMPLOYMENT_SUPPORT,
  262. FINANCIAL_SUPPORT,
  263. PUBLIC_WELFARE_SUPPORT,
  264. HOUSING_SECURITY_GUARANTEE,
  265. DRINKING_WATER_SECURITY_GUARANTEE,
  266. HEALTH_SUPPORT,
  267. EDUCATION_SUPPORT,
  268. COMPREHENSIVE_GUARANTEE,
  269. SOCIAL_SUPPORT,
  270. ]
  271. elif risk == "饮水安全问题":
  272. must_selected_option = [DRINKING_WATER_SECURITY_GUARANTEE]
  273. forbinddens_option = [
  274. INDUSTRY_SUPPORT,
  275. EMPLOYMENT_SUPPORT,
  276. FINANCIAL_SUPPORT,
  277. PUBLIC_WELFARE_SUPPORT,
  278. HOUSING_SECURITY_GUARANTEE,
  279. OBLIGATORY_EDUCATION_GUARANTEE,
  280. HEALTH_SUPPORT,
  281. EDUCATION_SUPPORT,
  282. COMPREHENSIVE_GUARANTEE,
  283. SOCIAL_SUPPORT,
  284. ]
  285. elif risk == "家庭成员中有未参加城乡居民(职工)基本医疗保险":
  286. must_selected_option = [HEALTH_SUPPORT, SOCIAL_SUPPORT]
  287. forbinddens_option = [
  288. INDUSTRY_SUPPORT,
  289. EMPLOYMENT_SUPPORT,
  290. FINANCIAL_SUPPORT,
  291. PUBLIC_WELFARE_SUPPORT,
  292. HOUSING_SECURITY_GUARANTEE,
  293. OBLIGATORY_EDUCATION_GUARANTEE,
  294. EDUCATION_SUPPORT,
  295. COMPREHENSIVE_GUARANTEE,
  296. DRINKING_WATER_SECURITY_GUARANTEE,
  297. ]
  298. must_selected = get_item_values_for(must_selected_option)
  299. forbiddens = get_item_values_for(forbinddens_option)
  300. # 如果option为空,代表无要求
  301. must_selected_ok = len(must_selected) > 0 if len(must_selected_option) > 0 else True
  302. forbiddens_ok = len(forbiddens) == 0 if len(forbinddens_option) > 0 else True
  303. # print(f"risk 为{risk}的必选项当前值:{must_selected},禁选项当前值:{forbiddens}")
  304. if not must_selected_ok or not forbiddens_ok:
  305. target = ws[f"{title_dict[get_key_for(i)]}{row_num}"]
  306. if not must_selected_ok:
  307. comment_and_fill_yellow_for(target, f"{get_key_for(i)} 未选择必选项,必选项为{must_selected_option}")
  308. else:
  309. forbiddens_keys = [x for x, y in forbiddens]
  310. forbiddens_values = [y for x, y in forbiddens]
  311. comment_and_fill_yellow_for(
  312. target,
  313. f"{get_key_for(i)} 填写了禁选项{forbiddens_keys},填写的内容为{forbiddens_values}",
  314. )
  315. def check_assistance(ws, row_num, title_dict):
  316. """检查实施开发式帮扶填写状态是否有问题,如果填写了开发式帮扶,以下四项:产业帮扶、就业帮扶、金融帮扶、公益岗位帮扶有一项填写的是其他或者技能培训就不合规"""
  317. assistance = "实施开发式帮扶措施情况"
  318. assistance_value = ws[f"{title_dict[assistance]}{row_num}"].value
  319. if assistance_value and len(assistance_value) > 0:
  320. for type in ["产业帮扶", "就业帮扶", "金融帮扶", "公益岗位帮扶"]:
  321. target = ws[f"{title_dict[type]}{row_num}"].value
  322. for key in ["其他", "技能培训"]:
  323. if key in target:
  324. comment_and_fill_yellow_for(target, f"实施开发式帮扶填写状态下,{type} 不允许选择 {key}")
  325. return
  326. # 填写了的话,剩下四项有一个是其他或者技能培训就不行
  327. # info_number = "户主证件号码"
  328. # identitycard = ws[f"{title_dict[info_number]}{row_num}"].value
  329. # if len(identitycard) not in [15, 18, 20, 22]:
  330. # target = ws[f"{title_dict[info_number]}{row_num}"]
  331. # comment_and_fill_yellow_for(target, "31.监测对象家庭成员证件号码位数异常(证件号码非15、18、20、22位)")
  332. # def get_item_values_for(ws, row_num, title_dict, items):
  333. # result = []
  334. # for item in items:
  335. # if item not in title_dict:
  336. # continue
  337. # value = ws[f"{title_dict[item]}{row_num}"].value
  338. # if value is not None:
  339. # result.append(value)
  340. # return result
  341. def comment_and_fill_yellow_for(target, comment):
  342. target.comment = Comment(text=comment, author="system")
  343. yellow_fill = PatternFill(patternType="solid", fgColor="FFFF00")
  344. target.fill = yellow_fill
  345. if __name__ == "__main__":
  346. # result = calculate_age_from_id_number("532801200607144126")
  347. # print(result)
  348. uvicorn.run("data_verification:app", host="0.0.0.0", port=8500, reload=True)