Skip to content

Commit

Permalink
Merge pull request #77 from negbie/master
Browse files Browse the repository at this point in the history
Only create partitions for new tables
  • Loading branch information
negbie authored Jun 9, 2018
2 parents a2b364d + 861bf44 commit 885f343
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 241 deletions.
6 changes: 3 additions & 3 deletions database/database-packr.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion database/files/homer5/mysql/tbldatalog.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- name: create-logs-table
CREATE TABLE IF NOT EXISTS `logs_capture_all_{{date}}` (
CREATE TABLE `logs_capture_all_{{date}}` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`micro_ts` bigint(18) NOT NULL DEFAULT '0',
Expand Down
4 changes: 2 additions & 2 deletions database/files/homer5/mysql/tbldataqos.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- name: create-report-table
CREATE TABLE IF NOT EXISTS `report_capture_all_{{date}}` (
CREATE TABLE `report_capture_all_{{date}}` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`micro_ts` bigint(18) NOT NULL DEFAULT '0',
Expand All @@ -22,7 +22,7 @@ PARTITION BY RANGE ( UNIX_TIMESTAMP(`date`) ) (
);

-- name: create-rtcp-table
CREATE TABLE IF NOT EXISTS `rtcp_capture_all_{{date}}` (
CREATE TABLE `rtcp_capture_all_{{date}}` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`micro_ts` bigint(18) NOT NULL DEFAULT '0',
Expand Down
6 changes: 3 additions & 3 deletions database/files/homer5/mysql/tbldatasip.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- name: create-call-table
CREATE TABLE IF NOT EXISTS `sip_capture_call_{{date}}` (
CREATE TABLE `sip_capture_call_{{date}}` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`micro_ts` bigint(18) NOT NULL DEFAULT '0',
Expand Down Expand Up @@ -64,7 +64,7 @@ PARTITION BY RANGE ( UNIX_TIMESTAMP(`date`) ) (
);

-- name: create-registration-table
CREATE TABLE IF NOT EXISTS `sip_capture_registration_{{date}}` (
CREATE TABLE `sip_capture_registration_{{date}}` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`micro_ts` bigint(18) NOT NULL DEFAULT '0',
Expand Down Expand Up @@ -129,7 +129,7 @@ PARTITION BY RANGE ( UNIX_TIMESTAMP(`date`) ) (
);

-- name: create-rest-table
CREATE TABLE IF NOT EXISTS `sip_capture_rest_{{date}}` (
CREATE TABLE `sip_capture_rest_{{date}}` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`micro_ts` bigint(18) NOT NULL DEFAULT '0',
Expand Down
152 changes: 77 additions & 75 deletions database/rotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,29 @@ func (r *Rotator) CreateDataTables(duration int) (err error) {
defer db.Close()
// Set this connection to UTC time and create the partitions with it.
r.dbExec(db, "SET time_zone = \"+00:00\";")
r.dbExecFileMYSQL(db, r.box.String("mysql/tbldatalog.sql"), suffix, duration, r.logStep)
r.dbExecFileMYSQL(db, r.box.String("mysql/tbldataqos.sql"), suffix, duration, r.qosStep)
r.dbExecFileMYSQL(db, r.box.String("mysql/tbldatasip.sql"), suffix, duration, r.sipStep)
r.dbExecPartitionFile(db, r.box.String("mysql/parlog.sql"), suffix, duration, r.logStep)
r.dbExecPartitionFile(db, r.box.String("mysql/parqos.sql"), suffix, duration, r.qosStep)
r.dbExecPartitionFile(db, r.box.String("mysql/parsip.sql"), suffix, duration, r.sipStep)
r.dbExecFile(db, r.box.String("mysql/parmax.sql"), suffix)
if err := r.dbExecFile(db, r.box.String("mysql/tbldatalog.sql"), suffix, duration, r.logStep); err == nil {
r.dbExecFileLoop(db, r.box.String("mysql/parlog.sql"), suffix, duration, r.logStep)
}
if err := r.dbExecFile(db, r.box.String("mysql/tbldataqos.sql"), suffix, duration, r.qosStep); err == nil {
r.dbExecFileLoop(db, r.box.String("mysql/parqos.sql"), suffix, duration, r.qosStep)
}
if err := r.dbExecFile(db, r.box.String("mysql/tbldatasip.sql"), suffix, duration, r.sipStep); err == nil {
r.dbExecFileLoop(db, r.box.String("mysql/parsip.sql"), suffix, duration, r.sipStep)
}
r.dbExecFile(db, r.box.String("mysql/parmax.sql"), suffix, 0, 0)
} else if config.Setting.DBDriver == "postgres" {
db, err := dbr.Open(config.Setting.DBDriver, " host="+r.addr[0]+" port="+r.addr[1]+" dbname="+config.Setting.DBDataTable+" user="+config.Setting.DBUser+" password="+config.Setting.DBPass+" sslmode=disable", nil)
if err != nil {
return err
}
defer db.Close()
r.dbExecFile(db, r.box.String("pgsql/tbldata.sql"), suffix)
r.dbExecPartitionFile(db, r.box.String("pgsql/parlog.sql"), suffix, duration, r.logStep)
r.dbExecPartitionFile(db, r.box.String("pgsql/parqos.sql"), suffix, duration, r.qosStep)
r.dbExecPartitionFile(db, r.box.String("pgsql/parsip.sql"), suffix, duration, r.sipStep)
r.dbExecPartitionFile(db, r.box.String("pgsql/idxlog.sql"), suffix, duration, r.logStep)
r.dbExecPartitionFile(db, r.box.String("pgsql/idxqos.sql"), suffix, duration, r.qosStep)
r.dbExecPartitionFile(db, r.box.String("pgsql/idxsip.sql"), suffix, duration, r.sipStep)
r.dbExecFile(db, r.box.String("pgsql/tbldata.sql"), suffix, 0, 0)
r.dbExecFileLoop(db, r.box.String("pgsql/parlog.sql"), suffix, duration, r.logStep)
r.dbExecFileLoop(db, r.box.String("pgsql/parqos.sql"), suffix, duration, r.qosStep)
r.dbExecFileLoop(db, r.box.String("pgsql/parsip.sql"), suffix, duration, r.sipStep)
r.dbExecFileLoop(db, r.box.String("pgsql/idxlog.sql"), suffix, duration, r.logStep)
r.dbExecFileLoop(db, r.box.String("pgsql/idxqos.sql"), suffix, duration, r.qosStep)
r.dbExecFileLoop(db, r.box.String("pgsql/idxsip.sql"), suffix, duration, r.sipStep)
}
return nil
}
Expand All @@ -123,18 +126,18 @@ func (r *Rotator) CreateConfTables(duration int) (err error) {
return err
}
defer db.Close()
r.dbExecFile(db, r.box.String("mysql/tblconf.sql"), suffix)
r.dbExecFile(db, r.box.String("mysql/insconf.sql"), suffix)
r.dbExecFile(db, r.box.String("mysql/tblconf.sql"), suffix, 0, 0)
r.dbExecFile(db, r.box.String("mysql/insconf.sql"), suffix, 0, 0)
} else if config.Setting.DBDriver == "postgres" {
db, err := dbr.Open(config.Setting.DBDriver, " host="+r.addr[0]+" port="+r.addr[1]+" dbname="+config.Setting.DBConfTable+" user="+config.Setting.DBUser+" password="+config.Setting.DBPass+" sslmode=disable", nil)
if err != nil {
return err
}
defer db.Close()
r.dbExec(db, "CREATE EXTENSION pgcrypto;")
r.dbExecFile(db, r.box.String("pgsql/tblconf.sql"), suffix)
r.dbExecFile(db, r.box.String("pgsql/idxconf.sql"), suffix)
r.dbExecFile(db, r.box.String("pgsql/insconf.sql"), suffix)
r.dbExecFile(db, r.box.String("pgsql/tblconf.sql"), suffix, 0, 0)
r.dbExecFile(db, r.box.String("pgsql/idxconf.sql"), suffix, 0, 0)
r.dbExecFile(db, r.box.String("pgsql/insconf.sql"), suffix, 0, 0)
}
return nil
}
Expand All @@ -147,21 +150,26 @@ func (r *Rotator) DropTables(duration int) (err error) {
return err
}
defer db.Close()
r.dbExecFile(db, r.box.String("mysql/droptbl.sql"), suffix)
r.dbExecFile(db, r.box.String("mysql/droptbl.sql"), suffix, 0, 0)
} else if config.Setting.DBDriver == "postgres" {
db, err := dbr.Open(config.Setting.DBDriver, " host="+r.addr[0]+" port="+r.addr[1]+" dbname="+config.Setting.DBDataTable+" user="+config.Setting.DBUser+" password="+config.Setting.DBPass+" sslmode=disable", nil)
if err != nil {
return err
}
defer db.Close()
r.dbExecPartitionFile(db, r.box.String("pgsql/droplog.sql"), suffix, duration, r.logStep)
r.dbExecPartitionFile(db, r.box.String("pgsql/dropqos.sql"), suffix, duration, r.qosStep)
r.dbExecPartitionFile(db, r.box.String("pgsql/dropsip.sql"), suffix, duration, r.sipStep)
r.dbExecFileLoop(db, r.box.String("pgsql/droplog.sql"), suffix, duration, r.logStep)
r.dbExecFileLoop(db, r.box.String("pgsql/dropqos.sql"), suffix, duration, r.qosStep)
r.dbExecFileLoop(db, r.box.String("pgsql/dropsip.sql"), suffix, duration, r.sipStep)
}
return nil
}

func (r *Rotator) dbExecFileMYSQL(db *dbr.Connection, file string, pattern strings.Replacer, d, p int) {
func (r *Rotator) dbExec(db *dbr.Connection, query string) {
_, err := db.Exec(query)
checkDBErr(err)
}

func (r *Rotator) dbExecFile(db *dbr.Connection, file string, pattern strings.Replacer, d, p int) error {
t := time.Now().Add(time.Hour * time.Duration(24*d))
tt := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
newMinTime := tt.Format("1504")
Expand All @@ -172,69 +180,80 @@ func (r *Rotator) dbExecFileMYSQL(db *dbr.Connection, file string, pattern strin
logp.Err("%s\n\n", err)
}

var lastErr error
for _, query := range dot.QueryMap() {
query = strings.Replace(query, partitionMinTime, newMinTime, -1)
query = strings.Replace(query, partitionEndTime, newEndTime, -1)
if p != 0 {
query = strings.Replace(query, partitionMinTime, newMinTime, -1)
query = strings.Replace(query, partitionEndTime, newEndTime, -1)
}

logp.Debug("rotator", "db query:\n%s\n\n", query)
_, err := db.Exec(query)
checkDBErr(err)
_, lastErr = db.Exec(query)
checkDBErr(lastErr)
}
return lastErr
}

func (r *Rotator) dbExecFile(db *dbr.Connection, file string, pattern strings.Replacer) {
func (r *Rotator) dbExecFileLoop(db *dbr.Connection, file string, pattern strings.Replacer, d, p int) {
dot, err := dotsql.LoadFromString(pattern.Replace(file))
if err != nil {
logp.Err("%s\n\n", err)
}

for _, query := range dot.QueryMap() {
logp.Debug("rotator", "db query:\n%s\n\n", query)
_, err := db.Exec(query)
checkDBErr(err)
for _, q := range dot.QueryMap() {
fileLoop(db, q, d, p)
}
}

func (r *Rotator) dbExecPartitionFile(db *dbr.Connection, file string, pattern strings.Replacer, d, p int) {
dot, err := dotsql.LoadFromString(pattern.Replace(file))
if err != nil {
logp.Err("%s\n\n", err)
}
func fileLoop(db *dbr.Connection, query string, d, p int) {
var newStartTime, newEndTime, newPartTime string
oriQuery := query

for _, q := range dot.QueryMap() {
rotatePartitions(db, q, d, p)
}
}
t := time.Now().Add(time.Hour * time.Duration(24*d))
startTime := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
endTime := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())

func (r *Rotator) dbExec(db *dbr.Connection, query string) {
_, err := db.Exec(query)
checkDBErr(err)
for i := 0; i < 1440/p; i++ {
query := oriQuery

newPartTime = startTime.Add(time.Minute * time.Duration(i*p)).Format("1504")
newStartTime = startTime.Add(time.Minute * time.Duration(i*p)).Format("2006-01-02 15:04:05")
newEndTime = endTime.Add(time.Minute * time.Duration(i*p+p)).Format("2006-01-02 15:04:05")

query = strings.Replace(query, partitionTime, newPartTime, -1)
query = strings.Replace(query, partitionStartTime, newStartTime, -1)
query = strings.Replace(query, partitionEndTime, newEndTime, -1)

logp.Debug("rotator", "db query:\n%s\n\n", query)
_, err := db.Exec(query)
checkDBErr(err)
}
}

func (r *Rotator) Rotate() (err error) {
r.createTables()
createJob := cron.New()

logp.Info("Start daily create data table job at 03:15:00\n")
createJob.AddFunc("0 15 03 * * *", func() {
logp.Info("schedule daily rotate job at 03:30:00\n")
createJob.AddFunc("0 30 03 * * *", func() {
if err := r.CreateDataTables(1); err != nil {
logp.Err("%v", err)
}
if err := r.CreateDataTables(2); err != nil {
logp.Err("%v", err)
}
logp.Info("Finished create data table job next will run at %v\n", time.Now().Add(time.Hour*24+1))
logp.Info("finished rotate job next will run at %v\n", time.Now().Add(time.Hour*24+1))
})
createJob.Start()

if config.Setting.DBDropDays > 0 {
dropJob := cron.New()
logp.Info("Start daily drop data table job at 03:45:00\n")
logp.Info("schedule daily drop job at 03:45:00\n")
dropJob.AddFunc("0 45 03 * * *", func() {
if err := r.DropTables(config.Setting.DBDropDays); err != nil {
logp.Err("%v", err)
}
logp.Info("Finished drop data table job next will run at %v\n", time.Now().Add(time.Hour*24+1))
logp.Info("finished drop job next will run at %v\n", time.Now().Add(time.Hour*24+1))
})
dropJob.Start()
}
Expand All @@ -247,44 +266,27 @@ func (r *Rotator) createTables() {
logp.Err("%v", err)
}
}
logp.Info("start creating tables (%v)\n", time.Now())
if err := r.CreateConfTables(0); err != nil {
logp.Err("%v", err)
}
if err := r.CreateDataTables(-1); err != nil {
logp.Err("%v", err)
}
if err := r.CreateDataTables(0); err != nil {
logp.Err("%v", err)
}
if err := r.CreateDataTables(1); err != nil {
logp.Err("%v", err)
}
logp.Info("end creating tables (%v)\n", time.Now())
if config.Setting.DBDropOnStart && config.Setting.DBDropDays != 0 {
if err := r.DropTables(config.Setting.DBDropDays); err != nil {
logp.Err("%v", err)
}
}
}

func rotatePartitions(db *dbr.Connection, query string, d, p int) {
var newStartTime, newEndTime, newPartTime string
oriQuery := query

t := time.Now().Add(time.Hour * time.Duration(24*d))
startTime := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
endTime := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())

for i := 0; i < 1440/p; i++ {
query := oriQuery

newPartTime = startTime.Add(time.Minute * time.Duration(i*p)).Format("1504")
newStartTime = startTime.Add(time.Minute * time.Duration(i*p)).Format("2006-01-02 15:04:05")
newEndTime = endTime.Add(time.Minute * time.Duration(i*p+p)).Format("2006-01-02 15:04:05")

query = strings.Replace(query, partitionTime, newPartTime, -1)
query = strings.Replace(query, partitionStartTime, newStartTime, -1)
query = strings.Replace(query, partitionEndTime, newEndTime, -1)

logp.Debug("rotator", "db query:\n%s\n\n", query)
_, err := db.Exec(query)
checkDBErr(err)
if config.Setting.DBDropDays == 0 {
logp.Warn("don't schedule daily drop job because config.Setting.DBDropDays is 0\n")
}
}

Expand Down Expand Up @@ -336,7 +338,7 @@ func setStep(name string) (step int) {
func checkDBErr(err error) {
if err != nil {
if config.Setting.DBDriver == "mysql" {
if mErr, ok := err.(*mysql.MySQLError); ok && mErr.Number != 1062 && mErr.Number != 1481 && mErr.Number != 1517 {
if mErr, ok := err.(*mysql.MySQLError); ok && mErr.Number != 1050 && mErr.Number != 1062 && mErr.Number != 1481 && mErr.Number != 1517 {
logp.Warn("%s\n\n", err)
}
} else if config.Setting.DBDriver == "postgres" {
Expand Down
Loading

0 comments on commit 885f343

Please sign in to comment.