Mergulhe no lago Delta: aplicação e evolução

Olá Habr! Apresento a você a tradução do artigo “Diving Into Delta Lake: Schema Enforcement & Evolution”, de Burak Yavuz, Brenner Heintz e Denny Lee, que foi preparado antes do início do curso de Data Engineer da OTUS.





Dados, como nossa experiência, estão constantemente se acumulando e se desenvolvendo. Para acompanhar, nossos modelos mentais do mundo devem se adaptar a novos dados, alguns dos quais contêm novas dimensões - novas maneiras de observar coisas sobre as quais não tínhamos idéia antes. Esses modelos mentais não são muito diferentes dos esquemas de tabela que determinam como classificamos e processamos novas informações.

Isso nos leva à questão do gerenciamento de circuitos. À medida que as tarefas e os requisitos de negócios mudam com o tempo, a estrutura dos seus dados muda. O Delta Lake facilita a implementação de novas medições quando os dados são alterados. Os usuários têm acesso a semânticas simples para gerenciar os layouts de suas tabelas. Essas ferramentas incluem o Schema Enforcement, que protege os usuários de obstruir inadvertidamente suas tabelas com erros ou dados desnecessários, além do Schema Evolution, que adiciona automaticamente novas colunas com dados valiosos aos locais apropriados. Neste artigo, vamos nos aprofundar no uso dessas ferramentas.

Noções básicas sobre esquemas de tabelas


Cada DataFrame no Apache Spark contém um esquema que define um formulário de dados, como tipos de dados, colunas e metadados. Usando o Delta Lake, o esquema da tabela é salvo no formato JSON dentro do log de transações.

O que é impor esquema?


A aplicação de esquema, também conhecida como validação de esquema, é um mecanismo de defesa no Delta Lake que garante a qualidade dos dados rejeitando registros que não correspondem ao esquema da tabela. Como a anfitriã na recepção de um restaurante popular, que aceita apenas mediante reserva prévia, ele verifica se cada coluna de dados inserida na tabela está na lista correspondente de colunas esperadas (em outras palavras, há uma "reserva" para cada uma delas), e rejeita todas as entradas com colunas que não estão na lista.

Como a imposição de um circuito funciona?


O Delta Lake usa uma verificação de esquema ao gravar, o que significa que todos os novos registros na tabela são verificados quanto à compatibilidade com o esquema da tabela de destino durante a gravação. Se o esquema for incompatível, o Delta Lake cancelará completamente a transação (os dados não serão gravados) e lançará uma exceção para informar o usuário sobre a discrepância.
Para determinar a compatibilidade do registro com uma tabela, o Delta Lake usa as seguintes regras. DataFrame gravável:

  • não pode conter colunas adicionais que não estão no esquema da tabela de destino. E vice-versa, tudo estará em ordem se os dados de entrada não contiverem absolutamente todas as colunas da tabela - essas colunas receberão simplesmente zero valores.
  • , . StringType, DataFrame IntegerType, .
  • não pode conter nomes de colunas que diferem apenas no caso. Isso significa que você não pode ter colunas com os nomes 'Foo' e 'foo' definidos na mesma tabela. Embora o Spark possa ser usado com distinção entre maiúsculas e minúsculas ou sem distinção entre maiúsculas e minúsculas (padrão), o Delta Lake faz distinção entre maiúsculas e minúsculas, mas sem distinção entre maiúsculas e minúsculas. O parquet diferencia maiúsculas de minúsculas ao armazenar e retornar informações da coluna. Para evitar possíveis erros, corrupção de dados ou perda de dados (que encontramos pessoalmente nos Databricks), decidimos adicionar essa restrição.

Para ilustrar isso, vamos dar uma olhada no que acontece no código abaixo ao tentar adicionar algumas colunas geradas recentemente a uma tabela do Delta Lake que ainda não está configurada para aceitá-las.

#  DataFrame ,       Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

#    DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
#    DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
#    DataFrame (  )   
loans.write.format("delta") \
           .mode("append") \
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")\'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

Em vez de adicionar automaticamente novas colunas, o Delta Lake impõe um gráfico e para a gravação. Para ajudar a determinar qual coluna (ou várias delas) é a causa da incompatibilidade, o Spark extrai os dois esquemas da pilha de pilhas para comparação.

Qual é a utilidade de impor um esquema?


Como impor um esquema é um teste bastante rigoroso, é uma excelente ferramenta para usar um conjunto de dados limpo e totalmente transformado, pronto para produção ou consumo como gatekeeper. Como regra, é aplicada a tabelas que fornecem dados diretamente:

  • Algoritmos de aprendizado de máquina
  • Painéis de BI
  • Análise de dados e ferramentas de visualização
  • Qualquer sistema de produção que exija esquemas semânticos estritamente estruturados e fortemente tipados.

Para preparar seus dados para essa barreira final, muitos usuários usam uma arquitetura "multi-hop" simples, que gradualmente traz estrutura às suas tabelas. Para saber mais sobre isso, consulte o artigo Machine Learning no nível de máquina da Delta Lake.

Obviamente, você pode usar o aplicativo forçado do esquema em qualquer lugar do seu pipeline, mas lembre-se de que a transmissão para uma tabela nesse caso pode ser frustrante, porque, por exemplo, você esqueceu que adicionou outra coluna aos dados de entrada.

Prevenção de redução de dados


Neste ponto, você pode estar se perguntando por que tanta agitação? Afinal, às vezes um erro inesperado de “incompatibilidade de esquema” pode configurá-lo no movimento no seu fluxo de trabalho, especialmente se você é novo no Delta Lake. Por que não deixar o esquema mudar conforme necessário para que eu possa escrever meu DataFrame, não importa o quê?

Como diz o velho ditado, "um grama de prevenção vale um quilo de cura". Em algum momento, se você não cuidar da aplicação do seu esquema, os problemas de compatibilidade do tipo de dados aumentarão sua cabeça nojenta - à primeira vista, fontes homogêneas de dados brutos podem conter casos limítrofes, colunas danificadas, mapeamentos malformados ou outras coisas assustadoras que sonham em pesadelos. A melhor abordagem é parar esses inimigos no portão - aplicando o esquema - e lidar com eles à luz, o mais tardar, quando eles começarem a vasculhar as profundezas escuras do seu código de trabalho.

A imposição de um esquema dá a você a confiança de que o esquema da sua tabela não será alterado, a menos que você mesmo confirme a opção de alteração. Isso evita a diluição de dados, que pode ocorrer quando novas colunas são adicionadas com tanta frequência que as tabelas compactadas anteriormente valiosas perdem seu valor e utilidade devido a uma inundação de dados. Incentivando você a ser intencional, estabelecer altos padrões e esperar alta qualidade, a imposição do esquema faz exatamente o que se destina a ajudá-lo a permanecer honesto e a manter suas mesas limpas.

Se, após uma análise mais aprofundada, você decidir que realmente precisa adicionar uma nova coluna - não há problema, abaixo está uma correção de linha única. A solução é a evolução do circuito!

O que é evolução de circuitos?


A evolução do esquema é um recurso que permite aos usuários alterar facilmente o esquema da tabela atual de acordo com os dados que são alterados ao longo do tempo. Na maioria das vezes, é usado ao executar uma operação de adição ou reescrita para adaptar automaticamente o layout para incluir uma ou mais novas colunas.

Como funciona a evolução do circuito?


Seguindo o exemplo da seção anterior, os desenvolvedores podem usar facilmente a evolução do esquema para adicionar novas colunas que foram rejeitadas anteriormente devido à incompatibilidade do esquema. A evolução do esquema é ativada adicionando .option('mergeSchema', 'true')à sua equipe do Spark..write .writeStream.

#   mergeSchema
loans.write.format("delta") \
           .option("mergeSchema", "true") \
           .mode("append") \
           .save(DELTALAKE_SILVER_PATH)

Para visualizar o gráfico, execute a seguinte consulta Spark SQL

#     ,  ,    
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

imagem
Como alternativa, você pode definir esta opção para toda a sessão do spark.databricks.delta.schema.autoMerge = TrueSpark adicionando o Spark à configuração. Mas use isso com cautela, porque a aplicação de um esquema não o alertará mais sobre inconsistências inadvertidas com o esquema.

Ao incluir um parâmetro na consulta mergeSchema, todas as colunas presentes no DataFrame, mas que não estão na tabela de destino, são automaticamente adicionadas ao final do esquema como parte da transação de gravação. Campos aninhados também podem ser adicionados e também serão adicionados ao final das colunas da estrutura correspondentes.

Os engenheiros e cientistas da data podem usar essa opção para adicionar novas colunas (talvez uma métrica rastreada recentemente ou uma coluna de métricas de vendas este mês) às tabelas de produção de aprendizado de máquina existentes sem quebrar os modelos existentes com base em colunas antigas.

Os seguintes tipos de alterações de esquema são permitidos como parte da evolução do esquema ao adicionar ou reescrever uma tabela:

  • Adicionando novas colunas (este é o cenário mais comum)
  • Alterar tipos de dados de NullType -> qualquer outro tipo ou aumentar de ByteType -> ShortType -> IntegerType

Outras alterações que são inaceitáveis ​​como parte da evolução de um esquema exigem que o esquema e os dados sejam substituídos pela adição .option("overwriteSchema", "true"). Por exemplo, no caso em que a coluna “Foo” era originalmente um número inteiro, e o novo esquema fosse um tipo de dados string, todos os arquivos Parquet (dados) precisariam ser reescritos. Essas mudanças incluem:

  • excluir coluna
  • alterar o tipo de dados de uma coluna existente (no local)
  • renomear colunas que diferenciam maiúsculas de minúsculas (por exemplo, "Foo" e "foo")

Finalmente, com a próxima versão do Spark 3.0, o DDL explícito (usando ALTER TABLE) será totalmente suportado, o que permitirá aos usuários executar as seguintes ações nos esquemas de tabela:

  • adicionando colunas
  • alterar comentários da coluna
  • definindo propriedades da tabela que determinam o comportamento da tabela, por exemplo, definindo a duração do armazenamento do log de transações.

Qual é a utilidade da evolução do circuito?


Você sempre pode usar a evolução do esquema quando pretende alterar o esquema da sua tabela (ao contrário dos casos em que acidentalmente adiciona colunas ao DataFrame que não deveriam estar lá). Essa é a maneira mais fácil de migrar seu esquema, pois adiciona automaticamente os nomes de colunas e tipos de dados corretos, sem a necessidade de declará-los explicitamente.

Conclusão


Forçar um esquema rejeita quaisquer novas colunas ou outras alterações de esquema que não sejam compatíveis com sua tabela. Ao definir e manter esses altos padrões, os analistas e engenheiros podem confiar no fato de que seus dados têm o mais alto nível de integridade, raciocinando com clareza e clareza, permitindo que eles tomem decisões comerciais mais eficazes.

Por outro lado, a evolução do circuito complementa a aplicação, simplificando as supostas alterações automáticas no circuito. No final, não deve ser difícil adicionar uma coluna.

A aplicação forçada do circuito é yang, onde a evolução do circuito é yin. Quando usados ​​em conjunto, esses recursos facilitam mais do que nunca a redução de ruído e a sintonia de sinal.

Também gostaríamos de agradecer a Mukul Murti e Pranava Ananda por suas contribuições para este artigo.

Outros artigos da série:

Mergulhe no Delta Lake: Descompactando um log de transações



Artigos relacionados


Aprendizado de máquina no nível de produção Delta Lake

O que é um data lake?



Saiba mais sobre o curso



All Articles