data_verification.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. import uvicorn
  2. import warnings
  3. from fastapi import FastAPI, UploadFile, File
  4. from openpyxl import load_workbook
  5. from openpyxl.utils.cell import coordinate_from_string
  6. from openpyxl.comments import Comment
  7. from openpyxl.styles import PatternFill
  8. from fastapi.middleware.cors import CORSMiddleware
  9. from datetime import datetime
  10. from fastapi.responses import FileResponse
  11. warnings.filterwarnings("ignore")
  12. app = FastAPI()
  13. app.add_middleware(
  14. CORSMiddleware,
  15. allow_origins=["*"],
  16. allow_credentials=True,
  17. allow_methods=["*"],
  18. allow_headers=["*"],
  19. )
  20. @app.post("/uploadfile")
  21. async def create_upload_file(file: UploadFile = File(...)):
  22. # print(file.filename)
  23. contents = await file.read()
  24. savename = "download_cache/" + file.filename
  25. # savename = "uploadfile/" + file.filename
  26. with open(savename, "wb") as f:
  27. f.write(contents)
  28. # 读取excel表
  29. workbook = load_workbook(savename)
  30. # 获取指定的sheet
  31. sheet_names = workbook.sheetnames
  32. # if "脱贫户信息查询" not in sheet_names:
  33. # print("读取不到指定的sheet页")
  34. # return {"code": 500, "msg": "读取不到指定的sheet页--脱贫户信息查询"}
  35. sheet = workbook[sheet_names[0]]
  36. title_row_num = 0
  37. # 读取前5行,正常应该有字段名了
  38. row_range = sheet[1:5]
  39. for i, r in enumerate(row_range):
  40. for j, c in enumerate(r):
  41. print(f"第{i + 1 }行,第{j}列,值:{c.value}")
  42. if "户主编号" == c.value or "户编号" == c.value:
  43. title_row_num = c.row
  44. # 获取字段名对应的列
  45. title_dict = {}
  46. title_rows = sheet[title_row_num]
  47. # 遍历字段名所在行的所有单元格
  48. for title_cell in title_rows:
  49. x, y = coordinate_from_string(title_cell.coordinate)
  50. title_dict[title_cell.value] = x
  51. # print(title_dict)
  52. # 开始读取表格内容
  53. # print(sheet.max_row)
  54. read_data(sheet, title_row_num + 1, sheet.max_row, title_dict)
  55. # 保存文档
  56. workbook.save(savename)
  57. return FileResponse(savename)
  58. # return {"code": 200, "msg": "分析完成,请点击下载查看分析结果", "fileName": file.filename}
  59. def calculate_age_from_id_number(id_number):
  60. """根据传入的身份证号码,提取出出生年月日,结合代码运行时的当天时间,返回当前年龄(周岁)"""
  61. age = 0
  62. if len(id_number) != 18:
  63. return age
  64. birthday = id_number[6:14]
  65. birthday = datetime.strptime(birthday, "%Y%m%d")
  66. today = datetime.now()
  67. age = today.year - birthday.year
  68. if today.month < birthday.month or (today.month == birthday.month and today.day < birthday.day):
  69. age -= 1
  70. return age
  71. def verify_age_region_for_education(education, age):
  72. """根据传入的学历和年龄判断该年龄是否符合该学历"""
  73. # 义务教育年龄段
  74. # if education == "" and (age >= 6 and age <= 15):
  75. # return False
  76. education_for_age = {
  77. "学龄前儿童": (0, 6),
  78. "学前教育": (3, 6),
  79. "小学": (6, 12),
  80. "七年级": (12, 13),
  81. "八年级": (13, 14),
  82. "九年级": (14, 15),
  83. "初中": (12, 15),
  84. "高中": (15, 18),
  85. "中专": (15, 18),
  86. "中职": (15, 18),
  87. "高职": (18, 22),
  88. "高专": (18, 22),
  89. "大专": (18, 20),
  90. "本科": (18, 22),
  91. "硕士": (22, 25),
  92. }
  93. for education_key in education_for_age.keys():
  94. if education_key in education:
  95. age_range = education_for_age[education_key]
  96. if age < age_range[0] or age > age_range[1]:
  97. return False
  98. break
  99. return True
  100. def read_data(ws, start_row, end_row, title_dict):
  101. # 监测对象致(返)贫风险非最新设计的风险类型
  102. for i in range(start_row, end_row):
  103. check_poverty_causes(ws, i, title_dict)
  104. check_identitycard_length(ws, i, title_dict)
  105. check_deformity(ws, i, title_dict)
  106. check_education_level(ws, i, title_dict)
  107. check_risk_type(ws, i, title_dict)
  108. check_assistance(ws, i, title_dict)
  109. # 筛查方法
  110. def check_poverty_causes(ws, row_num, title_dict):
  111. """筛查主要致贫原因是否合规"""
  112. main_reason = "主要致贫原因"
  113. if main_reason not in title_dict:
  114. return
  115. poverty_causes = ws[f"{title_dict[main_reason]}{row_num}"].value
  116. # 致贫原因列表
  117. imageTypeList = ["因病", "因学", "因残", "因自然灾害", "因意外事故", "因产业项目失败", "因务工就业不稳", "缺劳动力", "因房", "因水", "其他(填写备注)"]
  118. if poverty_causes not in imageTypeList:
  119. target = ws[f"{title_dict[main_reason]}{row_num}"]
  120. comment_and_fill_yellow_for(target, "21.监测对象致(返)贫风险非最新设计的风险类型")
  121. def check_identitycard_length(ws, row_num, title_dict):
  122. """筛查身份证号码是否合规"""
  123. info_number = "户主证件号码"
  124. if info_number not in title_dict:
  125. return
  126. identitycard = ws[f"{title_dict[info_number]}{row_num}"].value
  127. if len(identitycard) not in [15, 18, 20, 22]:
  128. target = ws[f"{title_dict[info_number]}{row_num}"]
  129. comment_and_fill_yellow_for(target, "31.监测对象家庭成员证件号码位数异常(证件号码非15、18、20、22位)")
  130. def check_deformity(ws, row_num, title_dict):
  131. """筛查是否不符合残疾劳动标准"""
  132. condition = "健康状况"
  133. skill = "劳动技能"
  134. if condition not in title_dict or skill not in title_dict:
  135. return
  136. condition_value = ws[f"{title_dict[condition]}{row_num}"].value
  137. skill_value = ws[f"{title_dict[skill]}{row_num}"].value
  138. if "残疾" in condition_value and skill_value == "弱劳动力或半劳动力":
  139. target = ws[f"{title_dict[skill]}{row_num}"]
  140. comment_and_fill_yellow_for(target, "35.一、二级(重度)肢体残疾脱贫人口有普通劳动能力")
  141. def check_education_level(ws, row_num, title_dict):
  142. """筛查在校生状况填写是否不符合年龄"""
  143. info_number = "证件号码"
  144. if info_number not in title_dict:
  145. return
  146. identitycard = ws[f"{title_dict[info_number]}{row_num}"].value
  147. if len(identitycard) != 18:
  148. return
  149. education_key = "文化程度"
  150. is_student_key = "在校生状况"
  151. if education_key not in title_dict or is_student_key not in title_dict:
  152. return
  153. education = ws[f"{title_dict[education_key]}{row_num}"].value
  154. is_student = ws[f"{title_dict[is_student_key]}{row_num}"].value
  155. if len(education) == 0 and len(is_student) == 0:
  156. target = ws[f"{title_dict[education_key]}{row_num}"]
  157. comment_and_fill_yellow_for(target, "文化程度,在校生状况均未填写")
  158. target_is_student = ws[f"{title_dict[education_key]}{row_num}"]
  159. comment_and_fill_yellow_for(target_is_student, "文化程度,在校生状况均未填写")
  160. return
  161. if len(is_student) == 0:
  162. return
  163. age = calculate_age_from_id_number(identitycard)
  164. age_is_reliable = verify_age_region_for_education(is_student, age)
  165. if not age_is_reliable:
  166. target = ws[f"{title_dict[is_student_key]}{row_num}"]
  167. comment_and_fill_yellow_for(target, "在校生状况与年龄不符")
  168. def check_risk_type(ws, row_num, title_dict):
  169. """筛查风险类型,家庭信息是否合规"""
  170. def get_item_values_for(items):
  171. result = []
  172. for item in items:
  173. if item not in title_dict:
  174. continue
  175. value = ws[f"{title_dict[item]}{row_num}"].value
  176. if value is not None and len(value) > 0:
  177. result.append((item, value))
  178. return result
  179. def get_key_for(i):
  180. key = f"致贫风险{i}"
  181. return key
  182. risks = []
  183. for i in range(1, 6):
  184. key = get_key_for(i)
  185. if key not in title_dict:
  186. return
  187. risk = ws[f"{title_dict[key]}{row_num}"].value
  188. if risk is not None and len(risk) > 0:
  189. risks.append((risk, i))
  190. print(f"risks is {risks}")
  191. # 定义:健康帮扶,"综合保障,社会帮扶,义务教育保障, 教育帮扶, 住房安全保障, 搬迁, 饮水安全保障, 产业帮扶, 就业帮扶, 金融帮扶, 公益岗位帮扶等常量
  192. HEALTH_SUPPORT = "健康帮扶"
  193. COMPREHENSIVE_GUARANTEE = "综合保障"
  194. SOCIAL_SUPPORT = "社会帮扶"
  195. OBLIGATORY_EDUCATION_GUARANTEE = "义务教育保障"
  196. EDUCATION_SUPPORT = "教育帮扶"
  197. HOUSING_SECURITY_GUARANTEE = "住房安全保障"
  198. RELOCATION = "搬迁"
  199. DRINKING_WATER_SECURITY_GUARANTEE = "饮水安全保障"
  200. INDUSTRY_SUPPORT = "产业帮扶"
  201. EMPLOYMENT_SUPPORT = "就业帮扶"
  202. FINANCIAL_SUPPORT = "金融帮扶"
  203. PUBLIC_WELFARE_SUPPORT = "公益岗位帮扶"
  204. must_selected_option = []
  205. forbinddens_option = []
  206. for risk, i in risks:
  207. if risk == "因病":
  208. must_selected_option = [HEALTH_SUPPORT, COMPREHENSIVE_GUARANTEE, SOCIAL_SUPPORT]
  209. forbinddens_option = [
  210. HOUSING_SECURITY_GUARANTEE,
  211. DRINKING_WATER_SECURITY_GUARANTEE,
  212. OBLIGATORY_EDUCATION_GUARANTEE,
  213. EDUCATION_SUPPORT,
  214. ]
  215. elif risk == "因学":
  216. must_selected_option = [OBLIGATORY_EDUCATION_GUARANTEE, EDUCATION_SUPPORT, SOCIAL_SUPPORT]
  217. forbinddens_option = [
  218. HOUSING_SECURITY_GUARANTEE,
  219. DRINKING_WATER_SECURITY_GUARANTEE,
  220. EDUCATION_SUPPORT,
  221. ]
  222. elif risk == "因残":
  223. must_selected_option = [COMPREHENSIVE_GUARANTEE, SOCIAL_SUPPORT]
  224. forbinddens_option = [
  225. HOUSING_SECURITY_GUARANTEE,
  226. DRINKING_WATER_SECURITY_GUARANTEE,
  227. OBLIGATORY_EDUCATION_GUARANTEE,
  228. EDUCATION_SUPPORT,
  229. ]
  230. elif risk == "因产业项目失败" or risk == "因务工就业不稳" or risk == "缺劳动力":
  231. must_selected_option = []
  232. forbinddens_option = [
  233. HOUSING_SECURITY_GUARANTEE,
  234. DRINKING_WATER_SECURITY_GUARANTEE,
  235. OBLIGATORY_EDUCATION_GUARANTEE,
  236. EDUCATION_SUPPORT,
  237. HEALTH_SUPPORT,
  238. ]
  239. elif risk == "住房出现安全问题":
  240. must_selected_option = [HOUSING_SECURITY_GUARANTEE, RELOCATION]
  241. forbinddens_option = [
  242. DRINKING_WATER_SECURITY_GUARANTEE,
  243. HEALTH_SUPPORT,
  244. OBLIGATORY_EDUCATION_GUARANTEE,
  245. EDUCATION_SUPPORT,
  246. COMPREHENSIVE_GUARANTEE,
  247. ]
  248. elif risk == "家庭成员中有义务教育阶段适龄儿童少年失学辍学":
  249. must_selected_option = [OBLIGATORY_EDUCATION_GUARANTEE]
  250. forbinddens_option = [
  251. INDUSTRY_SUPPORT,
  252. EMPLOYMENT_SUPPORT,
  253. FINANCIAL_SUPPORT,
  254. PUBLIC_WELFARE_SUPPORT,
  255. HOUSING_SECURITY_GUARANTEE,
  256. DRINKING_WATER_SECURITY_GUARANTEE,
  257. HEALTH_SUPPORT,
  258. EDUCATION_SUPPORT,
  259. COMPREHENSIVE_GUARANTEE,
  260. SOCIAL_SUPPORT,
  261. ]
  262. elif risk == "饮水安全问题":
  263. must_selected_option = [DRINKING_WATER_SECURITY_GUARANTEE]
  264. forbinddens_option = [
  265. INDUSTRY_SUPPORT,
  266. EMPLOYMENT_SUPPORT,
  267. FINANCIAL_SUPPORT,
  268. PUBLIC_WELFARE_SUPPORT,
  269. HOUSING_SECURITY_GUARANTEE,
  270. OBLIGATORY_EDUCATION_GUARANTEE,
  271. HEALTH_SUPPORT,
  272. EDUCATION_SUPPORT,
  273. COMPREHENSIVE_GUARANTEE,
  274. SOCIAL_SUPPORT,
  275. ]
  276. elif risk == "家庭成员中有未参加城乡居民(职工)基本医疗保险":
  277. must_selected_option = [HEALTH_SUPPORT, SOCIAL_SUPPORT]
  278. forbinddens_option = [
  279. INDUSTRY_SUPPORT,
  280. EMPLOYMENT_SUPPORT,
  281. FINANCIAL_SUPPORT,
  282. PUBLIC_WELFARE_SUPPORT,
  283. HOUSING_SECURITY_GUARANTEE,
  284. OBLIGATORY_EDUCATION_GUARANTEE,
  285. EDUCATION_SUPPORT,
  286. COMPREHENSIVE_GUARANTEE,
  287. DRINKING_WATER_SECURITY_GUARANTEE,
  288. ]
  289. must_selected = get_item_values_for(must_selected_option)
  290. forbiddens = get_item_values_for(forbinddens_option)
  291. # 如果option为空,代表无要求
  292. must_selected_ok = len(must_selected) > 0 if len(must_selected_option) > 0 else True
  293. forbiddens_ok = len(forbiddens) == 0 if len(forbinddens_option) > 0 else True
  294. print(f"risk 为{risk}的必选项当前值:{must_selected},禁选项当前值:{forbiddens}")
  295. if not must_selected_ok or not forbiddens_ok:
  296. target = ws[f"{title_dict[get_key_for(i)]}{row_num}"]
  297. if not must_selected_ok:
  298. comment_and_fill_yellow_for(target, f"{get_key_for(i)} 未选择必选项,必选项为{must_selected_option}")
  299. else:
  300. forbiddens_keys = [x for x, y in forbiddens]
  301. forbiddens_values = [y for x, y in forbiddens]
  302. comment_and_fill_yellow_for(
  303. target,
  304. f"{get_key_for(i)} 填写了禁选项{forbiddens_keys},填写的内容为{forbiddens_values}",
  305. )
  306. def check_assistance(ws, row_num, title_dict):
  307. """检查实施开发式帮扶填写状态是否有问题,如果填写了开发式帮扶,以下四项:产业帮扶、就业帮扶、金融帮扶、公益岗位帮扶有一项填写的是其他或者技能培训就不合规"""
  308. assistance = "实施开发式帮扶措施情况"
  309. assistance_value = ws[f"{title_dict[assistance]}{row_num}"].value
  310. if assistance_value and len(assistance_value) > 0:
  311. for type in ["产业帮扶", "就业帮扶", "金融帮扶", "公益岗位帮扶"]:
  312. target = ws[f"{title_dict[type]}{row_num}"].value
  313. for key in ["其他", "技能培训"]:
  314. if key in target:
  315. comment_and_fill_yellow_for(target, f"实施开发式帮扶填写状态下,{type} 不允许选择 {key}")
  316. return
  317. # 填写了的话,剩下四项有一个是其他或者技能培训就不行
  318. # info_number = "户主证件号码"
  319. # identitycard = ws[f"{title_dict[info_number]}{row_num}"].value
  320. # if len(identitycard) not in [15, 18, 20, 22]:
  321. # target = ws[f"{title_dict[info_number]}{row_num}"]
  322. # comment_and_fill_yellow_for(target, "31.监测对象家庭成员证件号码位数异常(证件号码非15、18、20、22位)")
  323. # def get_item_values_for(ws, row_num, title_dict, items):
  324. # result = []
  325. # for item in items:
  326. # if item not in title_dict:
  327. # continue
  328. # value = ws[f"{title_dict[item]}{row_num}"].value
  329. # if value is not None:
  330. # result.append(value)
  331. # return result
  332. def comment_and_fill_yellow_for(target, comment):
  333. target.comment = Comment(text=comment, author="system")
  334. yellow_fill = PatternFill(patternType="solid", fgColor="FFFF00")
  335. target.fill = yellow_fill
  336. if __name__ == "__main__":
  337. # result = calculate_age_from_id_number("532801200607144126")
  338. # print(result)
  339. uvicorn.run("data_verification:app", host="localhost", port=8000, reload=True)