osm2mongo/main.go
Alexander NeonXP Kiryukhin 97fd40df4d Fix indexes (dups osm_id)
2019-05-28 17:15:22 +03:00

326 lines
8.5 KiB
Go

package main
import (
"context"
"flag"
"log"
"os"
"time"
"github.com/paulmach/osm"
"github.com/paulmach/osm/osmpbf"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
)
func main() {
dbconnection := flag.String("dbconnection", "mongodb://localhost:27017", "Mongo database name")
dbname := flag.String("dbname", "map", "Mongo database name")
osmfile := flag.String("osmfile", "", "OSM file")
initial := flag.Bool("initial", false, "Is initial import")
indexes := flag.Bool("indexes", false, "Just create indexes")
concurrency := flag.Int("concurrency", 16, "Concurrency")
blockSize := flag.Int("block", 1000, "Block size to bulk write")
flag.Parse()
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
client, err := mongo.Connect(ctx, options.Client().ApplyURI(*dbconnection))
if err != nil {
log.Fatal(err)
}
defer client.Disconnect(context.Background())
db := client.Database(*dbname)
if *indexes {
if err := createIndexes(db); err != nil {
log.Fatal(err)
}
log.Println("Indexes created")
return
}
log.Printf("Started import file %s to db %s", *osmfile, *dbname)
if err := read(db, *osmfile, *initial, *concurrency, *blockSize); err != nil {
log.Fatal(err)
}
}
func read(db *mongo.Database, file string, initial bool, concurrency int, blockSize int) error {
nodes := db.Collection("nodes")
ways := db.Collection("ways")
relations := db.Collection("relations")
f, err := os.Open(file)
if err != nil {
return err
}
defer f.Close()
opts := (new(options.BulkWriteOptions)).SetOrdered(false)
nc := 0
wc := 0
rc := 0
scanner := osmpbf.New(context.Background(), f, concurrency)
defer scanner.Close()
bufferNodes := make([]mongo.WriteModel, 0, blockSize)
bufferWays := make([]mongo.WriteModel, 0, blockSize)
bufferRelations := make([]mongo.WriteModel, 0, blockSize)
for scanner.Scan() {
o := scanner.Object()
switch o := o.(type) {
case *osm.Way:
nodes := make([]int64, 0, len(o.Nodes))
for _, v := range o.Nodes {
nodes = append(nodes, int64(v.ID))
}
w := Way{
OsmID: int64(o.ID),
Tags: convertTags(o.Tags),
Nodes: nodes,
Timestamp: o.Timestamp,
Version: o.Version,
Visible: o.Visible,
}
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(w)
bufferWays = append(bufferWays, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(w)
um.SetFilter(bson.M{"osm_id": w.OsmID})
bufferWays = append(bufferWays, um)
}
wc++
case *osm.Node:
n := Node{
OsmID: int64(o.ID),
Location: Coords{
Type: "Point",
Coordinates: []float64{
o.Lon,
o.Lat,
}},
Tags: convertTags(o.Tags),
Version: o.Version,
Timestamp: o.Timestamp,
Visible: o.Visible,
}
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(n)
bufferNodes = append(bufferNodes, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(n)
um.SetFilter(bson.M{"osm_id": n.OsmID})
bufferNodes = append(bufferNodes, um)
}
nc++
case *osm.Relation:
members := make([]Member, len(o.Members))
for _, v := range o.Members {
members = append(members, Member{
Type: v.Type,
Version: v.Version,
Orientation: v.Orientation,
Ref: v.Ref,
Role: v.Role,
Location: Coords{
Type: "Point",
Coordinates: []float64{
v.Lon,
v.Lat,
}},
})
}
r := Relation{
OsmID: int64(o.ID),
Tags: convertTags(o.Tags),
Version: o.Version,
Timestamp: o.Timestamp,
Visible: o.Visible,
Members: members,
}
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(r)
bufferRelations = append(bufferRelations, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(r)
um.SetFilter(bson.M{"osm_id": r.OsmID})
bufferRelations = append(bufferRelations, um)
}
rc++
}
if len(bufferNodes) == blockSize {
if _, err := nodes.BulkWrite(context.Background(), bufferNodes, opts); err != nil {
return err
}
bufferNodes = make([]mongo.WriteModel, 0, blockSize)
log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
}
if len(bufferWays) == blockSize {
if _, err := nodes.BulkWrite(context.Background(), bufferWays, opts); err != nil {
return err
}
bufferWays = make([]mongo.WriteModel, 0, blockSize)
log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
}
if len(bufferRelations) == blockSize {
if _, err := nodes.BulkWrite(context.Background(), bufferRelations, opts); err != nil {
return err
}
bufferRelations = make([]mongo.WriteModel, 0, blockSize)
log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
}
}
if len(bufferNodes) != 0 {
if _, err := nodes.BulkWrite(context.Background(), bufferNodes, opts); err != nil {
return err
}
log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
}
if len(bufferWays) != 0 {
if _, err := ways.BulkWrite(context.Background(), bufferWays, opts); err != nil {
return err
}
log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
}
if len(bufferRelations) != 0 {
if _, err := relations.BulkWrite(context.Background(), bufferRelations, opts); err != nil {
return err
}
log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
}
log.Println("Import done")
scanErr := scanner.Err()
if scanErr != nil {
return scanErr
}
return nil
}
func createIndexes(db *mongo.Database) error {
opts := options.CreateIndexes().SetMaxTime(1000)
nodes := db.Collection("nodes")
log.Println("creating indexes for nodes")
created, err := nodes.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
},
{
Keys: bsonx.Doc{
{"tags.key", bsonx.Int32(-1)},
{"tags.value", bsonx.Int32(-1)},
},
Options: (options.Index()).SetBackground(true).SetSparse(true),
},
}, opts)
if err != nil {
return err
}
log.Println(created)
log.Println("creating geoindexes for nodes")
if err := geoIndex(nodes, "location"); err != nil {
return err
}
log.Println("creating indexes for ways")
ways := db.Collection("ways")
created, err = ways.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
},
{
Keys: bsonx.Doc{
{"tags.key", bsonx.Int32(-1)},
{"tags.value", bsonx.Int32(-1)},
},
Options: (options.Index()).SetBackground(true).SetSparse(true),
},
}, opts)
if err != nil {
return err
}
log.Println(created)
relations := db.Collection("relations")
log.Println("creating geoindexes for relations")
created, err = relations.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
},
{
Keys: bsonx.Doc{
{"tags.key", bsonx.Int32(-1)},
{"tags.value", bsonx.Int32(-1)},
},
Options: (options.Index()).SetBackground(true).SetSparse(true),
},
{
Keys: bsonx.Doc{{"members.ref", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true),
},
}, opts)
if err != nil {
return err
}
log.Println(created)
if err := geoIndex(relations, "members.coords"); err != nil {
return err
}
log.Println("indexes created")
return nil
}
func convertTags(tags osm.Tags) []Tag {
result := make([]Tag, 0, len(tags))
for _, t := range tags {
result = append(result, Tag{
Key: t.Key,
Value: t.Value,
})
}
return result
}
func simpleIndex(col *mongo.Collection, keys []string, unique bool) error {
idxKeys := bsonx.Doc{}
for _, e := range keys {
idxKeys.Append(e, bsonx.Int32(1))
}
_, err := col.Indexes().CreateOne(
context.Background(),
mongo.IndexModel{
Keys: idxKeys,
Options: options.Index().SetUnique(unique).SetSparse(true).SetBackground(true),
},
)
return err
}
func geoIndex(col *mongo.Collection, key string) error {
_, err := col.Indexes().CreateOne(
context.Background(),
mongo.IndexModel{
Keys: bsonx.Doc{{
Key: key, Value: bsonx.Int32(1),
}},
Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true),
},
)
return err
}