目录
介绍
环形缓冲区代码
初始化
一个真实的例子
一个简单的页面
单元测试
Q测试中的事件
两个连续Q检验中的事件
清除Frame测试
结论
在本文中,您将了解一个环形缓冲区,该缓冲区跟踪一个时间帧内的事件计数。这是量化事件以减少内存使用并创建所需持续时间的样本帧大小的简单但有用的方法。
- 下载源代码3.4 MB
我需要一个环形缓冲区来跟踪一个时间帧(例如一分钟)内的事件计数。由于帧中可能有数以万计的事件,因此理想的是将计数量化为帧的某些样本大小。例如,如果帧是一分钟,则对每秒计数进行量化意味着环形缓冲区仅需要管理60个样本。作为环形缓冲区,帧外的所有样本都将被丢弃并添加新的样本——这意味着如果帧为1分钟,则在1分钟后记录的所有样本将归零。出于我们的目的,将保留总样本计数,即在任何给定时间点帧内量化计数的总和。
上面的屏幕快照显示了事件计数快照的条形图,在这种情况下,来自不同客户端的Web API请求——水平条是每分钟的请求数,垂直轴(已删除名称)是客户端的名称请求。
环形缓冲区代码整体而言:
using System;
using System.Collections.Generic;
using System.Linq;
namespace QuantizedTemporalFrameRingBuffer
{
public class QBuffer
{
protected int runningCount = 0;
protected int[] qFrameCounts;
protected int sampleSize;
protected Func calcQ;
protected Func resetPastSamples;
protected DateTime lastEvent;
///
/// Get the frame as copy of the internal frame, mainly for unit tests.
///
public int[] Frame => qFrameCounts.ToArray();
public int GetRunningCount()
{
lock (locker)
{
return runningCount;
}
}
// Used to allow Capture to be run on a separate thread from CountEvent.
// Particularly useful when the counting HTTP events and
// separately monitoring the sample set.
protected object locker = new object();
public QBuffer(int frameSize, int quantization,
Func calcQ, Func resetPastSamples)
{
Assertion.That(frameSize > 0, "frameSize cannot be negative or 0.");
Assertion.That(quantization > 0, "quantization cannot be negative or 0.");
Assertion.That(frameSize % quantization == 0,
"frameSize must be divisible by quantization without a remainder.");
Assertion.NotNull(calcQ, "calculation of Q cannot be null.");
Assertion.NotNull(resetPastSamples, "reset of past samples cannot be null.");
lastEvent = DateTime.Now;
sampleSize = frameSize / quantization;
qFrameCounts = new int[sampleSize];
this.calcQ = calcQ;
this.resetPastSamples = resetPastSamples;
}
public void Reset(DateTime dt)
{
int q = calcQ(dt);
int resetCount = Math.Min(sampleSize, resetPastSamples(lastEvent, dt));
// We only reset up to the sample size.
// This handles situations where the time elapsed between events
// is greater than the frame size for the specified quantization.
// We effectively drop off the last n quantized samples,
// where n is the quantized frame size.
// We limit this to the sample size, in case the current event
// occurs at some point > frame size.
// At all times, the "past samples" are the very next samples.
while (resetCount > 0)
{
int pastQ = (q + resetCount) % sampleSize;
runningCount -= qFrameCounts[pastQ];
qFrameCounts[pastQ] = 0;
--resetCount;
}
}
public void CountEvent(DateTime dt)
{
lock (locker)
{
int q = calcQ(dt);
++runningCount;
++qFrameCounts[q];
lastEvent = dt;
}
}
public (int total, List samples) Capture()
{
lock (locker)
{
var ret = (runningCount, qFrameCounts.ToList());
return ret;
}
}
}
}
初始化
该用法要求定义一个回调,用于确定量化索引(Q)并确定有多少过去事件不在环形缓冲区量化的样本大小范围内。一个简单的例子是一个60秒的环形缓冲区,具有1秒的量化:
using System;
namespace QuantizedTemporalFrameRingBuffer
{
class Program
{
static void Main(string[] args)
{
// Create a 1 minute ring buffer quantized at one second.
Buffer buffer = new Buffer(60, 1, CalcQ, ResetQ);
}
static int CalcQ(DateTime dt)
{
// This is easy, because we're sampling one for 1 minute and quantized at one second,
// so Q is by its very nature simply the Second within the time component.
return dt.Second;
}
static int ResetQ(DateTime lastEvent, DateTime now)
{
// Again, straightforward because we're quantizing per second.
return (int)(now - lastEvent).TotalSeconds;
}
}
}
进行回调的原因是环形缓冲区不需要知道如何计算量化指数以及需要重置的“插槽”数量。例如,您可能需要一个环形缓冲区来跟踪24小时内的事件,并在一小时进行量化。构造函数的前两个参数:
public Buffer(int frameSize, int quantization,
与时间“单位”不相关的是帧大小和期望的量化。用于计算的回调Q和需要重置的插槽数确定所需的用法。
在我将其连接到的WCF应用程序中,每分钟跟踪客户端的Web请求是这样的:
public static Dictionary buffers = new Dictionary();
...
QBuffer qbuffer;
if (!buffers.TryGetValue(client, out qbuffer))
{
qbuffer = new QBuffer(60, 1, dt => dt.Second,
(lastEvent, eventTime) => (int)(eventTime - lastEvent).TotalSeconds);
buffers[tenant] = qbuffer;
}
DateTime now = DateTime.Now;
qbuffer.Reset(now);
qbuffer.CountEvent(now);
Reset调用一开始可能看起来很奇怪。这样做是将帧外的事件插槽清零,在这种情况下为一分钟。它执行回调:
(lastEvent, eventTime) => (int)(eventTime - lastEvent).TotalSeconds
从而:
- 如果我们仍在当前位置(上一个事件与当前事件发生在同一秒之内),则没有任何动静。
- 如果经过的秒数大于0,则这些插槽将归零:
while (resetCount > 0)
{
int pastQ = (q + resetCount) % sampleSize;
runningCount -= qFrameCounts[pastQ];
qFrameCounts[pastQ] = 0;
--resetCount;
}
并且无论每个插槽中的事件计数如何,运行计数都会减少。
类似地,在检索事件的当前帧时,“当前”表示在请求获取事件时。因此,在用于检索事件的API端点中——请记住,这是我正在使用的WCF应用程序。
public class ClientHitCount
{
public string Client { get; set; }
public int Count { get; set; }
}
public Stream GetQBufferRunningCounts()
{
var clients = GetClientList();
var samples = clients.Select(c =>
{
var buffer = buffers[c];
buffer.Reset(DateTime.Now);
return new ClientHitCount() { Client = c, Count = buffers[c].GetRunningCount() };
});
return Response.AsJson(samples);
}
同样,请注意Reset首先被调用。假设最后一个事件发生在两分钟前——当进行调用以获取帧中发生的事件数时,我们两分钟“为时已晚”,因此该Reset调用的目的是同步环形缓冲区与当前时间帧。
网页上的非常简单的渲染可以通过AnyChart完成:
Events
function getData(callback) {
var xhttp = new XMLHttpRequest();
xhttp.onreadystatechange = function() {
if (this.readyState == 4 && this.status == 200) {
callback(this.responseText);
}
};
xhttp.open("GET", "[your endpoint]", true);
xhttp.send();
}
function updateChart(chart, strJson) {
console.log();
let json = JSON.parse(strJson);
let fjson = json.filter(r => r.Count > 0);
// let fjson = json; // use this line to include 0 counts
let data = {rows: fjson.map(r => [r.Client, r.Count])};
chart.data(data);
chart.draw();
}
anychart.onDocumentReady(function() {
var chart = anychart.bar();
chart.container('container');
chart.barsPadding(10);
setInterval(() => {
let data = getData(json => updateChart(chart, json));
}, 1000);
});
单元测试
一些有用的单元测试。
[TestMethod]
public void EventsInOneQIndexTest()
{
QBuffer buffer = new QBuffer(60, 1, dt => dt.Second,
(lastEvent, currentEvent) => (int)(currentEvent - lastEvent).TotalSeconds);
DateTime now = DateTime.Now;
buffer.Reset(now);
buffer.CountEvent(now);
buffer.Reset(now);
buffer.CountEvent(now);
buffer.Reset(now);
buffer.CountEvent(now);
// Note that the count is in the current "second" slot.
int index = now.Second;
Assert.IsTrue(buffer.Frame[index] == 3, "Expected a count of 3.");
Assert.IsTrue(buffer.GetRunningCount() == 3, "Expected a count of 3.");
}
两个连续Q检验中的事件
[TestMethod]
public void EventsInTwoQIndicesTest()
{
QBuffer buffer = new QBuffer(60, 1, dt => dt.Second,
(lastEvent, currentEvent) => (int)(currentEvent - lastEvent).TotalSeconds);
DateTime now = DateTime.Now;
DateTime next = now.AddSeconds(1);
buffer.Reset(now);
buffer.CountEvent(now);
buffer.Reset(next);
buffer.CountEvent(next);
buffer.Reset(next);
buffer.CountEvent(next);
int index = now.Second;
Assert.IsTrue(buffer.Frame[index] == 1, "Expected a count of 1.");
Assert.IsTrue(buffer.Frame[index + 1] == 2, "Expected a count of 2.");
Assert.IsTrue(buffer.GetRunningCount() == 3, "Expected a count of 3.");
}
清除Frame测试
[TestMethod]
public void FrameClearedTest()
{
QBuffer buffer = new QBuffer(60, 1, dt => dt.Second,
(lastEvent, currentEvent) => (int)(currentEvent - lastEvent).TotalSeconds);
DateTime now = DateTime.Now;
DateTime next = now.AddSeconds(1);
DateTime frameNext = next.AddSeconds(60);
buffer.Reset(now);
buffer.CountEvent(now); // 1 at now
buffer.Reset(next);
buffer.CountEvent(next); // 2 at now + 1
buffer.Reset(next);
buffer.CountEvent(next);
buffer.Reset(frameNext);
buffer.CountEvent(frameNext); // 3 at now + 61
buffer.Reset(frameNext);
buffer.CountEvent(frameNext);
buffer.Reset(frameNext);
buffer.CountEvent(frameNext);
int index = frameNext.Second;
Assert.IsTrue(buffer.Frame[index] == 3, "Expected a count of 3.");
Assert.IsTrue(buffer.GetRunningCount() == 3, "Expected a count of 3.");
}
结论
在这里无需多说——这是量化事件以减少内存使用并创建所需持续时间的样本帧大小的简单但有用的方法。