{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Concepto de *data pipeline*\n", "\n", "Una tubería de datos es una secuencia de acciones representadas por módulos. Cada módulo tiene un objetivo único donde transforma la información de entrada en otra de salida que a su vez puede ser la entrada a uno o más módulos. Y al mismo tiempo, un módulo puede englobar un pipeline.\n", "\n", "Las tuberías son flexibles pues permiten extraer y recolocar módulos. A su vez, los módulos se pueden escalar o ejecutar paralalemente. Cada módulo puede tener unos requisitos independientes de librerías, de recursos computacionales, de seguridad, de coste, etc. \n", "\n", "El diseño de un *data pipeline* es un enfoque multidisciplinar para afrontar problemáticas del análisis de datos en los sistemas de producción. Un diseño correcto de pipeline proporciona agilidad y flexibilidad para el tratamiento de datos. En definitiva, afecta a recursos y características como: tiempo, coste, despliegue, mantenimiento, recursos humanos, adaptatibilidad, seguridad, reproducilibidad, entre otras.\n", "\n", "Existen herramientas que permiten gestionar el diseño y la ejecución de estas tuberías. Además, estas herramientas permiten detectar y notificar inciedencias con la aparición de anómalias, errores o detenciones de módulos. \n", "\n", "Por lo *general* - y en un mundo de competencias bien delimitadas-, un **ingenier@ de datos** suele diseñar este tipo de procedimientos. Conoce formatos de datos, transformaciones necesarias, estructuras y modelos de comunicación para intercambiar datos entre diferentes almacenajes. Un **científic@ de datos** extrae los datos y los analiza. Ha de conocer el pipeline creado por los ingeniero de datos para incluir procedimientos y readaptar el propio pipeline." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Ejemplos\n", "\n", " Fuente [1]\n", "\n", " Fuente Google" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Recursos por categoría\n", "Lenguajes de programación:\n", " - Python\n", " - Java\n", " - Javascript\n", " - Scala\n", "\n", "Base de datos:\n", "- *Lenguaje SQL\n", "- Relacionales: MySQL, PostgreSQL, Oracle, Microsoft SQL Server\n", "- No relacionales: Mongodb, Redis, Apache Lucene, Big Table (Google), Apacha Cassandra, Elastic Search, ...\n", "\n", "Infraestructura\n", "- Centralizada\n", "- Distribuida mediante Clusters: Apache Spark, Apache Kafka\n", "\n", "Data pipelines:\n", "- Necesitan un planificador (scheduler): crontab, Apache Airflow, Apache Nifi" ] }, { "cell_type": "markdown", "metadata": {}, "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Actividad\n", "\n", "Un primer \"pipeline\"\n", "\n", "**Vamos a capturar datos financieros del NASDAQ.**\n", "- Herramientas: Nasdaq, Yahoo finance, Google finance, Thomsom Reuters\n", "\n", "Pasos:\n", "- Registrarse: https://data.nasdaq.com/ \n", "- Obtener una key para poder acceder al API: *¿Key?¿Api?¿Cuota gratuita?* > https://data.nasdaq.com/search?query=tesla
\n", " \n", "\n", "- Instalar librería especifica de python: https://data.nasdaq.com/tools/python\n", " ```bash\n", " pip install nasdaq-data-link\n", " ```\n", "
\n", "\n", "- Ya tenemos un primer ejemplo:
\n", " \n", " ```python\n", " import nasdaqdatalink as ndl\n", " ndl.read_key(\"tufichero.env\") # ATENCION: guardar siempre el key fuera del código!\n", " # Una manera de guardar tokens y keys: https://pypi.org/project/python-dotenv/ \n", " timeseries_data = ndl.get(\"WIKI/GOOGL\", collapse=\"monthly\")\n", " ```\n", "\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Requirement already satisfied: nasdaq-data-link in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (1.0.4)\n", "Requirement already satisfied: python-dateutil in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (2.8.2)\n", "Requirement already satisfied: pandas>=0.14 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (1.4.2)\n", "Requirement already satisfied: requests>=2.7.0 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (2.28.0)\n", "Requirement already satisfied: numpy>=1.8 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (1.22.3)\n", "Requirement already satisfied: inflection>=0.3.1 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (0.5.1)\n", "Requirement already satisfied: six in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (1.16.0)\n", "Requirement already satisfied: more-itertools in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (8.14.0)\n", "Requirement already satisfied: pytz>=2020.1 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from pandas>=0.14->nasdaq-data-link) (2022.1)\n", "Requirement already satisfied: urllib3<1.27,>=1.21.1 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from requests>=2.7.0->nasdaq-data-link) (1.26.9)\n", "Requirement already satisfied: charset-normalizer~=2.0.0 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from requests>=2.7.0->nasdaq-data-link) (2.0.12)\n", "Requirement already satisfied: idna<4,>=2.5 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from requests>=2.7.0->nasdaq-data-link) (3.3)\n", "Requirement already satisfied: certifi>=2017.4.17 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from requests>=2.7.0->nasdaq-data-link) (2022.6.15)\n", "\n", "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip available: \u001b[0m\u001b[31;49m22.3\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m22.3.1\u001b[0m\n", "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n", "Note: you may need to restart the kernel to use updated packages.\n" ] } ], "source": [ "%pip install nasdaq-data-link" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " Value\n", "Date \n", "1986-01-06 26.53\n", "\n", "26.53\n" ] } ], "source": [ "\n", "\n", "import nasdaqdatalink as ndl # Note for Isaac: run on multipass master \n", "\n", "ndl.read_key(\"K4yp4pkfSEyhTG8Hi-hm\")\n", "\n", "# VALOR : gets the WTI Crude Oil price from the US Department of Energy: EIA/PET_RWTC_D\n", "\n", "#¿Cómo obtener el último valor?\n", "try:\n", " \n", " mydata = ndl.get(\"EIA/PET_RWTC_D\",start_date=\"1986-01-05\",end_date=\"1986-01-06\")\n", " print(mydata)\n", " print(type(mydata))\n", " print(mydata.Value.values[0])\n", "except :\n", " print(\"C\")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Suponemos que debemos registrar periodicamente el último valor del mercado. Necesitamos obtener la última muestra y guardarla. Esa última muestra, vamos a imaginar, que corresponde al valor de cierre diario de ese valor. En nuestro caso al valor de crudo.\n", "\n", "- Paso 1. Realiza una adaptación del código anterior para que guarde el último valor de la serie (Delta Time). Como tenemos que recorrer la serie temporal que está guardada en NASDAQ y tenemos un nivel que solo nos permite ver cierta información. Equipararemos fechas para poder \"simular\" el consumo de datos en tiempo real. Por tanto, nuestra fecha actual es la primera fecha de la serie: 1986-01-02 \n", "- Paso 2. ¿Cómo implementar la periocidad de ejecución?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Referencias\n", "- [1] Data Engineering with Python Work with massive datasets to design data models and automate data pipelines using Python (Paul Crickard) Packt Publishing; 2020\n", "\n", "\n", "Nota: referencias donde las infraestructuras están basadas en recursos del Cloud.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3.9.7 64-bit ('my397')", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.7" }, "orig_nbformat": 4, "vscode": { "interpreter": { "hash": "8b38137d60f5ef6101ebd11fd805c6415d52a5c999d13278488bced8392256b3" } } }, "nbformat": 4, "nbformat_minor": 2 }