ddranimtool: Rewrote animation renderer code from chart tool

This commit is contained in:
987123879113 2023-02-27 10:57:57 +09:00
parent 59392dac52
commit c0b0790643
13 changed files with 1247 additions and 0 deletions

103
other/ddranimtool/README.md Normal file
View File

@ -0,0 +1,103 @@
# ddranimtool
This tool renders the videos from Sys573 DDR games (MAX and later). It's not perfect but it gets the job done.
## Prerequisites
- Python 3 (tested on 3.9.6)
- Java Runtime Environment (tested with openjdk 17.0.6, other recent versions should work too)
- [jPSXdec](https://github.com/m35/jpsxdec/releases)
- [sys573tool](https://github.com/987123879113/gobbletools/tree/master/sys573/sys573tool)
- [py573a](https://github.com/987123879113/gobbletools/tree/master/sys573/py573a)
## Setup
Install python3 requirements:
```sh
python3 -m pip install -r requirements.txt
```
Extract the jPSXdec binary zip downloaded from the [official releases](https://github.com/m35/jpsxdec/releases) page into the `tools/jpsxdec` folder. Your path should look like `tools/jpsxdec/jpsxdec.jar` if done correctly.
Additional, follow the build steps for sys573tool and py573a to get those working for the following steps.
## How to prepare data
1. (Optional if using a MAME CHD) Extract CHD to CUE/BIN using chdman
```sh
chdman extractcd -i game.chd -o game.cue
```
2. Extract contents of cue/bin (or your CD image or physical CD) to a separate folder.
3. Use [sys573tool](https://github.com/987123879113/gobbletools/tree/master/sys573/sys573tool) to extract the GAME.DAT and CARD.DAT
```sh
python3 sys573tool.py --mode dump --input game_cd_contents --output game_data_extracted
```
This gives you the mdb folder, located at `game_data_extracted/0/mdb`, and the common movies, located at `game_data_extracted/0/movies/common`.
4. Grab required data from game_data_extracted:
- Copy the files from `game_data_extracted/0/movies/common` into `game_cd_contents/MOV`.
- (DDR Extreme only) Copy the files from `game_data_extracted/0/mp3/enc` into `game_cd_contents/DAT`
5. Decrypt all of the MP3 .DATs using [py573a](https://github.com/987123879113/gobbletools/tree/master/sys573/py573a)
```sh
(Linux/macOS)
find game_cd_contents/DAT -type f -iname "*.dat" -exec sh -c 'python3 py573a.py --input "$0" --output "$(echo "$0" | sed "s/\(.*\)\..*/\1/").mp3"' {} \;
(Windows)
for %s in (game_cd_contents/DAT/*.dat) do python3 py573a.py --input "%s" --output "%~ns.mp3"
```
6. (Optional) Prepare video cache. This step may take a significant amount of time so be prepared to wait potentially an hour. Alternatively, the video animation renderer tool will cache the videos it needs on demand if they aren't in the cache already. Letting the tool cache what's needed is recommended if you don't plan on rendering every song.
```sh
python3 video_frame_cacher.py -i game_cd_contents/MOV
```
Expect a full frame cache for each specific game to be somewhere around 2gb-3gb each.
I would recommend creating a new cache folder for every individual game you want to render so as to not run into issues where a video may have changed in some way between game releases. You can use the `-o frame_cache_folder_name` parameter to specify the output cache folder.
```sh
python3 video_frame_cacher.py -i game_cd_contents/MOV -o frame_cache_folder_name
```
### How to render video using anim_renderer.py
```sh
python3 anim_renderer.py -m game_data_extracted/0/mdb -s game_cd_contents/DAT -c frame_cache_folder_name -i song_id
```
Replace the `song_id` value at the end with the 4/5 letter song ID for the song you wish to render. You can reference [this list](https://zenius-i-vanisher.com/ddrmasterlist.txt) to easily figure out what the song ID is for a specific song.
## anim_renderer.py usage
```
usage: anim_renderer.py [-h] [-v] [-l LOG_OUTPUT] -m INPUT_MDB_PATH [-s INPUT_MP3_PATH] -i SONG_ID [-o OUTPUT] [-z] [-f] [-c CACHE_PATH] [-r VIDEO_PATH] [-t TOOLS_PATH]
optional arguments:
-h, --help show this help message and exit
-v, --verbose Print lots of debugging statements
-l LOG_OUTPUT, --log-output LOG_OUTPUT
Save log to specified output file
-m INPUT_MDB_PATH, --input-mdb-path INPUT_MDB_PATH
Input mdb folder containing song data
-s INPUT_MP3_PATH, --input-mp3-path INPUT_MP3_PATH
Input MP3 folder containing decrypted MP3s
-i SONG_ID, --song-id SONG_ID
Song ID (4 or 5 letter name found in mdb folder)
-o OUTPUT, --output OUTPUT
Output filename
-z, --render-background-image
Include background image in rendered video
-f, --force-overwrite
Force overwrite
-c CACHE_PATH, --cache-path CACHE_PATH
Frame cache path
-r VIDEO_PATH, --video-path VIDEO_PATH
Raw video path
-t TOOLS_PATH, --tools-path TOOLS_PATH
Tools path
```
## video_frame_cacher.py usage
```
usage: video_frame_cacher.py [-h] -i INPUT [-o OUTPUT] [-t TOOLS_PATH]
optional arguments:
-h, --help show this help message and exit
-i INPUT, --input INPUT
Input path containing raw video files
-o OUTPUT, --output OUTPUT
Output path to store cached video frames
-t TOOLS_PATH, --tools-path TOOLS_PATH
Tools path
```

View File

@ -0,0 +1,138 @@
# Master ID reference: https://zenius-i-vanisher.com/ddrmasterlist.txt
import argparse
import logging
import os
import re
from PIL import Image
import tim2png
from formats.csq import *
logger = logging.getLogger("ddranimtool")
def get_re_file_insensitive(f, path):
results = [os.path.join(path, filename) for filename in os.listdir(path) if re.search(f, filename, re.IGNORECASE)]
assert (len(results) <= 1)
return results[0] if results else None
def get_sys573_encoded_mp3_name(title):
# 800a7714 in DDR Extreme AC
title = bytearray(title.upper().encode('ascii'))
# Pad name until it's 5 bytes
title_sum = sum(title)
while len(title) < 5:
title.append((title_sum + (title_sum // 0x1a) * -0x1a + 0x41) & 0xff)
# Shuffle
title = bytearray([title[-1]]) + title[:-1]
title[1], title[3] = title[3], title[1]
# 800a91e4 in DDR Extreme AC
for i, c in enumerate(title):
if c >= 0x30 and c <= 0x39:
c = ((c - 0x30) * 2) + 0x41
else:
t = c - 0x41
if t < 0x1a:
if t < 10:
c = (t * 2) + 0x42
elif t < 0x14:
c -= 0x1b
title[i] = c
return title.decode('ascii')
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', help="Print lots of debugging statements",
action="store_const", dest="loglevel", const=logging.DEBUG, default=logging.INFO)
parser.add_argument('-l', '--log-output', help="Save log to specified output file", default=None)
parser.add_argument('-m', '--input-mdb-path', help='Input mdb folder containing song data', required=True)
parser.add_argument('-s', '--input-mp3-path',
help='Input MP3 folder containing decrypted MP3s', default=None)
parser.add_argument(
'-i', '--song-id', help='Song ID (4 or 5 letter name found in mdb folder)', required=True)
parser.add_argument('-o', '--output', help='Output filename', default=None)
parser.add_argument('-z', '--render-background-image',
help='Include background image in rendered video', default=True, action="store_false")
parser.add_argument('-f', '--force-overwrite', help='Force overwrite', default=False, action="store_true")
parser.add_argument('-c', '--cache-path', help='Frame cache path', default="frame_cache")
parser.add_argument('-r', '--video-path', help='Raw video path', default=None)
parser.add_argument('-t', '--tools-path', help='Tools path', default="tools")
args = parser.parse_args()
log_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
logger.setLevel(args.loglevel)
stream_logger = logging.StreamHandler()
stream_logger.setFormatter(log_formatter)
logger.addHandler(stream_logger)
if args.log_output is not None:
file_logger = logging.FileHandler(args.log_output)
file_logger.setFormatter(log_formatter)
logger.addHandler(file_logger)
output_filename = args.output if args.output else os.path.join("output", f"{args.song_id}.mp4")
if not args.force_overwrite and os.path.exists(output_filename):
logger.info("File already exists, skipping... %s" % output_filename)
exit(0)
os.makedirs(os.path.dirname(output_filename), exist_ok=True)
logger.info(f"Rendering {args.song_id}...")
mdb_song_path = get_re_file_insensitive(re.escape(args.song_id) + "$", args.input_mdb_path)
song_bg_filename = get_re_file_insensitive(
r'.*_bk\.cmt', mdb_song_path) if args.render_background_image else None
song_mp3_filename = get_re_file_insensitive(
r'M..'+re.escape(get_sys573_encoded_mp3_name(args.song_id))+'.*\..*MP3', args.input_mp3_path)
song_chart_filename = get_re_file_insensitive(r'all\..*sq', mdb_song_path)
if song_mp3_filename is None:
logger.warning("Could not find MP3 for %s, video will be silent" % args.song_id)
assert (song_chart_filename is not None)
if song_bg_filename is not None:
# Cropped to match what the actual AC game does.
# I noticed the PS2 versions seem to use a different crop when looking at YouTube videos for reference.
bg_image = tim2png.readTimImage(open(song_bg_filename, "rb"), disable_transparency=True)[0]
top_crop = 25
bottom_crop = 39
left_crop = 8
right_crop = 8
bg_image = bg_image.crop((left_crop, top_crop, bg_image.width - right_crop, bg_image.height - bottom_crop))
else:
bg_image = Image.new('RGB', (304, 176))
raw_video_render_only = False
data = bytearray(open(song_chart_filename, "rb").read())
reader = CsqReader(data)
bpm_list = reader.get_tempo_events()
anim_events = reader.get_anim_events()
timekeeper = reader.timekeeper
jpsxdec_path = os.path.join(args.tools_path, "jpsxdec", "jpsxdec.jar")
if not os.path.exists(jpsxdec_path):
logger.error("ERROR: Could not find jPSXdec! %s" % (jpsxdec_path))
assert (os.path.exists(jpsxdec_path) == True)
frame_manager = FrameManager(args.cache_path, args.video_path, jpsxdec_path)
renderer = CsqAnimationRenderer(anim_events, frame_manager, timekeeper)
renderer.export(output_filename, song_mp3_filename, bg_image, raw_video_render_only)

View File

View File

@ -0,0 +1,4 @@
from .csqanimationrenderer import CsqAnimationRenderer
from .csqreader import CsqReader
from .timekeeper import TimeKeeper
from .frame_manager import FrameManager

View File

@ -0,0 +1,37 @@
from enum import IntEnum
class PlaybackMethod(IntEnum):
Unknown = 0
Normal = 1
PingPong = 2
Freeze = 3
class PlaybackDirection(IntEnum):
Freeze = 0
Forward = 1
Reverse = -1
class AnimationFlags(IntEnum):
PlaybackMethodNormal = 1
PlaybackMethodPingPong = 2
PlaybackMethodFreeze = 3
PlaybackDirectionFreeze = 0
PlaybackDirectionForward = 1
PlaybackDirectionReverse = 2
class AnimationCommands(IntEnum):
Play2 = 1
Play3 = 2
Play4 = 3
PlayStretch = 4
AppendLoopAll = 5
FreezeFrame = 6
AppendLoopLast = 7
Clear = 8 # Are these two clear commands any different? I see more special cases for the 9 command but not for the 8 comand
Clear2 = 9
Play1 = 10 # Only on PS2

View File

@ -0,0 +1,234 @@
import logging
from moviepy.video.io.ImageSequenceClip import ImageSequenceClip
from moviepy.editor import concatenate_videoclips, AudioFileClip, CompositeVideoClip
import numpy as np
from .constants import *
from .timekeeper import TimeKeeper
logger = logging.getLogger("ddranimtool." + __name__)
TARGET_FRAME_RATE = 60
class CsqAnimationRenderer:
def __init__(self, events, frame_manager, timekeeper=None):
self.events = events
self.frame_manager = frame_manager
self.timekeeper = timekeeper if timekeeper else TimeKeeper()
def get_clip(self, frames, fps):
return concatenate_videoclips([ImageSequenceClip(frames, fps=fps)])
def get_frames(self, event):
clip_frames = []
frames = []
frame_start = int(event['frame_start'])
for clip in event['clips']:
frames = self.frame_manager.get_raw_frames(clip['filename'] + ".sbs")
if event['direction'] == PlaybackDirection.Freeze:
frames = [frames[frame_start]]
elif event['direction'] == PlaybackDirection.Reverse:
if frame_start > 0:
frames = frames[:frame_start+1]
else:
frames = frames[frame_start:]
clip_frames.append(frames)
frame_start = 0
if event['direction'] == PlaybackDirection.Reverse:
event['frame_start'] = len(frames) - 1
else:
event['frame_start'] = 0
if event['frame_start'] < 0:
event['frame_start'] = 0
return clip_frames
def get_output_frames(self):
output_clips = []
last_event_is_clear = self.events[-1].get('clear', False)
if not last_event_is_clear:
logger.error("ERROR: Last animation event is not a clear!")
assert (last_event_is_clear == True)
last_event_is_clear = self.events[-1].get('clear', False)
for event in self.events:
is_valid_clear = True
if event.get('clear', False) and event['timestamp'] != self.events[-1]['timestamp']:
is_valid_clear = False
logger.error("ERROR: Found clear command that isn't at the end of the animation!")
assert (is_valid_clear == True)
for idx, event in enumerate(self.events[:-1]):
if event.get('clear', False):
logger.error("ERROR: Handle clear event mid-song!")
exit(1)
clip_frames = self.get_frames(event)
output_frames = []
clip_idx = 0
frame_idx = int(event['frame_start'])
cur_dir = event['direction']
# DDR Extreme useful breakpoints for debugging
# bpset 80068850,1,{ printf "timer[%08x]",a2; g }
# bpset 80071624,1,{ printf "new_offset[%08x]",v0; g }
# bpset 80069168,1,{ printf "non-stretch frame[%02x] offset[%08x]",s0,s2; g }
# bpset 80068eac,1,{ printf "stretch frame[%02x] offset[%08x]",s0,s2; s }
event['offset'] -= 0x100
clip_idx = 0
start_offset = event['offset']
end_offset = self.events[idx+1]['offset']
t1 = self.timekeeper.calculate_timestamp_from_offset(start_offset)
t2 = self.timekeeper.calculate_timestamp_from_offset(end_offset)
tcur = t1
tstep = (1 / 60) * 1000
cnt = 0
next_clip_wrap = len(clip_frames[0])
while tcur < t2:
if event['method'] == PlaybackMethod.Freeze or cur_dir == PlaybackDirection.Freeze:
event['frame_length'] = 1
frame_idx = int(event['frame_start'])
clip_idx = 0
elif event.get('stretch', False):
x = (self.timekeeper.calculate_offset_from_timestamp(tcur) - start_offset)
frame_idx = int((x * len(clip_frames[clip_idx])) / (event.get('frame_speed', 2) * 1024))
if frame_idx >= next_clip_wrap:
clip_idx = (clip_idx + 1) % len(clip_frames)
next_clip_wrap += len(clip_frames[clip_idx])
if event['method'] == PlaybackMethod.PingPong:
cur_dir = PlaybackDirection.Reverse if cur_dir == PlaybackDirection.Forward else PlaybackDirection.Forward
frame_idx %= len(clip_frames[clip_idx])
if cur_dir == PlaybackDirection.Reverse:
frame_idx = len(clip_frames[clip_idx]) - (frame_idx + 1)
if frame_idx < 0:
frame_idx = 0
if len(clip_frames[clip_idx]) <= frame_idx:
frame_idx = len(clip_frames[clip_idx]) - 1
else:
if (cnt % event['frame_length']) == 0:
if cur_dir == PlaybackDirection.Reverse:
frame_idx = frame_idx - 1
elif cur_dir != PlaybackDirection.Freeze:
frame_idx = frame_idx + 1
if frame_idx < 0 or frame_idx >= len(clip_frames[clip_idx]):
if cur_dir == PlaybackDirection.Reverse:
clip_idx = clip_idx - 1
if clip_idx < 0:
clip_idx = len(clip_frames) - 1
else:
clip_idx = (clip_idx + 1) % len(clip_frames)
if event['method'] == PlaybackMethod.PingPong:
cur_dir = PlaybackDirection.Reverse if cur_dir == PlaybackDirection.Forward else PlaybackDirection.Forward
if cur_dir == PlaybackDirection.Reverse:
frame_idx = len(clip_frames[cur_dir]) - 1
elif cur_dir != PlaybackDirection.Freeze:
frame_idx = 0
# Don't play the last frame that was already played
if event['method'] == PlaybackMethod.PingPong:
if cur_dir == PlaybackDirection.Reverse:
frame_idx -= 1
elif cur_dir != PlaybackDirection.Freeze:
frame_idx += 1
output_frames.append(clip_frames[clip_idx][int(frame_idx)])
cnt += 1
tcur += tstep
expected_duration = (self.timekeeper.calculate_timestamp_from_offset(self.events[idx+1]['offset'] - 0x100) - self.timekeeper.calculate_timestamp_from_offset(event['offset'])) / 1000
expected_frame_count = round(expected_duration * 60)
if len(output_frames) > expected_frame_count:
output_frames = output_frames[:expected_frame_count]
assert(len(output_frames) == expected_frame_count)
clip = self.get_clip(output_frames, len(output_frames) / ((self.events[idx+1]['timestamp'] - event['timestamp'])/1000))
assert(round(clip.duration * clip.fps) == expected_frame_count)
output_clips.append({
'timestamp_start': event['timestamp'],
'timestamp_end': self.events[idx+1]['timestamp'],
'clip': clip
})
return output_clips
def export(self, output_filename, mp3_filename, background_image, raw_video_render_only=False):
output_clips = self.get_output_frames()
clear_events = [x['timestamp'] for x in self.events if x.get('clear', False)]
if len(clear_events) > 1:
logger.error("ERROR: Found multiple clear events!")
assert (len(clear_events) <= 1)
bgm_audio = AudioFileClip(mp3_filename) if mp3_filename else None
# Combine all clips into one composite clip
earliest_timestamp = min([c['timestamp_start'] for c in output_clips])
clip = concatenate_videoclips([c['clip'] for c in output_clips])
if clear_events:
clear_event_timestamp = (clear_events[0] - earliest_timestamp) / 1000
if clear_event_timestamp < clip.duration:
clip = clip.subclip(0, clear_event_timestamp)
if not raw_video_render_only:
video_timestamp_end = output_clips[-1]['timestamp_end'] + 1000
if bgm_audio is not None and bgm_audio.duration > video_timestamp_end / 1000:
video_timestamp_end = bgm_audio.duration * 1000
crossfade_time = 0.5
# Write the background image from the very beginning to the very end of the video
clip = CompositeVideoClip([
self.get_clip([np.asarray(background_image)] * int((video_timestamp_end/1000)*60), 60),
clip.set_start(earliest_timestamp / 1000).crossfadein(crossfade_time)
])
# Just some quick checks to make sure there are no unexpected gaps in the video frames
# If this ever asserts then there's probably an issue with the parser somewhere
timestamps = sorted([(c['timestamp_start'] - earliest_timestamp, c['timestamp_end'] -
earliest_timestamp) for c in output_clips], key=lambda x: x[0])
for i, timestamp in enumerate(timestamps[2:]):
if timestamp[0] > timestamps[i+1][1]:
logger.error("ERROR: Found gap in video clips! %f to %f" % (timestamps[i+1][1], timestamp[0]))
assert (timestamp[0] <= timestamps[i+1][1])
if not raw_video_render_only and bgm_audio is not None:
clip = clip.set_audio(bgm_audio)
clip.write_videofile(output_filename, audio_codec="aac", preset="ultrafast",
fps=TARGET_FRAME_RATE, bitrate="50000k")

View File

@ -0,0 +1,268 @@
import logging
from .constants import *
from .timekeeper import TimeKeeper
logger = logging.getLogger("ddranimtool." + __name__)
class CsqReader:
def __init__(self, data):
self.timekeeper = TimeKeeper()
self.raw_frames = {}
self.frame_cache = {}
self.chunks = self.parse_chunks(data)
self.timekeeper.bpm_list = self.get_tempo_events()
def parse_chunks(self, data):
chunks = {}
data_idx = 0
while data_idx < len(data):
chunk_len = int.from_bytes(data[data_idx:data_idx+4], 'little')
if data_idx + 4 >= len(data):
break
chunk_type = int.from_bytes(data[data_idx+4:data_idx+6], 'little')
chunk_raw = data[data_idx+6:data_idx+chunk_len]
data_idx += chunk_len
chunk_type = {
0x01: 'tempo',
0x02: 'events',
0x03: 'notes',
0x04: 'lamps',
0x05: 'anim',
}[chunk_type]
chunks[chunk_type] = chunk_raw
return chunks
def get_tempo_events(self):
assert ('tempo' in self.chunks)
data = self.chunks['tempo']
self.timekeeper.tick_rate = int.from_bytes(data[:2], 'little')
count = int.from_bytes(data[2:4], 'little')
assert (int.from_bytes(data[4:6], 'little') == 0)
time_offsets = [int.from_bytes(data[6+x*4:6+(x+1)*4], 'little', signed=True) for x in range(count)]
time_data = [int.from_bytes(data[6+x*4:6+(x+1)*4], 'little', signed=True) for x in range(count, count * 2)]
bpm_changes = []
for i in range(1, count):
timestamp_start = time_data[i-1] / self.timekeeper.tick_rate
timestamp_end = time_data[i] / self.timekeeper.tick_rate
time_delta = (timestamp_end - timestamp_start) * 1000
offset_delta = (time_offsets[i] - time_offsets[i-1])
bpm = 60000 / (time_delta / (offset_delta / 1024)) if offset_delta != 0 else 0
bpm_changes.append({
'beat_start': time_offsets[i-1],
'beat_end': time_offsets[i],
'music_start': time_data[i-1],
'music_end': time_data[i],
'timestamp_start': timestamp_start,
'timestamp_end': timestamp_end,
'bpm': bpm
})
return bpm_changes
def get_anim_events(self):
assert ('anim' in self.chunks)
data = self.chunks['anim']
# Ref: 80068224 in DDR Extreme AC
assert (int.from_bytes(data[:2], 'little') == 0)
count = int.from_bytes(data[2:4], 'little')
assert (int.from_bytes(data[4:6], 'little') == 0)
event_offsets = [int.from_bytes(data[6+x*4:6+(x+1)*4], 'little', signed=True) for x in range(count)]
event_data = [data[6+(count*4)+x*4:6+(count*4)+(x+1)*4] for x in range(count)]
filename_chunk_count = int.from_bytes(data[6+(count*8):6+(count*8)+4], 'little')
filename_chunks = [int.from_bytes(data[6+(count*8)+4+x*4:6+(count*8)+4+(x+1)*4], 'little')
for x in range(filename_chunk_count)]
clip_filenames = []
for chunk in filename_chunks:
# Around 80067e90 in DDR Extreme AC
clip_filename = ""
for i in range(6):
c = (chunk >> (5 * i)) & 0x1f
if c < 0x1b:
clip_filename += chr(c + 0x61)
clip_filenames.append(clip_filename)
events = []
for i in range(count):
import hexdump
logger.debug(hexdump.dump(event_data[i]))
cmd = event_data[i][0]
cmd_upper = (cmd >> 4) & 0x0f
clip_idx = event_data[i][1]
clip_offset = event_data[i][2]
if event_data[i][3] != 0:
logger.error("ERROR: event_data[i][3] was %02x" % event_data[i][3])
exit(1)
assert (event_data[i][3] == 0)
# TODO: There's a special clip ID, 0x28. What is it?
# TODO: There's also a case when the clip ID is >= 0x64. What does that do?
common_clip_filenames = {
0x14: "ccclca",
0x15: "ccclma",
0x16: "cccuba",
0x17: "ccddra",
0x18: "ccdrga",
0x19: "ccheaa",
0x1a: "ccitaa",
0x1b: "ccltaa",
0x1c: "ccrgca",
0x1d: "ccsaca",
}
clip_filename = common_clip_filenames[clip_idx] if clip_idx in common_clip_filenames else clip_filenames[clip_idx]
event = {
'offset': event_offsets[i],
'timestamp': self.timekeeper.calculate_timestamp_from_offset(event_offsets[i]),
'method': PlaybackMethod.Normal,
'direction': PlaybackDirection.Freeze,
'frame_length': 2,
'frame_start': 0,
'clips': [],
}
clip = {
'filename': clip_filename,
'loop': True,
}
# Defaults to 1 (normal) if cmd & 3 is not 1, 2, or 3
event['method'] = {
AnimationFlags.PlaybackMethodNormal: PlaybackMethod.Normal,
AnimationFlags.PlaybackMethodPingPong: PlaybackMethod.PingPong,
AnimationFlags.PlaybackMethodFreeze: PlaybackMethod.Freeze, # TODO: Verify
}.get(cmd & 3, PlaybackMethod.Normal)
# Defaults to 1 (forward) if (cmd >> 2) & 3 is not 0, 1, or 2
event['direction'] = {
AnimationFlags.PlaybackDirectionFreeze: PlaybackDirection.Freeze,
AnimationFlags.PlaybackDirectionForward: PlaybackDirection.Forward,
AnimationFlags.PlaybackDirectionReverse: PlaybackDirection.Reverse,
}.get((cmd >> 2) & 3, PlaybackDirection.Forward)
# Set other params
max_frames = 80
if cmd_upper in [AnimationCommands.Play1, AnimationCommands.Play2, AnimationCommands.Play3, AnimationCommands.Play4]:
# 0x12e4c8 in PS2 DDR Extreme JP
event['frame_start'] = clip_offset
event['frame_length'] = {
AnimationCommands.Play1: 1,
AnimationCommands.Play2: 2,
AnimationCommands.Play3: 3,
AnimationCommands.Play4: 4,
}[cmd_upper]
if event['frame_start'] == 0 and event['direction'] == PlaybackDirection.Reverse:
event['frame_start'] = max_frames - 1
elif cmd_upper == AnimationCommands.PlayStretch:
event['frame_start'] = 0
event['frame_length'] = 2
event['stretch'] = True
event['frame_speed'] = 4 if clip_offset == 0 else clip_offset
if event['direction'] == PlaybackDirection.Reverse:
event['frame_start'] = max_frames - \
(event['frame_start'] + 1)
elif cmd_upper == AnimationCommands.AppendLoopAll:
last_val = events[-1].get('frame_speed', 2)
if clip_offset != 0 and clip_offset != last_val:
logger.error("ERROR: append loop all has non-zero parameter!")
assert (clip_offset == 0 or clip_offset == last_val)
event['frame_start'] = 0
elif cmd_upper == AnimationCommands.FreezeFrame:
# Freeze frame
event['frame_start'] = clip_offset
event['frame_length'] = 0
elif cmd_upper == AnimationCommands.AppendLoopLast:
if clip_offset != 0:
logger.error("ERROR: append loop last has non-zero parameter!")
assert (clip_offset == 0)
# Freeze just turns into -0 which is still freeze
event['direction'] = PlaybackDirection.Reverse if events[-1]['direction'] == PlaybackDirection.Forward else PlaybackDirection.Forward
# Append, only loop last clip
# Plays the first clip normally, then repeats the 2nd clip for the remainder of the time
# An offset can be specified?
# 0x speed, from frame 0
events[-1]['clips'][-1]['loop'] = False
events[-1]['clips'].append(clip)
elif cmd_upper in [AnimationCommands.Clear, AnimationCommands.Clear2]:
# Display nothing
event['clear'] = True
assert (clip_offset == 0)
else:
logger.error("ERROR: Unknown upper command: %d", cmd_upper)
exit(1)
event['frame_length'] = event.get('frame_length', 2)
assert (event['frame_length'] >= 0)
is_anim_wrapped = event['frame_start'] >= max_frames
if event['frame_start'] < 0:
event['frame_start'] = 0
is_anim_wrapped = 0 >= max_frames
if is_anim_wrapped:
event['frame_start'] = max_frames - 1
if event['direction'] == PlaybackDirection.Freeze:
event['frame_length'] = 1
if cmd_upper == AnimationCommands.AppendLoopAll:
# Append to previous event, the two will loop continuous as one
# Make sure the two events are the same before trying to merge 1
if events[-1]['offset'] != event['offset']:
logger.error("ERROR: The append command data is not the same offset!")
exit(1)
if events[-1]['direction'] == PlaybackDirection.Reverse:
event['frame_start'] = max_frames - 1
events[-1]['clips'].append(clip)
event = events[-1]
else:
event['clips'].append(clip)
events.append(event)
logger.debug("Playing %8s from frame %2d @ %8f... %02x speed[%s]" % (
clip_filename, clip_offset, event['timestamp'] / 1000, cmd, str(event['frame_length'])))
logger.debug(event)
logger.debug("")
if cmd_upper in [AnimationCommands.AppendLoopLast]:
logger.error(
"ERROR: Found command that needs to be tested! Check if this actually loops just the last clip or not")
exit(1)
return events

View File

@ -0,0 +1,113 @@
import logging
import os
import shutil
import tempfile
from PIL import Image
import numpy as np
logger = logging.getLogger("ddranimtool." + __name__)
class FrameManager:
def __init__(self, cache_folder, raw_video_folder="", jpsxdec_jar_path=None):
self.video_cache = {}
self.frame_cache = {}
self.cache_folder = os.path.abspath(cache_folder)
self.raw_video_folder = os.path.abspath(raw_video_folder) if raw_video_folder else ""
self.jpsxdec_jar_path = os.path.abspath(jpsxdec_jar_path) if jpsxdec_jar_path is not None else None
def dump_raw_frame(self, chunk, output_filename):
JPSXDEC_COMMAND = "java -jar \"%s\" -f \"{0}\" -static bs -dim {1}x{2} -fmt png -quality psx" % self.jpsxdec_jar_path
# This is stupid but jPSXdec doesn't actually have a way to save to a specific directory from command line,
# so change directories to the temporary folder until the end of the function and then restore the old directory
cwd = os.getcwd()
with tempfile.NamedTemporaryFile(mode="wb", suffix=".bin") as raw_frame_file:
os.chdir(os.path.dirname(raw_frame_file.name))
raw_frame_file.write(chunk)
converted_frame_path = os.path.splitext(raw_frame_file.name)[0] + ".png"
cmd = JPSXDEC_COMMAND.format(raw_frame_file.name, 304, 176)
os.system(cmd)
shutil.move(converted_frame_path, output_filename)
os.chdir(cwd)
def get_cached_frames(self, filename):
self.video_cache[filename] = []
basename = os.path.basename(os.path.splitext(filename)[0])
frame_idx = 0
while True:
output_filename = os.path.join(self.cache_folder, "%s_%04d.png" % (basename, frame_idx))
if not os.path.exists(output_filename):
break
with Image.open(output_filename) as inframe:
self.frame_cache[output_filename] = (inframe.tobytes(), inframe.size, inframe.mode)
self.video_cache[filename].append(
np.asarray(Image.frombytes(
mode=self.frame_cache[output_filename][2],
size=self.frame_cache[output_filename][1],
data=self.frame_cache[output_filename][0]
))
)
frame_idx += 1
def get_raw_frames(self, filename):
req_frames = []
os.makedirs(self.cache_folder, exist_ok=True)
if not filename in self.video_cache:
self.get_cached_frames(filename)
if not self.video_cache.get(filename, []):
# Only deal with jPSXdec if we need to dump a video
assert (self.jpsxdec_jar_path is not None)
self.video_cache[filename] = []
input_filename = os.path.join(self.raw_video_folder, filename)
logger.debug("Loading frames for %s" % input_filename)
if not os.path.exists(input_filename):
logger.error("Could not find %s" % input_filename)
assert (os.path.exists(input_filename) == True)
with open(input_filename, "rb") as infile:
data = bytearray(infile.read())
chunks = [data[i:i+0x2000] for i in range(0, len(data), 0x2000)]
for frame_idx in range(len(chunks)):
output_filename = os.path.join(self.cache_folder, "%s_%04d.png" % (
os.path.basename(os.path.splitext(filename)[0]), frame_idx))
if output_filename not in self.frame_cache:
if not os.path.exists(output_filename):
self.dump_raw_frame(chunks[frame_idx], output_filename)
with Image.open(output_filename) as inframe:
self.frame_cache[output_filename] = (inframe.tobytes(), inframe.size, inframe.mode)
self.video_cache[filename].append(
np.asarray(Image.frombytes(
mode=self.frame_cache[output_filename][2],
size=self.frame_cache[output_filename][1],
data=self.frame_cache[output_filename][0]
))
)
req_frames += self.video_cache[filename]
return req_frames

View File

@ -0,0 +1,66 @@
import logging
logger = logging.getLogger("ddranimtool." + __name__)
class TimeKeeper:
def __init__(self, bpm_list=[]):
self.bpm_list = bpm_list
self.tick_rate = 150
def _get_bpm_info(self, value, k1='beat_start', k2='beat_end'):
assert (self.bpm_list is not None)
found_bpm = None
for test_bpm in self.bpm_list:
if value == test_bpm[k1] and value == test_bpm[k2]:
found_bpm = test_bpm
break
if found_bpm is None:
for test_bpm in self.bpm_list:
# BPMs with a matching start offset should take precedence
if value >= test_bpm[k1] and value < test_bpm[k2]:
found_bpm = test_bpm
break
if found_bpm is None:
# But just in case none of them are within range, check the last
# to see if it's a match (used by virt) or beyond (used by summ)
if value >= self.bpm_list[-1][k2]:
found_bpm = self.bpm_list[-1]
if found_bpm is None:
logger.error("ERROR: Couldn't find BPM!")
assert (found_bpm is not None)
return found_bpm
def calculate_timestamp_from_offset(self, value):
bpm_info = self._get_bpm_info(value, k1='beat_start', k2='beat_end')
timestamp = bpm_info['timestamp_start']
t = (int(value) - bpm_info['beat_start']) / 1024
if bpm_info['bpm'] != 0:
timestamp += (t / bpm_info['bpm']) * 60
else:
assert (t == 0)
return timestamp * 1000
def calculate_offset_from_timestamp(self, value):
value = (value / 1000) * self.tick_rate
bpm_info = self._get_bpm_info(value, k1='music_start', k2='music_end')
offset = bpm_info['beat_start'] + (bpm_info['beat_end'] - bpm_info['beat_start']) * (
(value - bpm_info['music_start']) / (bpm_info['music_end'] - bpm_info['music_start']))
return int(offset)
def get_bpm_from_offset(self, value):
return self._get_bpm_info(value, k1='beat_start', k2='beat_end')['bpm']
def get_bpm_from_timestamp(self, value):
value = (value / 1000) * self.tick_rate
return self._get_bpm_info(value, k1='music_start', k2='music_end')['bpm']

View File

@ -0,0 +1,4 @@
hexdump==3.3
moviepy==1.0.3
numpy==1.24.2
Pillow==9.4.0

View File

@ -0,0 +1,248 @@
#!/usr/bin/python
#
# tim2png - Convert PlayStation TIM image to PNG format
#
# Copyright (C) 2014 Christian Bauer <www.cebix.net>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
__version__ = "1.0"
import sys
import os
import struct
from PIL import Image
from PIL import ImagePalette
# Convert 16-bit little-endian ABGR format to ARGB (PIL's "BGR;15" format).
def convertABGR(data, first_alpha=False):
output = bytearray()
output2 = []
has_transparency = False
for i in range(0, len(data), 2):
pixel = struct.unpack_from("<H", data, i)[0]
r = pixel & 0x1f
g = (pixel >> 5) & 0x1f
b = (pixel >> 10) & 0x1f
a = pixel & 0x8000
pixel = a | (r << 10) | (g << 5) | b
# first_alpha is a hacky parameter to detect 4bpp stuff used for name plates
if (r, g, b) in [(0, 0, 0)] or (first_alpha and pixel in [0xffff, 0x8000]):
a = 0
has_transparency = True
else:
a = 255
output.extend(struct.pack("<H", pixel))
r = int((255 / 31) * r)
g = int((255 / 31) * g)
b = int((255 / 31) * b)
output2.append((r, g, b, a))
return output, output2, has_transparency
# Read TIM image from file
def readTimImage(f, clut_idx=0, disable_transparency=False):
# Check header
header = f.read(8)
if header[:4] != b"\x10\x00\x00\x00":
raise SyntaxError("Not a TIM file")
flags = struct.unpack_from("<I", header, 4)[0]
if flags & 0xfffffff0:
raise SyntaxError("Not a TIM file")
pMode = flags & 7
if pMode > 4:
raise SyntaxError("Not a TIM file")
elif pMode == 4:
raise ValueError("Mixed mode images not yet supported")
# Read CLUT, if present
palette = None
haveClut = flags & 8
transparency_idx = None
has_transparency = False
clut_count = 0
if haveClut:
# Check CLUT header
clutSize = struct.unpack("<I", f.read(4))[0]
if clutSize < 12:
raise ValueError("Size of CLUT data too small")
numEntries = (clutSize - 12) // 2
f.read(8) # skip DX/DY/H/W (frame buffer location and size)
# Read CLUT data and convert to BGR;15
clut = f.read(numEntries * 2)
palette_size = 0x10
if pMode == 1:
palette_size = 0x100
if clut_idx < numEntries // palette_size:
clut = clut[clut_idx*(palette_size*2):]
clut_count = numEntries // palette_size
if pMode == 0:
clut += b'\xff' * 32 * 16 # extend to 256 entries
clut = clut[:0x200]
clut, clut2, has_transparency = convertABGR(clut, pMode == 0)
for i in range(0, len(clut), 2):
if clut[i] == 0 and clut[i+1] == 0:
transparency_idx = i // 2
break
palette = ImagePalette.raw("BGR;15", bytes(clut))
has_transparency = False if disable_transparency else has_transparency
# Read pixel data
dataSize = struct.unpack("<I", f.read(4))[0]
if dataSize < 12:
raise ValueError("Size of pixel data too small")
f.read(4) # skip DX/DY (frame buffer location)
width, height = struct.unpack("<HH", f.read(4))
expectedSize = width * height * 2 # width is in 16-bit units
pixelData = f.read(expectedSize)
# Create image, converting pixel data if necessary
if pMode in [0, 1]:
# 4-bit indexed mode, 4 pixels in each 16-bit unit
width *= 2
if pMode == 0:
width *= 2
# Expand 4-bit pixel data to 8-bit
output = bytearray()
for x in pixelData:
pix0 = x & 0x0f
pix1 = x >> 4
output.append(pix0)
output.append(pix1)
else:
output = pixelData
# image = Image.frombytes("P", (width, height), bytes(output), "raw", "P", 0, 1)
# image.palette = palette
if has_transparency:
image = Image.new("RGBA", (width, height), (0, 0, 0, 0))
pixels = image.load()
i = 0
for y in range(height):
for x in range(width):
pixels[x, y] = clut2[output[i]]
i += 1
else:
image = Image.frombytes("P", (width, height), bytes(output), "raw", "P", 0, 1)
image.palette = palette
if transparency_idx != None:
import io
image_data = io.BytesIO()
image.save(image_data, "PNG", transparency=transparency_idx)
image.close()
del image
image = Image.open(image_data)
elif pMode == 2:
# 16-bit direct mode, convert from ABGR to ARGB
output = convertABGR(pixelData)
image = Image.frombytes("RGB", (width, height), bytes(output), "raw", "BGR;15", 0, 1)
elif pMode == 3:
# 24-bit direct mode, 2 pixels in three 16-bit units
width = width * 2 / 3
image = Image.frombytes("RGB", (width, height), bytes(pixelData), "raw", "RGB", 0, 1)
return image.convert("RGBA"), clut_count
if __name__ == "__main__":
# Print usage information and exit.
def usage(exitcode, error = None):
print("Usage: %s [OPTION...] <input.tim> [<output.png>]" % os.path.basename(sys.argv[0]))
print(" -V, --version Display version information and exit")
print(" -?, --help Show this help message")
if error is not None:
print("\nError:", error, file=sys.stderr)
sys.exit(exitcode)
# Parse command line arguments
inputFileName = None
outputFileName = None
for arg in sys.argv[1:]:
if arg == "--version" or arg == "-V":
print("tim2png", __version__)
sys.exit(0)
elif arg == "--help" or arg == "-?":
usage(0)
elif arg == "--list" or arg == "-l":
listFiles = True
elif arg[0] == "-":
usage(64, "Invalid option '%s'" % arg)
else:
if inputFileName is None:
inputFileName = arg
elif outputFileName is None:
outputFileName = arg
else:
usage(64, "Unexpected extra argument '%s'" % arg)
if inputFileName is None:
usage(64, "No input file specified")
if outputFileName is None:
outputFileName = os.path.splitext(inputFileName)[0] + ".png"
# Read input image
try:
f = open(inputFileName, "rb")
except IOError as e:
print("Error opening file '%s': %s" % (inputFileName, e.strerror), file=sys.stderr)
sys.exit(1)
try:
image = readTimImage(f)
except Exception as e:
print("Error reading TIM image '%s': %s" % (inputFileName, str(e)), file=sys.stderr)
sys.exit(1)
# Write output image
if image:
image[0].save(outputFileName, "PNG")
print("Written '%s'" % outputFileName)

View File

View File

@ -0,0 +1,32 @@
import argparse
import glob
import multiprocessing
import os
from formats.csq import FrameManager
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--input', help='Input path containing raw video files', required=True)
parser.add_argument('-o', '--output', help='Output path to store cached video frames', default="frame_cache")
parser.add_argument('-t', '--tools-path', help='Tools path', default="tools")
args = parser.parse_args()
jpsxdec_path = os.path.join(args.tools_path, "jpsxdec", "jpsxdec.jar")
if not os.path.exists(jpsxdec_path):
print("ERROR: Could not find jPSXdec! %s" % (jpsxdec_path))
assert(os.path.exists(jpsxdec_path) == True)
filenames = [os.path.basename(filename) for filename in glob.glob(os.path.join(args.input, "*"))]
os.makedirs(args.output, exist_ok=True)
pool = multiprocessing.Pool()
for filename in filenames:
frame_manager = FrameManager(args.output, args.input, jpsxdec_path)
pool.apply_async(frame_manager.get_raw_frames, args=(filename,))
pool.close()
pool.join()