四川中小学智慧平台视频下载器

文章正文
发布时间:2025-10-15 09:03

[Python] 纯文本查看 复制代码

import os import requests import json import re from PyQt5.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QWidget, QLabel, QListWidgetItem, QCheckBox, QLineEdit, QPushButton, QListWidget, QProgressBar, QComboBox, QTreeWidget, QHBoxLayout, QMessageBox, QFileDialog, QFormLayout, QTreeWidgetItem) from PyQt5.QtCore import QThread, pyqtSignal, Qt, QSize, QEvent from PyQt5.QtGui import QBrush, QColor, QMouseEvent from queue import Queue from pathlib import Path import threading import time import traceback from concurrent.futures import ThreadPoolExecutor, as_completed import concurrent.futures def format_size(size_bytes): for unit in ['B', 'KB', 'MB', 'GB']: if size_bytes < 1024.0: if unit == 'B': return f"{size_bytes:.0f}{unit}" else: return f"{size_bytes:.1f}{unit}" size_bytes /= 1024.0 return f"{size_bytes:.1f}GB" class CustomTreeWidget(QTreeWidget): def __init__(self, parent=None): super().__init__(parent) self.main_window = parent self.setSelectionMode(QTreeWidget.MultiSelection) self.setDragDropMode(QTreeWidget.NoDragDrop) self.setUniformRowHeights(True) self.setAnimated(False) self.setIndentation(15) self._drag_start_pos = None self._pending_click = None self._click_timer = None self._last_clicked_item = None self._double_click_interval = 200 # 自定义双击间隔(ms) def mousePressEvent(self, event): try: item = self.itemAt(event.pos()) if not item: super().mousePressEvent(event) return rect = self.visualItemRect(item) is_expander_click = event.pos().x() < rect.left() + self.indentation() node_data = item.data(0, Qt.UserRole) is_folder = node_data and node_data['is_folder'] # 处理展开按钮点击 if is_expander_click: super().mousePressEvent(event) return # 处理文件夹节点点击 if is_folder and event.button() == Qt.LeftButton: current_time = time.time() # 双击检测:相同item且在时间间隔内 is_double_click = (self._last_clicked_item == item and current_time - self._last_click_time < self._double_click_interval / 1000) if is_double_click: # 取消前一次的单击定时器 if self._click_timer: self.killTimer(self._click_timer) self._click_timer = None # 直接处理双击 item.setExpanded(not item.isExpanded()) else: # 设置新的单击检测 self._last_clicked_item = item self._last_click_time = current_time if self._click_timer: self.killTimer(self._click_timer) self._click_timer = self.startTimer(self._double_click_interval) return # 处理非文件夹节点 if event.button() == Qt.LeftButton: self._drag_start_pos = event.pos() super().mousePressEvent(event) except Exception as e: print(f"mousePressEvent error: {str(e)}") traceback.print_exc() def timerEvent(self, event): """处理单击事件""" if event.timerId() == self._click_timer: self.killTimer(self._click_timer) self._click_timer = None if self._last_clicked_item and hasattr(self.main_window, 'handle_folder_click'): self.main_window.handle_folder_click(self._last_clicked_item) self._last_clicked_item = None def mouseDoubleClickEvent(self, event): try: item = self.itemAt(event.pos()) if not item: super().mouseDoubleClickEvent(event) return # 获取点击位置的矩形区域 rect = self.visualItemRect(item) # 检查是否点击了展开/折叠按钮区域 is_expander_click = event.pos().x() < rect.left() + self.indentation() # 检查是否是文件夹节点 node_data = item.data(0, Qt.UserRole) is_folder = node_data and node_data['is_folder'] # 取消挂起的单击事件 if self._click_timer: self.killTimer(self._click_timer) self._click_timer = None self._pending_click = None # 处理文件夹节点双击 if is_folder and not is_expander_click: # 只处理展开/折叠,不影响选择状态 item.setExpanded(not item.isExpanded()) return super().mouseDoubleClickEvent(event) except Exception as e: print(f"mouseDoubleClickEvent error: {str(e)}") traceback.print_exc() def mouseMoveEvent(self, event): try: item = self.itemAt(event.pos()) if item: node_data = item.data(0, Qt.UserRole) if node_data and node_data['is_folder']: # 阻止文件夹节点的拖动选择 return # 只有非文件夹节点且拖动距离足够大时才处理拖动选择 if self._drag_start_pos is not None and ( event.pos() - self._drag_start_pos).manhattanLength() > QApplication.startDragDistance(): super().mouseMoveEvent(event) except Exception as e: print(f"mouseMoveEvent error: {str(e)}") traceback.print_exc() def mouseReleaseEvent(self, event): try: self._drag_start_pos = None super().mouseReleaseEvent(event) if hasattr(self.main_window, 'update_selection_state'): self.main_window.update_selection_state() except Exception as e: print(f"mouseReleaseEvent error: {str(e)}") traceback.print_exc() class DownloadThread(QThread): update_progress = pyqtSignal(str, int, int) download_complete = pyqtSignal(str, bool, str) def __init__(self, queue, max_workers=3, chunk_threads=3): super().__init__() self.queue = queue self.chunk_threads = chunk_threads self.max_workers = max_workers self.running = True self.active_threads = 0 self.lock = threading.Lock() self.progress_dict = {} self.executor = ThreadPoolExecutor(max_workers=self.max_workers) def run(self): try: futures = [] while self.running or not self.queue.empty() or futures: while self.running and len(futures) < self.max_workers and not self.queue.empty(): try: task = self.queue.get_nowait() name, url, save_path, parent_names = task future = self.executor.submit( self.safe_download_video, name, url, save_path, parent_names ) futures.append(future) except: break if futures: done, not_done = concurrent.futures.wait( futures, timeout=0.1, return_when=concurrent.futures.FIRST_COMPLETED ) futures = list(not_done) time.sleep(0.1) except Exception as e: print(f"DownloadThread error: {str(e)}") traceback.print_exc() finally: self.executor.shutdown(wait=True) def safe_download_video(self, name, url, save_path, parent_names): try: with self.lock: self.active_threads += 1 self.progress_dict[name] = {'downloaded': 0, 'total': 0} self.download_video(name, url, save_path, parent_names) except Exception as e: print(f"下载 {name} 出错: {str(e)}") finally: with self.lock: if name in self.progress_dict: del self.progress_dict[name] self.active_threads -= 1 def set_max_workers(self, max_workers): self.max_workers = max_workers if self.executor: self.executor._max_workers = max_workers def set_chunk_threads(self, chunk_threads): self.chunk_threads = chunk_threads def download_video(self, name, url, save_path, parent_names): success = False message = "未知错误" downloaded = 0 total = 0 try: full_save_path = Path(save_path) for parent_name in parent_names: safe_parent_name = "".join(c if c.isalnum() or c in (" ", "-", "_") else "_" for c in parent_name) full_save_path = full_save_path / safe_parent_name full_save_path.mkdir(parents=True, exist_ok=True) safe_name = "".join(c if c.isalnum() or c in (" ", "-", "_") else "_" for c in name) filename = f"{safe_name}.mp4" full_path = full_save_path / filename temp_path = full_path.with_suffix('.tmp') parm = {"url": url} response = requests.post('https://www.cdssxy.com/cloud-wlxy-customer/ossts/getPrivateDownloadUrl.action', json=parm, timeout=10) response.raise_for_status() mp4_url = response.json().get("data") if not mp4_url: raise ValueError("无法获取视频下载URL") if os.path.exists(full_path): success = True message = "文件已存在,跳过下载" downloaded = total = os.path.getsize(full_path) self.update_progress.emit(name, downloaded, total) self.download_complete.emit(name, success, message) return headers = requests.head(mp4_url, timeout=10).headers total = int(headers.get('Content-Length', 0)) if total <= 0: raise ValueError("无法获取文件大小") self.download_with_chunks(name, mp4_url, str(full_path), str(temp_path)) success = True message = "下载完成" downloaded = total except Exception as e: message = f"下载失败: {str(e)}" print(f"下载 {name} 出错: {message}") if 'temp_path' in locals() and os.path.exists(temp_path): try: os.remove(temp_path) except: pass finally: if downloaded > 0 and total > 0: self.update_progress.emit(name, downloaded, total) self.download_complete.emit(name, success, message) def download_with_chunks(self, name, url, full_path, temp_path): try: headers = requests.head(url, timeout=10).headers if 'Accept-Ranges' not in headers or headers['Accept-Ranges'].lower() != 'bytes': return self.single_thread_download(name, url, full_path, temp_path) total_size = int(headers.get('Content-Length', 0)) if total_size <= 0: return self.single_thread_download(name, url, full_path, temp_path) with self.lock: self.progress_dict[name] = {'downloaded': 0, 'total': total_size} chunk_size = max(5 * 1024 * 1024, total_size // self.chunk_threads) chunks = [(i, min(i + chunk_size - 1, total_size - 1)) for i in range(0, total_size, chunk_size)] with open(temp_path, 'wb') as f: f.truncate(total_size) if self._download_chunks_with_retry(name, url, temp_path, chunks): os.rename(temp_path, full_path) self.download_complete.emit(name, True, "下载完成") else: raise Exception("分块下载失败") except Exception as e: print(f"下载 {name} 失败: {str(e)}") self.download_complete.emit(name, False, f"下载失败: {str(e)}") if os.path.exists(temp_path): try: os.remove(temp_path) except: pass def _download_chunks_with_retry(self, name, url, filepath, chunks): with ThreadPoolExecutor(max_workers=len(chunks)) as executor: futures = [executor.submit(self._download_chunk, name, url, filepath, start, end, i) for i, (start, end) in enumerate(chunks)] return all(future.result() for future in concurrent.futures.as_completed(futures)) def _download_chunk(self, name, url, filepath, start, end, chunk_id, max_retries=3): for attempt in range(max_retries): try: headers = {'Range': f'bytes={start}-{end}'} response = requests.get(url, headers=headers, stream=True, timeout=30) response.raise_for_status() content_range = response.headers.get('Content-Range', '') if not content_range or f'bytes {start}-{end}' not in content_range: continue downloaded_in_chunk = 0 with open(filepath, 'r+b') as f: f.seek(start) for chunk in response.iter_content(chunk_size=1024 * 1024): if chunk: f.write(chunk) downloaded_in_chunk += len(chunk) with self.lock: if name not in self.progress_dict: self.progress_dict[name] = {'downloaded': 0, 'total': end + 1} self.progress_dict[name]['downloaded'] += len(chunk) current_progress = self.progress_dict[name]['downloaded'] total_size = self.progress_dict[name]['total'] self.update_progress.emit(name, current_progress, total_size) if downloaded_in_chunk != end - start + 1: continue return True except Exception as e: print(f"分块 {chunk_id} 第 {attempt + 1} 次尝试失败: {str(e)}") if attempt == max_retries - 1: return False time.sleep(2) return False def single_thread_download(self, name, url, full_path, temp_path): try: response = requests.get(url, stream=True, timeout=30) response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) downloaded = 0 with open(temp_path, 'wb') as f: for chunk in response.iter_content(chunk_size=1024 * 1024): if chunk: f.write(chunk) downloaded += len(chunk) self.update_progress.emit(name, downloaded, total_size) os.rename(temp_path, full_path) self.download_complete.emit(name, True, "下载完成") except Exception as e: self.download_complete.emit(name, False, f"下载失败: {str(e)}") raise def stop(self): self.running = False class VideoItemWidget(QWidget): def __init__(self, name, parent=None): super().__init__(parent) self.name = name self.layout = QHBoxLayout() self.setLayout(self.layout) self.indent_label = QLabel() self.indent_label.setFixedWidth(0) self.layout.addWidget(self.indent_label) self.name_label = QLabel(name) self.name_label.setStyleSheet("font-weight: bold;") self.progress_bar = QProgressBar() self.progress_bar.setMaximumHeight(20) self.progress_bar.setMinimumWidth(200) self.progress_bar.setAlignment(Qt.AlignCenter) self.progress_bar.setFormat("等待开始...") self.layout.addWidget(self.name_label, 2) self.layout.addWidget(self.progress_bar, 3) def set_indent_level(self, level): self.indent_label.setFixedWidth(0 * level) def update_progress(self, downloaded, total): self.progress_bar.setMaximum(total) self.progress_bar.setValue(downloaded) if total > 0: percent = int(downloaded / total * 100) self.progress_bar.setFormat(f"{format_size(downloaded)}/{format_size(total)} ({percent}%)") else: self.progress_bar.setFormat("获取大小信息...") return percent if total > 0 else 0 class VideoDownloaderApp(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("四川中小学智慧教育平台视频下载工具") # self.setFixedSize(700, 600) self.setMinimumSize(700, 600) self.center_window() self.setWindowFlags(self.windowFlags() | Qt.WindowStaysOnTopHint) self.download_queue = Queue() self.chunk_threads = 20 self.max_workers = 3 self.downloading_items = {} self.total_progress = 0 self.total_size = 0 self.total_selected_size = 0 self.total_downloaded = 0 self.video_sizes = {} self.video_progress = {} self.last_downloaded = {} self.download_thread = DownloadThread(self.download_queue, max_workers=self.max_workers, chunk_threads=self.chunk_threads) self.download_thread.update_progress.connect(self.update_progress) self.download_thread.download_complete.connect(self.download_finished) self.download_thread.start() self.init_ui() self.init_tree_selection_logic() self.folder_progress = {} self.selected_videos = set() self.last_clicked_item = None def center_window(self): frame_geometry = self.frameGeometry() center_point = QApplication.desktop().availableGeometry().center() frame_geometry.moveCenter(center_point) self.move(frame_geometry.topLeft()) def init_tree_selection_logic(self): self.tree_widget.itemClicked.connect(self.on_item_clicked) self.select_all_checkbox.stateChanged.connect(self.on_select_all_changed) self._sync_select_all_checkbox() def init_ui(self): main_widget = QWidget() layout = QVBoxLayout() settings_layout = QFormLayout() self.url_input = QLineEdit() self.url_input.setPlaceholderText("例如: [url=https://www.cdssxy.com/ssxyzy/#/CoursesDetail?id=254655]https://www.cdssxy.com/ssxyzy/#/CoursesDetail?id=254655[/url]") paste_btn = QPushButton("课程链接") paste_btn.clicked.connect(self.paste_from_clipboard) paste_btn.setStyleSheet(""" QPushButton { background-color: #4CAF50; color: white; border: none; padding: 5px; text-align: center; font-weight: bold; } QPushButton:hover { background-color: #45a049; } """) paste_btn.setCursor(Qt.PointingHandCursor) paste_btn.setToolTip('点击粘贴课程链接') url_layout = QHBoxLayout() url_layout.addWidget(paste_btn) url_layout.addWidget(self.url_input) settings_layout.addRow(url_layout) self.path_input = QLineEdit() self.path_input.setText(str(Path.cwd())) browse_path_btn = QPushButton("保存路径") browse_path_btn.clicked.connect(self.browse_path) browse_path_btn.setStyleSheet(""" QPushButton { background-color: #2196F3; color: white; border: none; padding: 5px; text-align: center; font-weight: bold; } QPushButton:hover { background-color: #0b7dda; } """) browse_path_btn.setCursor(Qt.PointingHandCursor) browse_path_btn.setToolTip('点击选择保存路径') path_layout = QHBoxLayout() path_layout.addWidget(browse_path_btn) path_layout.addWidget(self.path_input) settings_layout.addRow(path_layout) layout.addLayout(settings_layout) # Action Buttons btn_layout = QHBoxLayout() self.fetch_btn = QPushButton("获取视频列表") self.fetch_btn.clicked.connect(self.safe_fetch_video_list) self.fetch_btn.setStyleSheet(""" QPushButton { background-color: #FF9800; color: white; border: none; padding: 5px; text-align: center; font-weight: bold; } QPushButton:hover { background-color: #e68a00; } """) self.fetch_btn.setCursor(Qt.PointingHandCursor) self.fetch_btn.setToolTip('点击根据课程链接获取视频列表') btn_layout.addWidget(self.fetch_btn) self.download_btn = QPushButton("下载选中视频") self.download_btn.clicked.connect(self.start_download) self.download_btn.setEnabled(False) self.download_btn.setStyleSheet(""" QPushButton { background-color: #F9F9F9; color: #787878; border: none; padding: 5px; text-align: center; font-weight: bold; } QPushButton:enabled { background-color: #008CBA; color: white; } QPushButton:enabled:hover { background-color: #007a99; } """) self.download_btn.setCursor(Qt.PointingHandCursor) btn_layout.addWidget(self.download_btn) self.download_btn.setToolTip('点击开始下载选中的视频') open_dir_btn = QPushButton("打开保存目录") open_dir_btn.clicked.connect(self.open_save_directory) open_dir_btn.setStyleSheet(""" QPushButton { background-color: #FF5722; color: white; border: none; padding: 5px; text-align: center; font-weight: bold; } QPushButton:hover { background-color: #e64a19; } """) open_dir_btn.setCursor(Qt.PointingHandCursor) open_dir_btn.setToolTip('点击打开保存目录') btn_layout.addWidget(open_dir_btn) layout.addLayout(btn_layout) # Progress Bars self.setStyleSheet('QProgressBar{text-align:center}QProgressBar::chunk{background-color:#05B8CC}') self.total_progress_bar = QProgressBar() self.total_progress_bar.setAlignment(Qt.AlignCenter) self.total_progress_bar.setFormat("总进度: %p%") layout.addWidget(self.total_progress_bar) # Video List Header video_list_header = QHBoxLayout() video_list_header.addWidget(QLabel("视频列表:")) self.status_label = QLabel("准备就绪") video_list_header.addWidget(self.status_label) video_list_header.addStretch() # Parallel Downloads parallel_layout = QHBoxLayout() parallel_layout.addWidget(QLabel("同时下载:")) self.parallel_combo = QComboBox() self.parallel_combo.addItems([str(i) for i in range(1, 6)]) self.parallel_combo.setCurrentIndex(2) self.parallel_combo.currentTextChanged.connect(self.update_max_workers) parallel_layout.addWidget(self.parallel_combo) self.parallel_combo.setToolTip('同时下载视频的最大数量') video_list_header.addLayout(parallel_layout) # Chunk Threads chunk_layout = QHBoxLayout() chunk_layout.addWidget(QLabel("视频分块:")) self.chunk_thread_combo = QComboBox() self.chunk_thread_combo.addItems([str(i) for i in range(5, 51)]) self.chunk_thread_combo.setCurrentIndex(15) self.chunk_thread_combo.currentTextChanged.connect(self.update_chunk_thread_count) self.chunk_thread_combo.setToolTip('同一视频下载的分块数量') chunk_layout.addWidget(self.chunk_thread_combo) self.chunk_thread_combo.view().setFixedHeight(250) video_list_header.addLayout(chunk_layout) # Select All Checkbox self.select_all_checkbox = QCheckBox("全选") self.select_all_checkbox.setFixedWidth(45) self.select_all_checkbox.stateChanged.connect(self.on_select_all_changed) video_list_header.addWidget(self.select_all_checkbox) self.select_all_checkbox.setToolTip('全选/取消全选视频') layout.addLayout(video_list_header) # Tree Widget self.tree_widget = CustomTreeWidget(self) # self.tree_widget = CustomTreeWidget() self.tree_widget.setHeaderHidden(True) self.tree_widget.setSelectionMode(QTreeWidget.MultiSelection) self.tree_widget.setStyleSheet('QTreeWidget::item{height:40px}QTreeWidget::item:selected{background-color:#d4e6ff}') self.tree_widget.itemClicked.connect(self.on_item_clicked) layout.addWidget(self.tree_widget) main_widget.setLayout(layout) self.setCentralWidget(main_widget) def paste_from_clipboard(self): clipboard = QApplication.clipboard() self.url_input.setText(clipboard.text()) def browse_path(self): try: path = QFileDialog.getExistingDirectory(self, "选择保存路径", self.path_input.text()) if path: self.path_input.setText(path) except Exception as e: QMessageBox.critical(self, "错误", f"选择路径时出错: {str(e)}") def open_save_directory(self): try: path = self.path_input.text().strip() if path: import webbrowser webbrowser.open(f'file://{path}') else: QMessageBox.warning(self, "警告", "请先选择保存路径") except Exception as e: QMessageBox.critical(self, "错误", f"打开保存目录时出错: {str(e)}") def update_max_workers(self, text): try: self.max_workers = int(text) self.download_thread.set_max_workers(self.max_workers) except ValueError: pass def update_chunk_thread_count(self, text): try: self.chunk_threads = int(text) self.download_thread.set_chunk_threads(self.chunk_threads) except ValueError: pass def handle_folder_click(self, item): """处理文件夹项目主体的点击(不包括展开/折叠区域)""" try: self.tree_widget.blockSignals(True) node_data = item.data(0, Qt.UserRole) if node_data['is_folder']: # 切换选择状态 new_state = not item.isSelected() item.setSelected(new_state) self._set_children_selected(item, new_state) self._update_parent_states(item) self._sync_select_all_checkbox() self.update_folder_progress() except Exception as e: print(f"handle_folder_click error: {str(e)}") traceback.print_exc() finally: self.tree_widget.blockSignals(False) def update_selection_state(self): """更新选择状态""" try: self.tree_widget.blockSignals(True) selected_items = self.tree_widget.selectedItems() for item in selected_items: self._update_parent_states(item) self._sync_select_all_checkbox() self.update_folder_progress() except Exception as e: print(f"update_selection_state error: {str(e)}") traceback.print_exc() finally: self.tree_widget.blockSignals(False) def on_item_clicked(self, item, column): try: self.tree_widget.blockSignals(True) node_data = item.data(0, Qt.UserRole) if node_data['is_folder']: new_state = item.isSelected() self._set_children_selected(item, new_state) if not new_state: self._update_sibling_state(item) self._update_parent_states(item) self._sync_select_all_checkbox() self.update_folder_progress() except Exception as e: print(f"选择错误: {str(e)}") finally: self.tree_widget.blockSignals(False) def on_select_all_changed(self, state): if state == Qt.PartiallyChecked: return self.tree_widget.blockSignals(True) try: select_all = (state == Qt.Checked) root = self.tree_widget.invisibleRootItem() self._set_children_selected(root, select_all) self._sync_select_all_checkbox() self.update_folder_progress() finally: self.tree_widget.blockSignals(False) def _set_children_selected(self, parent_item, selected): stack = [parent_item] while stack: current = stack.pop() for i in range(current.childCount()): child = current.child(i) child.setSelected(selected) if child.childCount() > 0: stack.append(child) def _update_parent_states(self, item): """更安全的父节点状态更新""" try: parent = item.parent() while parent: node_data = parent.data(0, Qt.UserRole) if node_data['is_folder']: all_selected = True any_selected = False for i in range(parent.childCount()): child = parent.child(i) if child.isSelected(): any_selected = True else: all_selected = False # 只更新选择状态,不递归处理 parent.setSelected(all_selected) # 更新字体样式 font = parent.font(0) font.setItalic(any_selected and not all_selected) parent.setFont(0, font) parent = parent.parent() except Exception as e: print(f"_update_parent_states error: {str(e)}") traceback.print_exc() def _update_sibling_state(self, item): parent = item.parent() if parent and parent.isSelected(): parent.setSelected(False) self._update_parent_states(parent) def _sync_select_all_checkbox(self): root = self.tree_widget.invisibleRootItem() total, selected = 0, 0 stack = [root] while stack: current = stack.pop() for i in range(current.childCount()): child = current.child(i) node_data = child.data(0, Qt.UserRole) if not node_data['is_folder']: total += 1 if child.isSelected(): selected += 1 if child.childCount() > 0: stack.append(child) self.select_all_checkbox.blockSignals(True) if selected == 0: self.select_all_checkbox.setCheckState(Qt.Unchecked) elif selected == total: self.select_all_checkbox.setCheckState(Qt.Checked) else: self.select_all_checkbox.setCheckState(Qt.PartiallyChecked) self.select_all_checkbox.blockSignals(False) def update_folder_progress(self): self._reset_folder_counts() root = self.tree_widget.invisibleRootItem() self._calculate_folder_progress(root) self._update_folder_progress_bars() def _reset_folder_counts(self): for folder_data in self.folder_progress.values(): folder_data['total_videos'] = 0 folder_data['selected_videos'] = 0 def _calculate_folder_progress(self, parent_item): total_videos = 0 selected_videos = 0 for i in range(parent_item.childCount()): child = parent_item.child(i) node_data = child.data(0, Qt.UserRole) if node_data['is_folder']: child_total, child_selected = self._calculate_folder_progress(child) total_videos += child_total selected_videos += child_selected folder_path = self._get_folder_path(child) if folder_path in self.folder_progress: self.folder_progress[folder_path]['total_videos'] = child_total self.folder_progress[folder_path]['selected_videos'] = child_selected else: total_videos += 1 if child.isSelected(): selected_videos += 1 return total_videos, selected_videos def _get_folder_path(self, item): path_parts = [] current = item while current: node_data = current.data(0, Qt.UserRole) if node_data['is_folder']: path_parts.insert(0, node_data['name']) current = current.parent() return '/'.join(path_parts) def _update_folder_progress_bars(self): for folder_path, data in self.folder_progress.items(): item = data['item'] total = data['total_videos'] selected = data['selected_videos'] widget = self.tree_widget.itemWidget(item, 0) if widget: widget.progress_bar.setMaximum(total) widget.progress_bar.setValue(selected) if total > 0: percent = int((selected / total) * 100) widget.progress_bar.setFormat(f"{selected}/{total} ({percent}%)") else: widget.progress_bar.setFormat("0/0") def safe_fetch_video_list(self): try: if hasattr(self, 'worker') and self.worker.isRunning(): self.worker.quit() self.worker.wait() self.fetch_btn.setEnabled(False) course_url = self.url_input.text().strip() if not course_url: QMessageBox.warning(self, "警告", "请输入课程链接") self.fetch_btn.setEnabled(True) return course_id = self.extract_course_id(course_url) if not course_id: QMessageBox.warning(self, "警告", "无法从链接中提取课程ID") self.fetch_btn.setEnabled(True) return self.worker = FetchWorker(course_id) self.worker.finished.connect(self.display_video_tree) self.worker.error.connect(self.show_fetch_error) self.worker.start() except Exception as e: self.fetch_btn.setEnabled(True) QMessageBox.critical(self, "错误", f"获取视频列表时发生异常:\n{str(e)}") print(traceback.format_exc()) def extract_course_id(self, url): try: match = re.search(r'id=(\d+)', url) if match: return match.group(1) return None except Exception as e: print(f"Extract course id error: {str(e)}") return None def display_video_tree(self, tree_data): try: self.tree_widget.clear() self.folder_progress = {} def add_tree_items(parent_item, node_data, level=0): item = QTreeWidgetItem(parent_item) item.setData(0, Qt.UserRole, node_data) widget = VideoItemWidget(node_data['name']) widget.set_indent_level(level) if node_data['is_folder']: widget.name_label.setStyleSheet("font-weight: bold; color: #0066cc;") folder_path = self._get_folder_path(item) self.folder_progress[folder_path] = { 'total_videos': 0, 'selected_videos': 0, 'item': item } # 设置父节点的工具提示信息 item.setToolTip(0, node_data['tooltip']) else: # 设置视频节点的工具提示信息 item.setToolTip(0, node_data['tooltip']) self.tree_widget.setItemWidget(item, 0, widget) if node_data['is_folder'] and node_data['children']: item.setChildIndicatorPolicy(QTreeWidgetItem.ShowIndicator) for child_data in node_data['children']: child_item = add_tree_items(item, child_data, level + 1) if not child_data['is_folder']: folder_path = self._get_folder_path(item) if folder_path in self.folder_progress: self.folder_progress[folder_path]['total_videos'] += 1 return item root_item = add_tree_items(self.tree_widget, tree_data) self.tree_widget.expandItem(root_item) self.fetch_btn.setEnabled(True) self.download_btn.setEnabled(True) self.status_label.setText(f"找到 {self.count_videos(tree_data)} 个视频") self.update_folder_progress() except Exception as e: QMessageBox.critical(self, "错误", f"显示视频列表时出错: {str(e)}") self.fetch_btn.setEnabled(True) def count_videos(self, node): count = 0 if not node['is_folder']: count += 1 for child in node['children']: count += self.count_videos(child) return count def show_fetch_error(self, error_msg): QMessageBox.critical(self, "错误", f"获取视频列表失败:\n{error_msg}") self.fetch_btn.setEnabled(True) self.status_label.setText("获取视频列表失败") def start_download(self): try: selected_items = self.get_selected_items() if not selected_items: QMessageBox.warning(self, "警告", "请至少选择一个视频") return self.total_downloaded = 0 self.video_progress = {} self.last_downloaded = {} self.video_sizes_added = set() self.total_selected_size = 0 video_count = len(selected_items) self.total_progress_bar.setRange(0, video_count * 100) self.total_progress_bar.setValue(0) self.update_total_progress() save_path = self.path_input.text().strip() for item in selected_items: node_data = item.data(0, Qt.UserRole) name = node_data['name'] url = node_data['url'] self.video_progress[name] = 0 self.last_downloaded[name] = 0 parent_names = [] parent = item.parent() while parent: parent_data = parent.data(0, Qt.UserRole) if parent_data['is_folder']: parent_names.insert(0, parent_data['name']) parent = parent.parent() self.download_queue.put((name, url, save_path, parent_names)) self.status_label.setText(f"开始下载 {video_count} 个视频 (同时下载 {self.max_workers} 个)") except Exception as e: QMessageBox.critical(self, "错误", f"开始下载时出错: {str(e)}") self.status_label.setText("下载启动失败") def update_total_progress(self): if self.total_selected_size <= 0: return percent = (self.total_downloaded / self.total_selected_size) * 100 downloaded_str = format_size(self.total_downloaded) total_str = format_size(self.total_selected_size) self.total_progress_bar.setFormat(f"总进度: {percent:.1f}% ({downloaded_str}/{total_str})") def get_selected_items(self): selected_items = [] def collect_selected(parent_item): for i in range(parent_item.childCount()): child = parent_item.child(i) node_data = child.data(0, Qt.UserRole) if node_data['is_folder']: collect_selected(child) elif child.isSelected(): selected_items.append(child) collect_selected(self.tree_widget.invisibleRootItem()) return selected_items def update_progress(self, name, downloaded, total): try: item = self._find_video_item(name) if not item: return widget = self.tree_widget.itemWidget(item, 0) node_data = item.data(0, Qt.UserRole) if name not in self.video_sizes_added: self.total_selected_size += total self.video_sizes_added.add(name) node_data['size'] = total last_downloaded = self.last_downloaded.get(name, 0) download_increment = downloaded - last_downloaded self.total_downloaded += download_increment self.last_downloaded[name] = downloaded widget.update_progress(downloaded, total) node_data['downloaded'] = downloaded current_percent = int((downloaded / total) * 100) if total > 0 else 0 if name in self.video_progress: progress_delta = current_percent - self.video_progress[name] if progress_delta > 0: self.total_progress_bar.setValue(self.total_progress_bar.value() + progress_delta) self.video_progress[name] = current_percent self.update_total_progress() except Exception as e: print(f"更新进度出错: {str(e)}") def _find_video_item(self, name): def find_item(parent): for i in range(parent.childCount()): child = parent.child(i) node_data = child.data(0, Qt.UserRole) if not node_data['is_folder'] and node_data['name'] == name: return child found = find_item(child) if found: return found return None return find_item(self.tree_widget.invisibleRootItem()) def download_finished(self, name, success, message): try: item = self._find_video_item(name) if not item: return widget = self.tree_widget.itemWidget(item, 0) node_data = item.data(0, Qt.UserRole) if success: widget.name_label.setText(f"{name} &#10003; 完成") widget.name_label.setStyleSheet("font-weight: bold; color: green;") node_data['completed'] = True else: widget.name_label.setText(f"{name} &#10007; 失败") widget.name_label.setStyleSheet("font-weight: bold; color: red;") if self.download_queue.empty() and self.download_thread.active_threads == 0: self.status_label.setText("所有下载完成") self.total_progress_bar.setValue(self.total_progress_bar.maximum()) except Exception as e: print(f"下载完成处理出错: {str(e)}") def closeEvent(self, event): try: self.download_thread.stop() self.download_thread.wait() super().closeEvent(event) except Exception as e: print(f"Close event error: {str(e)}") super().closeEvent(event) class FetchWorker(QThread): finished = pyqtSignal(dict) error = pyqtSignal(str) def __init__(self, course_id): super().__init__() self.course_id = course_id def run(self): try: headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0"} url = "https://www.cdssxy.com/cloud-wlxy-external/course/findCourseDirTree.action" params = {"rootDirTreeId": self.course_id} params = json.dumps(params, separators=(',', ':')) response = requests.post(url, headers=headers, data=params, timeout=10) response.raise_for_status() data = response.json().get('data') if not data: raise ValueError("无法获取视频列表数据") # 解析树状结构 tree_data = self.parse_tree_data(data) self.finished.emit(tree_data) except Exception as e: self.error.emit(str(e)) print(traceback.format_exc()) def parse_tree_data(self, node): """递归解析树状结构数据""" item = { 'name': node['dirname'], 'children': [], 'is_folder': True, 'url': None, 'tooltip': f"单击切换子节点选择" # 添加工具提示信息 } # 如果有视频URL,则这是一个视频节点 if 'browserUrl' in node and node['browserUrl'].endswith('.mp4'): item.update({ 'is_folder': False, 'url': node['browserUrl'], 'tooltip': "单击/拖动切换选择", # 添加工具提示信息 }) # 递归处理子节点 if 'childSysDirtrees' in node: for child in node['childSysDirtrees']: item['children'].append(self.parse_tree_data(child)) return item if __name__ == "__main__": try: app = QApplication([]) # 设置全局的工具提示样式 app.setStyleSheet(""" QToolTip { background-color: #333333; /* 背景颜色 */ color: white; /* 字体颜色 */ border: 1px solid white; /* 边框 */ padding: 2px; /* 内边距 */ font-size: 12px; /* 字体大小 */ font-family: "宋体"; /* 字体分类 */ } """) window = VideoDownloaderApp() window.show() app.exec_() except Exception as e: print(f"Application error: {str(e)}") traceback.print_exc()