repos / pico

pico services - prose.sh, pastes.sh, imgs.sh, feeds.sh, pgs.sh
git clone https://github.com/picosh/pico.git

pico / cmd / scripts / migrate
Eric Bower · 08 Apr 24

migrate.go

  1package main
  2
  3import (
  4	"context"
  5	"database/sql"
  6	"fmt"
  7	"log/slog"
  8	"os"
  9
 10	"github.com/picosh/pico/db"
 11	"github.com/picosh/pico/db/postgres"
 12	"github.com/picosh/pico/shared"
 13)
 14
 15func findPosts(dbpool *sql.DB) ([]*db.Post, error) {
 16	var posts []*db.Post
 17	rs, err := dbpool.Query(`SELECT
 18		posts.id, user_id, filename, title, text, description,
 19		posts.created_at, publish_at, posts.updated_at, hidden, COALESCE(views, 0) as views
 20		FROM posts
 21		LEFT OUTER JOIN post_analytics ON post_analytics.post_id = posts.id
 22	`)
 23	if err != nil {
 24		return posts, err
 25	}
 26	for rs.Next() {
 27		post := &db.Post{}
 28		err := rs.Scan(
 29			&post.ID,
 30			&post.UserID,
 31			&post.Filename,
 32			&post.Title,
 33			&post.Text,
 34			&post.Description,
 35			&post.CreatedAt,
 36			&post.PublishAt,
 37			&post.UpdatedAt,
 38			&post.Hidden,
 39			&post.Views,
 40		)
 41		if err != nil {
 42			return posts, err
 43		}
 44
 45		posts = append(posts, post)
 46	}
 47	if rs.Err() != nil {
 48		return posts, rs.Err()
 49	}
 50	return posts, nil
 51}
 52
 53func insertUser(tx *sql.Tx, user *db.User) error {
 54	_, err := tx.Exec(
 55		"INSERT INTO app_users (id, name, created_at) VALUES($1, $2, $3)",
 56		user.ID,
 57		user.Name,
 58		user.CreatedAt,
 59	)
 60	return err
 61}
 62
 63func insertPublicKey(tx *sql.Tx, pk *db.PublicKey) error {
 64	_, err := tx.Exec(
 65		"INSERT INTO public_keys (id, user_id, public_key, created_at) VALUES ($1, $2, $3, $4)",
 66		pk.ID,
 67		pk.UserID,
 68		pk.Key,
 69		pk.CreatedAt,
 70	)
 71	return err
 72}
 73
 74func insertPost(tx *sql.Tx, post *db.Post) error {
 75	_, err := tx.Exec(
 76		`INSERT INTO posts
 77			(id, user_id, title, text, created_at, publish_at, updated_at, description, filename, hidden, cur_space, views)
 78			VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`,
 79		post.ID,
 80		post.UserID,
 81		post.Title,
 82		post.Text,
 83		post.CreatedAt,
 84		post.PublishAt,
 85		post.UpdatedAt,
 86		post.Description,
 87		post.Filename,
 88		post.Hidden,
 89		post.Space,
 90		post.Views,
 91	)
 92	return err
 93}
 94
 95type ConflictData struct {
 96	User          *db.User
 97	Pks           []*db.PublicKey
 98	ReplaceWithID string
 99}
100
101func main() {
102	logger := slog.Default()
103
104	listsCfg := shared.NewConfigSite()
105	listsCfg.Logger = logger
106	listsCfg.DbURL = os.Getenv("LISTS_DB_URL")
107	listsDb := postgres.NewDB(listsCfg.DbURL, listsCfg.Logger)
108
109	proseCfg := shared.NewConfigSite()
110	proseCfg.DbURL = os.Getenv("PROSE_DB_URL")
111	proseCfg.Logger = logger
112	proseDb := postgres.NewDB(proseCfg.DbURL, proseCfg.Logger)
113
114	picoCfg := shared.NewConfigSite()
115	picoCfg.Logger = logger
116	picoCfg.DbURL = os.Getenv("PICO_DB_URL")
117	picoDb := postgres.NewDB(picoCfg.DbURL, picoCfg.Logger)
118
119	ctx := context.Background()
120	tx, err := picoDb.Db.BeginTx(ctx, nil)
121	if err != nil {
122		panic(err)
123	}
124	defer func() {
125		err = tx.Rollback()
126	}()
127
128	logger.Info("Finding prose users")
129	proseUsers, err := proseDb.FindUsers()
130	if err != nil {
131		panic(err)
132	}
133
134	logger.Info("Finding lists users")
135	listUsers, err := listsDb.FindUsers()
136	if err != nil {
137		panic(err)
138	}
139
140	logger.Info("Adding prose users and public keys to PICO db")
141	userMap := map[string]*db.User{}
142	for _, proseUser := range proseUsers {
143		userMap[proseUser.Name] = proseUser
144
145		err = insertUser(tx, proseUser)
146		if err != nil {
147			panic(err)
148		}
149
150		proseKeys, err := proseDb.FindKeysForUser(proseUser)
151		if err != nil {
152			panic(err)
153		}
154
155		for _, prosePK := range proseKeys {
156			err = insertPublicKey(tx, prosePK)
157			if err != nil {
158				panic(err)
159			}
160		}
161	}
162
163	noconflicts := []*ConflictData{}
164	conflicts := []*ConflictData{}
165	updateIDs := []*ConflictData{}
166	logger.Info("Finding conflicts")
167	for _, listUser := range listUsers {
168		listKeys, err := listsDb.FindKeysForUser(listUser)
169		if err != nil {
170			panic(err)
171		}
172
173		data := &ConflictData{
174			User: listUser,
175			Pks:  listKeys,
176		}
177
178		if userMap[listUser.Name] == nil {
179			noconflicts = append(noconflicts, data)
180			continue
181		} else {
182			proseUser := userMap[listUser.Name]
183			proseKeys, err := proseDb.FindKeysForUser(proseUser)
184			if err != nil {
185				panic(err)
186			}
187
188			if len(listKeys) != len(proseKeys) {
189				conflicts = append(conflicts, data)
190				continue
191			}
192
193			pkMap := map[string]bool{}
194			for _, prosePK := range proseKeys {
195				pkMap[prosePK.Key] = true
196			}
197
198			conflicted := false
199			for _, listPK := range listKeys {
200				if !pkMap[listPK.Key] {
201					conflicted = true
202					conflicts = append(conflicts, data)
203					break
204				}
205			}
206
207			if !conflicted {
208				data.ReplaceWithID = proseUser.ID
209				updateIDs = append(updateIDs, data)
210			}
211		}
212	}
213
214	logger.Info("adding records with no conflicts", "len", len(noconflicts))
215	for _, data := range noconflicts {
216		err = insertUser(tx, data.User)
217		if err != nil {
218			panic(err)
219		}
220
221		for _, pk := range data.Pks {
222			err = insertPublicKey(tx, pk)
223			if err != nil {
224				panic(err)
225			}
226		}
227	}
228
229	logger.Info("adding records with conflicts", "len", len(conflicts))
230	for _, data := range conflicts {
231		data.User.Name = fmt.Sprintf("%stmp", data.User.Name)
232		err = insertUser(tx, data.User)
233		if err != nil {
234			panic(err)
235		}
236
237		for _, pk := range data.Pks {
238			err = insertPublicKey(tx, pk)
239			if err != nil {
240				panic(err)
241			}
242		}
243	}
244
245	prosePosts, err := findPosts(proseDb.Db)
246	if err != nil {
247		panic(err)
248	}
249
250	logger.Info("Adding posts from prose.sh")
251	for _, post := range prosePosts {
252		post.Space = "prose"
253		err = insertPost(tx, post)
254		if err != nil {
255			panic(err)
256		}
257	}
258
259	listPosts, err := findPosts(listsDb.Db)
260	if err != nil {
261		panic(err)
262	}
263
264	logger.Info("Adding posts from lists.sh")
265	for _, post := range listPosts {
266		updated := false
267		for _, alreadyAdded := range updateIDs {
268			if post.UserID == alreadyAdded.User.ID {
269				// we need to change the ID for these posts to the prose user id
270				// because we were able to determine it was the same user
271				post.UserID = alreadyAdded.ReplaceWithID
272				post.Space = "lists"
273				err = insertPost(tx, post)
274				if err != nil {
275					panic(err)
276				}
277				updated = true
278				break
279			}
280		}
281
282		if updated {
283			continue
284		}
285
286		post.Space = "lists"
287		err = insertPost(tx, post)
288		if err != nil {
289			panic(err)
290		}
291	}
292
293	logger.Info("Committing transactions to PICO db")
294	// Commit the transaction.
295	if err = tx.Commit(); err != nil {
296		panic(err)
297	}
298}