Windows平台Python aiodns模块异步代码无法执行
首先放一下原先的异步函数代码:
@staticmethod async def check_subdomain(domain: str): """异步子域名枚举""" resolver = aiodns.DNSResolver() found = [] try: with open("app/1knames.txt") as f: subdomains = [line.strip() for line in f] except FileNotFoundError: print("Subdomain dictionary not found") return False semaphore = asyncio.Semaphore(500) # 限制并发量 async def _query(sub: str): async with semaphore: full_domain = f"{sub}.{domain}" print(full_domain) try: a_records = await resolver.query(full_domain, 'A') if a_records: found.append(full_domain) return except aiodns.error.DNSError: pass try: cname_records = await resolver.query(full_domain, 'CNAME') if cname_records: found.append(full_domain) except aiodns.error.DNSError: pass batch_size = 2000 for i in range(0, len(subdomains), batch_size): batch = subdomains[i:i+batch_size] tasks = [_query(sub) for sub in batch] await asyncio.gather(*tasks) assets_table = AssetsTable(host=domain,subdomain=json.dumps(found)) assets_info = AssetsInfo(assets_table) assets_info.update_host_info() return True
乍一看没啥问题,但是如果在windows下跑起来就会发现啥输出都没有,如果一步步调试就会神奇的发现卡在第一行初始化
aiodns是异步dns模块,运行aiodns相关的函数不仅需要写成异步的形式,在windows下还需要更改asyncio的异步运行模式
在函数前面加一行就可以解决:
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())