Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- This code provides basic functionality to work with a database
- This code is published under the MIT license.
- Posted: 17 Oct 2024
- Author: Dmitry Anderson
- Git: https://github.com/4nd3r5on
- */
- import * as pg from "https://deno.land/x/postgres/mod.ts";
- import * as path from "jsr:@std/path"
- export interface MigrationsConfig {
- pool: pg.Pool;
- mLocal: LocalMigrations;
- };
- export interface LocalMigrations {
- /* Versions are sorted in the order from smaller ID to a bigger ID */
- versionsUP: bigint[];
- migrationsUP: Map<bigint, Migration>;
- migrationsDown: Map<bigint, Migration>;
- };
- export interface VerAndLabel {
- version: bigint,
- label: string | null
- };
- export interface Migration extends VerAndLabel {
- query: string,
- };
- const withClient = (pool: pg.Pool, callback: (client: pg.PoolClient) => Promise<void> | void) => {
- pool.connect().then(c => {
- callback(c)
- c.release()
- })
- }
- const tx = async (pool: pg.Pool, callback: (client: pg.PoolClient) => Promise<void>): Promise<void> => {
- const client = await pool.connect()
- try {
- await client.queryArray("BEGIN")
- await callback(client)
- await client.queryArray("COMMIT")
- } catch (err) {
- await client.queryArray("ROLLBACK")
- throw err
- } finally {
- client.release()
- }
- }
- // If returned version is -1 -- version is invalid
- const parseIdAndLabel = (verAndLabelStr: string): VerAndLabel => {
- const result: VerAndLabel = {version: BigInt("-1"), label: null};
- const underscoreIdx: number = verAndLabelStr.indexOf("_");
- let idStr: string
- let label: string | null = null;
- if (underscoreIdx === -1) {
- idStr = verAndLabelStr;
- } else {
- idStr = verAndLabelStr.substring(0, underscoreIdx);
- if (underscoreIdx < verAndLabelStr.length - 1) {
- label = verAndLabelStr.substring(underscoreIdx + 1, verAndLabelStr.length)
- };
- };
- if (Number.isNaN(Number(BigInt(idStr))) || BigInt(idStr) < 0) {
- return result;
- };
- return {
- version: BigInt(idStr),
- label: label,
- };
- };
- const loadMigrationDir = (dirPath: string): LocalMigrations => {
- const result: LocalMigrations = {
- versionsUP: [],
- migrationsUP: new Map<bigint, Migration>,
- migrationsDown: new Map<bigint, Migration>,
- }
- for (const dirEntry of Deno.readDirSync("/")) {
- const fname = dirEntry.name
- const [idAndLabel, action, ext] = fname.split(".", 3);
- if (ext != "sql") { continue };
- const verAndLabel = parseIdAndLabel(idAndLabel);
- const decoder = new TextDecoder("utf-8");
- const queryBuff = Deno.readFileSync(path.join(dirPath, fname))
- const migration: Migration = {
- version: verAndLabel.version,
- label: verAndLabel.label,
- query: decoder.decode(queryBuff),
- }
- if (action === "up") {
- const existingMigration = result.migrationsUP.get(verAndLabel.version);
- if (existingMigration) {
- throw `Duplicate migration version files: ${existingMigration} and ${fname}`;
- };
- result.versionsUP = [...result.versionsUP, verAndLabel.version];
- result.migrationsUP.set(verAndLabel.version, migration);
- } else {
- const existingMigration = result.migrationsDown.get(verAndLabel.version);
- if (existingMigration) {
- throw `Duplicate migration version files: ${existingMigration} and ${fname}`;
- }
- result.migrationsDown.set(verAndLabel.version, migration);
- }
- }
- return result;
- }
- // Just call it everytime if you're not sure if migrations table exists
- // It's safe to run this function even if table exists
- const createMigrationTable = (pool: pg.Pool) => {
- withClient(pool, async c => {await c.queryArray(`
- CREATE TABLE IF NOT EXISTS applied_migrations (
- version BIGINT UNIQUE NOT NULL,
- label TEXT
- )`)})
- }
- // Returns an array of versions and labels from smaller to bigger version number.
- // Make sure that migrations table exists before calling this function
- export const getAppliedMigrations = (cfg: MigrationsConfig): VerAndLabel[] => {
- const { pool } = cfg
- let qresult: pg.QueryObjectResult<VerAndLabel>
- withClient(pool, async c => {qresult = await c.queryObject<VerAndLabel>(
- "SELECT version, label FROM applied_migrations ORDER BY version"
- )})
- return qresult!.rows
- }
- // Rollback a single migration using down migration from a file
- export const rollbackMigration = async (pool: pg.Pool, fpath: string, version: number): Promise<void> => {
- await tx(pool, async (client) => {
- const decoder = new TextDecoder("utf-8");
- const queryBuff = await Deno.readFile(fpath).catch(err => { throw err });
- await client.queryArray(decoder.decode(queryBuff));
- await client.queryArray("DELETE FROM applied_migrations WHERE version=$1", [version]);
- })
- }
- // Before rolling back be sure that applied migrations contain target version
- export const rollbackToVer = async (
- pool: pg.Pool,
- mLocal: LocalMigrations,
- mApplied: VerAndLabel[],
- targetVer: bigint,
- ) => {
- let mRollaback: Migration[] = []
- for (let i = mApplied.length - 1; i >= 0; i--) {
- const mVer = mApplied[i].version
- if (mVer < targetVer) {
- break
- }
- const mDown = mLocal.migrationsDown.get(mVer)
- if (!mDown) {
- throw `No down migration for version ${mVer}`
- }
- if (mDown.label != mApplied[i].label) {
- throw `Label of the applied migration (${mApplied[i].label}) doesn't match `+
- `with local down migration (${mDown.label}). Migration version: ${mVer}`
- }
- mRollaback = [...mRollaback, mDown]
- }
- return await tx(pool, async (client) => {
- for (let i = 0; i < mRollaback.length; i++) {
- const migration = mRollaback[i]
- await client.queryArray(migration.query);
- await client.queryArray("DELETE FROM applied_migrations WHERE version=$1", [migration.version]);
- }
- })
- };
- // if return is null -- every version is clean
- // if return is -1 -- no clean versions
- export const findLastCleanVer = (localMigrationIDs: bigint[], appliedMigrations: VerAndLabel[]): bigint | null => {
- if (appliedMigrations.length < 1) {
- return null
- }
- const minLen = Math.min(localMigrationIDs.length, appliedMigrations.length)
- let i = 0
- let clean: boolean = true
- for (; i < minLen; i++) {
- if (localMigrationIDs[i] !== appliedMigrations[i].version) {
- clean = false
- break;
- }
- }
- return clean ? null : localMigrationIDs[i-1]
- }
- // Make sure that migrations table exists before calling this function
- export const rollbackToCleanVer = (cfg: MigrationsConfig): bigint => {
- const mApplied = getAppliedMigrations(cfg)
- if (mApplied.length < 1) {
- return -1n
- }
- const cleanVer = findLastCleanVer(cfg.mLocal.versionsUP, mApplied)
- if (cleanVer === null) {
- return mApplied[mApplied.length - 1].version
- }
- rollbackToVer(cfg.pool, cfg.mLocal, mApplied, cleanVer)
- return cleanVer
- }
- // Apply a single migration from a file
- export const applyMigration = async (pool: pg.Pool, fpath: string, version: number, label: string) => {
- return await tx(pool, async (client) => {
- const decoder = new TextDecoder("utf-8");
- const queryBuff = await Deno.readFile(fpath)
- await client.queryArray(decoder.decode(queryBuff))
- await client.queryArray("INSERT INTO applied_migrations (version, label) VALUES ($1, $2)", [version, label])
- })
- }
- // Before upgrading you must be sure that DB version is clean
- // (current version and all version bellow are exist in local migrations up)
- export const upgradeToVer = async (
- pool: pg.Pool,
- localMigrations: LocalMigrations,
- currentVer: bigint,
- targetVer: bigint,
- ) => {
- let idxCurrent: number = -1
- if (BigInt(currentVer.toString()) > -1) {
- idxCurrent = localMigrations.versionsUP.indexOf(currentVer)
- if (idxCurrent < 0) { throw "Current version isn't listed in local migrations" }
- }
- const idxTarget = localMigrations.versionsUP.indexOf(targetVer)
- if (idxTarget < 0) { throw "Target version isn't listed in local migrations" }
- const applyVersions = localMigrations.versionsUP
- applyVersions.slice(idxCurrent+1, idxTarget)
- return await tx(pool, async (client) => {
- let mVer: bigint
- for (let i = 0; i < applyVersions.length; i++) {
- mVer = applyVersions[i]
- const migration = localMigrations.migrationsUP.get(mVer)
- if (!migration) {
- throw `Failed to get migration version ${mVer}. Probably error in migrations loading`
- }
- await client.queryArray(migration.query)
- await client.queryArray("INSERT INTO applied_migrations (version, label) "+
- "VALUES ($1, $2)", [migration.version, migration.label])
- }
- })
- }
- // Migrates to some specific version
- // Make sure that migrations table exists before calling this function
- //
- // If you're migrating down -- make sure u have
- export const migrateTo = async (cfg: MigrationsConfig, targetVer: bigint) => {
- const { pool, mLocal } = cfg
- const mApplied = getAppliedMigrations(cfg)
- let mCurrentVer: bigint | null = null
- if (mApplied.length >= 1) {
- mCurrentVer = mApplied[mApplied.length - 1].version
- }
- if (mCurrentVer === targetVer) {
- // Already up to date
- return
- } else if (!mCurrentVer || mCurrentVer < targetVer) {
- const cleanVer = findLastCleanVer(mLocal.versionsUP, mApplied);
- if (cleanVer !== null) {
- throw "DB is dirty. Migrate to a clean version before upgrading DB"
- }
- await upgradeToVer(pool, mLocal, mCurrentVer || BigInt(-1), targetVer)
- } else {
- await rollbackToVer(pool, mLocal, mApplied, targetVer)
- }
- }
- interface DoMigrateConfig {
- pool: pg.Pool
- migrationsDir: string
- versionLimit: bigint | null
- }
- // Automated script for migrations on app start
- const doMigrate = async (cfg: DoMigrateConfig) => {
- const mLocal = loadMigrationDir(cfg.migrationsDir)
- const mCfg: MigrationsConfig = {
- pool: cfg.pool,
- mLocal: mLocal
- }
- if (mLocal.versionsUP.length < 1) {
- throw `No migrations were found in ${cfg.migrationsDir}`
- }
- const maxVersion = mLocal.versionsUP[mLocal.versionsUP.length - 1]
- const migrateToVer = cfg.versionLimit || maxVersion
- // If you're not sure if migration table exists
- // just call that function
- createMigrationTable(cfg.pool)
- rollbackToCleanVer(mCfg)
- await migrateTo(mCfg, migrateToVer)
- await new Promise(r => setTimeout(r, 300));
- const mApplied = getAppliedMigrations(mCfg)
- let currentVer: bigint | null = null
- if (mApplied.length > 0) {
- currentVer = mApplied[mApplied.length - 1].version
- }
- const mLastClean = findLastCleanVer(mLocal.versionsUP, mApplied)
- console.log(
- `DB Info:\n`+
- `\tDB Version Limit: ${cfg.versionLimit}\n`+
- `\tCurrent DB Version: ${currentVer}/${maxVersion}\n`+
- `\tDirty: ${mLastClean !== null ? true : false}\n`
- )
- }
- interface SetupDbCfg {
- migrationsDir: string
- versionLimit: bigint | null
- poolCfg: pg.ClientOptions | string
- poolSize: number,
- }
- const setupDB = async (cfg: SetupDbCfg): Promise<pg.Pool> => {
- const pool = new pg.Pool(cfg.poolCfg, 1)
- const { versionLimit, migrationsDir } = cfg
- await doMigrate({ pool, versionLimit, migrationsDir })
- return pool
- }
- export default setupDB
- export { tx, withClient, createMigrationTable }
- export type { SetupDbCfg }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement