Eric Bower
·
08 Apr 24
migrate.go
1package main
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log/slog"
8 "os"
9
10 "github.com/picosh/pico/db"
11 "github.com/picosh/pico/db/postgres"
12 "github.com/picosh/pico/shared"
13)
14
15func findPosts(dbpool *sql.DB) ([]*db.Post, error) {
16 var posts []*db.Post
17 rs, err := dbpool.Query(`SELECT
18 posts.id, user_id, filename, title, text, description,
19 posts.created_at, publish_at, posts.updated_at, hidden, COALESCE(views, 0) as views
20 FROM posts
21 LEFT OUTER JOIN post_analytics ON post_analytics.post_id = posts.id
22 `)
23 if err != nil {
24 return posts, err
25 }
26 for rs.Next() {
27 post := &db.Post{}
28 err := rs.Scan(
29 &post.ID,
30 &post.UserID,
31 &post.Filename,
32 &post.Title,
33 &post.Text,
34 &post.Description,
35 &post.CreatedAt,
36 &post.PublishAt,
37 &post.UpdatedAt,
38 &post.Hidden,
39 &post.Views,
40 )
41 if err != nil {
42 return posts, err
43 }
44
45 posts = append(posts, post)
46 }
47 if rs.Err() != nil {
48 return posts, rs.Err()
49 }
50 return posts, nil
51}
52
53func insertUser(tx *sql.Tx, user *db.User) error {
54 _, err := tx.Exec(
55 "INSERT INTO app_users (id, name, created_at) VALUES($1, $2, $3)",
56 user.ID,
57 user.Name,
58 user.CreatedAt,
59 )
60 return err
61}
62
63func insertPublicKey(tx *sql.Tx, pk *db.PublicKey) error {
64 _, err := tx.Exec(
65 "INSERT INTO public_keys (id, user_id, public_key, created_at) VALUES ($1, $2, $3, $4)",
66 pk.ID,
67 pk.UserID,
68 pk.Key,
69 pk.CreatedAt,
70 )
71 return err
72}
73
74func insertPost(tx *sql.Tx, post *db.Post) error {
75 _, err := tx.Exec(
76 `INSERT INTO posts
77 (id, user_id, title, text, created_at, publish_at, updated_at, description, filename, hidden, cur_space, views)
78 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`,
79 post.ID,
80 post.UserID,
81 post.Title,
82 post.Text,
83 post.CreatedAt,
84 post.PublishAt,
85 post.UpdatedAt,
86 post.Description,
87 post.Filename,
88 post.Hidden,
89 post.Space,
90 post.Views,
91 )
92 return err
93}
94
95type ConflictData struct {
96 User *db.User
97 Pks []*db.PublicKey
98 ReplaceWithID string
99}
100
101func main() {
102 logger := slog.Default()
103
104 listsCfg := shared.NewConfigSite()
105 listsCfg.Logger = logger
106 listsCfg.DbURL = os.Getenv("LISTS_DB_URL")
107 listsDb := postgres.NewDB(listsCfg.DbURL, listsCfg.Logger)
108
109 proseCfg := shared.NewConfigSite()
110 proseCfg.DbURL = os.Getenv("PROSE_DB_URL")
111 proseCfg.Logger = logger
112 proseDb := postgres.NewDB(proseCfg.DbURL, proseCfg.Logger)
113
114 picoCfg := shared.NewConfigSite()
115 picoCfg.Logger = logger
116 picoCfg.DbURL = os.Getenv("PICO_DB_URL")
117 picoDb := postgres.NewDB(picoCfg.DbURL, picoCfg.Logger)
118
119 ctx := context.Background()
120 tx, err := picoDb.Db.BeginTx(ctx, nil)
121 if err != nil {
122 panic(err)
123 }
124 defer func() {
125 err = tx.Rollback()
126 }()
127
128 logger.Info("Finding prose users")
129 proseUsers, err := proseDb.FindUsers()
130 if err != nil {
131 panic(err)
132 }
133
134 logger.Info("Finding lists users")
135 listUsers, err := listsDb.FindUsers()
136 if err != nil {
137 panic(err)
138 }
139
140 logger.Info("Adding prose users and public keys to PICO db")
141 userMap := map[string]*db.User{}
142 for _, proseUser := range proseUsers {
143 userMap[proseUser.Name] = proseUser
144
145 err = insertUser(tx, proseUser)
146 if err != nil {
147 panic(err)
148 }
149
150 proseKeys, err := proseDb.FindKeysForUser(proseUser)
151 if err != nil {
152 panic(err)
153 }
154
155 for _, prosePK := range proseKeys {
156 err = insertPublicKey(tx, prosePK)
157 if err != nil {
158 panic(err)
159 }
160 }
161 }
162
163 noconflicts := []*ConflictData{}
164 conflicts := []*ConflictData{}
165 updateIDs := []*ConflictData{}
166 logger.Info("Finding conflicts")
167 for _, listUser := range listUsers {
168 listKeys, err := listsDb.FindKeysForUser(listUser)
169 if err != nil {
170 panic(err)
171 }
172
173 data := &ConflictData{
174 User: listUser,
175 Pks: listKeys,
176 }
177
178 if userMap[listUser.Name] == nil {
179 noconflicts = append(noconflicts, data)
180 continue
181 } else {
182 proseUser := userMap[listUser.Name]
183 proseKeys, err := proseDb.FindKeysForUser(proseUser)
184 if err != nil {
185 panic(err)
186 }
187
188 if len(listKeys) != len(proseKeys) {
189 conflicts = append(conflicts, data)
190 continue
191 }
192
193 pkMap := map[string]bool{}
194 for _, prosePK := range proseKeys {
195 pkMap[prosePK.Key] = true
196 }
197
198 conflicted := false
199 for _, listPK := range listKeys {
200 if !pkMap[listPK.Key] {
201 conflicted = true
202 conflicts = append(conflicts, data)
203 break
204 }
205 }
206
207 if !conflicted {
208 data.ReplaceWithID = proseUser.ID
209 updateIDs = append(updateIDs, data)
210 }
211 }
212 }
213
214 logger.Info("adding records with no conflicts", "len", len(noconflicts))
215 for _, data := range noconflicts {
216 err = insertUser(tx, data.User)
217 if err != nil {
218 panic(err)
219 }
220
221 for _, pk := range data.Pks {
222 err = insertPublicKey(tx, pk)
223 if err != nil {
224 panic(err)
225 }
226 }
227 }
228
229 logger.Info("adding records with conflicts", "len", len(conflicts))
230 for _, data := range conflicts {
231 data.User.Name = fmt.Sprintf("%stmp", data.User.Name)
232 err = insertUser(tx, data.User)
233 if err != nil {
234 panic(err)
235 }
236
237 for _, pk := range data.Pks {
238 err = insertPublicKey(tx, pk)
239 if err != nil {
240 panic(err)
241 }
242 }
243 }
244
245 prosePosts, err := findPosts(proseDb.Db)
246 if err != nil {
247 panic(err)
248 }
249
250 logger.Info("Adding posts from prose.sh")
251 for _, post := range prosePosts {
252 post.Space = "prose"
253 err = insertPost(tx, post)
254 if err != nil {
255 panic(err)
256 }
257 }
258
259 listPosts, err := findPosts(listsDb.Db)
260 if err != nil {
261 panic(err)
262 }
263
264 logger.Info("Adding posts from lists.sh")
265 for _, post := range listPosts {
266 updated := false
267 for _, alreadyAdded := range updateIDs {
268 if post.UserID == alreadyAdded.User.ID {
269 // we need to change the ID for these posts to the prose user id
270 // because we were able to determine it was the same user
271 post.UserID = alreadyAdded.ReplaceWithID
272 post.Space = "lists"
273 err = insertPost(tx, post)
274 if err != nil {
275 panic(err)
276 }
277 updated = true
278 break
279 }
280 }
281
282 if updated {
283 continue
284 }
285
286 post.Space = "lists"
287 err = insertPost(tx, post)
288 if err != nil {
289 panic(err)
290 }
291 }
292
293 logger.Info("Committing transactions to PICO db")
294 // Commit the transaction.
295 if err = tx.Commit(); err != nil {
296 panic(err)
297 }
298}