MongoDB
 sql >> база данни >  >> NoSQL >> MongoDB

Потопете Kafka Stream към MongoDB с помощта на PySpark Structured Streaming

Намерих решение. Тъй като не можах да намеря подходящия драйвер за Mongo за структурирано поточно предаване, работих върху друго решение. Сега използвам директната връзка към mongoDb и използвам „foreach(...)“ вместо foreachbatch(. ..). Моят код изглежда така във файла testSpark.py:

....
import pymongo
from pymongo import MongoClient

local_url = "mongodb://localhost:27017"


def write_machine_df_mongo(target_df):

    cluster = MongoClient(local_url)
    db = cluster["test_db"]
    collection = db.test1

    post = {
            "machine_id": target_df.machine_id,
            "proc_type": target_df.proc_type,
            "sensor1_id": target_df.sensor1_id,
            "sensor2_id": target_df.sensor2_id,
            "time": target_df.time,
            "sensor1_val": target_df.sensor1_val,
            "sensor2_val": target_df.sensor2_val,
            }

    collection.insert_one(post)

machine_df.writeStream\
    .outputMode("append")\
    .foreach(write_machine_df_mongo)\
    .start()



  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Mongodb няма да стартира

  2. Неуспешна заявка за диапазон от дати за MongoDB база данни с туитове

  3. MongoDB изглежда избира грешен индекс, когато прави агрегат

  4. Mongoose - RangeError:Максималният размер на стека от обаждания е превишен

  5. (възел:3341) Предупреждение за прекратяване:Mongoose:mpromise