血管撮影の動画が含まれているDICOMから動画を取り出し、汎用の動画ファイルに変換するPythonを紹介します。
フォルダ内にある.dcmファイルを動画ファイル(AVI形式)に一括変換します。
import pydicom
import numpy as np
from PIL import Image
import cv2
import os
import argparse
from pathlib import Path
import logging
# ログ設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class DICOMToVideoConverter:
def __init__(self, output_dir=None):
"""
DICOM動画変換クラス
Args:
output_dir (str): 出力ディレクトリ(Noneの場合は入力ファイルと同じ場所)
"""
self.output_dir = Path(output_dir) if output_dir else None
if self.output_dir:
self.output_dir.mkdir(exist_ok=True)
def convert_file_to_video(self, dicom_path):
"""
単一のDICOMファイルを動画に変換
Args:
dicom_path (str): DICOMファイルのパス
"""
try:
dicom_path = Path(dicom_path)
ds = pydicom.dcmread(dicom_path)
logger.info(f"変換開始: {dicom_path}")
logger.info(f"Modality: {getattr(ds, 'Modality', 'Unknown')}")
logger.info(f"Series Description: {getattr(ds, 'SeriesDescription', 'Unknown')}")
# ピクセルデータが存在するかチェック
if not hasattr(ds, 'pixel_array'):
logger.warning(f"ピクセルデータが見つかりません: {dicom_path}")
return False
pixel_array = ds.pixel_array
logger.info(f"画像サイズ: {pixel_array.shape}")
# 出力ファイル名を決定
output_filename = f"{dicom_path.stem}.avi"
if self.output_dir:
output_path = self.output_dir / output_filename
else:
output_path = dicom_path.parent / output_filename
# データタイプに応じて処理
if len(pixel_array.shape) == 2:
# 2D画像(単一フレーム)- 1フレームの動画として保存
logger.info("2D画像を1フレーム動画として変換")
self._save_as_video(pixel_array[np.newaxis, :, :], output_path, ds)
elif len(pixel_array.shape) == 3:
# 3D画像(マルチフレーム/ボリューム)
if pixel_array.shape[0] > 1:
# マルチフレーム - 動画として保存
logger.info(f"マルチフレーム動画として変換: {pixel_array.shape[0]}フレーム")
self._save_as_video(pixel_array, output_path, ds)
else:
# 単一フレーム - 1フレームの動画として保存
logger.info("単一フレームを1フレーム動画として変換")
self._save_as_video(pixel_array, output_path, ds)
elif len(pixel_array.shape) == 4:
# 4D画像(時系列ボリューム)- 最初の次元をフレームとして扱う
logger.info(f"4D画像を動画として変換: {pixel_array.shape[0]}フレーム")
# 4D -> 3D に変換(時間軸を保持、空間軸を統合)
if pixel_array.shape[1] == 1:
# (time, 1, height, width) -> (time, height, width)
reshaped_array = pixel_array.squeeze(1)
else:
# (time, slices, height, width) -> 最初のスライスのみ使用
reshaped_array = pixel_array[:, 0, :, :]
self._save_as_video(reshaped_array, output_path, ds)
else:
logger.error(f"サポートされていない配列次元: {pixel_array.shape}")
return False
return True
except Exception as e:
logger.error(f"変換エラー {dicom_path}: {str(e)}")
return False
def _normalize_pixel_array(self, pixel_array, ds):
"""
ピクセル配列を0-255の範囲に正規化
Args:
pixel_array: ピクセル配列
ds: DICOMデータセット
Returns:
正規化されたピクセル配列
"""
# ウィンドウレベルとウィンドウ幅を取得
window_center = getattr(ds, 'WindowCenter', None)
window_width = getattr(ds, 'WindowWidth', None)
if window_center is not None and window_width is not None:
# 複数の値がある場合は最初の値を使用
if hasattr(window_center, '__iter__'):
window_center = window_center[0]
if hasattr(window_width, '__iter__'):
window_width = window_width[0]
# ウィンドウレベル処理
min_val = window_center - window_width / 2
max_val = window_center + window_width / 2
pixel_array = np.clip(pixel_array, min_val, max_val)
pixel_array = ((pixel_array - min_val) / (max_val - min_val) * 255).astype(np.uint8)
else:
# ウィンドウ情報がない場合は単純な正規化
pixel_array = pixel_array.astype(np.float64)
pixel_array = (pixel_array - pixel_array.min()) / (pixel_array.max() - pixel_array.min()) * 255
pixel_array = pixel_array.astype(np.uint8)
return pixel_array
def _save_as_video(self, pixel_array, output_path, ds):
"""
ピクセル配列を動画として保存
Args:
pixel_array: 3Dピクセル配列 (frames, height, width)
output_path: 出力パス
ds: DICOMデータセット
"""
try:
# フレームレートを決定(デフォルト: 10fps)
frame_rate = 10
if hasattr(ds, 'RecommendedDisplayFrameRate'):
try:
frame_rate = float(ds.RecommendedDisplayFrameRate)
except:
pass
elif hasattr(ds, 'FrameTime'):
try:
frame_rate = 1000.0 / float(ds.FrameTime) # FrameTimeはミリ秒
except:
pass
elif hasattr(ds, 'CineRate'):
try:
frame_rate = float(ds.CineRate)
except:
pass
# フレームレートの妥当性チェック
frame_rate = max(1, min(60, frame_rate))
height, width = pixel_array.shape[1], pixel_array.shape[2]
logger.info(f"動画設定: {pixel_array.shape[0]}フレーム, {width}x{height}, {frame_rate:.1f}fps")
# AVI形式で保存
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter(str(output_path), fourcc, frame_rate, (width, height), False)
if not out.isOpened():
logger.error(f"VideoWriter の初期化に失敗しました: {output_path}")
return
for i in range(pixel_array.shape[0]):
frame = pixel_array[i]
normalized_frame = self._normalize_pixel_array(frame, ds)
out.write(normalized_frame)
out.release()
# ファイルが正常に作成されたかチェック
if output_path.exists() and output_path.stat().st_size > 0:
logger.info(f"✓ 動画を保存しました: {output_path}")
else:
logger.error(f"✗ 動画ファイルが正常に作成されませんでした: {output_path}")
except Exception as e:
logger.error(f"動画保存エラー: {str(e)}")
def convert_directory(self, dicom_dir):
"""
ディレクトリ内のすべての.dcmファイルを動画に変換
Args:
dicom_dir (str): DICOMファイルが含まれるディレクトリ
"""
dicom_dir = Path(dicom_dir)
# .dcmファイルを探索
dcm_files = []
# 再帰的に.dcmファイルを検索
for pattern in ['**/*.dcm', '**/*.DCM']:
dcm_files.extend(dicom_dir.glob(pattern))
# 重複を除去してソート
dcm_files = sorted(list(set(dcm_files)))
if not dcm_files:
logger.warning(f".dcmファイルが見つかりません: {dicom_dir}")
return
logger.info(f"{len(dcm_files)}個の.dcmファイルを発見しました")
# 変換統計
success_count = 0
failed_count = 0
for dcm_file in dcm_files:
logger.info(f"\n[{success_count + failed_count + 1}/{len(dcm_files)}] 変換中: {dcm_file.name}")
if self.convert_file_to_video(dcm_file):
success_count += 1
else:
failed_count += 1
# 結果サマリー
logger.info(f"\n{'='*60}")
logger.info(f"変換完了サマリー")
logger.info(f"{'='*60}")
logger.info(f"総ファイル数: {len(dcm_files)}")
logger.info(f"成功: {success_count}")
logger.info(f"失敗: {failed_count}")
logger.info(f"成功率: {success_count/len(dcm_files)*100:.1f}%")
if self.output_dir:
logger.info(f"出力先: {self.output_dir}")
else:
logger.info(f"出力先: 各ファイルと同じディレクトリ")
def main():
parser = argparse.ArgumentParser(description='DICOMファイルを一括でAVI動画に変換')
parser.add_argument('input_dir', help='DICOMファイルが含まれるディレクトリのパス')
parser.add_argument('-o', '--output', help='出力ディレクトリ(省略時は入力ファイルと同じ場所)')
parser.add_argument('-v', '--verbose', action='store_true', help='詳細ログを表示')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
input_dir = Path(args.input_dir)
if not input_dir.exists():
logger.error(f"指定されたディレクトリが存在しません: {input_dir}")
return
if not input_dir.is_dir():
logger.error(f"指定されたパスはディレクトリではありません: {input_dir}")
return
converter = DICOMToVideoConverter(args.output)
converter.convert_directory(input_dir)
if __name__ == "__main__":
# 使用例の表示
print("=== DICOM一括動画変換プログラム ===")
print("フォルダ内のすべての.dcmファイルを[元ファイル名].aviに変換します")
print()
print("使用方法:")
print("python dicom_converter.py <DICOMディレクトリ> [-o 出力ディレクトリ] [-v]")
print()
print("例:")
print("python dicom_converter.py ./dicom_folder/")
print("python dicom_converter.py ./dicom_folder/ -o ./output/ -v")
print()
print("特徴:")
print("- フォルダ内の.dcmファイルを再帰的に検索")
print("- 各ファイルを個別のAVI動画に変換")
print("- 2D画像も1フレームの動画として変換")
print("- マルチフレームDICOMは適切なフレームレートで動画化")
print("- 出力先未指定時は元ファイルと同じ場所に保存")
print()
# コマンドライン引数がある場合は実行
import sys
if len(sys.argv) > 1:
main()
else:
print("対話式使用例:")
print("converter = DICOMToVideoConverter('./output/')")
print("converter.convert_directory('./dicom_folder/')")
使用方法については以下の通り。
=== DICOM一括動画変換プログラム ===
フォルダ内のすべての.dcmファイルを[元ファイル名].aviに変換します
使用方法:
python dicom_converter.py <DICOMディレクトリ> [-o 出力ディレクトリ] [-v]
例:
python dicom_converter.py ./dicom_folder/
python dicom_converter.py ./dicom_folder/ -o ./output/ -v
特徴:
- フォルダ内の.dcmファイルを再帰的に検索
- 各ファイルを個別のAVI動画に変換
- 2D画像も1フレームの動画として変換
- マルチフレームDICOMは適切なフレームレートで動画化
- 出力先未指定時は元ファイルと同じ場所に保存
コメント