<tfoot id='afz0X'></tfoot>

    <small id='afz0X'></small><noframes id='afz0X'>

      <i id='afz0X'><tr id='afz0X'><dt id='afz0X'><q id='afz0X'><span id='afz0X'><b id='afz0X'><form id='afz0X'><ins id='afz0X'></ins><ul id='afz0X'></ul><sub id='afz0X'></sub></form><legend id='afz0X'></legend><bdo id='afz0X'><pre id='afz0X'><center id='afz0X'></center></pre></bdo></b><th id='afz0X'></th></span></q></dt></tr></i><div id='afz0X'><tfoot id='afz0X'></tfoot><dl id='afz0X'><fieldset id='afz0X'></fieldset></dl></div>

        <bdo id='afz0X'></bdo><ul id='afz0X'></ul>
    1. <legend id='afz0X'><style id='afz0X'><dir id='afz0X'><q id='afz0X'></q></dir></style></legend>
    2. 如何使用插入/修改/删除将 dynamodb 设计为弹性搜索

      时间:2023-07-05
    3. <small id='vlRvb'></small><noframes id='vlRvb'>

        <tfoot id='vlRvb'></tfoot>
          <tbody id='vlRvb'></tbody>

          <legend id='vlRvb'><style id='vlRvb'><dir id='vlRvb'><q id='vlRvb'></q></dir></style></legend>
            • <bdo id='vlRvb'></bdo><ul id='vlRvb'></ul>

              <i id='vlRvb'><tr id='vlRvb'><dt id='vlRvb'><q id='vlRvb'><span id='vlRvb'><b id='vlRvb'><form id='vlRvb'><ins id='vlRvb'></ins><ul id='vlRvb'></ul><sub id='vlRvb'></sub></form><legend id='vlRvb'></legend><bdo id='vlRvb'><pre id='vlRvb'><center id='vlRvb'></center></pre></bdo></b><th id='vlRvb'></th></span></q></dt></tr></i><div id='vlRvb'><tfoot id='vlRvb'></tfoot><dl id='vlRvb'><fieldset id='vlRvb'></fieldset></dl></div>
              • 本文介绍了如何使用插入/修改/删除将 dynamodb 设计为弹性搜索的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

                问题描述

                如何使用 Python 将整个文档传递到弹性搜索中?这是放入弹性搜索的正确方法吗?

                How to pass this entire document into elastic search using Python? Is this this the right way to put into elastic search?

                在dynamodb中id是主键

                In dynamodb id is the primary key

                如何插入dynamodb 下面是代码

                How to insert in to dynamodb Below is the code

                import boto3
                from boto3.dynamodb.conditions import Key, And, Attr
                def lambda_handler(event, context):
                    dynamodb = boto3.resource ('dynamodb')
                    table =dynamodb.Table('newtable')
                    with table.batch_writer(overwrite_by_pkeys=['id']) as batch:
                            batch.put_item(
                                Item={
                                    'id': '1',
                                    'last_name': 'V',
                                    'age': '2',
                                }
                            )
                            batch.put_item(
                                Item={
                                    'id': '2',
                                    'last_name': 'JJ',
                                    'age': '7',
                                }
                            )
                            batch.put_item(
                                Item={
                                    'id': '9',
                                    'last_name': 'ADD',
                                    'age': '95',
                                }
                            )
                            batch.put_item(
                                Item={
                                    'id': '10',
                                    'last_name': 'ADD',
                                    'age': '95',
                                }
                            )
                

                • 如何将期望推送到 Elastic Search 中

                  • How to push expected out into Elastic Search

                    dynamodb 内容发生变化时如何在 ES 中自动反映

                    How to reflect automatically in ES if dynamodb content changes

                    我已经浏览了链接 https://aws.amazon.com/blogs/compute/indexing-amazon-dynamodb-content-with-amazon-elasticsearch-service-using-aws-lambda/

                    下面是我收到错误的代码 ERROR: NameError("name 'event' is not defined")

                    Below is code I am getting error ERROR: NameError("name 'event' is not defined")

                    代码.* 在此之前从 dynamodb 表中触发以下 lambda 函数

                    Code. * Before that trigger the below lambda function from the dynamodb table

                    import boto3
                    import json
                    import re
                    from requests_aws4auth import AWS4Auth
                    from elasticsearch import Elasticsearch, RequestsHttpConnection
                    
                    session = boto3.session.Session()
                    credentials = session.get_credentials()
                    # s3 = session.resource('s3')
                    awsauth = AWS4Auth(credentials.access_key,
                                       credentials.secret_key,
                                       session.region_name, 'es',
                                       session_token=credentials.token)
                    es = Elasticsearch(
                        ['https://xx-east-1.es.amazonaws.com'],
                        http_auth=awsauth,
                        use_ssl=True,
                        verify_certs=True,
                        connection_class=RequestsHttpConnection
                    )
                    reserved_fields = ["uid", "_id", "_type", "_source", "_all", "_parent", "_fieldnames", "_routing", "_index", "_size",
                                       "_timestamp", "_ttl"]
                    
                    
                    def lambda_handler(event, context):
                        print(event)
                        dynamodb = boto3.resource('dynamodb')
                    
                        # Loop over the DynamoDB Stream records
                        for record in event['Records']:
                    
                            try:
                    
                                if record['eventName'] == "INSERT":
                                    insert_document(es, record)
                                elif record['eventName'] == "REMOVE":
                                    remove_document(es, record)
                                elif record['eventName'] == "MODIFY":
                                    modify_document(es, record)
                    
                            except Exception as e:
                                print("Failed to process:")
                                print(json.dumps(record))
                                print("ERROR: " + repr(e))
                                continue
                    
                    
                    # Process MODIFY events
                    def modify_document(es, record):
                        table = getTable(record)
                        print("Dynamo Table: " + table)
                    
                        docId = docid(record)
                        print("KEY")
                        print(docId)
                    
                        # Unmarshal the DynamoDB JSON to a normal JSON
                        doc = json.dumps(document())
                    
                        print("Updated document:")
                        print(doc)
                    
                        # We reindex the whole document as ES accepts partial docs
                        es.index(index=table,
                                 body=doc,
                                 id=docId,
                                 doc_type=table,
                                 refresh=True)
                    
                        print("Successly modified - Index: " + table + " - Document ID: " + docId)
                    
                    
                    def remove_document(es, record):
                        table = getTable(record)
                        print("Dynamo Table: " + table)
                    
                        docId = docid(record)
                        print("Deleting document ID: " + docId)
                    
                        es.delete(index=table,
                                  id=docId,
                                  doc_type=table,
                                  refresh=True)
                    
                        print("Successly removed - Index: " + table + " - Document ID: " + docId)
                    
                    
                    # Process INSERT events
                    def insert_document(es, record):
                        table = getTable(record)
                        print("Dynamo Table: " + table)
                    
                        # Create index if missing
                        if es.indices.exists(table) == False:
                            print("Create missing index: " + table)
                    
                            es.indices.create(table,
                                              body='{"settings": { "index.mapping.coerce": true } }')
                    
                            print("Index created: " + table)
                    
                        # Unmarshal the DynamoDB JSON to a normal JSON
                        doc = json.dumps(document())
                    
                        print("New document to Index:")
                        print(doc)
                    
                        newId = docid(record)
                        es.index(index=table,
                                 body=doc,
                                 id=newId,
                                 doc_type=table,
                                 refresh=True)
                    
                        print("Successly inserted - Index: " + table + " - Document ID: " + newId)
                    
                    
                    def getTable(record):
                        p = re.compile('arn:aws:dynamodb:.*?:.*?:table/([0-9a-zA-Z_-]+)/.+')
                        m = p.match(record['eventSourceARN'])
                        if m is None:
                            raise Exception("Table not found in SourceARN")
                        return m.group(1).lower()
                    
                    
                    def document(event):
                        result = []
                        for r in event['Records']:
                            tmp = {}
                            for k, v in r['dynamodb']['NewImage'].items():
                                if "S" in v.keys() or "BOOL" in v.keys():
                                    tmp[k] = v.get('S', v.get('BOOL', False))
                                elif 'NULL' in v:
                                    tmp[k] = None
                            result.append(tmp)
                            for i in result:
                                return i
                    
                    
                    def docid(event):
                        result = []
                        for r in event['Records']:
                            tmp = {}
                            for k, v in r['dynamodb']['Keys'].items():
                                if "S" in v.keys() or "BOOL" in v.keys():
                                    tmp[k] = v.get('S', v.get('BOOL', False))
                                elif 'NULL' in v:
                                    tmp[k] = None
                            result.append(tmp)
                        for newId in result:
                            return newId
                    

                    在文档和文档中出现错误

                    Getting error at document and docid

                    各自都有输出

                    result = []
                    for r in event['Records']:
                        tmp = {}
                    
                        for k, v in r['dynamodb']['NewImage'].items():
                        #for k, v in r['dynamodb']['Keys'].items():
                            if "S" in v.keys() or "BOOL" in v.keys():
                                tmp[k] = v.get('S', v.get('BOOL', False))
                            elif 'NULL' in v:
                                tmp[k] = None
                    
                        result.append(tmp)
                    for i in result:
                        print (i)
                    
                    event = {'Records': [{'eventID': '2339bc590c21035b84f8cc602b12c1d2', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '9'}}, 'NewImage': {'last_name': {'S': 'Hus'}, 'id': {'S': '9'}, 'age': {'S': '95'}}, 'SequenceNumber': '3100000000035684810908', 'SizeBytes': 23, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': 'xxxx', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '2'}}, 'NewImage': {'last_name': {'S': 'JJ'}, 'id': {'S': '2'}, 'age': {'S': '5'}}, 'SequenceNumber': '3200000000035684810954', 'SizeBytes': 21, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': 'a9c90c0c4a5a4b64d0314c4557e94e28', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '10'}}, 'NewImage': {'last_name': {'S': 'Hus'}, 'id': {'S': '10'}, 'age': {'S': '95'}}, 'SequenceNumber': '3300000000035684810956', 'SizeBytes': 25, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': '288f4a424992e5917af0350b53f754dc', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '1'}}, 'NewImage': {'last_name': {'S': 'V'}, 'id': {'S': '1'}, 'age': {'S': '2'}}, 'SequenceNumber': '3400000000035684810957', 'SizeBytes': 20, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}]}
                    

                    推荐答案

                    您可以检查以下内容.我尝试复制该问题并且可以确认错误

                    You can check the following. I tried to replicate the issue and can confirm the error of

                    ERROR: NameError("name 'event' is not defined")
                    

                    我使用了 simulated INSERT event 来自 DynamoDb 流,基于您的示例和 我自己的表:

                    I used simulated INSERT event from DynamoDb stream, based on your example and my own table:

                    {
                      "Records": [
                        {
                          "eventID": "b8b993cf16d1aacb61b40411b39e0b1f",
                          "eventName": "INSERT",
                          "eventVersion": "1.1",
                          "eventSource": "aws:dynamodb",
                          "awsRegion": "us-east-1",
                          "dynamodb": {
                            "ApproximateCreationDateTime": 1595922821.0,
                            "Keys": {
                              "id": {
                                "N": "1"
                              }
                            },
                            "NewImage": {
                              "last_name": {
                                "S": "V"
                              },
                              "id": {
                                "N": "1"
                              },
                              "age": {
                                "S": "2"
                              }
                            },
                            "SequenceNumber": "25200000000020406897812",
                            "SizeBytes": 22,
                            "StreamViewType": "NEW_AND_OLD_IMAGES"
                          },
                          "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
                        },
                        {
                          "eventID": "e5d5bec988945c06ffc879cf16b89bf7",
                          "eventName": "INSERT",
                          "eventVersion": "1.1",
                          "eventSource": "aws:dynamodb",
                          "awsRegion": "us-east-1",
                          "dynamodb": {
                            "ApproximateCreationDateTime": 1595922821.0,
                            "Keys": {
                              "id": {
                                "N": "9"
                              }
                            },
                            "NewImage": {
                              "last_name": {
                                "S": "ADD"
                              },
                              "id": {
                                "N": "9"
                              },
                              "age": {
                                "S": "95"
                              }
                            },
                            "SequenceNumber": "25300000000020406897813",
                            "SizeBytes": 25,
                            "StreamViewType": "NEW_AND_OLD_IMAGES"
                          },
                          "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
                        },
                        {
                          "eventID": "f1a7c9736253b5ef28ced38ed5ff645b",
                          "eventName": "INSERT",
                          "eventVersion": "1.1",
                          "eventSource": "aws:dynamodb",
                          "awsRegion": "us-east-1",
                          "dynamodb": {
                            "ApproximateCreationDateTime": 1595922821.0,
                            "Keys": {
                              "id": {
                                "N": "2"
                              }
                            },
                            "NewImage": {
                              "last_name": {
                                "S": "JJ"
                              },
                              "id": {
                                "N": "2"
                              },
                              "age": {
                                "S": "7"
                              }
                            },
                            "SequenceNumber": "25400000000020406897819",
                            "SizeBytes": 23,
                            "StreamViewType": "NEW_AND_OLD_IMAGES"
                          },
                          "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
                        },
                        {
                          "eventID": "bfcbad9dc19883e4172e6dc25e66637b",
                          "eventName": "INSERT",
                          "eventVersion": "1.1",
                          "eventSource": "aws:dynamodb",
                          "awsRegion": "us-east-1",
                          "dynamodb": {
                            "ApproximateCreationDateTime": 1595922821.0,
                            "Keys": {
                              "id": {
                                "N": "10"
                              }
                            },
                            "NewImage": {
                              "last_name": {
                                "S": "ADD"
                              },
                              "id": {
                                "N": "10"
                              },
                              "age": {
                                "S": "95"
                              }
                            },
                            "SequenceNumber": "25500000000020406897820",
                            "SizeBytes": 25,
                            "StreamViewType": "NEW_AND_OLD_IMAGES"
                          },
                          "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
                        }
                      ]
                    }
                    

                    示例修改事件:

                    {
                      "Records": [
                        {
                          "eventID": "4e4629c88aa00e366c89a293d9c82d54",
                          "eventName": "MODIFY",
                          "eventVersion": "1.1",
                          "eventSource": "aws:dynamodb",
                          "awsRegion": "us-east-1",
                          "dynamodb": {
                            "ApproximateCreationDateTime": 1595924589.0,
                            "Keys": {
                              "id": {
                                "N": "2"
                              }
                            },
                            "NewImage": {
                              "last_name": {
                                "S": "zhgdhfgdh"
                              },
                              "id": {
                                "N": "2"
                              },
                              "age": {
                                "S": "7"
                              }
                            },
                            "OldImage": {
                              "last_name": {
                                "S": "JJ"
                              },
                              "id": {
                                "N": "2"
                              },
                              "age": {
                                "S": "7"
                              }
                            },
                            "SequenceNumber": "25600000000020408264140",
                            "SizeBytes": 49,
                            "StreamViewType": "NEW_AND_OLD_IMAGES"
                          },
                          "eventSourceARN": "arn:aws:dynamodb:us-east-1:34234:table/newtable/stream/2020-07-28T06:59:38.569"
                        }
                      ]
                    }
                    

                    lambda函数的修改代码,我现在可以确认不会产生错误:

                    Modified code of lambda function, which I can confirm does not produce errors now:

                    import boto3
                    import json
                    import re
                    
                    from requests_aws4auth import AWS4Auth
                    from elasticsearch import Elasticsearch, RequestsHttpConnection
                    
                    session = boto3.session.Session()
                    credentials = session.get_credentials()
                    
                    s3 = session.resource('s3')
                    
                    awsauth = AWS4Auth(credentials.access_key,
                                      credentials.secret_key,
                                      session.region_name, 'es',
                                      session_token=credentials.token)
                    
                        
                    es = Elasticsearch(
                        ['https://vpc-test-dmamain-452frn764ggb4a.us-east-1.es.amazonaws.com'],
                        use_ssl=True,
                        verify_certs=True,
                        http_auth=awsauth,
                        connection_class=RequestsHttpConnection
                    )
                    reserved_fields = ["uid", "_id", "_type", "_source", "_all", "_parent", "_fieldnames", "_routing", "_index", "_size",
                                       "_timestamp", "_ttl"]
                    
                    
                    def lambda_handler(event, context):
                        print(event)
                        #dynamodb = boto3.resource('dynamodb')
                    
                        # Loop over the DynamoDB Stream records
                        for record in event['Records']:
                                
                            if record['eventName'] == "INSERT":
                                insert_document(event, es, record)
                            elif record['eventName'] == "REMOVE":
                                remove_document(event, es, record)
                            elif record['eventName'] == "MODIFY":
                                modify_document(event, es, record)
                    
                    
                    # Process MODIFY events
                    def modify_document(event, es, record):
                        table = getTable(record)
                        print("Dynamo Table: " + table)
                    
                        docId = docid(event, event)
                        print("KEY")
                        print(docId)
                    
                        # Unmarshal the DynamoDB JSON to a normal JSON
                        doc = json.dumps(document(event))
                    
                        print("Updated document:")
                        print(doc)
                    
                        # We reindex the whole document as ES accepts partial docs
                        es.index(index=table,
                                 body=doc,
                                 id=docId,
                                 doc_type=table,
                                 refresh=True)
                    
                        print("Successly modified - Index: " , table , " - Document ID: " , docId)
                    
                    
                    def remove_document(event, es, record):
                        
                        table = getTable(record)
                        
                        print("Dynamo Table: " + table)
                    
                        docId = docid(event, event)
                        print("Deleting document ID: ", docId)
                    
                        es.delete(index=table,
                                  id=docId,
                                  doc_type=table,
                                  refresh=True)
                    
                        print("Successly removed - Index: ", table, " - Document ID: " , docId)
                    
                    
                    # Process INSERT events
                    def insert_document(event, es, record):
                        table = getTable(record)
                        print("Dynamo Table: " + table)
                    
                        # Create index if missing
                        if es.indices.exists(table) == False:
                            print("Create missing index: " + table)
                    
                            es.indices.create(table,
                                              body='{"settings": { "index.mapping.coerce": true } }')
                    
                            print("Index created: " + table)
                    
                        # Unmarshal the DynamoDB JSON to a normal JSON
                        doc = json.dumps(document(event))
                    
                        print("New document to Index:")
                        print(doc)
                    
                        newId = docid(event, record)
                        
                        es.index(index=table,
                                 body=doc,
                                 id=newId,
                                 doc_type=table,
                                 refresh=True)
                    
                        print("Successly inserted - Index: " , table + " - Document ID: " , newId)
                    
                    
                    def getTable(record):
                        p = re.compile('arn:aws:dynamodb:.*?:.*?:table/([0-9a-zA-Z_-]+)/.+')
                        m = p.match(record['eventSourceARN'])
                        if m is None:
                            raise Exception("Table not found in SourceARN")
                        return m.group(1).lower()
                    
                    
                    def document(event):
                        result = []
                        for r in event['Records']:
                            tmp = {}
                            for k, v in r['dynamodb']['NewImage'].items():
                                if "S" in v.keys() or "BOOL" in v.keys():
                                    tmp[k] = v.get('S', v.get('BOOL', False))
                                elif 'NULL' in v:
                                    tmp[k] = None
                            result.append(tmp)
                            for i in result:
                                return i
                    
                    
                    def docid(event, record):
                        result = []
                        for r in event['Records']:
                            tmp = {}
                            for k, v in r['dynamodb']['Keys'].items():
                                if "S" in v.keys() or "BOOL" in v.keys():
                                    tmp[k] = v.get('S', v.get('BOOL', False))
                                elif 'NULL' in v:
                                    tmp[k] = None
                            result.append(tmp)
                        for newId in result:
                            return newId
                    

                    尚未验证数据是否正确写入、修改或插入 ElasticSearch.但是我运行了 ES 域,并在 lambda 中使用它来验证 lambda 是否可以连接到它并运行查询.

                    I haven't verified if data is correctly written, modified or inserted into ElasticSearch. But I had ES domain running and used in the lambda to verify if lambda can connect to it and run the queries.

                    插入事件的 lambda 输出示例:

                    Example output from lambda for INSERT event:

                    Dynamo Table: newtable
                    New document to Index:
                    {"last_name": "V", "age": "2"}
                    Successly inserted - Index:  newtable - Document ID:  {}
                    Dynamo Table: newtable
                    New document to Index:
                    {"last_name": "V", "age": "2"}
                    Successly inserted - Index:  newtable - Document ID:  {}
                    Dynamo Table: newtable
                    New document to Index:
                    {"last_name": "V", "age": "2"}
                    Successly inserted - Index:  newtable - Document ID:  {}
                    Dynamo Table: newtable
                    New document to Index:
                    {"last_name": "V", "age": "2"}
                    Successly inserted - Index:  newtable - Document ID:  {}
                    
                    Example output from lambda from MODIFY event:
                    
                    

                    更新文档:

                    {
                        "last_name": "zhgdhfgdh",
                        "age": "7"
                    }
                    Successly modified - Index:  newtable  - Document ID:  
                    {}
                    

                    我认为 docid 是否正常工作需要进一步调查,因为它似乎返回空字典:

                    I think docid requires further investigation if it works correctly as it seems to return empty dict:

                     Document ID:  {}
                    

                    这篇关于如何使用插入/修改/删除将 dynamodb 设计为弹性搜索的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

                上一篇:如何模拟 AWS DynamoDB 服务? 下一篇:如何通过流从 dynamodb 推送数据

                相关文章

                1. <small id='IycFq'></small><noframes id='IycFq'>

                  <i id='IycFq'><tr id='IycFq'><dt id='IycFq'><q id='IycFq'><span id='IycFq'><b id='IycFq'><form id='IycFq'><ins id='IycFq'></ins><ul id='IycFq'></ul><sub id='IycFq'></sub></form><legend id='IycFq'></legend><bdo id='IycFq'><pre id='IycFq'><center id='IycFq'></center></pre></bdo></b><th id='IycFq'></th></span></q></dt></tr></i><div id='IycFq'><tfoot id='IycFq'></tfoot><dl id='IycFq'><fieldset id='IycFq'></fieldset></dl></div>
                  <tfoot id='IycFq'></tfoot>
                    <bdo id='IycFq'></bdo><ul id='IycFq'></ul>
                  <legend id='IycFq'><style id='IycFq'><dir id='IycFq'><q id='IycFq'></q></dir></style></legend>