[Python] 纯文本查看 复制代码 import json
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
import logging
from urllib.parse import urljoin
from os import makedirs
from os.path import exists
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s') # 日志配置
INDEX_URL = 'https://spa2.scrape.center/page/{page}' # 电影列表页面
TIME_OUT = 10 # 等待时长
TOTAL_PAGE = 10 # 页数
browser = webdriver.Chrome() # 初始化浏览器
wait = WebDriverWait(browser, TIME_OUT) # 配置页面加载最长等待时间
def scrape_page(url, condition, locator): # 定义通用爬取方法
logging.info('scraping %s', url)
try:
browser.get(url)
wait.until(condition(locator)) # 等待
except TimeoutException: # 报错处理
logging.error('error occurred while scraping %s', url, exc_info=True)
def scrapge_index(page):
url = INDEX_URL.format(page=page) # 完善url
scrape_page(url, condition=EC.visibility_of_all_elements_located, # 元素可见方法
locator=(By.CSS_SELECTOR, '#index .item')) # CSS定位元素
def scrape_detail(url):
scrape_page(url, condition=EC.presence_of_element_located,
locator=(By.TAG_NAME, 'h2'))
def parse_detail():
url = browser.current_url # 获取当前链接
name = browser.find_element_by_tag_name('h2').text
categories = [element.text for element in browser.find_elements_by_css_selector('.categories button span')]
cover = browser.find_element_by_css_selector('.cover').get_attribute('src')
score = browser.find_element_by_class_name('score').text
drama = browser.find_element_by_css_selector('.drama p').text
return {
'url': url,
'name': name,
'categories': categories,
'cover': cover,
'score': score,
'drama': drama
}
def parse_index(): # 获取电影详情链接
elements = browser.find_elements_by_css_selector('#index .item .name')
for element in elements:
href = element.get_attribute('href')
yield urljoin(INDEX_URL, href)
RESULTS_DIR = 'results' # 定义目录
exists(RESULTS_DIR) or makedirs(RESULTS_DIR) # 确定路径在不在,没有新建
def save_data(data): # 储存
name = data.get('name')
data_path = f'{RESULTS_DIR}/{name}.json'
json.dump(data, open(data_path, 'w', encoding='utf-8'), ensure_ascii=False, indent=2)
def main():
try:
for page in range(1, TOTAL_PAGE + 1):
scrapge_index(page)
detail_urls = parse_index()
for detail_url in list(detail_urls):
logging.info('get detail url %s', detail_url)
scrape_detail(detail_url)
detail_data = parse_detail()
save_data(detail_data)
logging.info('detail data %s', detail_data)
finally:
browser.close()
if __name__ == '__main__':
main()
|