Detector avançado de anomalias de séries temporais no Microsoft Fabric
O Anomaly Detector , um dos serviços de IA do Azure, permite que você monitore e detecte anomalias em seus dados de séries temporais. Este serviço é baseado em algoritmos avançados, SR-CNN para análise univariada e MTAD-GAT para análise multivariada e será descontinuado em outubro de 2026. Nesta postagem do blog, apresentaremos uma estratégia de migração para o Microsoft Fabric, permitindo que você detecte anomalias idênticas, usando os mesmos algoritmos do serviço antigo e muito mais.
Aqui estão alguns dos benefícios da estratégia que estamos prestes a apresentar para você:
- Gerenciamento mais fácil do ciclo de vida dos modelos treinados usando o Fabric ML.
- Não há necessidade de enviar seus dados para uma conta de armazenamento externo, basta transmitir seus dados para o Fabric Eventhouse com disponibilidade do OneLake e você pode usá-los para treinamento e pontuação.
- Você pode usar seus dados por qualquer mecanismo Fabric (KQL DB, Fabric ML Notebook, PBI e mais)
- Os algoritmos são de código aberto e publicados pelo novo pacote time-series-anomaly-detector · PyPI , portanto você pode revisá-los e ajustá-los conforme necessário.
Detecção de anomalias de séries temporais em RTI de tecido
Existem poucas opções para detecção de anomalias de séries temporais no Fabric RTI (Real Time Intelligence):
- Para análise univariada , o KQL contém a função nativa series_decompose_anomalies() que pode executar detecção de anomalias em milhares de séries temporais em segundos. Para mais informações sobre o uso desta função, dê uma olhada em Detecção e previsão de anomalias de séries temporais no Azure Data Explorer .
- Para análise multivariada , há poucas funções de biblioteca KQL que aproveitam algoritmos de análise multivariada conhecidos no scikit-learn , aproveitando a capacidade do ADX para executar Python inline como parte da consulta KQL . Para mais informações, consulte Detecção de anomalias multivariadas no Azure Data Explorer – Microsoft Community Hub .
- Para análises univariadas e multivariadas, agora você pode usar o novo fluxo de trabalho, que é baseado no pacote time-series-anomaly-detector, conforme descrito abaixo.
Usando detector de anomalias de séries temporais no Fabric
No exemplo a seguir, iremos
- Carregar tabela de alterações de ações para o Fabric
- Treine o modelo de detecção de anomalias multivariadas em um notebook Python usando o mecanismo Spark
- Prever anomalias aplicando o modelo treinado a novos dados usando o mecanismo Eventhouse (Kusto)
Observe que, para o modelo univariado, não há necessidade de treinar o modelo em uma etapa separada (pois o treinamento é rápido e feito internamente) e podemos apenas prever.
Abaixo apresentamos brevemente as etapas, veja Detecção de anomalias multivariadas – Microsoft Fabric | Microsoft Learn para o tutorial detalhado.
Criando os ambientes
- Criar um espaço de trabalho
- Criar Eventhouse – para armazenar os dados de streaming recebidos
- Habilitar a disponibilidade do OneLake – para que os dados mais antigos que foram ingeridos no Eventhouse possam ser acessados perfeitamente pelo Spark Notebook para treinar o modelo de detecção de anomalias
- Habilitar plugin KQL Python – para ser usado para previsões em tempo real de anomalias nos novos dados de streaming. Selecione a imagem 3.11.7 DL que contém o pacote time-series-anomaly-detector
- Crie um ambiente Spark que inclua o pacote time-series-anomaly-detector
Treinamento e armazenamento do modelo de detecção de anomalias
- Carregue os dados de ações para o Eventhouse
- Crie um notebook para treinar o modelo
- Carregue os dados do Eventhouse usando o caminho OneLake:
onelake_uri = "OneLakeTableURI" # Replace with your OneLake table URI
abfss_uri = convert_onelake_to_abfss(onelake_uri)
df = spark.read.format('delta').load(abfss_uri)
df = df.toPandas().set_index('Date')
- Veja os dados:
import plotly.graph_objects as go
fig = go.Figure()
fig.add_trace(go.Scatter(x=df.index, y=df['AAPL'], mode='lines', name='AAPL'))
fig.add_trace(go.Scatter(x=df.index, y=df['AMZN'], mode='lines', name='AMZN'))
fig.add_trace(go.Scatter(x=df.index, y=df['GOOG'], mode='lines', name='GOOG'))
fig.add_trace(go.Scatter(x=df.index, y=df['MSFT'], mode='lines', name='MSFT'))
fig.add_trace(go.Scatter(x=df.index, y=df['SPY'], mode='lines', name='SPY'))
fig.update_layout(
title='Stock Prices change',
xaxis_title='Date',
yaxis_title='Change %',
legend_title='Tickers'
)
fig.show()
- Prepare os dados para treinamento:
features_cols = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY']
cutoff_date = pd.to_datetime('2023-01-01')
train_df = df[df.Date < cutoff_date]
- Treine o modelo:
import mlflow
from anomaly_detector import MultivariateAnomalyDetector
model = MultivariateAnomalyDetector()
sliding_window = 200
param s = {"sliding_window": sliding_window}
model.fit(train_df, params=params)
- Salvar o modelo no registro do modelo Fabric ML
with mlflow.start_run():
mlflow.log_params(params)
mlflow.set_tag("Training Info", "MVAD on 5 Stocks Dataset")
model_info = mlflow.pyfunc.log_model(
python_model=model,
artifact_path="mvad_artifacts",
registered_model_name="mvad_5_stocks_model",
)
- Extraia o caminho do modo (a ser usado pelo Eventhouse para a previsão):
mi = mlflow.search_registered_models(filter_string="name='mvad_5_stocks_model'")[0]
model_abfss = mi.latest_versions[0].source
print(model_abfss)
- Crie um conjunto de consultas e anexe o Eventhouse a ele
- Execute a consulta ‘.create-or-alter function’ para definir a função armazenada predict_fabric_mvad_fl():
.create-or-alter function with (folder = "Packages\\ML", docstring = "Predict MVAD model in Microsoft Fabric")
predict_fabric_mvad_fl(samples:(*), features_cols:dynamic, artifacts_uri:string, trim_result:bool=false)
{
let s = artifacts_uri;
let artifacts = bag_pack('MLmodel', strcat(s, '/MLmodel;impersonate'), 'conda.yaml', strcat(s, '/conda.yaml;impersonate'),
'requirements.txt', strcat(s, '/requirements.txt;impersonate'), 'python_env.yaml', strcat(s, '/python_env.yaml;impersonate'),
'python_model.pkl', strcat(s, '/python_model.pkl;impersonate'));
let kwargs = bag_pack('features_cols', features_cols, 'trim_result', trim_result);
let code = ```if 1:
import os
import shutil
import mlflow
model_dir = 'C:/Temp/mvad_model'
model_data_dir = model_dir + '/data'
os.mkdir(model_dir)
shutil.move('C:/Temp/MLmodel', model_dir)
shutil.move('C:/Temp/conda.yaml', model_dir)
shutil.move('C:/Temp/requirements.txt', model_dir)
shutil.move('C:/Temp/python_env.yaml', model_dir)
shutil.move('C:/Temp/python_model.pkl', model_dir)
features_cols = kargs["features_cols"]
trim_result = kargs["trim_result"]
test_data = df[features_cols]
model = mlflow.pyfunc.load_model(model_dir)
predictions = model.predict(test_data)
predict_result = pd.DataFrame(predictions)
samples_offset = len(df) - len(predict_result) # this model doesn't output predictions for the first sliding_window-1 samples
if trim_result: # trim the prefix samples
result = df[samples_offset:]
result.iloc[:,-4:] = predict_result.iloc[:, 1:] # no need to copy 1st column which is the timestamp index
else:
result = df # output all samples
result.iloc[samples_offset:,-4:] = predict_result.iloc[:, 1:]
```;
samples
| evaluate python(typeof(*), code, kwargs, external_artifacts=artifacts)
}
- Execute a consulta de previsão que detectará anomalias multivariadas nas 5 ações, com base no modelo treinado, e renderize-a como anomalychart . Observe que os pontos anômalos são renderizados na primeira ação (AAPL), embora representem anomalias multivariadas, ou seja, anomalias do vetor das 5 ações na data específica.
let cutoff_date=datetime(2023-01-01);
let num_predictions=toscalar(demo_stocks_change | where Date >= cutoff_date | count); // number of latest points to predict
let sliding_window=200; // should match the window that was set for model training
let prefix_score_len = sliding_window/2+min_of(sliding_window/2, 200)-1;
let num_samples = prefix_score_len + num_predictions;
demo_stocks_change
| top num_samples by Date desc
| order by Date asc
| extend is_anomaly=bool(false), score=real(null), severity=real(null), interpretation=dynamic(null)
| invoke predict_fabric_mvad_fl(pack_array('AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'),
// NOTE: Update artifacts_uri to model path
artifacts_uri='enter your model URI here',
trim_result=true)
| summarize Date=make_list(Date), AAPL=make_list(AAPL), AMZN=make_list(AMZN), GOOG=make_list(GOOG), MSFT=make_list(MSFT), SPY=make_list(SPY), anomaly=make_list(toint(is_anomaly))
| render anomalychart with(anomalycolumns=anomaly, title='Stock Price Changest in % with Anomalies')
Resumo
A adição do pacote time-series-anomaly-detector ao Fabric o torna a plataforma principal para detecção de anomalias de séries temporais univariadas e multivariadas. Escolha o método de detecção de anomalias que melhor se adapta ao seu cenário – desde a função KQL nativa para análise univariada em escala, passando por técnicas de análise multivariada padrão e até os melhores algoritmos de detecção de anomalias de séries temporais implementados no pacote time-series-anomaly-detector. Para obter mais informações, consulte a visão geral e o tutorial .
Arbit: 25 anos transformando tecnologia em valor
Pode não ser parecer fácil gerar valor para seus dados, mas a Arbit, pode ajudá-lo. Há 25 anos atuando com inteligência de dados, a Arbit possui especialistas para implementar as melhores soluções ao seu ambiente de negócios. Fale conosco agora mesmo
Fonte: Microsoft