Skip to content

Commit

Permalink
Create generic upload function, clean up uploader.go
Browse files Browse the repository at this point in the history
Created a generic function for uploading JSON files to Mongo, and added some comments for clarification. Currently the function requires that the file name is the same as the collection that the file contents are being uploaded to. Only courses, professors, and sections are supported.
  • Loading branch information
mohammadmehrab committed Apr 25, 2024
1 parent 75eaabb commit 89d4556
Showing 1 changed file with 43 additions and 198 deletions.
241 changes: 43 additions & 198 deletions uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"log"
"os"
"strings"

"time"

Expand Down Expand Up @@ -57,222 +58,52 @@ func Upload(inDir string, replace bool) {

switch path {
case "courses.json":
uploadCourses(client, ctx, fptr, replace)
UploadData[schema.Course](client, ctx, fptr, replace)
case "professors.json":
uploadProfessors(client, ctx, fptr, replace)
UploadData[schema.Professor](client, ctx, fptr, replace)
case "sections.json":
uploadSections(client, ctx, fptr, replace)
UploadData[schema.Section](client, ctx, fptr, replace)
}
}

}

func uploadCourses(client *mongo.Client, ctx context.Context, fptr *os.File, replace bool) {
log.Println("Uploading courses.json ...")
// Generic upload function to upload parsed JSON data to the Mongo database
// Make sure that the name of the file being parsed matches with the name of the collection you are uploading to!
// For example, your file should be named courses.json if you want to upload courses
// As of right now, courses, professors, and sections are available to upload.
func UploadData[T any](client *mongo.Client, ctx context.Context, fptr *os.File, replace bool) {
fileName := fptr.Name()[strings.LastIndex(fptr.Name(), "/")+1 : len(fptr.Name())-5]
log.Println("Uploading " + fileName + ".json ...")

// Decode courses from courses.json
var courses []schema.Course
// Decode documents from file
var docs []T
decoder := json.NewDecoder(fptr)
err := decoder.Decode(&courses)
err := decoder.Decode(&docs)
if err != nil {
log.Panic(err)
}

if replace {

// Get collection
collection := getCollection(client, "courses")
collection := getCollection(client, fileName)

// Delete all documents from collection
_, err := collection.DeleteMany(ctx, bson.D{})
if err != nil {
log.Panic(err)
}

// Convert your courses to []interface{}
courseDocs := make([]interface{}, len(courses))
for i := range courses {
courseDocs[i] = courses[i]
// Convert your documents to []interface{}
docsInterface := make([]interface{}, len(docs))
for i := range docs {
docsInterface[i] = docs[i]
}

// Add all documents decoded from courses.json into the collection
// Add all documents decoded from the file into the collection
opts := options.InsertMany().SetOrdered(false)
_, err = collection.InsertMany(ctx, courseDocs, opts)
if err != nil {
log.Panic(err)
}

} else {

// If a temp collection already exists, drop it
tempCollection := getCollection(client, "temp")
err = tempCollection.Drop(ctx)
if err != nil {
log.Panic(err)
}

// Create a temporary collection
err := client.Database("combinedDB").CreateCollection(ctx, "temp")
if err != nil {
log.Panic(err)
}

// Get the temporary collection
tempCollection = getCollection(client, "temp")

// Convert your courses to []interface{}
courseDocs := make([]interface{}, len(courses))
for i := range courses {
courseDocs[i] = courses[i]
}

// Add all documents decoded from courses.json into the temporary collection
opts := options.InsertMany().SetOrdered(false)
_, err = tempCollection.InsertMany(ctx, courseDocs, opts)

if err != nil {
log.Panic(err)
}

// Create a merge aggregate pipeline
// Matched documents from the temporary collection will replace matched documents from the Mongo collection
// Unmatched documents from the temporary collection will be inserted into the Mongo collection
mergeStage := bson.D{primitive.E{Key: "$merge", Value: bson.D{primitive.E{Key: "into", Value: "courses"}, primitive.E{Key: "on", Value: [3]string{"catalog_year", "course_number", "subject_prefix"}}, primitive.E{Key: "whenMatched", Value: "replace"}, primitive.E{Key: "whenNotMatched", Value: "insert"}}}}

// Execute aggregate pipeline
_, err = tempCollection.Aggregate(ctx, mongo.Pipeline{mergeStage})
if err != nil {
log.Panic(err)
}

// Drop the temporary collection
err = tempCollection.Drop(ctx)
if err != nil {
log.Panic(err)
}
}

log.Println("Done uploading courses.json!")
}

func uploadProfessors(client *mongo.Client, ctx context.Context, fptr *os.File, replace bool) {
log.Println("Uploading professors.json ...")

// Decode courses from professors.json
var professors []schema.Professor
decoder := json.NewDecoder(fptr)
err := decoder.Decode(&professors)
if err != nil {
log.Panic(err)
}

if replace {

// Get collection
collection := getCollection(client, "professors")

// Delete all documents from collection
_, err := collection.DeleteMany(ctx, bson.D{})
if err != nil {
log.Panic(err)
}

// Convert your professors to []interface{}
professorsDocs := make([]interface{}, len(professors))
for i := range professors {
professorsDocs[i] = professors[i]
}

// Add all documents decoded from professors.json into the collection
opts := options.InsertMany().SetOrdered(false)
_, err = collection.InsertMany(ctx, professorsDocs, opts)
if err != nil {
log.Panic(err)
}

} else {

// If a temp collection already exists, drop it
tempCollection := getCollection(client, "temp")
err = tempCollection.Drop(ctx)
if err != nil {
log.Panic(err)
}

// Create a temporary collection
err := client.Database("combinedDB").CreateCollection(ctx, "temp")
if err != nil {
log.Panic(err)
}

// Get the temporary collection
tempCollection = getCollection(client, "temp")

// Convert your professors to []interface{}
professorsDocs := make([]interface{}, len(professors))
for i := range professors {
professorsDocs[i] = professors[i]
}

// Add all documents decoded from professors.json into the temporary collection
opts := options.InsertMany().SetOrdered(false)
_, err = tempCollection.InsertMany(ctx, professorsDocs, opts)
if err != nil {
log.Panic(err)
}

// Create a merge aggregate pipeline
// Matched documents from the temporary collection will replace matched documents from the Mongo collection
// Unmatched documents from the temporary collection will be inserted into the Mongo collection
mergeStage := bson.D{primitive.E{Key: "$merge", Value: bson.D{primitive.E{Key: "into", Value: "professors"}, primitive.E{Key: "on", Value: [2]string{"first_name", "last_name"}}, primitive.E{Key: "whenMatched", Value: "replace"}, primitive.E{Key: "whenNotMatched", Value: "insert"}}}}

// Execute aggregate pipeline
_, err = tempCollection.Aggregate(ctx, mongo.Pipeline{mergeStage})
if err != nil {
log.Panic(err)
}

// Drop the temporary collection
err = tempCollection.Drop(ctx)
if err != nil {
log.Panic(err)
}
}

log.Println("Done uploading professors.json!")
}

func uploadSections(client *mongo.Client, ctx context.Context, fptr *os.File, replace bool) {
log.Println("Uploading sections.json ...")

// Decode courses from sections.json
var sections []schema.Section
decoder := json.NewDecoder(fptr)
err := decoder.Decode(&sections)
if err != nil {
log.Panic(err)
}

if replace {

// Get collection
collection := getCollection(client, "sections")

// Delete all documents from collection
_, err := collection.DeleteMany(ctx, bson.D{})
if err != nil {
log.Panic(err)
}

// Convert your sections to []interface{}
sectionsDocs := make([]interface{}, len(sections))
for i := range sections {
sectionsDocs[i] = sections[i]
}

// Add all documents decoded from sections.json into the collection
opts := options.InsertMany().SetOrdered(false)
_, err = collection.InsertMany(ctx, sectionsDocs, opts)
_, err = collection.InsertMany(ctx, docsInterface, opts)
if err != nil {
log.Panic(err)
}
Expand All @@ -294,23 +125,37 @@ func uploadSections(client *mongo.Client, ctx context.Context, fptr *os.File, re
// Get the temporary collection
tempCollection = getCollection(client, "temp")

// Convert your sections to []interface{}
sectionsDocs := make([]interface{}, len(sections))
for i := range sections {
sectionsDocs[i] = sections[i]
// Convert your documents to []interface{}
docsInterface := make([]interface{}, len(docs))
for i := range docs {
docsInterface[i] = docs[i]
}

// Add all documents decoded from professors.json into the temporary collection
// Add all documents decoded from the file into the temporary collection
opts := options.InsertMany().SetOrdered(false)
_, err = tempCollection.InsertMany(ctx, sectionsDocs, opts)
_, err = tempCollection.InsertMany(ctx, docsInterface, opts)
if err != nil {
log.Panic(err)
}

// Create a merge aggregate pipeline
// Matched documents from the temporary collection will replace matched documents from the Mongo collection
// Unmatched documents from the temporary collection will be inserted into the Mongo collection
mergeStage := bson.D{primitive.E{Key: "$merge", Value: bson.D{primitive.E{Key: "into", Value: "sections"}, primitive.E{Key: "on", Value: [3]string{"section_number", "course_reference", "academic_session"}}, primitive.E{Key: "whenMatched", Value: "replace"}, primitive.E{Key: "whenNotMatched", Value: "insert"}}}}
var matchFilters []string
switch fileName {
case "courses":
matchFilters = []string{"catalog_year", "course_number", "subject_prefix"}
case "professors":
matchFilters = []string{"first_name", "last_name"}
case "sections":
matchFilters = []string{"section_number", "course_reference", "academic_session"}
default:
log.Panic("Unrecognizable filename: " + fileName)
}

// The documents will be added/merged into the collection with the same name as the file
// The filters for the merge aggregate pipeline are based on the file name
mergeStage := bson.D{primitive.E{Key: "$merge", Value: bson.D{primitive.E{Key: "into", Value: fileName}, primitive.E{Key: "on", Value: matchFilters}, primitive.E{Key: "whenMatched", Value: "replace"}, primitive.E{Key: "whenNotMatched", Value: "insert"}}}}

// Execute aggregate pipeline
_, err = tempCollection.Aggregate(ctx, mongo.Pipeline{mergeStage})
Expand All @@ -325,7 +170,7 @@ func uploadSections(client *mongo.Client, ctx context.Context, fptr *os.File, re
}
}

log.Println("Done uploading sections.json!")
log.Println("Done uploading " + fileName + ".json!")

defer fptr.Close()
}

0 comments on commit 89d4556

Please sign in to comment.