main.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package main
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/csv"
  6. "fmt"
  7. "io"
  8. "log"
  9. "os"
  10. "os/signal"
  11. "path/filepath"
  12. "regexp"
  13. "strings"
  14. _ "github.com/mattn/go-sqlite3"
  15. "golang.org/x/term"
  16. )
  17. var MaxRecords = 1_000_000_000
  18. func main() {
  19. if len(os.Args) != 2 {
  20. log.Printf("usage: %s <csv_path>", os.Args[0])
  21. os.Exit(1)
  22. }
  23. csvPath, err := filepath.Abs(os.Args[1])
  24. if err != nil {
  25. log.Fatal(fmt.Sprintf("resolve absolute path for '%s': %v", os.Args[1], err))
  26. }
  27. ctx, cancel := context.WithCancel(context.Background())
  28. go func() {
  29. _, stop := signal.NotifyContext(
  30. ctx,
  31. os.Interrupt,
  32. os.Kill,
  33. )
  34. sig := make(chan os.Signal, 1)
  35. <-sig
  36. stop()
  37. log.Print("caught signal; shutting down")
  38. cancel()
  39. }()
  40. // open csv for read
  41. f, err := os.Open(os.Args[1])
  42. if err != nil {
  43. log.Fatal(fmt.Sprintf("open csv: %v", err))
  44. }
  45. defer func() {
  46. if err := f.Close(); err != nil {
  47. log.Printf("close csv '%s': %v", csvPath, err)
  48. }
  49. }()
  50. r := csv.NewReader(f)
  51. r.ReuseRecord = true
  52. // open db
  53. tmp := strings.TrimSuffix(csvPath, ".csv")
  54. dbPath := tmp + ".db"
  55. name := scrubName(filepath.Base(tmp))
  56. // no need to rollback or be crash resistant
  57. params := "?_synchronous=0&_journal_mode=OFF&_temp_store=2"
  58. db, err := sql.Open("sqlite3", dbPath+params)
  59. if err != nil {
  60. log.Fatal(fmt.Sprintf("open db '%s': %v", dbPath, err))
  61. }
  62. defer func() {
  63. if err := db.Close(); err != nil {
  64. log.Printf("close db '%s': %v", dbPath, err)
  65. }
  66. }()
  67. // create table, prepare insert
  68. var insert *sql.Stmt
  69. headers := make([]string, 0, 32)
  70. rec, err := r.Read()
  71. if err != nil && err != io.EOF {
  72. log.Fatal(fmt.Sprintf("read csv '%s': %v", csvPath, err))
  73. }
  74. for _, f := range rec {
  75. f = scrubName(f)
  76. headers = append(headers, f)
  77. }
  78. _, err = createTable(ctx, db, name, headers)
  79. if err != nil {
  80. log.Fatal(fmt.Sprintf("create table '%s': %v", name, err))
  81. }
  82. insert, err = genInsert(ctx, db, name, headers)
  83. if err != nil {
  84. log.Fatal(fmt.Sprintf("prepare insert: %v", err))
  85. }
  86. defer func() {
  87. if err := insert.Close(); err != nil {
  88. log.Printf("close prepared insert: %v", err)
  89. }
  90. }()
  91. // insert records
  92. tx, err := db.BeginTx(ctx, nil)
  93. if err != nil {
  94. log.Fatal(fmt.Sprintf("begin transaction: %v", err))
  95. }
  96. i := 0
  97. for {
  98. i++
  99. if i >= MaxRecords {
  100. panic(fmt.Sprintf("BUG: max record limit reached: %d", MaxRecords))
  101. }
  102. if ctx.Err() != nil {
  103. break
  104. }
  105. // Print count for every thousand-ish records
  106. if i&4095 == 0 {
  107. printStatus(i)
  108. }
  109. if i&65535 == 0 {
  110. if err := tx.Commit(); err != nil {
  111. log.Fatal(fmt.Printf("commit transaction: %v", err))
  112. }
  113. tx, err = db.BeginTx(ctx, nil)
  114. if err != nil {
  115. log.Fatal(fmt.Sprintf("begin transaction: %v", err))
  116. }
  117. }
  118. rec, err := r.Read()
  119. if err != nil {
  120. fmt.Println()
  121. if err == io.EOF {
  122. log.Printf("read %d records", i)
  123. } else {
  124. log.Printf("read csv '%s': %v", csvPath, err)
  125. }
  126. break
  127. }
  128. // Rather slow, doing this for every record, but since
  129. // we cannot know how many fields will exist ahead of
  130. // time (we compile *now*, not after we've read the
  131. // headers), we cannot simply enter, say, 27 arguments,
  132. // each converted to `any`, by hand. It may be faster to
  133. // simply print the SQL statements and pipe them to
  134. // sqlite. Should check this.
  135. args := make([]any, len(rec))
  136. for j := 0; j < len(rec); j++ {
  137. args[j] = any(rec[j])
  138. }
  139. _, err = insert.ExecContext(ctx, args...)
  140. if err != nil {
  141. log.Fatal(fmt.Sprintf("insert record '%#v': %v", rec, err))
  142. }
  143. }
  144. if err := tx.Commit(); err != nil {
  145. log.Fatal(fmt.Sprintf("commit transaction: %v", err))
  146. }
  147. }
  148. func printStatus(n int) {
  149. if term.IsTerminal(int(os.Stdout.Fd())) {
  150. fmt.Printf("\x1b[%dG", 1)
  151. fmt.Printf("%d", n)
  152. }
  153. }
  154. func genInsert(
  155. ctx context.Context,
  156. db *sql.DB,
  157. name string,
  158. headers []string,
  159. ) (*sql.Stmt, error) {
  160. var b strings.Builder
  161. b.WriteString(fmt.Sprintf("INSERT INTO %s (", name))
  162. for i, h := range headers {
  163. if i != 0 {
  164. b.WriteString(", ")
  165. }
  166. b.WriteString(h)
  167. }
  168. b.WriteString(") VALUES (")
  169. for i := 0; i < len(headers); i++ {
  170. if i != 0 {
  171. b.WriteString(", ")
  172. }
  173. b.WriteString(fmt.Sprintf("$%d", i+1))
  174. }
  175. b.WriteString(");")
  176. log.Printf("debug: prepare insert: %s\n", b.String())
  177. return db.PrepareContext(ctx, b.String())
  178. }
  179. func createTable(
  180. ctx context.Context,
  181. db *sql.DB,
  182. name string,
  183. headers []string,
  184. ) (sql.Result, error) {
  185. var b strings.Builder
  186. b.WriteString(
  187. fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (", name),
  188. )
  189. for i, h := range headers {
  190. if i != 0 {
  191. b.WriteString(", ")
  192. }
  193. b.WriteString(fmt.Sprintf("%s text", h))
  194. }
  195. b.WriteString(");")
  196. return db.ExecContext(ctx, b.String())
  197. }
  198. var badRunes = regexp.MustCompile(`[^a-zA-Z0-9_\-\.]+`)
  199. var badFirst = regexp.MustCompile(`^[^a-zA-Z]+`)
  200. var parens = regexp.MustCompile(`\([^\)]*\)`)
  201. var spaces = regexp.MustCompile(`\s+`)
  202. var hyphens = regexp.MustCompile(`\-+`)
  203. var unders = regexp.MustCompile(`_+`)
  204. func scrubName(s string) string {
  205. s = strings.ToLower(s)
  206. s = badFirst.ReplaceAllLiteralString(s, "")
  207. s = parens.ReplaceAllLiteralString(s, "")
  208. s = strings.TrimSpace(s)
  209. s = spaces.ReplaceAllLiteralString(s, "_")
  210. s = badRunes.ReplaceAllLiteralString(s, "_")
  211. s = hyphens.ReplaceAllLiteralString(s, "_")
  212. s = unders.ReplaceAllLiteralString(s, "_")
  213. s = strings.TrimSuffix(s, "_")
  214. return s
  215. }