diff --git a/bin/update-software-hashes.mjs b/bin/update-software-hashes.mjs new file mode 100755 index 0000000..788ebfa --- /dev/null +++ b/bin/update-software-hashes.mjs @@ -0,0 +1,479 @@ +#!/usr/bin/env node + +// Compute MD5, CRC32 and size for the inner tape file inside each download zip. +// Populates the `software_hashes` table and exports a JSON snapshot to +// data/zxdb/software_hashes.json for reimport after DB wipes. +// +// Usage: +// node bin/update-software-hashes.mjs [flags] +// +// Flags: +// --rebuild-all Ignore state and reprocess every download +// --start-from-id=N Start processing from download id N +// --export-only Skip processing, just export current table to JSON +// --quiet Reduce log output +// --verbose Force verbose output (default) + +import dotenv from "dotenv"; +import dotenvExpand from "dotenv-expand"; +dotenvExpand.expand(dotenv.config()); + +import { z } from "zod"; +import mysql from "mysql2/promise"; +import fs from "fs/promises"; +import path from "path"; +import { createReadStream } from "fs"; +import { createHash } from "crypto"; +import { pipeline } from "stream/promises"; +import { Transform } from "stream"; +import { fileURLToPath } from "url"; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); +const PROJECT_ROOT = path.resolve(__dirname, ".."); + +// ---- CLI flags ---- +const ARGV = new Set(process.argv.slice(2)); +const QUIET = ARGV.has("--quiet"); +const VERBOSE = ARGV.has("--verbose") || !QUIET; +const REBUILD_ALL = ARGV.has("--rebuild-all"); +const EXPORT_ONLY = ARGV.has("--export-only"); + +// Parse --start-from-id=N +let CLI_START_FROM = 0; +for (const arg of process.argv.slice(2)) { + const m = arg.match(/^--start-from-id=(\d+)$/); + if (m) CLI_START_FROM = parseInt(m[1], 10); +} + +function logInfo(msg) { if (VERBOSE) console.log(msg); } +function logWarn(msg) { console.warn(msg); } +function logError(msg) { console.error(msg); } + +// ---- Environment ---- +const envSchema = z.object({ + ZXDB_URL: z.string().url().refine((s) => s.startsWith("mysql://"), { + message: "ZXDB_URL must be a valid mysql:// URL", + }), + CDN_CACHE: z.string().min(1, "CDN_CACHE must be set to the local CDN mirror root"), +}); + +const parsedEnv = envSchema.safeParse(process.env); +if (!parsedEnv.success) { + logError("Invalid environment variables:\n" + JSON.stringify(parsedEnv.error.format(), null, 2)); + process.exit(1); +} +const { ZXDB_URL, CDN_CACHE } = parsedEnv.data; + +const SNAPSHOT_PATH = path.join(PROJECT_ROOT, "data", "zxdb", "software_hashes.json"); +const STATE_FILE = path.join(CDN_CACHE, ".update-software-hashes.state.json"); + +// Filetype IDs for tape images +const TAPE_FILETYPE_IDS = [8, 22]; + +// Tape file extensions in priority order (most common first) +const TAPE_EXTENSIONS = [".tap", ".tzx", ".pzx", ".csw"]; + +// ---- DB ---- +const pool = mysql.createPool({ + uri: ZXDB_URL, + connectionLimit: 10, + maxPreparedStatements: 256, +}); + +// ---- Path mapping (mirrors sync-downloads.mjs) ---- +function toLocalPath(fileLink) { + if (fileLink.startsWith("/zxdb/sinclair/")) { + return path.join(CDN_CACHE, "SC", fileLink.slice("/zxdb/sinclair".length)); + } + if (fileLink.startsWith("/pub/sinclair/")) { + return path.join(CDN_CACHE, "WoS", fileLink.slice("/pub/sinclair".length)); + } + return null; +} + +// ---- State management ---- +async function loadState() { + try { + const raw = await fs.readFile(STATE_FILE, "utf8"); + return JSON.parse(raw); + } catch { + return null; + } +} + +async function saveStateAtomic(state) { + const tmp = STATE_FILE + ".tmp"; + await fs.writeFile(tmp, JSON.stringify(state, null, 2), "utf8"); + await fs.rename(tmp, STATE_FILE); +} + +// ---- Zip extraction ---- + +// Use Node.js built-in (node:zlib for deflate) + manual zip parsing +// to avoid external dependencies. Zip files in ZXDB are simple (no encryption, single file). + +async function extractZipContents(zipPath, contentsDir) { + const { execFile } = await import("child_process"); + const { promisify } = await import("util"); + const execFileAsync = promisify(execFile); + + await fs.mkdir(contentsDir, { recursive: true }); + + try { + // Use system unzip, quoting the path to handle brackets in filenames + await execFileAsync("unzip", ["-o", "-d", contentsDir, zipPath], { + maxBuffer: 50 * 1024 * 1024, + }); + } catch (err) { + // unzip returns exit code 1 for warnings (e.g. "appears to use backslashes") + // which is non-fatal — only fail on actual extraction errors + if (err.code !== 1) { + throw new Error(`unzip failed for ${zipPath}: ${err.message}`); + } + } +} + +// ---- Find tape file inside _CONTENTS ---- + +async function findTapeFile(contentsDir) { + let entries; + try { + entries = await fs.readdir(contentsDir, { recursive: true, withFileTypes: true }); + } catch { + return null; + } + + // Collect all tape files grouped by extension priority + const candidates = []; + for (const entry of entries) { + if (!entry.isFile()) continue; + const ext = path.extname(entry.name).toLowerCase(); + const priority = TAPE_EXTENSIONS.indexOf(ext); + if (priority === -1) continue; + + const fullPath = path.join(entry.parentPath ?? entry.path, entry.name); + candidates.push({ path: fullPath, ext, priority, name: entry.name }); + } + + if (candidates.length === 0) return null; + + // Sort by priority (lowest index = highest priority), then alphabetically + candidates.sort((a, b) => a.priority - b.priority || a.name.localeCompare(b.name)); + + // Return the best candidate + return candidates[0]; +} + +// ---- Hash computation ---- + +async function computeHashes(filePath) { + const md5 = createHash("md5"); + let crc = 0xFFFFFFFF; + let size = 0; + + // CRC32 lookup table + const crcTable = new Uint32Array(256); + for (let i = 0; i < 256; i++) { + let c = i; + for (let j = 0; j < 8; j++) { + c = (c & 1) ? (0xEDB88320 ^ (c >>> 1)) : (c >>> 1); + } + crcTable[i] = c; + } + + const transform = new Transform({ + transform(chunk, encoding, callback) { + md5.update(chunk); + size += chunk.length; + for (let i = 0; i < chunk.length; i++) { + crc = crcTable[(crc ^ chunk[i]) & 0xFF] ^ (crc >>> 8); + } + callback(null, chunk); + }, + }); + + const stream = createReadStream(filePath); + // Pipe through transform (which computes hashes) and discard output + await pipeline(stream, transform, async function* (source) { + for await (const _ of source) { /* drain */ } + }); + + const crc32Final = ((crc ^ 0xFFFFFFFF) >>> 0).toString(16).padStart(8, "0"); + return { + md5: md5.digest("hex"), + crc32: crc32Final, + sizeBytes: size, + }; +} + +// ---- Ensure software_hashes table exists ---- + +async function ensureTable() { + await pool.query(` + CREATE TABLE IF NOT EXISTS software_hashes ( + download_id INT NOT NULL PRIMARY KEY, + md5 VARCHAR(32) NOT NULL, + crc32 VARCHAR(8) NOT NULL, + size_bytes BIGINT NOT NULL, + inner_path VARCHAR(500) NOT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_sh_md5 (md5), + INDEX idx_sh_crc32 (crc32) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + `); +} + +// ---- JSON export ---- + +async function exportSnapshot() { + const [rows] = await pool.query( + "SELECT download_id, md5, crc32, size_bytes, inner_path, updated_at FROM software_hashes ORDER BY download_id" + ); + + const snapshot = { + exportedAt: new Date().toISOString(), + count: rows.length, + rows: rows.map((r) => ({ + download_id: r.download_id, + md5: r.md5, + crc32: r.crc32, + size_bytes: Number(r.size_bytes), + inner_path: r.inner_path, + updated_at: r.updated_at instanceof Date ? r.updated_at.toISOString() : r.updated_at, + })), + }; + + // Ensure directory exists + await fs.mkdir(path.dirname(SNAPSHOT_PATH), { recursive: true }); + + // Atomic write + const tmp = SNAPSHOT_PATH + ".tmp"; + await fs.writeFile(tmp, JSON.stringify(snapshot, null, 2), "utf8"); + await fs.rename(tmp, SNAPSHOT_PATH); + + logInfo(`Exported ${rows.length} rows to ${SNAPSHOT_PATH}`); + return rows.length; +} + +// ---- Main processing loop ---- + +let currentState = null; + +async function main() { + await ensureTable(); + + if (EXPORT_ONLY) { + const count = await exportSnapshot(); + logInfo(`Export complete: ${count} rows.`); + await pool.end(); + return; + } + + // Determine start point + const prior = await loadState(); + let resumeFrom = CLI_START_FROM; + if (!REBUILD_ALL && !CLI_START_FROM && prior?.lastProcessedId) { + resumeFrom = prior.lastProcessedId + 1; + } + + const startedAt = new Date().toISOString(); + currentState = { + version: 1, + startedAt, + updatedAt: startedAt, + startFromId: resumeFrom, + lastProcessedId: prior?.lastProcessedId ?? -1, + processed: 0, + hashed: 0, + skipped: 0, + errors: 0, + error: undefined, + }; + + // Query tape-image downloads + const placeholders = TAPE_FILETYPE_IDS.map(() => "?").join(", "); + const [rows] = await pool.query( + `SELECT id, file_link, file_size FROM downloads + WHERE filetype_id IN (${placeholders}) AND id >= ? + ORDER BY id ASC`, + [...TAPE_FILETYPE_IDS, resumeFrom] + ); + + // Also get total count for progress display + const [totalRows] = await pool.query( + `SELECT COUNT(*) as cnt FROM downloads WHERE filetype_id IN (${placeholders})`, + TAPE_FILETYPE_IDS + ); + const total = totalRows[0].cnt; + + logInfo(`Processing ${rows.length} tape-image downloads (total in DB: ${total}, starting from id >= ${resumeFrom})`); + + let processed = 0; + let hashed = 0; + let skipped = 0; + let errors = 0; + + for (const row of rows) { + const { id, file_link: fileLink } = row; + + try { + const localZip = toLocalPath(fileLink); + if (!localZip) { + logWarn(` [${id}] Unsupported file_link prefix: ${fileLink}`); + errors++; + processed++; + currentState.lastProcessedId = id; + continue; + } + + // Check if zip exists locally + try { + await fs.access(localZip); + } catch { + // Zip not synced yet — skip silently + skipped++; + processed++; + currentState.lastProcessedId = id; + if (processed % 500 === 0) { + await checkpoint(); + } + continue; + } + + // Check/create _CONTENTS + const contentsDir = localZip + "_CONTENTS"; + let contentsExisted = false; + try { + await fs.access(contentsDir); + contentsExisted = true; + } catch { + // Need to extract + } + + if (!contentsExisted) { + try { + await extractZipContents(localZip, contentsDir); + } catch (err) { + logWarn(` [${id}] Extract failed: ${err.message}`); + errors++; + processed++; + currentState.lastProcessedId = id; + continue; + } + } + + // Find tape file + const tapeFile = await findTapeFile(contentsDir); + if (!tapeFile) { + // No tape file found inside zip — unusual but not fatal + if (VERBOSE) logWarn(` [${id}] No tape file in ${contentsDir}`); + skipped++; + processed++; + currentState.lastProcessedId = id; + continue; + } + + // Compute hashes + const hashes = await computeHashes(tapeFile.path); + + // Relative path inside _CONTENTS for the inner_path column + const innerPath = path.relative(contentsDir, tapeFile.path); + + // Upsert + await pool.query( + `INSERT INTO software_hashes (download_id, md5, crc32, size_bytes, inner_path, updated_at) + VALUES (?, ?, ?, ?, ?, NOW()) + ON DUPLICATE KEY UPDATE + md5 = VALUES(md5), + crc32 = VALUES(crc32), + size_bytes = VALUES(size_bytes), + inner_path = VALUES(inner_path), + updated_at = NOW()`, + [id, hashes.md5, hashes.crc32, hashes.sizeBytes, innerPath] + ); + + hashed++; + processed++; + currentState.lastProcessedId = id; + currentState.hashed = hashed; + currentState.processed = processed; + currentState.skipped = skipped; + currentState.errors = errors; + currentState.updatedAt = new Date().toISOString(); + + if (processed % 100 === 0) { + await checkpoint(); + logInfo(`... processed=${processed}/${rows.length}, hashed=${hashed}, skipped=${skipped}, errors=${errors}`); + } + } catch (err) { + logError(` [${id}] Unexpected error: ${err.message}`); + errors++; + processed++; + currentState.lastProcessedId = id; + currentState.errors = errors; + } + } + + // Final state save + currentState.processed = processed; + currentState.hashed = hashed; + currentState.skipped = skipped; + currentState.errors = errors; + currentState.updatedAt = new Date().toISOString(); + await saveStateAtomic(currentState); + + logInfo(`\nProcessing complete: processed=${processed}, hashed=${hashed}, skipped=${skipped}, errors=${errors}`); + + // Export snapshot + logInfo("\nExporting JSON snapshot..."); + await exportSnapshot(); + + await pool.end(); + logInfo("Done."); + + async function checkpoint() { + currentState.processed = processed; + currentState.hashed = hashed; + currentState.skipped = skipped; + currentState.errors = errors; + currentState.updatedAt = new Date().toISOString(); + try { + await saveStateAtomic(currentState); + } catch (e) { + logError(`Failed to write state: ${e?.message || e}`); + } + } +} + +// ---- Graceful shutdown ---- +process.on("SIGINT", async () => { + logWarn("\nInterrupted (SIGINT). Writing state..."); + try { + if (currentState) { + currentState.updatedAt = new Date().toISOString(); + await saveStateAtomic(currentState); + logWarn(`State saved at: ${STATE_FILE}`); + } + } catch (e) { + logError(`Failed to write state on SIGINT: ${e?.message || e}`); + } + try { await pool.end(); } catch {} + process.exit(130); +}); + +// Run +main().catch(async (err) => { + logError(`Fatal error: ${err.message}\n${err.stack || ""}`); + try { + if (currentState) { + currentState.updatedAt = new Date().toISOString(); + currentState.error = { message: err.message, stack: err.stack }; + await saveStateAtomic(currentState); + } + } catch (e) { + logError(`Failed to write state on fatal: ${e?.message || e}`); + } + try { await pool.end(); } catch {} + process.exit(1); +});