首先放一下原先的异步函数代码:
@staticmethod
async def check_subdomain(domain: str):
"""异步子域名枚举"""
resolver = aiodns.DNSResolver()
found = []
try:
with open("app/1knames.txt") as f:
subdomains = [line.strip() for line in f]
except FileNotFoundError:
print("Subdomain dictionary not found")
return False
semaphore = asyncio.Semaphore(500) # 限制并发量
async def _query(sub: str):
async with semaphore:
full_domain = f"{sub}.{domain}"
print(full_domain)
try:
a_records = await resolver.query(full_domain, 'A')
if a_records:
found.append(full_domain)
return
except aiodns.error.DNSError:
pass
try:
cname_records = await resolver.query(full_domain, 'CNAME')
if cname_records:
found.append(full_domain)
except aiodns.error.DNSError:
pass
batch_size = 2000
for i in range(0, len(subdomains), batch_size):
batch = subdomains[i:i+batch_size]
tasks = [_query(sub) for sub in batch]
await asyncio.gather(*tasks)
assets_table = AssetsTable(host=domain,subdomain=json.dumps(found))
assets_info = AssetsInfo(assets_table)
assets_info.update_host_info()
return True
乍一看没啥问题,但是如果在windows下跑起来就会发现啥输出都没有,如果一步步调试就会神奇的发现卡在第一行初始化
aiodns是异步dns模块,运行aiodns相关的函数不仅需要写成异步的形式,在windows下还需要更改asyncio的异步运行模式
在函数前面加一行就可以解决:
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
文章评论