Spaces:
Running
Running
File size: 4,006 Bytes
5c2ed06 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
/**
* Library made to simplify accessing / connecting to postgres databases,
* and to cleanly handle when the pg module isn't installed.
* @author mia-pi-git
*/
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore in case module doesn't exist
import type * as PG from 'pg';
import type { SQLStatement } from 'sql-template-strings';
import * as Streams from './streams';
import { FS } from './fs';
import * as Utils from './utils';
interface MigrationOptions {
table: string;
migrationsFolder: string;
baseSchemaFile: string;
}
export class PostgresDatabase {
private pool: PG.Pool;
constructor(config = PostgresDatabase.getConfig()) {
try {
this.pool = new (require('pg').Pool)(config);
} catch {
this.pool = null!;
}
}
destroy() {
return this.pool.end();
}
async query(statement: string | SQLStatement, values?: any[]) {
if (!this.pool) {
throw new Error(`Attempting to use postgres without 'pg' installed`);
}
let result;
try {
result = await this.pool.query(statement, values);
} catch (e: any) {
// postgres won't give accurate stacks unless we do this
throw new Error(e.message);
}
return result?.rows || [];
}
static getConfig() {
let config: AnyObject = {};
try {
config = require(FS.ROOT_PATH + '/config/config').usepostgres;
if (!config) throw new Error('Missing config for pg database');
} catch {}
return config;
}
async transaction(callback: (conn: PG.PoolClient) => any, depth = 0): Promise<any> {
const conn = await this.pool.connect();
await conn.query(`BEGIN`);
let result;
try {
result = await callback(conn);
} catch (e: any) {
await conn.query(`ROLLBACK`);
// two concurrent transactions conflicted, try again
if (e.code === '40001' && depth <= 10) {
return this.transaction(callback, depth + 1);
// There is a bug in Postgres that causes some
// serialization failures to be reported as failed
// unique constraint checks. Only retrying once since
// it could be our fault (thanks chaos for this info / the first half of this comment)
} else if (e.code === '23505' && !depth) {
return this.transaction(callback, depth + 1);
} else {
throw e;
}
}
await conn.query(`COMMIT`);
return result;
}
stream<T = any>(query: string) {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const db = this;
return new Streams.ObjectReadStream<T>({
async read(this: Streams.ObjectReadStream<T>) {
const result = await db.query(query) as T[];
if (!result.length) return this.pushEnd();
// getting one row at a time means some slower queries
// might help with performance
this.buf.push(...result);
},
});
}
async ensureMigrated(opts: MigrationOptions) {
let value;
try {
const stored = await this.query(
`SELECT value FROM db_info WHERE key = 'version' AND name = $1`, [opts.table]
);
if (stored.length) {
value = stored[0].value || "0";
}
} catch {
await this.query(`CREATE TABLE db_info (name TEXT NOT NULL, key TEXT NOT NULL, value TEXT NOT NULL)`);
}
if (!value) { // means nothing inserted - create row
value = "0";
await this.query('INSERT INTO db_info (name, key, value) VALUES ($1, $2, $3)', [opts.table, 'version', value]);
}
value = Number(value);
const files = FS(opts.migrationsFolder)
.readdirSync()
.filter(f => f.endsWith('.sql'))
.map(f => Number(f.slice(1).split('.')[0]));
Utils.sortBy(files, f => f);
const curVer = files[files.length - 1] || 0;
if (curVer !== value) {
if (!value) {
try {
await this.query(`SELECT * FROM ${opts.table} LIMIT 1`);
} catch {
await this.query(FS(opts.baseSchemaFile).readSync());
}
}
for (const n of files) {
if (n <= value) continue;
await this.query(FS(`${opts.migrationsFolder}/v${n}.sql`).readSync());
await this.query(
`UPDATE db_info SET value = $1 WHERE key = 'version' AND name = $2`, [`${n}`, opts.table]
);
}
}
}
}
|