import email as email_reader
import requests
import imaplib
from email.header import decode_header
from bs4 import BeautifulSoup # 添加HTML解析库
def get_access_token_from_refresh_token(refresh_token, client_id):
headers = {
'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8',
}
data = {
"client_id": client_id,
"refresh_token": refresh_token,
"grant_type": "refresh_token"
}
try:
rr = requests.post(
"https://login.microsoftonline.com/common/oauth2/v2.0/token",
headers=headers,
data=data,
timeout=10
)
rr.raise_for_status()
response_data = rr.json()
if "error" not in response_data:
return {
"code": 0,
"access_token": response_data["access_token"],
"refresh_token": response_data.get("refresh_token", refresh_token)
}
elif "error_description" in response_data and "User account is found to be in service abuse mode" in \
response_data["error_description"]:
return {"code": 1, "message": "账户被封锁或凭证错误"}
else:
return {"code": 1, "message": f"获取访问令牌失败: {response_data.get('error_description', '未知错误')}"}
except requests.exceptions.RequestException as e:
return {"code": 1, "message": f"网络请求错误: {str(e)}"}
def imap_authenticate_with_oauth2(username, access_token):
try:
auth_string = f"user={username}\1auth=Bearer {access_token}\1\1"
mail = imaplib.IMAP4_SSL("outlook.office365.com")
mail.authenticate("XOAUTH2", lambda x: auth_string)
return mail
except Exception as e:
raise Exception(f"IMAP认证失败: {str(e)}")
def decode_str(s):
value, charset = decode_header(s)[0]
if charset:
value = value.decode(charset)
return value
def html_to_text(html_content):
"""将HTML内容转换为纯文本"""
if not html_content:
return ""
soup = BeautifulSoup(html_content, 'html.parser')
# 处理pre标签,保留换行
for pre in soup.find_all('pre'):
pre.replace_with(f"\n{pre.get_text()}\n")
# 处理br标签,转换为换行
for br in soup.find_all('br'):
br.replace_with("\n")
# 获取纯文本并处理多余空白
text = soup.get_text()
# 合并多个空白为一个
text = ' '.join(text.split())
# 恢复换行
text = text.replace(' \n ', '\n')
text = text.replace('\n ', '\n')
text = text.replace(' \n', '\n')
return text
def get_latest_email(email, access_token):
mail = imap_authenticate_with_oauth2(email, access_token)
try:
mail.select("inbox")
status, messages = mail.search(None, 'ALL')
if not messages[0]:
return {"code": 1, "message": "收件箱为空"}
latest_mail_id = messages[0].split()[-1]
status, msg_data = mail.fetch(latest_mail_id, '(RFC822)')
raw_email = msg_data[0][1]
msg = email_reader.message_from_bytes(raw_email)
subject = decode_str(msg["Subject"])
content = ""
content_type = ""
if msg.is_multipart():
# 优先获取text/plain部分
for part in msg.walk():
part_content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
try:
part_body = part.get_payload(decode=True)
if not part_body:
continue
# 尝试解码内容
charset = part.get_content_charset() or 'utf-8'
try:
decoded_body = part_body.decode(charset, errors='replace')
except LookupError:
decoded_body = part_body.decode('utf-8', errors='replace')
if part_content_type == "text/plain" and "attachment" not in content_disposition:
content = decoded_body
content_type = "text/plain"
break
elif part_content_type == "text/html" and "attachment" not in content_disposition and not content:
content = decoded_body
content_type = "text/html"
except Exception as e:
continue
else:
body = msg.get_payload(decode=True)
if body:
charset = msg.get_content_charset() or 'utf-8'
try:
content = body.decode(charset, errors='replace')
except LookupError:
content = body.decode('utf-8', errors='replace')
content_type = msg.get_content_type()
# 如果是HTML内容,转换为纯文本
if content_type == "text/html":
content = html_to_text(content)
return {
"code": 0,
"subject": subject,
"content": content
}
except Exception as e:
return {"code": 1, "message": f"读取邮件失败: {str(e)}"}
finally:
mail.close()
mail.logout()
def example():
email = ''
client_id = 'your_client_id'
refresh_token = "your_refresh_token"
token_result = get_access_token_from_refresh_token(refresh_token, client_id)
if token_result["code"] != 0:
print(f"获取令牌失败: {token_result['message']}")
return
email_result = get_latest_email(email, token_result["access_token"])
if email_result["code"] == 0:
print(f"主题: {email_result['subject']}")
print(f"内容: {email_result['content'][:2000]}...")
else:
print(f"获取邮件失败: {email_result['message']}")
if name == 'main':
example()