1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
| import os
import urllib.parse
import requests
from bs4 import BeautifulSoup
from markdownify import markdownify as md
from urllib.parse import unquote, urlparse, parse_qs
from tqdm import tqdm
import json
from util import insert_new_line, get_article_date, process_markdown_content, download_image, download_video, get_valid_filename
class ZhihuParserLocal:
def __init__(self, cookies, hexo_uploader=False):
self.hexo_uploader = hexo_uploader # 是否为 hexo 博客上传
self.cookies = cookies # 登录知乎后的 cookies
self.session = requests.Session() # 创建会话
self.user_agents = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" # 用户代理
self.headers = { # 请求头
'User-Agent': self.user_agents,
'Accept-Language': 'en,zh-CN;q=0.9,zh;q=0.8',
'Cookie': self.cookies
}
self.session.headers.update(self.headers) # 更新会话的请求头
self.soup = None # 存储页面的 BeautifulSoup 对象
def check_connect_error(self, target_link):
"""
检查是否连接错误
"""
try:
response = self.session.get(target_link)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
print(f"HTTP error occurred: {err}")
except requests.exceptions.RequestException as err:
print(f"Error occurred: {err}")
self.soup = BeautifulSoup(response.content, "html.parser")
if self.soup.text.find("有问题,就会有答案打开知乎App在「我的页」右上角打开扫一扫其他扫码方式") != -1:
print("Cookies are required to access the article.")
if self.soup.text.find("你似乎来到了没有知识存在的荒原") != -1:
print("The page does not exist.")
def judge_type(self, target_link):
"""
判断url类型
"""
if target_link.find("column") != -1:
# 如果是专栏
title = self.parse_zhihu_column(target_link)
elif target_link.find("answer") != -1:
# 如果是回答
title = self.parse_zhihu_answer(target_link)
elif target_link.find("zvideo") != -1:
# 如果是视频
title = self.parse_zhihu_zvideo(target_link)
else:
# 如果是单篇文章
title = self.parse_zhihu_article(target_link)
return title
def save_and_transform(self, title_element, content_element, author, target_link, date=None):
"""
转化并保存为 Markdown 格式文件
"""
# 获取标题和内容
if title_element is not None:
title = title_element.text.strip()
else:
title = "Untitled"
# 防止文件名称太长,加载不出图像
# markdown_title = get_valid_filename(title[-20:-1])
# 如果觉得文件名太怪不好管理,那就使用全名
markdown_title = get_valid_filename(title)
if date:
markdown_title = f"({date}){markdown_title}_{author}"
else:
markdown_title = f"{markdown_title}_{author}"
if content_element is not None:
# 将 css 样式移除
for style_tag in content_element.find_all("style"):
style_tag.decompose()
for img_lazy in content_element.find_all("img", class_=lambda x: 'lazy' in x if x else True):
img_lazy.decompose()
# 处理内容中的标题
for header in content_element.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']):
header_level = int(header.name[1]) # 从标签名中获取标题级别(例如,'h2' -> 2)
header_text = header.get_text(strip=True) # 提取标题文本
# 转换为 Markdown 格式的标题
markdown_header = f"{'#' * header_level} {header_text}"
insert_new_line(self.soup, header, 1)
header.replace_with(markdown_header)
# 处理回答中的图片
for img in content_element.find_all("img"):
if 'src' in img.attrs:
img_url = img.attrs['src']
else:
continue
img_name = urllib.parse.quote(os.path.basename(img_url))
img_path = f"{markdown_title}/{img_name}"
extensions = ['.jpg', '.png', '.gif'] # 可以在此列表中添加更多的图片格式
# 如果图片链接中图片后缀后面还有字符串则直接截停
for ext in extensions:
index = img_path.find(ext)
if index != -1:
img_path = img_path[:index + len(ext)]
break # 找到第一个匹配的格式后就跳出循环
img["src"] = img_path
# 下载图片并保存到本地
os.makedirs(os.path.dirname(img_path), exist_ok=True)
download_image(img_url, img_path, self.session)
# 在图片后插入换行符
insert_new_line(self.soup, img, 1)
# 在图例后面加上换行符
for figcaption in content_element.find_all("figcaption"):
insert_new_line(self.soup, figcaption, 2)
# 处理链接
for link in content_element.find_all("a"):
if 'href' in link.attrs:
original_url = link.attrs['href']
# 解析并解码 URL
parsed_url = urlparse(original_url)
query_params = parse_qs(parsed_url.query)
target_url = query_params.get('target', [original_url])[
0] # 使用原 URL 作为默认值
article_url = unquote(target_url) # 解码 URL
# 如果没有 data-text 属性,则使用文章链接作为标题
if 'data-text' not in link.attrs:
article_title = article_url
else:
article_title = link.attrs['data-text']
markdown_link = f"[{article_title}]({article_url})"
link.replace_with(markdown_link)
# 提取并存储数学公式
math_formulas = []
math_tags = []
for math_span in content_element.select("span.ztext-math"):
latex_formula = math_span['data-tex']
# math_formulas.append(latex_formula)
# math_span.replace_with("@@MATH@@")
# 使用特殊标记标记位置
if latex_formula.find("\\tag") != -1:
math_tags.append(latex_formula)
insert_new_line(self.soup, math_span, 1)
math_span.replace_with("@@MATH_FORMULA@@")
else:
math_formulas.append(latex_formula)
math_span.replace_with("@@MATH@@")
# 获取文本内容
content = content_element.decode_contents().strip()
# 转换为 markdown
content = md(content)
# 将特殊标记替换为 LaTeX 数学公式
for formula in math_formulas:
if self.hexo_uploader:
content = content.replace(
"@@MATH@@", "$" + "{% raw %}" + formula + "{% endraw %}" + "$", 1)
else:
# 如果公式中包含 $ 则不再添加 $ 符号
if formula.find('$') != -1:
content = content.replace("@@MATH@@", f"{formula}", 1)
else:
content = content.replace(
"@@MATH@@", f"${formula}$", 1)
for formula in math_tags:
if self.hexo_uploader:
content = content.replace(
"@@MATH\_FORMULA@@",
"$$" + "{% raw %}" + formula + "{% endraw %}" + "$$",
1,
)
else:
# 如果公式中包含 $ 则不再添加 $ 符号
if formula.find("$") != -1:
content = content.replace(
"@@MATH\_FORMULA@@", f"{formula}", 1)
else:
content = content.replace(
"@@MATH\_FORMULA@@", f"$${formula}$$", 1)
else:
content = ""
# 转化为 Markdown 格式
if content:
markdown = f"# {title}\n\n **Author:** [{author}]\n\n **Link:** [{target_link}]\n\n{content}"
else:
markdown = f"# {title}\n\n Content is empty."
# 保存 Markdown 文件
with open(f"{markdown_title}.md", "w", encoding="utf-8") as f:
f.write(markdown)
return markdown_title
def parse_zhihu_zvideo(self, target_link):
"""
解析知乎视频并保存为 Markdown 格式文件
"""
self.check_connect_error(target_link)
data = json.loads(self.soup.select_one(
"div.ZVideo-video")['data-zop']) # 获取视频数据
# match = re.search(r"\d{4}-\d{2}-\d{2}",
# soup.select_one("div.ZVideo-meta").text) # 获取日期
# if match:
# # 将日期中的"-"替换为空字符以格式化为YYYYMMDD
# date = match.group().replace('-', '')
# else:
# date = "Unknown"
date = get_article_date(self.soup, "div.ZVideo-meta")
markdown_title = f"({date}){data['authorName']}_{data['title']}/{data['authorName']}_{data['title']}.mp4"
video_url = None
script = self.soup.find('script', id='js-initialData')
if script:
data = json.loads(script.text)
try:
videos = data['initialState']['entities']['zvideos']
for video_id, video_info in videos.items():
if 'playlist' in video_info['video']:
for quality, details in video_info['video']['playlist'].items():
video_url = details['playUrl']
except KeyError as e:
print("Key error in parsing JSON data:", e)
return None
else:
print("No suitable script tag found for video data")
return None
os.makedirs(os.path.dirname(markdown_title), exist_ok=True)
download_video(video_url, markdown_title, self.session)
return markdown_title
def parse_zhihu_article(self, target_link):
"""
解析知乎文章并保存为Markdown格式文件
"""
self.check_connect_error(target_link)
title_element = self.soup.select_one("h1.Post-Title")
content_element = self.soup.select_one("div.Post-RichTextContainer")
date = get_article_date(self.soup, "div.ContentItem-time")
author = self.soup.select_one('div.AuthorInfo').find(
'meta', {'itemprop': 'name'}).get('content')
markdown_title = self.save_and_transform(
title_element, content_element, author, target_link, date)
return markdown_title
def parse_zhihu_answer(self, target_link):
"""
解析知乎回答并保存为 Markdown 格式文件
"""
self.check_connect_error(target_link)
# 找到回答标题、内容、作者所在的元素
title_element = self.soup.select_one("h1.QuestionHeader-title")
content_element = self.soup.select_one("div.RichContent-inner")
date = get_article_date(self.soup, "div.ContentItem-time")
author = self.soup.select_one('div.AuthorInfo').find(
'meta', {'itemprop': 'name'}).get('content')
# 解析知乎文章并保存为Markdown格式文件
markdown_title = self.save_and_transform(
title_element, content_element, author, target_link, date)
return markdown_title
def load_processed_articles(self, filename):
"""
从文件加载已处理文章的ID。
"""
if not os.path.exists(filename):
return set()
with open(filename, 'r', encoding='utf-8') as file:
return set(file.read().splitlines())
def save_processed_article(self, filename, article_id):
"""
将处理过的文章ID保存到文件。
"""
with open(filename, 'a', encoding='utf-8') as file:
file.write(article_id + '\n')
def parse_zhihu_column(self, target_link):
"""
解析知乎专栏并保存为 Markdown 格式文件
"""
self.check_connect_error(target_link)
# 将所有文章放在一个以专栏标题命名的文件夹中
title = self.soup.text.split('-')[0].strip()
total_articles = int(self.soup.text.split(
'篇内容')[0].split('·')[-1].strip()) # 总文章数
folder_name = get_valid_filename(title)
os.makedirs(folder_name, exist_ok=True)
os.chdir(folder_name)
processed_filename = "zhihu_processed_articles.txt"
processed_articles = self.load_processed_articles(processed_filename)
# 获取所有文章链接
offset = 0
total_parsed = 0
# # 首先获取总文章数
# api_url = f"/api/v4/columns/{target_link.split('/')[-1]}/items?limit=1&offset=0"
# response = session.get(f"https://www.zhihu.com{api_url}")
# total_articles = response.json()["paging"]["totals"]
# 计算已处理的文章数
already_processed = len(processed_articles)
# 初始化进度条,从已处理的文章数开始
progress_bar = tqdm(total=total_articles,
initial=already_processed, desc="解析文章")
while True:
api_url = f"/api/v4/columns/{target_link.split('/')[-1]}/items?limit=10&offset={offset}"
response = self.session.get(f"https://www.zhihu.com{api_url}")
data = response.json()
for item in data["data"]:
if item["type"] == "zvideo":
video_id = str(item["id"])
if video_id in processed_articles:
continue
video_link = f"https://www.zhihu.com/zvideo/{video_id}"
self.parse_zhihu_zvideo(video_link)
self.save_processed_article(processed_filename, video_id)
progress_bar.update(1) # 更新进度条
elif item["type"] == "article":
article_id = str(item["id"])
if article_id in processed_articles:
continue
article_link = f"https://zhuanlan.zhihu.com/p/{article_id}"
self.parse_zhihu_article(article_link)
self.save_processed_article(processed_filename, article_id)
progress_bar.update(1) # 更新进度条
elif item["type"] == "answer":
answer_id = str(item["id"])
if answer_id in processed_articles:
continue
answer_link = f"https://www.zhihu.com/question/{item['question']['id']}/answer/{answer_id}"
self.judge_type(answer_link)
self.save_processed_article(processed_filename, answer_id)
progress_bar.update(1)
if data["paging"]["is_end"]:
break
offset += 10
progress_bar.close() # 完成后关闭进度条
os.remove(processed_filename) # 删除已处理文章的ID文件
return folder_name
class ZhihuParser:
def __init__(self, cookies, hexo_uploader=False):
self.hexo_uploader = hexo_uploader # 是否为 hexo 博客上传
self.cookies = cookies # 登录知乎后的 cookies
self.session = requests.Session() # 创建会话
self.user_agents = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" # 用户代理
self.headers = { # 请求头
'User-Agent': self.user_agents,
'Accept-Language': 'en,zh-CN;q=0.9,zh;q=0.8',
'Cookie': self.cookies
}
self.session.headers.update(self.headers) # 更新会话的请求头
self.soup = None # 存储页面的 BeautifulSoup 对象
def check_connect_error(self, target_link):
"""
检查是否连接错误
"""
try:
response = self.session.get(target_link)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
print(f"HTTP error occurred: {err}")
except requests.exceptions.RequestException as err:
print(f"Error occurred: {err}")
self.soup = BeautifulSoup(response.content, "html.parser")
if self.soup.text.find("有问题,就会有答案打开知乎App在「我的页」右上角打开扫一扫其他扫码方式") != -1:
print("Cookies are required to access the article.")
if self.soup.text.find("你似乎来到了没有知识存在的荒原") != -1:
print("The page does not exist.")
def judge_type(self, target_link):
"""
判断url类型
"""
if target_link.find("column") != -1:
# 如果是专栏
title = self.parse_zhihu_column(target_link)
elif target_link.find("answer") != -1:
# 如果是回答
title = self.parse_zhihu_answer(target_link)
elif target_link.find("zvideo") != -1:
# 如果是视频
title = self.parse_zhihu_zvideo(target_link)
else:
# 如果是单篇文章
title = self.parse_zhihu_article(target_link)
return title
def save_and_transform(self, title_element, content_element, author, target_link, date=None):
"""
转化并保存为 Markdown 格式文件
"""
# 获取标题和内容
title = title_element.text.strip() if title_element is not None else "Untitled"
markdown_title = get_valid_filename(title) if date is None else f"({date}){get_valid_filename(title)}_{author}"
if content_element is not None:
# 将 css 样式移除
for style_tag in content_element.find_all("style"):
style_tag.decompose()
for img_lazy in content_element.find_all("img", class_=lambda x: 'lazy' in x if x else True):
img_lazy.decompose()
# 处理内容中的标题
for header in content_element.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']):
header_level = int(header.name[1])
header_text = header.get_text(strip=True)
markdown_header = f"{'#' * header_level} {header_text}"
insert_new_line(self.soup, header, 1)
header.replace_with(markdown_header)
# 处理回答中的图片
for figure in content_element.find_all("figure"):
img = figure.find("img")
if img and 'src' in img.attrs:
img_url = img.attrs['src']
# 提取 figcaption 内容
figcaption = figure.find("figcaption")
figcaption_text = figcaption.get_text(strip=True) if figcaption else "Image"
# 替换 img 标签为 Markdown 图片格式, 使用 figcaption 内容作为描述
markdown_img = f""
# 在 img 后插入 Markdown 格式字符串
img.insert_after(markdown_img)
img.extract() # 移除 img 标签
# 若存在 figcaption,移除它
if figcaption:
figcaption.extract()
# 在插入的 Markdown 图片后插入换行符
next_sibling = img.find_next_sibling()
if next_sibling is not None:
insert_new_line(self.soup, next_sibling, 1)
else:
# 如果没有下一个兄弟元素,可以选择在内容末尾插入换行符
insert_new_line(self.soup, content_element, 1) # 或者一个合适的元素
# 处理链接
for link in content_element.find_all("a"):
if 'href' in link.attrs:
original_url = link.attrs['href']
parsed_url = urlparse(original_url)
query_params = parse_qs(parsed_url.query)
target_url = query_params.get('target', [original_url])[0]
article_url = unquote(target_url)
article_title = link.attrs['data-text'] if 'data-text' in link.attrs else article_url
markdown_link = f"[{article_title}]({article_url})"
link.replace_with(markdown_link)
# 提取并存储数学公式
math_formulas = []
math_tags = []
for math_span in content_element.select("span.ztext-math"):
latex_formula = math_span['data-tex']
if latex_formula.find("\\tag") != -1:
math_tags.append(latex_formula)
insert_new_line(self.soup, math_span, 1)
math_span.replace_with("@@MATH_FORMULA@@")
else:
math_formulas.append(latex_formula)
math_span.replace_with("@@MATH@@")
# 获取文本内容
content = content_element.decode_contents().strip()
content = md(content)
# 替换数学公式
for formula in math_formulas:
if self.hexo_uploader:
content = content.replace("@@MATH@@", "$" + "{% raw %}" + formula + "{% endraw %}" + "$", 1)
else:
if formula.find('$') != -1:
content = content.replace("@@MATH@@", f"{formula}", 1)
else:
content = content.replace("@@MATH@@", f"${formula}$", 1)
for formula in math_tags:
if self.hexo_uploader:
content = content.replace("@@MATH_FORMULA@@", "$$" + "{% raw %}" + formula + "{% endraw %}" + "$$",
1)
else:
if formula.find("$") != -1:
content = content.replace("@@MATH_FORMULA@@", f"{formula}", 1)
else:
content = content.replace("@@MATH_FORMULA@@", f"$${formula}$$", 1)
else:
content = ""
if content:
markdown = f"# {title}\n\n **Author:** [{author}]\n\n **Link:** [{target_link}]\n\n{content}"
else:
markdown = f"# {title}\n\n Content is empty."
# 保存 Markdown 文件
# 在保存 Markdown 前处理内容
markdown = process_markdown_content(markdown)
with open(f"{markdown_title}.md", "w", encoding="utf-8") as f:
f.write(markdown)
def parse_zhihu_zvideo(self, target_link):
"""
解析知乎视频并保存为 Markdown 格式文件
"""
self.check_connect_error(target_link)
data = json.loads(self.soup.select_one(
"div.ZVideo-video")['data-zop']) # 获取视频数据
# match = re.search(r"\d{4}-\d{2}-\d{2}",
# soup.select_one("div.ZVideo-meta").text) # 获取日期
# if match:
# # 将日期中的"-"替换为空字符以格式化为YYYYMMDD
# date = match.group().replace('-', '')
# else:
# date = "Unknown"
date = get_article_date(self.soup, "div.ZVideo-meta")
markdown_title = f"({date}){data['authorName']}_{data['title']}/{data['authorName']}_{data['title']}.mp4"
video_url = None
script = self.soup.find('script', id='js-initialData')
if script:
data = json.loads(script.text)
try:
videos = data['initialState']['entities']['zvideos']
for video_id, video_info in videos.items():
if 'playlist' in video_info['video']:
for quality, details in video_info['video']['playlist'].items():
video_url = details['playUrl']
except KeyError as e:
print("Key error in parsing JSON data:", e)
return None
else:
print("No suitable script tag found for video data")
return None
os.makedirs(os.path.dirname(markdown_title), exist_ok=True)
download_video(video_url, markdown_title, self.session)
return markdown_title
def parse_zhihu_article(self, target_link):
"""
解析知乎文章并保存为Markdown格式文件
"""
self.check_connect_error(target_link)
title_element = self.soup.select_one("h1.Post-Title")
content_element = self.soup.select_one("div.Post-RichTextContainer")
date = get_article_date(self.soup, "div.ContentItem-time")
author = self.soup.select_one('div.AuthorInfo').find(
'meta', {'itemprop': 'name'}).get('content')
markdown_title = self.save_and_transform(
title_element, content_element, author, target_link, date)
return markdown_title
def parse_zhihu_answer(self, target_link):
"""
解析知乎回答并保存为 Markdown 格式文件
"""
self.check_connect_error(target_link)
# 找到回答标题、内容、作者所在的元素
title_element = self.soup.select_one("h1.QuestionHeader-title")
content_element = self.soup.select_one("div.RichContent-inner")
date = get_article_date(self.soup, "div.ContentItem-time")
author = self.soup.select_one('div.AuthorInfo').find(
'meta', {'itemprop': 'name'}).get('content')
# 解析知乎文章并保存为Markdown格式文件
markdown_title = self.save_and_transform(
title_element, content_element, author, target_link, date)
return markdown_title
def load_processed_articles(self, filename):
"""
从文件加载已处理文章的ID。
"""
if not os.path.exists(filename):
return set()
with open(filename, 'r', encoding='utf-8') as file:
return set(file.read().splitlines())
def save_processed_article(self, filename, article_id):
"""
将处理过的文章ID保存到文件。
"""
with open(filename, 'a', encoding='utf-8') as file:
file.write(article_id + '\n')
def parse_zhihu_column(self, target_link):
"""
解析知乎专栏并保存为 Markdown 格式文件
"""
self.check_connect_error(target_link)
# 将所有文章放在一个以专栏标题命名的文件夹中
title = self.soup.text.split('-')[0].strip()
total_articles = int(self.soup.text.split(
'篇内容')[0].split('·')[-1].strip()) # 总文章数
folder_name = get_valid_filename(title)
os.makedirs(folder_name, exist_ok=True)
os.chdir(folder_name)
processed_filename = "zhihu_processed_articles.txt"
processed_articles = self.load_processed_articles(processed_filename)
# 获取所有文章链接
offset = 0
total_parsed = 0
# # 首先获取总文章数
# api_url = f"/api/v4/columns/{target_link.split('/')[-1]}/items?limit=1&offset=0"
# response = session.get(f"https://www.zhihu.com{api_url}")
# total_articles = response.json()["paging"]["totals"]
# 计算已处理的文章数
already_processed = len(processed_articles)
# 初始化进度条,从已处理的文章数开始
progress_bar = tqdm(total=total_articles,
initial=already_processed, desc="解析文章")
while True:
api_url = f"/api/v4/columns/{target_link.split('/')[-1]}/items?limit=10&offset={offset}"
response = self.session.get(f"https://www.zhihu.com{api_url}")
data = response.json()
for item in data["data"]:
if item["type"] == "zvideo":
video_id = str(item["id"])
if video_id in processed_articles:
continue
video_link = f"https://www.zhihu.com/zvideo/{video_id}"
self.parse_zhihu_zvideo(video_link)
self.save_processed_article(processed_filename, video_id)
progress_bar.update(1) # 更新进度条
elif item["type"] == "article":
article_id = str(item["id"])
if article_id in processed_articles:
continue
article_link = f"https://zhuanlan.zhihu.com/p/{article_id}"
self.parse_zhihu_article(article_link)
self.save_processed_article(processed_filename, article_id)
progress_bar.update(1) # 更新进度条
elif item["type"] == "answer":
answer_id = str(item["id"])
if answer_id in processed_articles:
continue
answer_link = f"https://www.zhihu.com/question/{item['question']['id']}/answer/{answer_id}"
self.judge_type(answer_link)
self.save_processed_article(processed_filename, answer_id)
progress_bar.update(1)
if data["paging"]["is_end"]:
break
offset += 10
progress_bar.close() # 完成后关闭进度条
os.remove(processed_filename) # 删除已处理文章的ID文件
return folder_name
|