用 docker-compose 部署 MongoDB Replica Set 开发环境
前言
话不多说,直接进正题
如何搭建和配置
把以下内容写入 docker-compose.yml
文件中。然后运行 docker-compose up -d --scale mongo=3
,就可以启动三个 MongoDB 服务。接下来我们要把这三个 MongoDB 节点配置成一组 Replica Set
version: '3.5'
services:
mongo:
image: mongo
ports:
- "27017-27118:27017"
command: mongod --replSet main
把以下内容写入 utils.py
文件中
import subprocess as sp
def get_node_ports():
ports = []
idx = 1
while True:
try:
output = sp.check_output(
f"docker-compose port --index {idx} mongo 27017",
shell=True,
encoding="utf-8",
stderr=sp.DEVNULL,
)
except sp.CalledProcessError:
break
port = output.split(":")[1]
ports.append(int(port))
idx += 1
if not ports:
raise RuntimeError("no node is running.")
return ports
把以下内容写入 init.py
文件中,再执行 python init.py
命令配置这组 Replica Set
import json
import subprocess as sp
import tempfile
from utils import get_node_ports
def generate_init_js(node_ports):
members = [
{"_id": idx, "host": f"host.docker.internal:{port}"}
for idx, port in enumerate(node_ports)
]
initiate_parameters = {"_id": "main", "members": members}
init_js = f"rs.initiate({json.dumps(initiate_parameters, indent=2)})"
return init_js
def main():
ports = get_node_ports()
init_js = generate_init_js(ports)
print("\x1b[32m$\x1b[0m cat init.js")
print(init_js)
container_name = sp.check_output(
"docker-compose ps | grep mongo_1 | awk '{ print $1 }'",
shell=True,
encoding="utf-8",
).strip()
with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8") as f:
f.write(init_js)
f.flush()
path = f.name
sp.check_output(f"docker cp {path} {container_name}:/init.js", shell=True)
print("\x1b[32m$\x1b[0m copy init.js")
output = sp.check_output(
f'docker exec {container_name} sh -c "mongo --port 27017 < /init.js"',
shell=True,
encoding="utf-8",
)
print("\x1b[32m$\x1b[0m initiate")
print(output)
if __name__ == "__main__":
main()
执行的结果如下
[32m$[0m cat init.js rs.initiate({ "_id": "main", "members": [ { "_id": 0, "host": "host.docker.internal:27025" }, { "_id": 1, "host": "host.docker.internal:27023" }, { "_id": 2, "host": "host.docker.internal:27024" } ] }) [32m$[0m copy init.js [32m$[0m initiate MongoDB shell version v4.0.11 connecting to: mongodb://127.0.0.1:27017/?gssapiServiceName=mongodb Implicit session: session { "id" : UUID("af1e1d7a-69aa-406d-abb3-73b422f3f093") } MongoDB server version: 4.0.11 { "ok" : 1, "operationTime" : Timestamp(1565015581, 1), "$clusterTime" : { "clusterTime" : Timestamp(1565015581, 1), "signature" : { "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="), "keyId" : NumberLong(0) } } } bye
测试
搭建完成了,接下来用以下代码测试一下
import asyncio
import logging
import socket
from unittest import mock
import pytest
from motor.motor_asyncio import AsyncIOMotorClient
from utils import get_node_ports
@pytest.fixture(scope="session")
def node_addresses():
node_ports = get_node_ports()
return [("localhost", port) for port in node_ports]
@pytest.fixture(autouse=True)
def hook_getaddrinfo():
original_getaddrinfo = socket.getaddrinfo
def container_2_host_address(host, port, family=0, type=0, proto=0, flags=0):
"""
由于 MongoDB 会下发 replica 在容器中的地址,
必须替换成在主机上的正确地址
"""
if host == "host.docker.internal":
logging.debug("container 2 host")
host = "localhost"
else:
logging.debug("no change %r", (host, port))
return original_getaddrinfo(host, port, family, type, proto, flags)
with mock.patch("socket.getaddrinfo", new=container_2_host_address):
yield
@pytest.fixture
def uri(node_addresses):
nodes = ",".join(f"{host}:{port}" for host, port in node_addresses)
return f"mongodb://{nodes}/?replicaSet=main"
@pytest.fixture
async def create_db(uri):
async def creator():
client = AsyncIOMotorClient(uri)
db = client["default"]
return db
db = await creator()
# 删除掉所有 collections
col_names = await db.list_collection_names()
for col_name in col_names:
await db.drop_collection(col_name)
return creator
async def get_latest_oplog_timestamp(client):
"""
获取最新的 oplog 时间戳
"""
cursor = client.local.oplog.rs.find().sort("$natural", -1).limit(-1)
await cursor.fetch_next
return cursor.next_object()["ts"]
@pytest.mark.asyncio
async def test_change_stream(create_db):
db = await create_db()
col_name = "boo"
await db.create_collection(col_name)
logging.debug("create collection")
current_timestamp = await get_latest_oplog_timestamp(db.client)
logging.debug("latest oplog timestamp %r", current_timestamp)
async def wait_for_change():
async with db[col_name].watch(
pipeline=[{"$match": {"operationType": "insert"}}],
start_at_operation_time=current_timestamp,
) as change_stream:
# 使用 startAtOperationTime 参数,避免漏掉插入事件
# https://docs.mongodb.com/manual/changeStreams/#start-time
# 可要创建一个 bson.Timestamp 对象
logging.debug("wait for change")
async for change in change_stream:
return change
fut = asyncio.ensure_future(wait_for_change())
data = {"username": "black"}
await db[col_name].insert_one(data)
logging.debug("insert new document %r", data)
change = await fut
logging.debug("received change %r", change)
assert data == change["fullDocument"]
从以上的代码,你可能会发现代码利用 unittest.mock.patch
修改了 socket.getaddrinfo
函数的执行,目的是为了替换 MongoDB Replica Set 下发的节点地址。这个地址为容器内的主机地址 host.docker.internal
,需要替换成本地主机地址 localhost
,以避免无法解析导致测试代码运行失败。如果把程序打包成 docker 镜像,在 docker 容器中开发测试的话,就去掉这一部分的代码了
pytest test.py --log-cli-level debug
把代码写入 test.py
,再执行以上命令
[1m============================= test session starts ==============================[0m platform darwin -- Python 3.7.4, pytest-5.0.1, py-1.8.0, pluggy-0.12.0 rootdir: /Users/linw1995/Documents/me/tmp plugins: asyncio-0.10.0 collected 1 item test.py::test_change_stream [1m-------------------------------- live log setup --------------------------------[0m [35mDEBUG [0m asyncio:selector_events.py:53 Using selector: KqueueSelector [35mDEBUG [0m asyncio:selector_events.py:53 Using selector: KqueueSelector [35mDEBUG [0m root:test.py:33 no change ('localhost', 27025) [35mDEBUG [0m root:test.py:33 no change ('localhost', 27024) [35mDEBUG [0m root:test.py:33 no change ('localhost', 27023) [35mDEBUG [0m root:test.py:30 container 2 host [35mDEBUG [0m root:test.py:30 container 2 host [35mDEBUG [0m root:test.py:30 container 2 host [35mDEBUG [0m root:test.py:30 container 2 host [1m-------------------------------- live log call ---------------------------------[0m [35mDEBUG [0m root:test.py:33 no change ('localhost', 27025) [35mDEBUG [0m root:test.py:33 no change ('localhost', 27024) [35mDEBUG [0m root:test.py:33 no change ('localhost', 27023) [35mDEBUG [0m root:test.py:30 container 2 host [35mDEBUG [0m root:test.py:30 container 2 host [35mDEBUG [0m root:test.py:30 container 2 host [35mDEBUG [0m root:test.py:30 container 2 host [35mDEBUG [0m root:test.py:78 create collection [35mDEBUG [0m root:test.py:81 latest oplog timestamp Timestamp(1565015602, 1) [35mDEBUG [0m root:test.py:91 wait for change [35mDEBUG [0m root:test.py:30 container 2 host [35mDEBUG [0m root:test.py:99 insert new document {'username': 'black', '_id': ObjectId('5d483e325c87feb3e9b4bf9d')} [35mDEBUG [0m root:test.py:102 received change {'_id': {'_data': '825D483E32000000022B022C0100296E5A10044B41D3216978416C8D3372D0F6FCEA4046645F696400645D483E325C87FEB3E9B4BF9D0004'}, 'operationType': 'insert', 'clusterTime': Timestamp(1565015602, 2), 'fullDocument': {'_id': ObjectId('5d483e325c87feb3e9b4bf9d'), 'username': 'black'}, 'ns': {'db': 'default', 'coll': 'boo'}, 'documentKey': {'_id': ObjectId('5d483e325c87feb3e9b4bf9d')}} [32mPASSED[0m[36m [100%][0m [32m[1m=========================== 1 passed in 2.64 seconds ===========================[0m
成功了~这样用来做开发测试真是十分的方便呢