Le format libre Parquet présente certaines caractéristiques intéressantes pour le stockage de données optimisé (notamment avec la compression ZSTD et le chiffrement). Beaucoup de langages et de solutions comme Spark prennent en charge ces fichiers mais pas toutes. Pour rendre les données qu'ils contiennent disponibles à d'autres plateforme, il peut être necéssaire de charger ces données dans un SGBD standard qui sera intérrogé via SQL.
Nous utiliserons ici Python et PostgreSQL avec les librairies Psycopg
(Psycopg2 via SQLAlchemy) et pyArrow. La technique consiste à utiliser
le fonction COPY de PostgreSQL qui utilise le format
CSV.
Avec Psycopg2 via SQLAlchemy
SQLAlchemy ne support que Psycopg dans sa version 2 pour le moment. La table de destination "importparquet" doit être créée au préalable avec les mêmes colonnes que le fichier Parquet.
import sqlalchemy, pyarrow.parquet, pyarrow.csv, io
# Connection à PostgreSQL
engine = sqlalchemy.create_engine("postgresql://login:mot2passe@serveur:5432/Base")
connection = engine.raw_connection()
# Lecture du fichier Parquet et conversion en CSV dans un tampon
buf = io.BytesIO()
table = pyarrow.parquet.read_table('fichier01.parquet')
pyarrow.csv.write_csv(table, buf, pyarrow.csv.WriteOptions(include_header=False, delimiter=','))
buf.seek(0)
# Lancement de COPY
cursor = connection.cursor()
cursor.copy_expert("""COPY "importparquet" ({cols}) FROM stdin WITH (FORMAT CSV)""".format(cols=','.join(table.column_names)), buf)
connection.commit()
# Fermeture des tampons et de la connexion
buf.close()
cursor.close()
connection.close()
engine.dispose()Avec Psycopg3
import psycopg, pyarrow.parquet, pyarrow.csv, io
connection = psycopg.connect(host='serveur', port='5432', dbname='base', user='postgres', password='postgres')
buf = io.BytesIO()
table = pyarrow.parquet.read_table('fichier01.parquet')
pyarrow.csv.write_csv(table, buf, pyarrow.csv.WriteOptions(include_header=False, delimiter=','))
buf.seek(0)
with connection.cursor().copy("""COPY "importparquet" ({cols}) FROM STDIN WITH (FORMAT CSV)""".format(cols=','.join(table.column_names))) as copy:
copy.write(buf.getvalue())
connection.commit()
connection.close()
buf.close()Modifier les données depuis le tampon
Le caractère nul \0 qui peut être stocké dans un fichier
Parquet ne peut l'être dans PostgreSQL. Une ligne vide contenant
uniquement le caractère \n provoquera une erreur lors de la
copie. Pour filtrer ces caractères, il suffit de copier le contenu de
buf vers un nouveau tampon buf_clean qui sera
utilisé avec COPY. Cette technique n'est pas nécessairement
optimale car elle duplique les données en mémoire.
buf_clean = io.BytesIO()
buf_clean.seek(0)
for line in buf:
if b'\0' in line:
buf_clean.write(line.replace(b'\0', b''))
elif line != b'\n':
buf_clean.write(line)
buf.close()
buf_clean.seek(0)