DICOM形式を動画ファイルに変換するには?(血管撮影画像用)

血管撮影の動画が含まれているDICOMから動画を取り出し、汎用の動画ファイルに変換するPythonを紹介します。

フォルダ内にある.dcmファイルを動画ファイル(AVI形式)に一括変換します。

import pydicom
import numpy as np
from PIL import Image
import cv2
import os
import argparse
from pathlib import Path
import logging

# ログ設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class DICOMToVideoConverter:
    def __init__(self, output_dir=None):
        """
        DICOM動画変換クラス
        
        Args:
            output_dir (str): 出力ディレクトリ(Noneの場合は入力ファイルと同じ場所)
        """
        self.output_dir = Path(output_dir) if output_dir else None
        if self.output_dir:
            self.output_dir.mkdir(exist_ok=True)
        
    def convert_file_to_video(self, dicom_path):
        """
        単一のDICOMファイルを動画に変換
        
        Args:
            dicom_path (str): DICOMファイルのパス
        """
        try:
            dicom_path = Path(dicom_path)
            ds = pydicom.dcmread(dicom_path)
            
            logger.info(f"変換開始: {dicom_path}")
            logger.info(f"Modality: {getattr(ds, 'Modality', 'Unknown')}")
            logger.info(f"Series Description: {getattr(ds, 'SeriesDescription', 'Unknown')}")
            
            # ピクセルデータが存在するかチェック
            if not hasattr(ds, 'pixel_array'):
                logger.warning(f"ピクセルデータが見つかりません: {dicom_path}")
                return False
                
            pixel_array = ds.pixel_array
            logger.info(f"画像サイズ: {pixel_array.shape}")
            
            # 出力ファイル名を決定
            output_filename = f"{dicom_path.stem}.avi"
            if self.output_dir:
                output_path = self.output_dir / output_filename
            else:
                output_path = dicom_path.parent / output_filename
            
            # データタイプに応じて処理
            if len(pixel_array.shape) == 2:
                # 2D画像(単一フレーム)- 1フレームの動画として保存
                logger.info("2D画像を1フレーム動画として変換")
                self._save_as_video(pixel_array[np.newaxis, :, :], output_path, ds)
                
            elif len(pixel_array.shape) == 3:
                # 3D画像(マルチフレーム/ボリューム)
                if pixel_array.shape[0] > 1:
                    # マルチフレーム - 動画として保存
                    logger.info(f"マルチフレーム動画として変換: {pixel_array.shape[0]}フレーム")
                    self._save_as_video(pixel_array, output_path, ds)
                else:
                    # 単一フレーム - 1フレームの動画として保存
                    logger.info("単一フレームを1フレーム動画として変換")
                    self._save_as_video(pixel_array, output_path, ds)
                    
            elif len(pixel_array.shape) == 4:
                # 4D画像(時系列ボリューム)- 最初の次元をフレームとして扱う
                logger.info(f"4D画像を動画として変換: {pixel_array.shape[0]}フレーム")
                # 4D -> 3D に変換(時間軸を保持、空間軸を統合)
                if pixel_array.shape[1] == 1:
                    # (time, 1, height, width) -> (time, height, width)
                    reshaped_array = pixel_array.squeeze(1)
                else:
                    # (time, slices, height, width) -> 最初のスライスのみ使用
                    reshaped_array = pixel_array[:, 0, :, :]
                self._save_as_video(reshaped_array, output_path, ds)
                
            else:
                logger.error(f"サポートされていない配列次元: {pixel_array.shape}")
                return False
            
            return True
            
        except Exception as e:
            logger.error(f"変換エラー {dicom_path}: {str(e)}")
            return False
    
    def _normalize_pixel_array(self, pixel_array, ds):
        """
        ピクセル配列を0-255の範囲に正規化
        
        Args:
            pixel_array: ピクセル配列
            ds: DICOMデータセット
            
        Returns:
            正規化されたピクセル配列
        """
        # ウィンドウレベルとウィンドウ幅を取得
        window_center = getattr(ds, 'WindowCenter', None)
        window_width = getattr(ds, 'WindowWidth', None)
        
        if window_center is not None and window_width is not None:
            # 複数の値がある場合は最初の値を使用
            if hasattr(window_center, '__iter__'):
                window_center = window_center[0]
            if hasattr(window_width, '__iter__'):
                window_width = window_width[0]
                
            # ウィンドウレベル処理
            min_val = window_center - window_width / 2
            max_val = window_center + window_width / 2
            pixel_array = np.clip(pixel_array, min_val, max_val)
            pixel_array = ((pixel_array - min_val) / (max_val - min_val) * 255).astype(np.uint8)
        else:
            # ウィンドウ情報がない場合は単純な正規化
            pixel_array = pixel_array.astype(np.float64)
            pixel_array = (pixel_array - pixel_array.min()) / (pixel_array.max() - pixel_array.min()) * 255
            pixel_array = pixel_array.astype(np.uint8)
            
        return pixel_array
    
    def _save_as_video(self, pixel_array, output_path, ds):
        """
        ピクセル配列を動画として保存
        
        Args:
            pixel_array: 3Dピクセル配列 (frames, height, width)
            output_path: 出力パス
            ds: DICOMデータセット
        """
        try:
            # フレームレートを決定(デフォルト: 10fps)
            frame_rate = 10
            if hasattr(ds, 'RecommendedDisplayFrameRate'):
                try:
                    frame_rate = float(ds.RecommendedDisplayFrameRate)
                except:
                    pass
            elif hasattr(ds, 'FrameTime'):
                try:
                    frame_rate = 1000.0 / float(ds.FrameTime)  # FrameTimeはミリ秒
                except:
                    pass
            elif hasattr(ds, 'CineRate'):
                try:
                    frame_rate = float(ds.CineRate)
                except:
                    pass
            
            # フレームレートの妥当性チェック
            frame_rate = max(1, min(60, frame_rate))
            
            height, width = pixel_array.shape[1], pixel_array.shape[2]
            
            logger.info(f"動画設定: {pixel_array.shape[0]}フレーム, {width}x{height}, {frame_rate:.1f}fps")
            
            # AVI形式で保存
            fourcc = cv2.VideoWriter_fourcc(*'XVID')
            out = cv2.VideoWriter(str(output_path), fourcc, frame_rate, (width, height), False)
            
            if not out.isOpened():
                logger.error(f"VideoWriter の初期化に失敗しました: {output_path}")
                return
            
            for i in range(pixel_array.shape[0]):
                frame = pixel_array[i]
                normalized_frame = self._normalize_pixel_array(frame, ds)
                out.write(normalized_frame)
            
            out.release()
            
            # ファイルが正常に作成されたかチェック
            if output_path.exists() and output_path.stat().st_size > 0:
                logger.info(f"✓ 動画を保存しました: {output_path}")
            else:
                logger.error(f"✗ 動画ファイルが正常に作成されませんでした: {output_path}")
                
        except Exception as e:
            logger.error(f"動画保存エラー: {str(e)}")
    
    def convert_directory(self, dicom_dir):
        """
        ディレクトリ内のすべての.dcmファイルを動画に変換
        
        Args:
            dicom_dir (str): DICOMファイルが含まれるディレクトリ
        """
        dicom_dir = Path(dicom_dir)
        
        # .dcmファイルを探索
        dcm_files = []
        
        # 再帰的に.dcmファイルを検索
        for pattern in ['**/*.dcm', '**/*.DCM']:
            dcm_files.extend(dicom_dir.glob(pattern))
        
        # 重複を除去してソート
        dcm_files = sorted(list(set(dcm_files)))
        
        if not dcm_files:
            logger.warning(f".dcmファイルが見つかりません: {dicom_dir}")
            return
        
        logger.info(f"{len(dcm_files)}個の.dcmファイルを発見しました")
        
        # 変換統計
        success_count = 0
        failed_count = 0
        
        for dcm_file in dcm_files:
            logger.info(f"\n[{success_count + failed_count + 1}/{len(dcm_files)}] 変換中: {dcm_file.name}")
            
            if self.convert_file_to_video(dcm_file):
                success_count += 1
            else:
                failed_count += 1
        
        # 結果サマリー
        logger.info(f"\n{'='*60}")
        logger.info(f"変換完了サマリー")
        logger.info(f"{'='*60}")
        logger.info(f"総ファイル数: {len(dcm_files)}")
        logger.info(f"成功: {success_count}")
        logger.info(f"失敗: {failed_count}")
        logger.info(f"成功率: {success_count/len(dcm_files)*100:.1f}%")
        
        if self.output_dir:
            logger.info(f"出力先: {self.output_dir}")
        else:
            logger.info(f"出力先: 各ファイルと同じディレクトリ")

def main():
    parser = argparse.ArgumentParser(description='DICOMファイルを一括でAVI動画に変換')
    parser.add_argument('input_dir', help='DICOMファイルが含まれるディレクトリのパス')
    parser.add_argument('-o', '--output', help='出力ディレクトリ(省略時は入力ファイルと同じ場所)')
    parser.add_argument('-v', '--verbose', action='store_true', help='詳細ログを表示')
    
    args = parser.parse_args()
    
    if args.verbose:
        logging.getLogger().setLevel(logging.DEBUG)
    
    input_dir = Path(args.input_dir)
    
    if not input_dir.exists():
        logger.error(f"指定されたディレクトリが存在しません: {input_dir}")
        return
    
    if not input_dir.is_dir():
        logger.error(f"指定されたパスはディレクトリではありません: {input_dir}")
        return
    
    converter = DICOMToVideoConverter(args.output)
    converter.convert_directory(input_dir)

if __name__ == "__main__":
    # 使用例の表示
    print("=== DICOM一括動画変換プログラム ===")
    print("フォルダ内のすべての.dcmファイルを[元ファイル名].aviに変換します")
    print()
    print("使用方法:")
    print("python dicom_converter.py <DICOMディレクトリ> [-o 出力ディレクトリ] [-v]")
    print()
    print("例:")
    print("python dicom_converter.py ./dicom_folder/")
    print("python dicom_converter.py ./dicom_folder/ -o ./output/ -v")
    print()
    print("特徴:")
    print("- フォルダ内の.dcmファイルを再帰的に検索")
    print("- 各ファイルを個別のAVI動画に変換")
    print("- 2D画像も1フレームの動画として変換")
    print("- マルチフレームDICOMは適切なフレームレートで動画化")
    print("- 出力先未指定時は元ファイルと同じ場所に保存")
    print()
    
    # コマンドライン引数がある場合は実行
    import sys
    if len(sys.argv) > 1:
        main()
    else:
        print("対話式使用例:")
        print("converter = DICOMToVideoConverter('./output/')")
        print("converter.convert_directory('./dicom_folder/')")

使用方法については以下の通り。

=== DICOM一括動画変換プログラム ===
フォルダ内のすべての.dcmファイルを[元ファイル名].aviに変換します        

使用方法:
python dicom_converter.py <DICOMディレクトリ> [-o 出力ディレクトリ] [-v]

例:
python dicom_converter.py ./dicom_folder/
python dicom_converter.py ./dicom_folder/ -o ./output/ -v

特徴:
- フォルダ内の.dcmファイルを再帰的に検索
- 各ファイルを個別のAVI動画に変換
- 2D画像も1フレームの動画として変換
- マルチフレームDICOMは適切なフレームレートで動画化
- 出力先未指定時は元ファイルと同じ場所に保存

コメント

タイトルとURLをコピーしました