Skip to content

Commit

Permalink
Merge pull request #65 from github/caol-ila-constinuous-export
Browse files Browse the repository at this point in the history
Add continuous exports
  • Loading branch information
caol-ila authored Mar 11, 2024
2 parents 0d3e211 + 3964f68 commit df8912f
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 2 deletions.
34 changes: 32 additions & 2 deletions KustoSchemaTools/Changes/DatabaseChanges.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,38 @@ public static List<IChange> GenerateChanges(Database oldState, Database newState
var changes = tmp.Where(itm => itm.Scripts.Any()).ToList();
if (changes.Any())
{
log.LogInformation($"Detected changes for MaterializedViews: {changes.Count} changes with {changes.SelectMany(itm => itm.Scripts).Count()} scripts");
result.Add(new Heading("MaterializedViews"));
log.LogInformation($"Detected changes for Materialized Views: {changes.Count} changes with {changes.SelectMany(itm => itm.Scripts).Count()} scripts");
result.Add(new Heading("Materialized Views"));
result.AddRange(changes);
}
}

if (newState.ContinuousExports.Any())
{
var tmp = new List<IChange>();
var existingContinuousExports = oldState?.ContinuousExports ?? new Dictionary<string, ContinuousExport>();
log.LogInformation($"Existing materialized views: {string.Join(", ", existingContinuousExports.Keys)}");

foreach (var view in newState.ContinuousExports)
{
if (existingContinuousExports.ContainsKey(view.Key))
{
var change = new ScriptCompareChange(view.Key, existingContinuousExports[view.Key], view.Value);
log.LogInformation($"Continuous Exports {view.Key} exists, created {change.Scripts.Count} script to apply the diffs");
tmp.Add(change);
}
else
{
var change = new ScriptCompareChange(view.Key, null, view.Value);
log.LogInformation($"Continuous Exports {view.Key} doesn't exist, created {change.Scripts.Count} scripts to create the view");
tmp.Add(change);
}
}
var changes = tmp.Where(itm => itm.Scripts.Any()).ToList();
if (changes.Any())
{
log.LogInformation($"Detected changes for Continuous Exports: {changes.Count} changes with {changes.SelectMany(itm => itm.Scripts).Count()} scripts");
result.Add(new Heading("Continuous Exports "));
result.AddRange(changes);
}
}
Expand Down
25 changes: 25 additions & 0 deletions KustoSchemaTools/Model/ContinuousExport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using KustoSchemaTools.Changes;
using YamlDotNet.Serialization;

namespace KustoSchemaTools.Model
{
public class ContinuousExport : IKustoBaseEntity
{
public string ExternalTable { get; set; }
public int ForcedLatencyInMinutes { get; set; }
public int IntervalBetweenRuns { get; set; }
public long SizeLimit { get; set; }
public bool Distributed { get; set; }
public string ManagedIdentity { get; set; }
[YamlMember(ScalarStyle = YamlDotNet.Core.ScalarStyle.Literal)]
public string Query { get; set; }

public List<DatabaseScriptContainer> CreateScripts(string name)
{
return new List<DatabaseScriptContainer>
{
new DatabaseScriptContainer("ContinuousExport",120,@$".create-or-alter continuous-export {name} to table {ExternalTable} with (forcedLatency={ForcedLatencyInMinutes}m, intervalBetweenRuns={ForcedLatencyInMinutes}m, sizeLimit={SizeLimit}, distributed={Distributed}, managedIdentity='{ManagedIdentity}') <| {Query}")
};
}
}
}
1 change: 1 addition & 0 deletions KustoSchemaTools/Model/Database.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class Database
public Dictionary<string, MaterializedView> MaterializedViews { get; set; } = new Dictionary<string, MaterializedView>();

public Dictionary<string, Function> Functions { get; set; } = new Dictionary<string, Function>();
public Dictionary<string, ContinuousExport> ContinuousExports { get; set; } = new Dictionary<string, ContinuousExport>();

public List<DatabaseScript> Scripts { get; set; } = new List<DatabaseScript>();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using KustoSchemaTools.Model;

namespace KustoSchemaTools.Parser.KustoLoader
{
public class KustoContinuousExportBulkLoader : KustoBulkEntityLoader<ContinuousExport>
{
const string LoadContinuousExports = @".show database hydro cslschema script
| parse-where DatabaseSchemaScript with '.create-or-alter continuous-export ' EntityName:string ' to table ' ExternalTable:string ' with (forcedLatency=time(' ForcedLatency:timespan '), intervalBetweenRuns=time('IntervalBetweenRuns:timespan '), sizeLimit='SizeLimit:long', distributed='Distributed:bool', managedIdentity='ManagedIdentity:string') <| ' Query:string
| project EntityName=trim("" "",EntityName), Body = bag_pack(
'ExternalTable', ExternalTable,
'ForcedLatencyInMinutes',toint(ForcedLatency /1m),
'IntervalBetweenRuns',toint(IntervalBetweenRuns /1m),
'SizeLimit',SizeLimit,
'Distributed', Distributed,
'ManagedIdentity',ManagedIdentity,
'Query',Query)";

public KustoContinuousExportBulkLoader() : base(d => d.ContinuousExports) { }

protected override IEnumerable<string> EnumerateScripts()
{
yield return LoadContinuousExports;
}
}
}
11 changes: 11 additions & 0 deletions KustoSchemaTools/Plugins/ContinuousExportPlugin.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using KustoSchemaTools.Model;

namespace KustoSchemaTools.Plugins
{
public class ContinuousExportPlugin : EntityPlugin<ContinuousExport>
{
public ContinuousExportPlugin(string subFolder = "continuous-exports", int minRowLength = 5) : base(db => db.ContinuousExports, subFolder, minRowLength)
{
}
}
}

0 comments on commit df8912f

Please sign in to comment.