Speed up import

This commit is contained in:
Alexander NeonXP Kiryukhin 2019-05-28 01:34:40 +03:00
parent a523b63679
commit 2afa4e76fb
2 changed files with 99 additions and 64 deletions

View file

@ -8,11 +8,14 @@ Simple loader from osm dump file to mongodb. Based on https://github.com/paulmac
## Usage
`./osm2go -osmfile PATH_TO_OSM_FILE [-dbconnection mongodb://localhost:27017] [-dbname osm]`
`./osm2go -osmfile PATH_TO_OSM_FILE [-dbconnection mongodb://localhost:27017] [-dbname osm] [-initial=true] [-concurrency=16] [-block=1000]`
* `osmfile` required, path to *.osm or *.osm.pbf file
* `dbconnection` optional, mongodb connection string (default: `mongodb://localhost:27017`)
* `dbname` optional, mongodb database name (default: `osm`)
* `initial` optional, use insert instead upsert. Faster, but not check if item exists (default: `false`)
* `concurrency` optional, parallel read processes (default, `16`)
* `block` optional, block size to bulk write (default: `1000`)
## Example

158
main.go
View file

@ -3,7 +3,6 @@ package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"time"
@ -18,8 +17,11 @@ import (
func main() {
dbconnection := flag.String("dbconnection", "mongodb://localhost:27017", "Mongo database name")
dbname := flag.String("dbname", "osm", "Mongo database name")
dbname := flag.String("dbname", "map", "Mongo database name")
osmfile := flag.String("osmfile", "", "OSM file")
initial := flag.Bool("initial", false, "Is initial import")
concurrency := flag.Int("concurrency", 16, "Concurrency")
blockSize := flag.Int("block", 1000, "Block size to bulk write")
flag.Parse()
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
client, err := mongo.Connect(ctx, options.Client().ApplyURI(*dbconnection))
@ -28,67 +30,33 @@ func main() {
}
defer client.Disconnect(context.Background())
db := client.Database(*dbname)
if err := read(db, *osmfile); err != nil {
log.Printf("Started import file %s to db %s", *osmfile, *dbname)
if *initial {
log.Println("Initial import")
} else {
log.Println("Diff import")
}
if err := read(db, *osmfile, *initial, *concurrency, *blockSize); err != nil {
log.Fatal(err)
}
}
func read(db *mongo.Database, file string) error {
func read(db *mongo.Database, file string, initial bool, concurrency int, blockSize int) error {
nodes := db.Collection("nodes")
_, _ = nodes.Indexes().CreateOne(
context.Background(),
mongo.IndexModel{
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(1)}},
Options: options.Index().SetUnique(true).SetSparse(true).SetBackground(true),
},
)
_, _ = nodes.Indexes().CreateOne(
context.Background(),
mongo.IndexModel{
Keys: bsonx.Doc{{"coords", bsonx.Int32(1)}},
Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true),
},
)
simpleIndex(nodes, []string{"osm_id"}, true)
simpleIndex(nodes, []string{"tags"}, false)
geoIndex(nodes, "location")
ways := db.Collection("ways")
_, _ = ways.Indexes().CreateOne(
context.Background(),
mongo.IndexModel{
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(1)}},
Options: options.Index().SetUnique(true).SetSparse(true).SetBackground(true),
},
)
_, _ = ways.Indexes().CreateOne(
context.Background(),
mongo.IndexModel{
Keys: bsonx.Doc{{"nodes", bsonx.Int32(1)}},
Options: options.Index().SetSparse(true).SetBackground(true),
},
)
simpleIndex(ways, []string{"osm_id"}, true)
simpleIndex(ways, []string{"tags"}, false)
relations := db.Collection("relations")
_, _ = relations.Indexes().CreateOne(
context.Background(),
mongo.IndexModel{
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(1)}},
Options: options.Index().SetUnique(true).SetSparse(true).SetBackground(true),
},
)
_, _ = relations.Indexes().CreateOne(
context.Background(),
mongo.IndexModel{
Keys: bsonx.Doc{{"members.ref", bsonx.Int32(1)}},
Options: options.Index().SetUnique(true).SetSparse(true).SetBackground(true),
},
)
_, _ = relations.Indexes().CreateOne(
context.Background(),
mongo.IndexModel{
Keys: bsonx.Doc{{"members.coords", bsonx.Int32(1)}},
Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true),
},
)
simpleIndex(relations, []string{"osm_id"}, true)
simpleIndex(relations, []string{"tags"}, false)
simpleIndex(relations, []string{"members.ref"}, false)
geoIndex(relations, "members.coords")
f, err := os.Open(file)
if err != nil {
@ -96,14 +64,15 @@ func read(db *mongo.Database, file string) error {
}
defer f.Close()
opts := (new(options.ReplaceOptions)).SetUpsert(true)
opts := (new(options.BulkWriteOptions)).SetOrdered(false)
nc := 0
wc := 0
rc := 0
scanner := osmpbf.New(context.Background(), f, 3)
scanner := osmpbf.New(context.Background(), f, concurrency)
defer scanner.Close()
buffer := make([]mongo.WriteModel, 0, blockSize)
for scanner.Scan() {
o := scanner.Object()
switch o := o.(type) {
@ -112,6 +81,7 @@ func read(db *mongo.Database, file string) error {
for _, v := range o.Nodes {
nodes = append(nodes, int64(v.ID))
}
w := Way{
OsmID: int64(o.ID),
Tags: convertTags(o.Tags),
@ -120,8 +90,16 @@ func read(db *mongo.Database, file string) error {
Version: o.Version,
Visible: o.Visible,
}
if _, err = ways.ReplaceOne(context.Background(), bson.M{"osm_id": int64(o.ID)}, w, opts); err != nil {
return err
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(w)
buffer = append(buffer, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(w)
um.SetFilter(bson.M{"osm_id": w.OsmID})
buffer = append(buffer, um)
}
wc++
case *osm.Node:
@ -138,8 +116,16 @@ func read(db *mongo.Database, file string) error {
Timestamp: o.Timestamp,
Visible: o.Visible,
}
if _, err = nodes.ReplaceOne(context.Background(), bson.M{"osm_id": int64(o.ID)}, n, opts); err != nil {
return err
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(n)
buffer = append(buffer, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(n)
um.SetFilter(bson.M{"osm_id": n.OsmID})
buffer = append(buffer, um)
}
nc++
case *osm.Relation:
@ -167,14 +153,34 @@ func read(db *mongo.Database, file string) error {
Visible: o.Visible,
Members: members,
}
if _, err = relations.ReplaceOne(context.Background(), bson.M{"osm_id": int64(o.ID)}, r, opts); err != nil {
return err
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(r)
buffer = append(buffer, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(r)
um.SetFilter(bson.M{"osm_id": r.OsmID})
buffer = append(buffer, um)
}
rc++
}
fmt.Printf("\rNodes: %d Ways: %d Relations: %d", nc, wc, rc)
if len(buffer) == blockSize {
if _, err := nodes.BulkWrite(context.Background(), buffer, opts); err != nil {
return err
}
buffer = make([]mongo.WriteModel, 0, blockSize)
log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
}
}
if len(buffer) != 0 {
if _, err := nodes.BulkWrite(context.Background(), buffer, opts); err != nil {
return err
}
log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
}
log.Println("Import done")
scanErr := scanner.Err()
if scanErr != nil {
return scanErr
@ -189,3 +195,29 @@ func convertTags(tags osm.Tags) map[string]string {
}
return result
}
func simpleIndex(col *mongo.Collection, keys []string, unique bool) {
idxKeys := bsonx.Doc{}
for _, e := range keys {
idxKeys.Append(e, bsonx.Int32(1))
}
_, _ = col.Indexes().CreateOne(
context.Background(),
mongo.IndexModel{
Keys: idxKeys,
Options: options.Index().SetUnique(unique).SetSparse(true).SetBackground(true),
},
)
}
func geoIndex(col *mongo.Collection, key string) {
_, _ = col.Indexes().CreateOne(
context.Background(),
mongo.IndexModel{
Keys: bsonx.Doc{{
Key: key, Value: bsonx.Int32(1),
}},
Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true),
},
)
}